1 package org.apache.continuum.taskqueueexecutor;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import edu.emory.mathcs.backport.java.util.concurrent.CancellationException;
23 import edu.emory.mathcs.backport.java.util.concurrent.ExecutionException;
24 import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService;
25 import edu.emory.mathcs.backport.java.util.concurrent.Executors;
26 import edu.emory.mathcs.backport.java.util.concurrent.Future;
27 import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
28 import edu.emory.mathcs.backport.java.util.concurrent.TimeoutException;
29 import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
30 import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
31 import org.codehaus.plexus.personality.plexus.lifecycle.phase.Startable;
32 import org.codehaus.plexus.personality.plexus.lifecycle.phase.StartingException;
33 import org.codehaus.plexus.personality.plexus.lifecycle.phase.StoppingException;
34 import org.codehaus.plexus.taskqueue.Task;
35 import org.codehaus.plexus.taskqueue.TaskQueue;
36 import org.codehaus.plexus.taskqueue.execution.TaskExecutionException;
37 import org.codehaus.plexus.taskqueue.execution.TaskExecutor;
38 import org.codehaus.plexus.taskqueue.execution.TaskQueueExecutor;
39 import org.codehaus.plexus.util.StringUtils;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43
44
45
46 public class ParallelBuildsThreadedTaskQueueExecutor
47 implements TaskQueueExecutor, Initializable, Startable
48 {
49 private static final Logger log = LoggerFactory.getLogger( ParallelBuildsThreadedTaskQueueExecutor.class );
50
51 private static final int SHUTDOWN = 1;
52
53 private static final int CANCEL_TASK = 2;
54
55
56
57
58 private TaskQueue queue;
59
60
61
62
63 private TaskExecutor executor;
64
65
66
67
68 private String name;
69
70
71
72
73
74 private ExecutorRunnable executorRunnable;
75
76 private ExecutorService executorService;
77
78 private Task currentTask;
79
80 private class ExecutorRunnable
81 extends Thread
82 {
83 private volatile int command;
84
85 private boolean done;
86
87 public void run()
88 {
89 while ( command != SHUTDOWN )
90 {
91 final Task task;
92
93 currentTask = null;
94
95 try
96 {
97 task = queue.poll( 100, TimeUnit.MILLISECONDS );
98 }
99 catch ( InterruptedException e )
100 {
101 log.info( "Executor thread interrupted, command: " + ( command == SHUTDOWN
102 ? "Shutdown"
103 : command == CANCEL_TASK ? "Cancel task" : "Unknown" ) );
104 continue;
105 }
106
107 if ( task == null )
108 {
109 continue;
110 }
111
112 currentTask = task;
113
114 Future future = executorService.submit( new Runnable()
115 {
116 public void run()
117 {
118 try
119 {
120 executor.executeTask( task );
121 }
122 catch ( TaskExecutionException e )
123 {
124 log.error( "Error executing task", e );
125 }
126 }
127 } );
128
129 try
130 {
131 waitForTask( task, future );
132 }
133 catch ( ExecutionException e )
134 {
135 log.error( "Error executing task", e );
136 }
137 }
138
139 currentTask = null;
140
141 log.info( "Executor thread '" + name + "' exited." );
142
143 done = true;
144
145 synchronized ( this )
146 {
147 notifyAll();
148 }
149 }
150
151 private void waitForTask( Task task, Future future )
152 throws ExecutionException
153 {
154 boolean stop = false;
155
156 while ( !stop )
157 {
158 try
159 {
160 if ( task.getMaxExecutionTime() == 0 )
161 {
162 log.debug( "Waiting indefinitely for task to complete" );
163 future.get();
164 return;
165 }
166 else
167 {
168 log.debug( "Waiting at most " + task.getMaxExecutionTime() + "ms for task completion" );
169 future.get( task.getMaxExecutionTime(), TimeUnit.MILLISECONDS );
170 log.debug( "Task completed within " + task.getMaxExecutionTime() + "ms" );
171 return;
172 }
173 }
174 catch ( InterruptedException e )
175 {
176 switch ( command )
177 {
178 case SHUTDOWN:
179 {
180 log.info( "Shutdown command received. Cancelling task." );
181 cancel( future );
182 return;
183 }
184
185 case CANCEL_TASK:
186 {
187 command = 0;
188 log.info( "Cancelling task" );
189 cancel( future );
190 return;
191 }
192
193 default:
194
195 log.warn( "Interrupted while waiting for task to complete; ignoring", e );
196 break;
197 }
198 }
199 catch ( TimeoutException e )
200 {
201 log.warn( "Task " + task + " didn't complete within time, cancelling it." );
202 cancel( future );
203 return;
204 }
205 catch ( CancellationException e )
206 {
207 log.warn( "The task was cancelled", e );
208 return;
209 }
210 }
211 }
212
213 private void cancel( Future future )
214 {
215 if ( !future.cancel( true ) )
216 {
217 if ( !future.isDone() && !future.isCancelled() )
218 {
219 log.warn( "Unable to cancel task" );
220 }
221 else
222 {
223 log.warn(
224 "Task not cancelled (Flags: done: " + future.isDone() + " cancelled: " + future.isCancelled() +
225 ")" );
226 }
227 }
228 else
229 {
230 log.debug( "Task successfully cancelled" );
231 }
232 }
233
234 public synchronized void shutdown()
235 {
236 log.debug( "Signalling executor thread to shutdown" );
237
238 command = SHUTDOWN;
239
240 interrupt();
241 }
242
243 public synchronized boolean cancelTask( Task task )
244 {
245 if ( !task.equals( currentTask ) )
246 {
247 log.debug( "Not cancelling task - it is not running" );
248 return false;
249 }
250
251 if ( command != SHUTDOWN )
252 {
253 log.debug( "Signalling executor thread to cancel task" );
254
255 command = CANCEL_TASK;
256
257 interrupt();
258 }
259 else
260 {
261 log.debug( "Executor thread already stopping; task will be cancelled automatically" );
262 }
263
264 return true;
265 }
266
267 public boolean isDone()
268 {
269 return done;
270 }
271 }
272
273
274
275
276
277 public void initialize()
278 throws InitializationException
279 {
280 if ( StringUtils.isEmpty( name ) )
281 {
282 throw new IllegalArgumentException( "'name' must be set." );
283 }
284 }
285
286 public void start()
287 throws StartingException
288 {
289 log.info( "Starting task executor, thread name '" + name + "'." );
290
291 this.executorService = Executors.newCachedThreadPool();
292
293 executorRunnable = new ExecutorRunnable();
294
295 executorRunnable.setDaemon( true );
296
297 executorRunnable.start();
298 }
299
300 public void stop()
301 throws StoppingException
302 {
303 executorRunnable.shutdown();
304
305 int maxSleep = 10 * 1000;
306
307 int interval = 1000;
308
309 long endTime = System.currentTimeMillis() + maxSleep;
310
311 while ( !executorRunnable.isDone() && executorRunnable.isAlive() )
312 {
313 if ( System.currentTimeMillis() > endTime )
314 {
315 log.warn( "Timeout waiting for executor thread '" + name + "' to stop, aborting" );
316 break;
317 }
318
319 log.info( "Waiting until task executor '" + name + "' is idling..." );
320
321 try
322 {
323 synchronized ( executorRunnable )
324 {
325 executorRunnable.wait( interval );
326 }
327 }
328 catch ( InterruptedException ex )
329 {
330
331 }
332
333
334 executorRunnable.shutdown();
335 }
336 }
337
338 public Task getCurrentTask()
339 {
340 return currentTask;
341 }
342
343 public synchronized boolean cancelTask( Task task )
344 {
345 return executorRunnable.cancelTask( task );
346 }
347
348 public String getName()
349 {
350 return name;
351 }
352
353 public void setName( String name )
354 {
355 this.name = name;
356 }
357
358 public TaskQueue getQueue()
359 {
360 return queue;
361 }
362 }