View Javadoc

1   package org.apache.continuum.builder.distributed.executor;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import edu.emory.mathcs.backport.java.util.concurrent.CancellationException;
23  import edu.emory.mathcs.backport.java.util.concurrent.ExecutionException;
24  import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService;
25  import edu.emory.mathcs.backport.java.util.concurrent.Executors;
26  import edu.emory.mathcs.backport.java.util.concurrent.Future;
27  import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
28  import edu.emory.mathcs.backport.java.util.concurrent.TimeoutException;
29  import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
30  import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
31  import org.codehaus.plexus.personality.plexus.lifecycle.phase.Startable;
32  import org.codehaus.plexus.personality.plexus.lifecycle.phase.StartingException;
33  import org.codehaus.plexus.personality.plexus.lifecycle.phase.StoppingException;
34  import org.codehaus.plexus.taskqueue.Task;
35  import org.codehaus.plexus.taskqueue.TaskQueue;
36  import org.codehaus.plexus.taskqueue.execution.TaskExecutionException;
37  import org.codehaus.plexus.util.StringUtils;
38  import org.slf4j.Logger;
39  import org.slf4j.LoggerFactory;
40  
41  /**
42   * Codes were taken from Plexus' ThreadedTaskQueueExecutor
43   */
44  public class ThreadedDistributedBuildTaskQueueExecutor
45      implements DistributedBuildTaskQueueExecutor, Initializable, Startable
46  {
47      private static final int SHUTDOWN = 1;
48  
49      private static final int CANCEL_TASK = 2;
50  
51      private static final Logger log = LoggerFactory.getLogger( ThreadedDistributedBuildTaskQueueExecutor.class );
52  
53      /**
54       * @requirement
55       */
56      private TaskQueue queue;
57  
58      /**
59       * @requirement
60       */
61      private DistributedBuildTaskExecutor executor;
62  
63      /**
64       * @configuration
65       */
66      private String name;
67  
68      // ----------------------------------------------------------------------
69      //
70      // ----------------------------------------------------------------------
71  
72      private ExecutorRunnable executorRunnable;
73  
74      private ExecutorService executorService;
75  
76      private Task currentTask;
77  
78      private class ExecutorRunnable
79          extends Thread
80      {
81          private volatile int command;
82  
83          private boolean done;
84  
85          public void run()
86          {
87              while ( command != SHUTDOWN )
88              {
89                  final Task task;
90  
91                  currentTask = null;
92  
93                  try
94                  {
95                      task = queue.poll( 100, TimeUnit.MILLISECONDS );
96                  }
97                  catch ( InterruptedException e )
98                  {
99                      log.info( "Executor thread interrupted, command: " + ( command == SHUTDOWN
100                         ? "Shutdown"
101                         : command == CANCEL_TASK ? "Cancel task" : "Unknown" ) );
102                     continue;
103                 }
104 
105                 if ( task == null )
106                 {
107                     continue;
108                 }
109 
110                 currentTask = task;
111 
112                 Future future = executorService.submit( new Runnable()
113                 {
114                     public void run()
115                     {
116                         try
117                         {
118                             executor.executeTask( task );
119                         }
120                         catch ( TaskExecutionException e )
121                         {
122                             log.error( "Error executing task", e );
123                         }
124                     }
125                 } );
126 
127                 try
128                 {
129                     waitForTask( task, future );
130                 }
131                 catch ( ExecutionException e )
132                 {
133                     log.error( "Error executing task", e );
134                 }
135             }
136 
137             currentTask = null;
138 
139             log.info( "Executor thread '" + name + "' exited." );
140 
141             done = true;
142 
143             synchronized ( this )
144             {
145                 notifyAll();
146             }
147         }
148 
149         private void waitForTask( Task task, Future future )
150             throws ExecutionException
151         {
152             boolean stop = false;
153 
154             while ( !stop )
155             {
156                 try
157                 {
158                     if ( task.getMaxExecutionTime() == 0 )
159                     {
160                         log.debug( "Waiting indefinitely for task to complete" );
161                         future.get();
162                         return;
163                     }
164                     else
165                     {
166                         log.debug( "Waiting at most " + task.getMaxExecutionTime() + "ms for task completion" );
167                         future.get( task.getMaxExecutionTime(), TimeUnit.MILLISECONDS );
168                         log.debug( "Task completed within " + task.getMaxExecutionTime() + "ms" );
169                         return;
170                     }
171                 }
172                 catch ( InterruptedException e )
173                 {
174                     switch ( command )
175                     {
176                         case SHUTDOWN:
177                         {
178                             log.info( "Shutdown command received. Cancelling task." );
179                             cancel( future );
180                             return;
181                         }
182 
183                         case CANCEL_TASK:
184                         {
185                             command = 0;
186                             log.info( "Cancelling task" );
187                             cancel( future );
188                             return;
189                         }
190 
191                         default:
192                             // when can this thread be interrupted, and should we ignore it if shutdown = false?
193                             log.warn( "Interrupted while waiting for task to complete; ignoring", e );
194                             break;
195                     }
196                 }
197                 catch ( TimeoutException e )
198                 {
199                     log.warn( "Task " + task + " didn't complete within time, cancelling it." );
200                     cancel( future );
201                     return;
202                 }
203                 catch ( CancellationException e )
204                 {
205                     log.warn( "The task was cancelled", e );
206                     return;
207                 }
208             }
209         }
210 
211         private void cancel( Future future )
212         {
213             if ( !future.cancel( true ) )
214             {
215                 if ( !future.isDone() && !future.isCancelled() )
216                 {
217                     log.warn( "Unable to cancel task" );
218                 }
219                 else
220                 {
221                     log.warn(
222                         "Task not cancelled (Flags: done: " + future.isDone() + " cancelled: " + future.isCancelled() +
223                             ")" );
224                 }
225             }
226             else
227             {
228                 log.debug( "Task successfully cancelled" );
229             }
230         }
231 
232         public synchronized void shutdown()
233         {
234             log.debug( "Signalling executor thread to shutdown" );
235 
236             command = SHUTDOWN;
237 
238             interrupt();
239         }
240 
241         public synchronized boolean cancelTask( Task task )
242         {
243             if ( !task.equals( currentTask ) )
244             {
245                 log.debug( "Not cancelling task - it is not running" );
246                 return false;
247             }
248 
249             if ( command != SHUTDOWN )
250             {
251                 log.debug( "Signalling executor thread to cancel task" );
252 
253                 command = CANCEL_TASK;
254 
255                 interrupt();
256             }
257             else
258             {
259                 log.debug( "Executor thread already stopping; task will be cancelled automatically" );
260             }
261 
262             return true;
263         }
264 
265         public boolean isDone()
266         {
267             return done;
268         }
269     }
270 
271     // ----------------------------------------------------------------------
272     // Component lifecycle
273     // ----------------------------------------------------------------------
274 
275     public void initialize()
276         throws InitializationException
277     {
278         if ( StringUtils.isEmpty( name ) )
279         {
280             throw new IllegalArgumentException( "'name' must be set." );
281         }
282     }
283 
284     public void start()
285         throws StartingException
286     {
287         log.info( "Starting task executor, thread name '" + name + "'." );
288 
289         this.executorService = Executors.newSingleThreadExecutor();
290 
291         executorRunnable = new ExecutorRunnable();
292 
293         executorRunnable.setDaemon( true );
294 
295         executorRunnable.start();
296     }
297 
298     public void stop()
299         throws StoppingException
300     {
301         executorRunnable.shutdown();
302 
303         int maxSleep = 10 * 1000; // 10 seconds
304 
305         int interval = 1000;
306 
307         long endTime = System.currentTimeMillis() + maxSleep;
308 
309         while ( !executorRunnable.isDone() && executorRunnable.isAlive() )
310         {
311             if ( System.currentTimeMillis() > endTime )
312             {
313                 log.warn( "Timeout waiting for executor thread '" + name + "' to stop, aborting" );
314                 break;
315             }
316 
317             log.info( "Waiting until task executor '" + name + "' is idling..." );
318 
319             try
320             {
321                 synchronized ( executorRunnable )
322                 {
323                     executorRunnable.wait( interval );
324                 }
325             }
326             catch ( InterruptedException ex )
327             {
328                 // ignore
329             }
330 
331             // notify again, just in case.
332             executorRunnable.shutdown();
333         }
334     }
335 
336     public Task getCurrentTask()
337     {
338         return currentTask;
339     }
340 
341     public synchronized boolean cancelTask( Task task )
342     {
343         return executorRunnable.cancelTask( task );
344     }
345 
346     public void setBuildAgentUrl( String buildAgentUrl )
347     {
348         executor.setBuildAgentUrl( buildAgentUrl );
349     }
350 
351     public String getBuildAgentUrl()
352     {
353         return executor.getBuildAgentUrl();
354     }
355 
356     public TaskQueue getQueue()
357     {
358         return queue;
359     }
360 }