1 package org.apache.continuum.builder.distributed.executor;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import edu.emory.mathcs.backport.java.util.concurrent.CancellationException;
23 import edu.emory.mathcs.backport.java.util.concurrent.ExecutionException;
24 import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService;
25 import edu.emory.mathcs.backport.java.util.concurrent.Executors;
26 import edu.emory.mathcs.backport.java.util.concurrent.Future;
27 import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
28 import edu.emory.mathcs.backport.java.util.concurrent.TimeoutException;
29 import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
30 import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
31 import org.codehaus.plexus.personality.plexus.lifecycle.phase.Startable;
32 import org.codehaus.plexus.personality.plexus.lifecycle.phase.StartingException;
33 import org.codehaus.plexus.personality.plexus.lifecycle.phase.StoppingException;
34 import org.codehaus.plexus.taskqueue.Task;
35 import org.codehaus.plexus.taskqueue.TaskQueue;
36 import org.codehaus.plexus.taskqueue.execution.TaskExecutionException;
37 import org.codehaus.plexus.util.StringUtils;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41
42
43
44 public class ThreadedDistributedBuildTaskQueueExecutor
45 implements DistributedBuildTaskQueueExecutor, Initializable, Startable
46 {
47 private static final int SHUTDOWN = 1;
48
49 private static final int CANCEL_TASK = 2;
50
51 private static final Logger log = LoggerFactory.getLogger( ThreadedDistributedBuildTaskQueueExecutor.class );
52
53
54
55
56 private TaskQueue queue;
57
58
59
60
61 private DistributedBuildTaskExecutor executor;
62
63
64
65
66 private String name;
67
68
69
70
71
72 private ExecutorRunnable executorRunnable;
73
74 private ExecutorService executorService;
75
76 private Task currentTask;
77
78 private class ExecutorRunnable
79 extends Thread
80 {
81 private volatile int command;
82
83 private boolean done;
84
85 public void run()
86 {
87 while ( command != SHUTDOWN )
88 {
89 final Task task;
90
91 currentTask = null;
92
93 try
94 {
95 task = queue.poll( 100, TimeUnit.MILLISECONDS );
96 }
97 catch ( InterruptedException e )
98 {
99 log.info( "Executor thread interrupted, command: " + ( command == SHUTDOWN
100 ? "Shutdown"
101 : command == CANCEL_TASK ? "Cancel task" : "Unknown" ) );
102 continue;
103 }
104
105 if ( task == null )
106 {
107 continue;
108 }
109
110 currentTask = task;
111
112 Future future = executorService.submit( new Runnable()
113 {
114 public void run()
115 {
116 try
117 {
118 executor.executeTask( task );
119 }
120 catch ( TaskExecutionException e )
121 {
122 log.error( "Error executing task", e );
123 }
124 }
125 } );
126
127 try
128 {
129 waitForTask( task, future );
130 }
131 catch ( ExecutionException e )
132 {
133 log.error( "Error executing task", e );
134 }
135 }
136
137 currentTask = null;
138
139 log.info( "Executor thread '" + name + "' exited." );
140
141 done = true;
142
143 synchronized ( this )
144 {
145 notifyAll();
146 }
147 }
148
149 private void waitForTask( Task task, Future future )
150 throws ExecutionException
151 {
152 boolean stop = false;
153
154 while ( !stop )
155 {
156 try
157 {
158 if ( task.getMaxExecutionTime() == 0 )
159 {
160 log.debug( "Waiting indefinitely for task to complete" );
161 future.get();
162 return;
163 }
164 else
165 {
166 log.debug( "Waiting at most " + task.getMaxExecutionTime() + "ms for task completion" );
167 future.get( task.getMaxExecutionTime(), TimeUnit.MILLISECONDS );
168 log.debug( "Task completed within " + task.getMaxExecutionTime() + "ms" );
169 return;
170 }
171 }
172 catch ( InterruptedException e )
173 {
174 switch ( command )
175 {
176 case SHUTDOWN:
177 {
178 log.info( "Shutdown command received. Cancelling task." );
179 cancel( future );
180 return;
181 }
182
183 case CANCEL_TASK:
184 {
185 command = 0;
186 log.info( "Cancelling task" );
187 cancel( future );
188 return;
189 }
190
191 default:
192
193 log.warn( "Interrupted while waiting for task to complete; ignoring", e );
194 break;
195 }
196 }
197 catch ( TimeoutException e )
198 {
199 log.warn( "Task " + task + " didn't complete within time, cancelling it." );
200 cancel( future );
201 return;
202 }
203 catch ( CancellationException e )
204 {
205 log.warn( "The task was cancelled", e );
206 return;
207 }
208 }
209 }
210
211 private void cancel( Future future )
212 {
213 if ( !future.cancel( true ) )
214 {
215 if ( !future.isDone() && !future.isCancelled() )
216 {
217 log.warn( "Unable to cancel task" );
218 }
219 else
220 {
221 log.warn(
222 "Task not cancelled (Flags: done: " + future.isDone() + " cancelled: " + future.isCancelled() +
223 ")" );
224 }
225 }
226 else
227 {
228 log.debug( "Task successfully cancelled" );
229 }
230 }
231
232 public synchronized void shutdown()
233 {
234 log.debug( "Signalling executor thread to shutdown" );
235
236 command = SHUTDOWN;
237
238 interrupt();
239 }
240
241 public synchronized boolean cancelTask( Task task )
242 {
243 if ( !task.equals( currentTask ) )
244 {
245 log.debug( "Not cancelling task - it is not running" );
246 return false;
247 }
248
249 if ( command != SHUTDOWN )
250 {
251 log.debug( "Signalling executor thread to cancel task" );
252
253 command = CANCEL_TASK;
254
255 interrupt();
256 }
257 else
258 {
259 log.debug( "Executor thread already stopping; task will be cancelled automatically" );
260 }
261
262 return true;
263 }
264
265 public boolean isDone()
266 {
267 return done;
268 }
269 }
270
271
272
273
274
275 public void initialize()
276 throws InitializationException
277 {
278 if ( StringUtils.isEmpty( name ) )
279 {
280 throw new IllegalArgumentException( "'name' must be set." );
281 }
282 }
283
284 public void start()
285 throws StartingException
286 {
287 log.info( "Starting task executor, thread name '" + name + "'." );
288
289 this.executorService = Executors.newSingleThreadExecutor();
290
291 executorRunnable = new ExecutorRunnable();
292
293 executorRunnable.setDaemon( true );
294
295 executorRunnable.start();
296 }
297
298 public void stop()
299 throws StoppingException
300 {
301 executorRunnable.shutdown();
302
303 int maxSleep = 10 * 1000;
304
305 int interval = 1000;
306
307 long endTime = System.currentTimeMillis() + maxSleep;
308
309 while ( !executorRunnable.isDone() && executorRunnable.isAlive() )
310 {
311 if ( System.currentTimeMillis() > endTime )
312 {
313 log.warn( "Timeout waiting for executor thread '" + name + "' to stop, aborting" );
314 break;
315 }
316
317 log.info( "Waiting until task executor '" + name + "' is idling..." );
318
319 try
320 {
321 synchronized ( executorRunnable )
322 {
323 executorRunnable.wait( interval );
324 }
325 }
326 catch ( InterruptedException ex )
327 {
328
329 }
330
331
332 executorRunnable.shutdown();
333 }
334 }
335
336 public Task getCurrentTask()
337 {
338 return currentTask;
339 }
340
341 public synchronized boolean cancelTask( Task task )
342 {
343 return executorRunnable.cancelTask( task );
344 }
345
346 public void setBuildAgentUrl( String buildAgentUrl )
347 {
348 executor.setBuildAgentUrl( buildAgentUrl );
349 }
350
351 public String getBuildAgentUrl()
352 {
353 return executor.getBuildAgentUrl();
354 }
355
356 public TaskQueue getQueue()
357 {
358 return queue;
359 }
360 }