View Javadoc

1   package org.apache.continuum.taskqueueexecutor;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import edu.emory.mathcs.backport.java.util.concurrent.CancellationException;
23  import edu.emory.mathcs.backport.java.util.concurrent.ExecutionException;
24  import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService;
25  import edu.emory.mathcs.backport.java.util.concurrent.Executors;
26  import edu.emory.mathcs.backport.java.util.concurrent.Future;
27  import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
28  import edu.emory.mathcs.backport.java.util.concurrent.TimeoutException;
29  import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
30  import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
31  import org.codehaus.plexus.personality.plexus.lifecycle.phase.Startable;
32  import org.codehaus.plexus.personality.plexus.lifecycle.phase.StartingException;
33  import org.codehaus.plexus.personality.plexus.lifecycle.phase.StoppingException;
34  import org.codehaus.plexus.taskqueue.Task;
35  import org.codehaus.plexus.taskqueue.TaskQueue;
36  import org.codehaus.plexus.taskqueue.execution.TaskExecutionException;
37  import org.codehaus.plexus.taskqueue.execution.TaskExecutor;
38  import org.codehaus.plexus.taskqueue.execution.TaskQueueExecutor;
39  import org.codehaus.plexus.util.StringUtils;
40  import org.slf4j.Logger;
41  import org.slf4j.LoggerFactory;
42  
43  /**
44   * Modified plexus ThreadedTaskQueueExecutor
45   */
46  public class ParallelBuildsThreadedTaskQueueExecutor
47      implements TaskQueueExecutor, Initializable, Startable
48  {
49      private static final Logger log = LoggerFactory.getLogger( ParallelBuildsThreadedTaskQueueExecutor.class );
50  
51      private static final int SHUTDOWN = 1;
52  
53      private static final int CANCEL_TASK = 2;
54  
55      /**
56       * @requirement
57       */
58      private TaskQueue queue;
59  
60      /**
61       * @requirement
62       */
63      private TaskExecutor executor;
64  
65      /**
66       * @configuration
67       */
68      private String name;
69  
70      // ----------------------------------------------------------------------
71      //
72      // ----------------------------------------------------------------------
73  
74      private ExecutorRunnable executorRunnable;
75  
76      private ExecutorService executorService;
77  
78      private Task currentTask;
79  
80      private class ExecutorRunnable
81          extends Thread
82      {
83          private volatile int command;
84  
85          private boolean done;
86  
87          public void run()
88          {
89              while ( command != SHUTDOWN )
90              {
91                  final Task task;
92  
93                  currentTask = null;
94  
95                  try
96                  {
97                      task = queue.poll( 100, TimeUnit.MILLISECONDS );
98                  }
99                  catch ( InterruptedException e )
100                 {
101                     log.info( "Executor thread interrupted, command: " + ( command == SHUTDOWN
102                         ? "Shutdown"
103                         : command == CANCEL_TASK ? "Cancel task" : "Unknown" ) );
104                     continue;
105                 }
106 
107                 if ( task == null )
108                 {
109                     continue;
110                 }
111 
112                 currentTask = task;
113 
114                 Future future = executorService.submit( new Runnable()
115                 {
116                     public void run()
117                     {
118                         try
119                         {
120                             executor.executeTask( task );
121                         }
122                         catch ( TaskExecutionException e )
123                         {
124                             log.error( "Error executing task", e );
125                         }
126                     }
127                 } );
128 
129                 try
130                 {
131                     waitForTask( task, future );
132                 }
133                 catch ( ExecutionException e )
134                 {
135                     log.error( "Error executing task", e );
136                 }
137             }
138 
139             currentTask = null;
140 
141             log.info( "Executor thread '" + name + "' exited." );
142 
143             done = true;
144 
145             synchronized ( this )
146             {
147                 notifyAll();
148             }
149         }
150 
151         private void waitForTask( Task task, Future future )
152             throws ExecutionException
153         {
154             boolean stop = false;
155 
156             while ( !stop )
157             {
158                 try
159                 {
160                     if ( task.getMaxExecutionTime() == 0 )
161                     {
162                         log.debug( "Waiting indefinitely for task to complete" );
163                         future.get();
164                         return;
165                     }
166                     else
167                     {
168                         log.debug( "Waiting at most " + task.getMaxExecutionTime() + "ms for task completion" );
169                         future.get( task.getMaxExecutionTime(), TimeUnit.MILLISECONDS );
170                         log.debug( "Task completed within " + task.getMaxExecutionTime() + "ms" );
171                         return;
172                     }
173                 }
174                 catch ( InterruptedException e )
175                 {
176                     switch ( command )
177                     {
178                         case SHUTDOWN:
179                         {
180                             log.info( "Shutdown command received. Cancelling task." );
181                             cancel( future );
182                             return;
183                         }
184 
185                         case CANCEL_TASK:
186                         {
187                             command = 0;
188                             log.info( "Cancelling task" );
189                             cancel( future );
190                             return;
191                         }
192 
193                         default:
194                             // when can this thread be interrupted, and should we ignore it if shutdown = false?
195                             log.warn( "Interrupted while waiting for task to complete; ignoring", e );
196                             break;
197                     }
198                 }
199                 catch ( TimeoutException e )
200                 {
201                     log.warn( "Task " + task + " didn't complete within time, cancelling it." );
202                     cancel( future );
203                     return;
204                 }
205                 catch ( CancellationException e )
206                 {
207                     log.warn( "The task was cancelled", e );
208                     return;
209                 }
210             }
211         }
212 
213         private void cancel( Future future )
214         {
215             if ( !future.cancel( true ) )
216             {
217                 if ( !future.isDone() && !future.isCancelled() )
218                 {
219                     log.warn( "Unable to cancel task" );
220                 }
221                 else
222                 {
223                     log.warn(
224                         "Task not cancelled (Flags: done: " + future.isDone() + " cancelled: " + future.isCancelled() +
225                             ")" );
226                 }
227             }
228             else
229             {
230                 log.debug( "Task successfully cancelled" );
231             }
232         }
233 
234         public synchronized void shutdown()
235         {
236             log.debug( "Signalling executor thread to shutdown" );
237 
238             command = SHUTDOWN;
239 
240             interrupt();
241         }
242 
243         public synchronized boolean cancelTask( Task task )
244         {
245             if ( !task.equals( currentTask ) )
246             {
247                 log.debug( "Not cancelling task - it is not running" );
248                 return false;
249             }
250 
251             if ( command != SHUTDOWN )
252             {
253                 log.debug( "Signalling executor thread to cancel task" );
254 
255                 command = CANCEL_TASK;
256 
257                 interrupt();
258             }
259             else
260             {
261                 log.debug( "Executor thread already stopping; task will be cancelled automatically" );
262             }
263 
264             return true;
265         }
266 
267         public boolean isDone()
268         {
269             return done;
270         }
271     }
272 
273     // ----------------------------------------------------------------------
274     // Component lifecycle
275     // ----------------------------------------------------------------------
276 
277     public void initialize()
278         throws InitializationException
279     {
280         if ( StringUtils.isEmpty( name ) )
281         {
282             throw new IllegalArgumentException( "'name' must be set." );
283         }
284     }
285 
286     public void start()
287         throws StartingException
288     {
289         log.info( "Starting task executor, thread name '" + name + "'." );
290 
291         this.executorService = Executors.newCachedThreadPool();
292 
293         executorRunnable = new ExecutorRunnable();
294 
295         executorRunnable.setDaemon( true );
296 
297         executorRunnable.start();
298     }
299 
300     public void stop()
301         throws StoppingException
302     {
303         executorRunnable.shutdown();
304 
305         int maxSleep = 10 * 1000; // 10 seconds
306 
307         int interval = 1000;
308 
309         long endTime = System.currentTimeMillis() + maxSleep;
310 
311         while ( !executorRunnable.isDone() && executorRunnable.isAlive() )
312         {
313             if ( System.currentTimeMillis() > endTime )
314             {
315                 log.warn( "Timeout waiting for executor thread '" + name + "' to stop, aborting" );
316                 break;
317             }
318 
319             log.info( "Waiting until task executor '" + name + "' is idling..." );
320 
321             try
322             {
323                 synchronized ( executorRunnable )
324                 {
325                     executorRunnable.wait( interval );
326                 }
327             }
328             catch ( InterruptedException ex )
329             {
330                 // ignore
331             }
332 
333             // notify again, just in case.
334             executorRunnable.shutdown();
335         }
336     }
337 
338     public Task getCurrentTask()
339     {
340         return currentTask;
341     }
342 
343     public synchronized boolean cancelTask( Task task )
344     {
345         return executorRunnable.cancelTask( task );
346     }
347 
348     public String getName()
349     {
350         return name;
351     }
352 
353     public void setName( String name )
354     {
355         this.name = name;
356     }
357 
358     public TaskQueue getQueue()
359     {
360         return queue;
361     }
362 }