1 package org.jgroups.blocks.executor;
2 
3 import java.io.DataInputStream;
4 import java.io.DataOutputStream;
5 import java.io.Externalizable;
6 import java.io.IOException;
7 import java.io.NotSerializableException;
8 import java.io.Serializable;
9 import java.util.HashSet;
10 import java.util.List;
11 import java.util.Set;
12 import java.util.concurrent.AbstractExecutorService;
13 import java.util.concurrent.Callable;
14 import java.util.concurrent.CancellationException;
15 import java.util.concurrent.ExecutionException;
16 import java.util.concurrent.Future;
17 import java.util.concurrent.RejectedExecutionException;
18 import java.util.concurrent.RunnableFuture;
19 import java.util.concurrent.TimeUnit;
20 import java.util.concurrent.TimeoutException;
21 import java.util.concurrent.atomic.AtomicBoolean;
22 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
23 import java.util.concurrent.locks.Condition;
24 import java.util.concurrent.locks.Lock;
25 import java.util.concurrent.locks.ReentrantLock;
26 
27 import org.jgroups.JChannel;
28 import org.jgroups.protocols.Executing;
29 import org.jgroups.protocols.Locking;
30 import org.jgroups.util.FutureListener;
31 import org.jgroups.util.NotifyingFuture;
32 import org.jgroups.util.Streamable;
33 import org.jgroups.util.Util;
34 
35 /**
36  * This is a jgroups implementation of an ExecutorService, where the consumers
37  * are running on any number of nodes.  The nodes should run
38  * {@link ExecutionRunner} to start picking up requests.
39  * <p>
40  * Every future object returned will be a {@link NotifyingFuture} which
41  * allows for not having to query the future and have a callback instead.  This
42  * can then be used as a workflow to submit other tasks sequentially or also to
43  * query the future for the value at that time.
44  * <p>
45  * Every callable or runnable submitted must be either {@link Serializable},
46  * {@link Externalizable}, or {@link Streamable}.  Also the value returned from
47  * a callable must {@link Serializable}, {@link Externalizable}, or
48  * {@link Streamable}.  Unfortunately if the value returned is not serializable
49  * then a {@link NotSerializableException} will be thrown as the cause.
50  * @author wburns
51  * @since 2.12.0
52  */
53 public class ExecutionService extends AbstractExecutorService {
54     protected JChannel ch;
55     protected Executing _execProt;
56 
57     protected Lock _unfinishedLock = new ReentrantLock();
58     protected Condition _unfinishedCondition = _unfinishedLock.newCondition();
59 
60     protected Set<Future<?>> _unfinishedFutures = new HashSet<Future<?>>();
61 
62     protected AtomicBoolean _shutdown = new AtomicBoolean(false);
63 
ExecutionService()64     public ExecutionService() {
65 
66     }
67 
ExecutionService(JChannel ch)68     public ExecutionService(JChannel ch) {
69         setChannel(ch);
70     }
71 
setChannel(JChannel ch)72     public void setChannel(JChannel ch) {
73         this.ch=ch;
74         _execProt=(Executing)ch.getProtocolStack().findProtocol(Executing.class);
75         if(_execProt == null)
76             throw new IllegalStateException("Channel configuration must include a executing protocol " +
77                                               "(subclass of " + Executing.class.getName() + ")");
78     }
79 
80     // @see java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable, java.lang.Object)
81     @Override
submit(Runnable task, T result)82     public <T> NotifyingFuture<T> submit(Runnable task, T result) {
83         // This cast is okay cause we control creation of the task
84         return (NotifyingFuture<T>)super.submit(task, result);
85     }
86 
87     // @see java.util.concurrent.AbstractExecutorService#submit(java.util.concurrent.Callable)
88     @Override
submit(Callable<T> task)89     public <T> NotifyingFuture<T> submit(Callable<T> task) {
90         // This cast is okay cause we control creation of the task
91         return (NotifyingFuture<T>)super.submit(task);
92     }
93 
94     /**
95      * This is basically a copy of the FutureTask in java.util.concurrent but
96      * added serializable to it.  Also added in the NotifyingFuture
97      * so that the channel can update the future when the value is calculated.
98      *
99      * @param <V>
100      * @author wburns
101      */
102     public static class DistributedFuture<V> implements RunnableFuture<V>,
103             ExecutorNotification, NotifyingFuture<V> {
104         // @see java.lang.Object#toString()
105         @Override
toString()106         public String toString() {
107             return "DistributedFuture [callable=" + sync.callable + "]";
108         }
109 
110         /** Synchronization control for FutureTask */
111         protected final Sync<V> sync;
112 
113         /** The following values are only used on the client side */
114         private final JChannel channel;
115         private final Set<Future<?>> _unfinishedFutures;
116         private final Lock _unfinishedLock;
117         private final Condition _unfinishedCondition;
118         private volatile FutureListener<V> _listener;
119 
120         /**
121          * Creates a <tt>FutureTask</tt> that will upon running, execute the
122          * given <tt>Callable</tt>.
123          *
124          * @param channel The channel that messages are sent down
125          * @param unfinishedLock The lock which protects the futuresToFinish
126          *        set object.
127          * @param condition The condition to signal when this future finishes
128          * @param futuresToFinish The set to remove this future from when
129          *        it is finished.
130          * @param callable The callable to actually run on the server side
131          */
DistributedFuture(JChannel channel, Lock unfinishedLock, Condition condition, Set<Future<?>> futuresToFinish, Callable<V> callable)132         public DistributedFuture(JChannel channel, Lock unfinishedLock,
133                           Condition condition,
134                           Set<Future<?>> futuresToFinish,
135                           Callable<V> callable) {
136             if (callable == null)
137                 throw new NullPointerException();
138             sync = new Sync<V>(this, callable);
139             this.channel = channel;
140             // We keep the real copy to update the outside
141             _unfinishedFutures = futuresToFinish;
142             _unfinishedLock = unfinishedLock;
143             _unfinishedCondition = condition;
144         }
145 
146         /**
147          * Creates a <tt>FutureTask</tt> that will upon running, execute the
148          * given <tt>Runnable</tt>, and arrange that <tt>get</tt> will return the
149          * given result on successful completion.
150          *
151          * @param channel The channel that messages are sent down
152          * @param unfinishedLock The lock which protects the futuresToFinish
153          *        set object.
154          * @param condition The condition to signal when this future finishes
155          * @param futuresToFinish The set to remove this future from when
156          *        it is finished.
157          * @param runnable the runnable task
158          * @param result the result to return on successful completion. If
159          * you don't need a particular result, consider using
160          * constructions of the form:
161          * <tt>Future&lt;?&gt; f = new FutureTask&lt;Object&gt;(runnable, null)</tt>
162          * @throws NullPointerException if runnable is null
163          */
DistributedFuture(JChannel channel, Lock unfinishedLock, Condition condition, Set<Future<?>> futuresToFinish, Runnable runnable, V result)164         public DistributedFuture(JChannel channel, Lock unfinishedLock,
165                           Condition condition, Set<Future<?>> futuresToFinish,
166                           Runnable runnable, V result) {
167             sync = new Sync<V>(this, new RunnableAdapter<V>(runnable, result));
168             this.channel = channel;
169             // We keep the real copy to update the outside
170             _unfinishedFutures = futuresToFinish;
171             _unfinishedLock = unfinishedLock;
172             _unfinishedCondition = condition;
173         }
174 
getCallable()175         public Callable<V> getCallable() {
176             return sync.callable;
177         }
178 
isCancelled()179         public boolean isCancelled() {
180             return sync.innerIsCancelled();
181         }
182 
isDone()183         public boolean isDone() {
184             return sync.innerIsDone();
185         }
186 
cancel(boolean mayInterruptIfRunning)187         public boolean cancel(boolean mayInterruptIfRunning) {
188             if (sync.innerIsDone()) {
189                 return false;
190             }
191             // This will only happen on calling side since it is transient
192             if (channel != null) {
193                 return (Boolean)channel.downcall(new ExecutorEvent(
194                     ExecutorEvent.TASK_CANCEL, new Object[] {this, mayInterruptIfRunning}));
195             }
196             return sync.innerCancel(mayInterruptIfRunning);
197         }
198 
199         /**
200          * @throws CancellationException {@inheritDoc}
201          */
get()202         public V get() throws InterruptedException, ExecutionException {
203             return sync.innerGet();
204         }
205 
206         /**
207          * @throws CancellationException {@inheritDoc}
208          */
get(long timeout, TimeUnit unit)209         public V get(long timeout, TimeUnit unit)
210             throws InterruptedException, ExecutionException, TimeoutException {
211             return sync.innerGet(unit.toNanos(timeout));
212         }
213 
214         /**
215          * Protected method invoked when this task transitions to state
216          * <tt>isDone</tt> (whether normally or via cancellation). The
217          * default implementation does nothing.  Subclasses may override
218          * this method to invoke completion callbacks or perform
219          * bookkeeping. Note that you can query status inside the
220          * implementation of this method to determine whether this task
221          * has been cancelled.
222          */
done()223         protected void done() {
224             _unfinishedLock.lock();
225             try {
226                 _unfinishedFutures.remove(this);
227                 _unfinishedCondition.signalAll();
228             }
229             finally {
230                 _unfinishedLock.unlock();
231             }
232             // We assign the listener to a local variable so we don't have to
233             // worry about it becoming null inside the if
234             FutureListener<V> listener = _listener;
235             // We don't want this to run on server
236             if (listener != null) {
237                 listener.futureDone(this);
238             }
239         }
240 
241         @Override
setListener(FutureListener<V> listener)242         public NotifyingFuture<V> setListener(FutureListener<V> listener) {
243             _listener = listener;
244             if (sync.innerIsDone()) {
245                 _listener.futureDone(this);
246             }
247             return this;
248         }
249 
250         /**
251          * Sets the result of this Future to the given value unless
252          * this future has already been set or has been cancelled.
253          * This method is invoked internally by the <tt>run</tt> method
254          * upon successful completion of the computation.
255          * @param v the value
256          */
set(V v)257         protected void set(V v) {
258             sync.innerSet(v);
259         }
260 
261         /**
262          * Causes this future to report an <tt>ExecutionException</tt>
263          * with the given throwable as its cause, unless this Future has
264          * already been set or has been cancelled.
265          * This method is invoked internally by the <tt>run</tt> method
266          * upon failure of the computation.
267          * @param t the cause of failure
268          */
setException(Throwable t)269         protected void setException(Throwable t) {
270             sync.innerSetException(t);
271         }
272 
273         // The following (duplicated) doc comment can be removed once
274         //
275         // 6270645: Javadoc comments should be inherited from most derived
276         //          superinterface or superclass
277         // is fixed.
278         /**
279          * Sets this Future to the result of its computation
280          * unless it has been cancelled.
281          */
run()282         public void run() {
283             sync.innerRun();
284         }
285 
286         /**
287          * Executes the computation without setting its result, and then
288          * resets this Future to initial state, failing to do so if the
289          * computation encounters an exception or is cancelled.  This is
290          * designed for use with tasks that intrinsically execute more
291          * than once.
292          * @return true if successfully run and reset
293          */
runAndReset()294         protected boolean runAndReset() {
295             return sync.innerRunAndReset();
296         }
297 
298         /**
299          * Synchronization control for FutureTask. Note that this must be
300          * a non-static inner class in order to invoke the protected
301          * <tt>done</tt> method. For clarity, all inner class support
302          * methods are same as outer, prefixed with "inner".
303          *
304          * Uses AQS sync state to represent run status
305          */
306         protected static final class Sync<V> extends AbstractQueuedSynchronizer {
307             private static final long serialVersionUID = -7828117401763700385L;
308 
309             /** State value representing that task is running */
310             protected static final int RUNNING   = 1;
311             /** State value representing that task ran */
312             protected static final int RAN       = 2;
313             /** State value representing that task was cancelled */
314             protected static final int CANCELLED = 4;
315 
316             /** The containing future */
317             protected final DistributedFuture<V> future;
318             /** The underlying callable */
319             protected final Callable<V> callable;
320             /** The result to return from get() */
321             protected V result;
322             /** The exception to throw from get() */
323             protected Throwable exception;
324 
325             /**
326              * The thread running task. When nulled after set/cancel, this
327              * indicates that the results are accessible.  Must be
328              * volatile, to ensure visibility upon completion.
329              */
330             protected transient volatile Thread runner;
331 
Sync(DistributedFuture<V> future, Callable<V> callable)332             public Sync(DistributedFuture<V> future, Callable<V> callable) {
333                 this.future = future;
334                 this.callable = callable;
335             }
336 
ranOrCancelled(int state)337             private static boolean ranOrCancelled(int state) {
338                 return (state & (RAN | CANCELLED)) != 0;
339             }
340 
341             /**
342              * Implements AQS base acquire to succeed if ran or cancelled
343              */
tryAcquireShared(int ignore)344             protected int tryAcquireShared(int ignore) {
345                 return innerIsDone()? 1 : -1;
346             }
347 
348             /**
349              * Implements AQS base release to always signal after setting
350              * final done status by nulling runner thread.
351              */
tryReleaseShared(int ignore)352             protected boolean tryReleaseShared(int ignore) {
353                 runner = null;
354                 return true;
355             }
356 
innerIsCancelled()357             boolean innerIsCancelled() {
358                 return getState() == CANCELLED;
359             }
360 
innerIsDone()361             boolean innerIsDone() {
362                 return ranOrCancelled(getState()) && runner == null;
363             }
364 
innerGet()365             V innerGet() throws InterruptedException, ExecutionException {
366                 acquireSharedInterruptibly(0);
367                 if (getState() == CANCELLED)
368                     throw new CancellationException();
369                 if (exception != null)
370                     throw new ExecutionException(exception);
371                 return result;
372             }
373 
innerGet(long nanosTimeout)374             V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException {
375                 if (!tryAcquireSharedNanos(0, nanosTimeout))
376                     throw new TimeoutException();
377                 if (getState() == CANCELLED)
378                     throw new CancellationException();
379                 if (exception != null)
380                     throw new ExecutionException(exception);
381                 return result;
382             }
383 
innerSet(V v)384             void innerSet(V v) {
385                 for (;;) {
386                     int s = getState();
387                     if (s == RAN)
388                         return;
389                     if (s == CANCELLED) {
390                         // aggressively release to set runner to null,
391                         // in case we are racing with a cancel request
392                         // that will try to interrupt runner
393                         releaseShared(0);
394                         return;
395                     }
396                     if (compareAndSetState(s, RAN)) {
397                         result = v;
398                         releaseShared(0);
399                         future.done();
400                         return;
401                     }
402                 }
403             }
404 
innerSetException(Throwable t)405             void innerSetException(Throwable t) {
406                 for (;;) {
407                     int s = getState();
408                     if (s == RAN)
409                         return;
410                     if (s == CANCELLED) {
411                         // aggressively release to set runner to null,
412                         // in case we are racing with a cancel request
413                         // that will try to interrupt runner
414                         releaseShared(0);
415                         return;
416                     }
417                     if (compareAndSetState(s, RAN)) {
418                         exception = t;
419                         result = null;
420                         releaseShared(0);
421                         future.done();
422                         return;
423                     }
424                 }
425             }
426 
innerCancel(boolean mayInterruptIfRunning)427             boolean innerCancel(boolean mayInterruptIfRunning) {
428                 for (;;) {
429                     int s = getState();
430                     if (ranOrCancelled(s))
431                         return false;
432                     if (compareAndSetState(s, CANCELLED))
433                         break;
434                 }
435                 if (mayInterruptIfRunning) {
436                     Thread r = runner;
437                     if (r != null)
438                         r.interrupt();
439                 }
440                 releaseShared(0);
441                 future.done();
442                 return true;
443             }
444 
innerRun()445             void innerRun() {
446                 if (!compareAndSetState(0, RUNNING))
447                     return;
448                 try {
449                     runner = Thread.currentThread();
450                     if (getState() == RUNNING) // recheck after setting thread
451                         innerSet(callable.call());
452                     else
453                         releaseShared(0); // cancel
454                 } catch (Throwable ex) {
455                     innerSetException(ex);
456                 }
457             }
458 
innerRunAndReset()459             boolean innerRunAndReset() {
460                 if (!compareAndSetState(0, RUNNING))
461                     return false;
462                 try {
463                     runner = Thread.currentThread();
464                     if (getState() == RUNNING)
465                         callable.call(); // don't set result
466                     runner = null;
467                     return compareAndSetState(RUNNING, 0);
468                 } catch (Throwable ex) {
469                     innerSetException(ex);
470                     return false;
471                 }
472             }
473         }
474 
475         // @see org.jgroups.blocks.executor.ExecutorNotification#resultReturned(java.lang.Object)
476         @SuppressWarnings("unchecked")
477         @Override
resultReturned(Object obj)478         public void resultReturned(Object obj) {
479             set((V)obj);
480         }
481 
482         // @see org.jgroups.blocks.executor.ExecutorNotification#throwableEncountered(java.lang.Throwable)
483         @Override
throwableEncountered(Throwable t)484         public void throwableEncountered(Throwable t) {
485             setException(t);
486         }
487 
488         @Override
interrupted(Runnable runnable)489         public void interrupted(Runnable runnable) {
490             _unfinishedLock.lock();
491             try {
492                 _unfinishedFutures.remove(this);
493                 _unfinishedCondition.signalAll();
494             }
495             finally {
496                 _unfinishedLock.unlock();
497             }
498 
499             // We assign the listener to a local variable so we don't have to
500             // worry about it becoming null inside the if
501             FutureListener<V> listener = _listener;
502             // We don't want this to run on server
503             if (listener != null) {
504                 listener.futureDone(this);
505             }
506         }
507     }
508 
509     // @see java.util.concurrent.ExecutorService#shutdown()
510     @Override
shutdown()511     public void shutdown() {
512         _realShutdown(false);
513     }
514 
515     @SuppressWarnings("unchecked")
_realShutdown(boolean interrupt)516     private List<Runnable> _realShutdown(boolean interrupt) {
517         _shutdown.set(true);
518         _unfinishedLock.lock();
519         Set<Future<?>> futures;
520         try {
521              futures = new HashSet<Future<?>>(_unfinishedFutures);
522         }
523         finally {
524             _unfinishedLock.unlock();
525         }
526         return (List<Runnable>)ch.downcall(new ExecutorEvent(
527             ExecutorEvent.ALL_TASK_CANCEL, new Object[]{futures, interrupt}));
528     }
529 
530     // @see java.util.concurrent.ExecutorService#shutdownNow()
531     @Override
shutdownNow()532     public List<Runnable> shutdownNow() {
533         return _realShutdown(true);
534     }
535 
536     // @see java.util.concurrent.ExecutorService#isShutdown()
537     @Override
isShutdown()538     public boolean isShutdown() {
539         return _shutdown.get();
540     }
541 
542     // @see java.util.concurrent.ExecutorService#isTerminated()
543     @Override
isTerminated()544     public boolean isTerminated() {
545         if (_shutdown.get()) {
546             _unfinishedLock.lock();
547             try {
548                 return _unfinishedFutures.isEmpty();
549             }
550             finally {
551                 _unfinishedLock.unlock();
552             }
553         }
554         return false;
555     }
556 
557     // @see java.util.concurrent.ExecutorService#awaitTermination(long, java.util.concurrent.TimeUnit)
558     @Override
awaitTermination(long timeout, TimeUnit unit)559     public boolean awaitTermination(long timeout, TimeUnit unit)
560             throws InterruptedException {
561         long nanoTimeWait = unit.toNanos(timeout);
562         _unfinishedLock.lock();
563         try {
564             while (!_unfinishedFutures.isEmpty()) {
565                 if ((nanoTimeWait = _unfinishedCondition.awaitNanos(
566                     nanoTimeWait)) <= 0) {
567                     return false;
568                 }
569             }
570         }
571         finally {
572             _unfinishedLock.unlock();
573         }
574 
575         return true;
576     }
577 
578     // @see java.util.concurrent.Executor#execute(java.lang.Runnable)
579     @Override
execute(Runnable command)580     public void execute(Runnable command) {
581         if (!_shutdown.get()) {
582             Object serializeCheck;
583             // If it is wrapped by our future, then we have to make sure to
584             // check the actual callable/runnable given to us for serialization
585             if (command instanceof DistributedFuture) {
586                 serializeCheck = ((DistributedFuture<?>)command).getCallable();
587                 if (serializeCheck instanceof RunnableAdapter) {
588                     serializeCheck = ((RunnableAdapter<?>)serializeCheck).task;
589                 }
590             }
591             else {
592                  serializeCheck = command;
593             }
594 
595             if (serializeCheck instanceof Serializable ||
596                     serializeCheck instanceof Streamable) {
597                 ch.down(new ExecutorEvent(ExecutorEvent.TASK_SUBMIT, command));
598             }
599             else {
600                 throw new IllegalArgumentException(
601                     "Command was not Serializable, Externalizable, or Streamable - "
602                             + serializeCheck);
603             }
604         }
605         else {
606             throw new RejectedExecutionException();
607         }
608     }
609 
610     /**
611      * This is copied from {@see java.util.concurrent.Executors} class which
612      * contains RunnableAdapter.  However that adapter isn't serializable, and
613      * is final and package level so we can' reference.
614      */
615     protected static final class RunnableAdapter<T> implements Callable<T>, Streamable {
616         protected Runnable task;
617         protected T result;
618 
RunnableAdapter()619         protected RunnableAdapter() {
620 
621         }
RunnableAdapter(Runnable task, T result)622         protected RunnableAdapter(Runnable  task, T result) {
623             this.task = task;
624             this.result = result;
625         }
call()626         public T call() {
627             task.run();
628             return result;
629         }
630         @Override
writeTo(DataOutputStream out)631         public void writeTo(DataOutputStream out) throws IOException {
632             try {
633                 Util.writeObject(task, out);
634             }
635             catch (IOException e) {
636                 throw e;
637             }
638             catch (Exception e) {
639                 throw new IOException("Exception encountered while writing execution runnable", e);
640             }
641 
642             try {
643                 Util.writeObject(result, out);
644             }
645             catch (IOException e) {
646                 throw e;
647             }
648             catch (Exception e) {
649                 throw new IOException("Exception encountered while writing execution result", e);
650             }
651         }
652         @SuppressWarnings("unchecked")
653         @Override
readFrom(DataInputStream in)654         public void readFrom(DataInputStream in) throws IOException,
655                 IllegalAccessException, InstantiationException {
656             // We can't use Util.readObject since it's size is limited to 2^15-1
657             // The runner could be larger than that possibly
658             try {
659                 task = (Runnable)Util.readObject(in);
660             }
661             catch (IOException e) {
662                 throw e;
663             }
664             catch (Exception e) {
665                 throw new IOException("Exception encountered while reading execution runnable", e);
666             }
667 
668             try {
669                 result = (T)Util.readObject(in);
670             }
671             catch (IOException e) {
672                 throw e;
673             }
674             catch (Exception e) {
675                 throw new IOException("Exception encountered while reading execution result", e);
676             }
677         }
678     }
679 
680     // @see java.util.concurrent.AbstractExecutorService#newTaskFor(java.lang.Runnable, java.lang.Object)
681     @Override
newTaskFor(Runnable runnable, T value)682     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
683         DistributedFuture<T> future = new DistributedFuture<T>(ch, _unfinishedLock,
684                 _unfinishedCondition, _unfinishedFutures, runnable, value);
685         _execProt.addExecutorListener(future, future);
686         _unfinishedLock.lock();
687         try {
688             _unfinishedFutures.add(future);
689         }
690         finally {
691             _unfinishedLock.unlock();
692         }
693         return future;
694     }
695 
696     // @see java.util.concurrent.AbstractExecutorService#newTaskFor(java.util.concurrent.Callable)
697     @Override
newTaskFor(Callable<T> callable)698     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
699         DistributedFuture<T> future = new DistributedFuture<T>(ch, _unfinishedLock,
700                 _unfinishedCondition, _unfinishedFutures, callable);
701         _execProt.addExecutorListener(future, future);
702         _unfinishedLock.lock();
703         try {
704             _unfinishedFutures.add(future);
705         }
706         finally {
707             _unfinishedLock.unlock();
708         }
709         return future;
710     }
711 }
712