View Javadoc

1   package org.sonatype.aether.util.concurrency;
2   
3   /*******************************************************************************
4    * Copyright (c) 2010-2011 Sonatype, Inc.
5    * All rights reserved. This program and the accompanying materials
6    * are made available under the terms of the Eclipse Public License v1.0
7    * which accompanies this distribution, and is available at
8    *   http://www.eclipse.org/legal/epl-v10.html
9    *******************************************************************************/
10  
11  import java.util.concurrent.atomic.AtomicInteger;
12  import java.util.concurrent.atomic.AtomicReference;
13  import java.util.concurrent.locks.LockSupport;
14  
15  /**
16   * A utility class to forward any uncaught {@link Error} or {@link RuntimeException} from a {@link Runnable} executed in
17   * a worker thread back to the parent thread. The simplified usage pattern looks like this:
18   * 
19   * <pre>
20   * RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
21   * for ( Runnable task : tasks )
22   * {
23   *     executor.execute( errorForwarder.wrap( task ) );
24   * }
25   * errorForwarder.await();
26   * </pre>
27   * 
28   * @author Benjamin Bentmann
29   */
30  public class RunnableErrorForwarder
31  {
32  
33      private final Thread thread = Thread.currentThread();
34  
35      private final AtomicInteger counter = new AtomicInteger();
36  
37      private final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
38  
39      /**
40       * Wraps the specified runnable into an equivalent runnable that will allow forwarding of uncaught errors.
41       * 
42       * @param runnable The runnable from which to forward errors, must not be {@code null}.
43       * @return The error-forwarding runnable to eventually execute, never {@code null}.
44       */
45      public Runnable wrap( final Runnable runnable )
46      {
47          if ( runnable == null )
48          {
49              throw new IllegalArgumentException( "runnable missing" );
50          }
51  
52          counter.incrementAndGet();
53  
54          return new Runnable()
55          {
56              public void run()
57              {
58                  try
59                  {
60                      runnable.run();
61                  }
62                  catch ( RuntimeException e )
63                  {
64                      error.compareAndSet( null, e );
65                      throw e;
66                  }
67                  catch ( Error e )
68                  {
69                      error.compareAndSet( null, e );
70                      throw e;
71                  }
72                  finally
73                  {
74                      counter.decrementAndGet();
75                      LockSupport.unpark( thread );
76                  }
77              }
78          };
79      }
80  
81      /**
82       * Causes the current thread to wait until all previously {@link #wrap(Runnable) wrapped} runnables have terminated
83       * and potentially re-throws an uncaught {@link RuntimeException} or {@link Error} from any of the runnables. In
84       * case multiple runnables encountered uncaught errors, one error is arbitrarily selected.
85       */
86      public void await()
87      {
88          awaitTerminationOfAllRunnables();
89  
90          Throwable error = this.error.get();
91          if ( error != null )
92          {
93              if ( error instanceof RuntimeException )
94              {
95                  throw (RuntimeException) error;
96              }
97              else if ( error instanceof ThreadDeath )
98              {
99                  throw new IllegalStateException( error );
100             }
101             else if ( error instanceof Error )
102             {
103                 throw (Error) error;
104             }
105             throw new IllegalStateException( error );
106         }
107     }
108 
109     private void awaitTerminationOfAllRunnables()
110     {
111         boolean interrupted = false;
112 
113         while ( counter.get() > 0 )
114         {
115             LockSupport.park();
116 
117             if ( Thread.interrupted() )
118             {
119                 interrupted = true;
120             }
121         }
122 
123         if ( interrupted )
124         {
125             Thread.currentThread().interrupt();
126         }
127     }
128 
129 }