1 package org.sonatype.aether.util.concurrency;
2
3 /*******************************************************************************
4 * Copyright (c) 2010-2011 Sonatype, Inc.
5 * All rights reserved. This program and the accompanying materials
6 * are made available under the terms of the Eclipse Public License v1.0
7 * which accompanies this distribution, and is available at
8 * http://www.eclipse.org/legal/epl-v10.html
9 *******************************************************************************/
10
11 import java.util.concurrent.atomic.AtomicInteger;
12 import java.util.concurrent.atomic.AtomicReference;
13 import java.util.concurrent.locks.LockSupport;
14
15 /**
16 * A utility class to forward any uncaught {@link Error} or {@link RuntimeException} from a {@link Runnable} executed in
17 * a worker thread back to the parent thread. The simplified usage pattern looks like this:
18 *
19 * <pre>
20 * RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
21 * for ( Runnable task : tasks )
22 * {
23 * executor.execute( errorForwarder.wrap( task ) );
24 * }
25 * errorForwarder.await();
26 * </pre>
27 *
28 * @author Benjamin Bentmann
29 */
30 public class RunnableErrorForwarder
31 {
32
33 private final Thread thread = Thread.currentThread();
34
35 private final AtomicInteger counter = new AtomicInteger();
36
37 private final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
38
39 /**
40 * Wraps the specified runnable into an equivalent runnable that will allow forwarding of uncaught errors.
41 *
42 * @param runnable The runnable from which to forward errors, must not be {@code null}.
43 * @return The error-forwarding runnable to eventually execute, never {@code null}.
44 */
45 public Runnable wrap( final Runnable runnable )
46 {
47 if ( runnable == null )
48 {
49 throw new IllegalArgumentException( "runnable missing" );
50 }
51
52 counter.incrementAndGet();
53
54 return new Runnable()
55 {
56 public void run()
57 {
58 try
59 {
60 runnable.run();
61 }
62 catch ( RuntimeException e )
63 {
64 error.compareAndSet( null, e );
65 throw e;
66 }
67 catch ( Error e )
68 {
69 error.compareAndSet( null, e );
70 throw e;
71 }
72 finally
73 {
74 counter.decrementAndGet();
75 LockSupport.unpark( thread );
76 }
77 }
78 };
79 }
80
81 /**
82 * Causes the current thread to wait until all previously {@link #wrap(Runnable) wrapped} runnables have terminated
83 * and potentially re-throws an uncaught {@link RuntimeException} or {@link Error} from any of the runnables. In
84 * case multiple runnables encountered uncaught errors, one error is arbitrarily selected.
85 */
86 public void await()
87 {
88 awaitTerminationOfAllRunnables();
89
90 Throwable error = this.error.get();
91 if ( error != null )
92 {
93 if ( error instanceof RuntimeException )
94 {
95 throw (RuntimeException) error;
96 }
97 else if ( error instanceof ThreadDeath )
98 {
99 throw new IllegalStateException( error );
100 }
101 else if ( error instanceof Error )
102 {
103 throw (Error) error;
104 }
105 throw new IllegalStateException( error );
106 }
107 }
108
109 private void awaitTerminationOfAllRunnables()
110 {
111 boolean interrupted = false;
112
113 while ( counter.get() > 0 )
114 {
115 LockSupport.park();
116
117 if ( Thread.interrupted() )
118 {
119 interrupted = true;
120 }
121 }
122
123 if ( interrupted )
124 {
125 Thread.currentThread().interrupt();
126 }
127 }
128
129 }