1 package org.jgroups.blocks.executor; 2 3 import java.io.DataInputStream; 4 import java.io.DataOutputStream; 5 import java.io.Externalizable; 6 import java.io.IOException; 7 import java.io.NotSerializableException; 8 import java.io.Serializable; 9 import java.util.HashSet; 10 import java.util.List; 11 import java.util.Set; 12 import java.util.concurrent.AbstractExecutorService; 13 import java.util.concurrent.Callable; 14 import java.util.concurrent.CancellationException; 15 import java.util.concurrent.ExecutionException; 16 import java.util.concurrent.Future; 17 import java.util.concurrent.RejectedExecutionException; 18 import java.util.concurrent.RunnableFuture; 19 import java.util.concurrent.TimeUnit; 20 import java.util.concurrent.TimeoutException; 21 import java.util.concurrent.atomic.AtomicBoolean; 22 import java.util.concurrent.locks.AbstractQueuedSynchronizer; 23 import java.util.concurrent.locks.Condition; 24 import java.util.concurrent.locks.Lock; 25 import java.util.concurrent.locks.ReentrantLock; 26 27 import org.jgroups.JChannel; 28 import org.jgroups.protocols.Executing; 29 import org.jgroups.protocols.Locking; 30 import org.jgroups.util.FutureListener; 31 import org.jgroups.util.NotifyingFuture; 32 import org.jgroups.util.Streamable; 33 import org.jgroups.util.Util; 34 35 /** 36 * This is a jgroups implementation of an ExecutorService, where the consumers 37 * are running on any number of nodes. The nodes should run 38 * {@link ExecutionRunner} to start picking up requests. 39 * <p> 40 * Every future object returned will be a {@link NotifyingFuture} which 41 * allows for not having to query the future and have a callback instead. This 42 * can then be used as a workflow to submit other tasks sequentially or also to 43 * query the future for the value at that time. 44 * <p> 45 * Every callable or runnable submitted must be either {@link Serializable}, 46 * {@link Externalizable}, or {@link Streamable}. Also the value returned from 47 * a callable must {@link Serializable}, {@link Externalizable}, or 48 * {@link Streamable}. Unfortunately if the value returned is not serializable 49 * then a {@link NotSerializableException} will be thrown as the cause. 50 * @author wburns 51 * @since 2.12.0 52 */ 53 public class ExecutionService extends AbstractExecutorService { 54 protected JChannel ch; 55 protected Executing _execProt; 56 57 protected Lock _unfinishedLock = new ReentrantLock(); 58 protected Condition _unfinishedCondition = _unfinishedLock.newCondition(); 59 60 protected Set<Future<?>> _unfinishedFutures = new HashSet<Future<?>>(); 61 62 protected AtomicBoolean _shutdown = new AtomicBoolean(false); 63 ExecutionService()64 public ExecutionService() { 65 66 } 67 ExecutionService(JChannel ch)68 public ExecutionService(JChannel ch) { 69 setChannel(ch); 70 } 71 setChannel(JChannel ch)72 public void setChannel(JChannel ch) { 73 this.ch=ch; 74 _execProt=(Executing)ch.getProtocolStack().findProtocol(Executing.class); 75 if(_execProt == null) 76 throw new IllegalStateException("Channel configuration must include a executing protocol " + 77 "(subclass of " + Executing.class.getName() + ")"); 78 } 79 80 // @see java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable, java.lang.Object) 81 @Override submit(Runnable task, T result)82 public <T> NotifyingFuture<T> submit(Runnable task, T result) { 83 // This cast is okay cause we control creation of the task 84 return (NotifyingFuture<T>)super.submit(task, result); 85 } 86 87 // @see java.util.concurrent.AbstractExecutorService#submit(java.util.concurrent.Callable) 88 @Override submit(Callable<T> task)89 public <T> NotifyingFuture<T> submit(Callable<T> task) { 90 // This cast is okay cause we control creation of the task 91 return (NotifyingFuture<T>)super.submit(task); 92 } 93 94 /** 95 * This is basically a copy of the FutureTask in java.util.concurrent but 96 * added serializable to it. Also added in the NotifyingFuture 97 * so that the channel can update the future when the value is calculated. 98 * 99 * @param <V> 100 * @author wburns 101 */ 102 public static class DistributedFuture<V> implements RunnableFuture<V>, 103 ExecutorNotification, NotifyingFuture<V> { 104 // @see java.lang.Object#toString() 105 @Override toString()106 public String toString() { 107 return "DistributedFuture [callable=" + sync.callable + "]"; 108 } 109 110 /** Synchronization control for FutureTask */ 111 protected final Sync<V> sync; 112 113 /** The following values are only used on the client side */ 114 private final JChannel channel; 115 private final Set<Future<?>> _unfinishedFutures; 116 private final Lock _unfinishedLock; 117 private final Condition _unfinishedCondition; 118 private volatile FutureListener<V> _listener; 119 120 /** 121 * Creates a <tt>FutureTask</tt> that will upon running, execute the 122 * given <tt>Callable</tt>. 123 * 124 * @param channel The channel that messages are sent down 125 * @param unfinishedLock The lock which protects the futuresToFinish 126 * set object. 127 * @param condition The condition to signal when this future finishes 128 * @param futuresToFinish The set to remove this future from when 129 * it is finished. 130 * @param callable The callable to actually run on the server side 131 */ DistributedFuture(JChannel channel, Lock unfinishedLock, Condition condition, Set<Future<?>> futuresToFinish, Callable<V> callable)132 public DistributedFuture(JChannel channel, Lock unfinishedLock, 133 Condition condition, 134 Set<Future<?>> futuresToFinish, 135 Callable<V> callable) { 136 if (callable == null) 137 throw new NullPointerException(); 138 sync = new Sync<V>(this, callable); 139 this.channel = channel; 140 // We keep the real copy to update the outside 141 _unfinishedFutures = futuresToFinish; 142 _unfinishedLock = unfinishedLock; 143 _unfinishedCondition = condition; 144 } 145 146 /** 147 * Creates a <tt>FutureTask</tt> that will upon running, execute the 148 * given <tt>Runnable</tt>, and arrange that <tt>get</tt> will return the 149 * given result on successful completion. 150 * 151 * @param channel The channel that messages are sent down 152 * @param unfinishedLock The lock which protects the futuresToFinish 153 * set object. 154 * @param condition The condition to signal when this future finishes 155 * @param futuresToFinish The set to remove this future from when 156 * it is finished. 157 * @param runnable the runnable task 158 * @param result the result to return on successful completion. If 159 * you don't need a particular result, consider using 160 * constructions of the form: 161 * <tt>Future<?> f = new FutureTask<Object>(runnable, null)</tt> 162 * @throws NullPointerException if runnable is null 163 */ DistributedFuture(JChannel channel, Lock unfinishedLock, Condition condition, Set<Future<?>> futuresToFinish, Runnable runnable, V result)164 public DistributedFuture(JChannel channel, Lock unfinishedLock, 165 Condition condition, Set<Future<?>> futuresToFinish, 166 Runnable runnable, V result) { 167 sync = new Sync<V>(this, new RunnableAdapter<V>(runnable, result)); 168 this.channel = channel; 169 // We keep the real copy to update the outside 170 _unfinishedFutures = futuresToFinish; 171 _unfinishedLock = unfinishedLock; 172 _unfinishedCondition = condition; 173 } 174 getCallable()175 public Callable<V> getCallable() { 176 return sync.callable; 177 } 178 isCancelled()179 public boolean isCancelled() { 180 return sync.innerIsCancelled(); 181 } 182 isDone()183 public boolean isDone() { 184 return sync.innerIsDone(); 185 } 186 cancel(boolean mayInterruptIfRunning)187 public boolean cancel(boolean mayInterruptIfRunning) { 188 if (sync.innerIsDone()) { 189 return false; 190 } 191 // This will only happen on calling side since it is transient 192 if (channel != null) { 193 return (Boolean)channel.downcall(new ExecutorEvent( 194 ExecutorEvent.TASK_CANCEL, new Object[] {this, mayInterruptIfRunning})); 195 } 196 return sync.innerCancel(mayInterruptIfRunning); 197 } 198 199 /** 200 * @throws CancellationException {@inheritDoc} 201 */ get()202 public V get() throws InterruptedException, ExecutionException { 203 return sync.innerGet(); 204 } 205 206 /** 207 * @throws CancellationException {@inheritDoc} 208 */ get(long timeout, TimeUnit unit)209 public V get(long timeout, TimeUnit unit) 210 throws InterruptedException, ExecutionException, TimeoutException { 211 return sync.innerGet(unit.toNanos(timeout)); 212 } 213 214 /** 215 * Protected method invoked when this task transitions to state 216 * <tt>isDone</tt> (whether normally or via cancellation). The 217 * default implementation does nothing. Subclasses may override 218 * this method to invoke completion callbacks or perform 219 * bookkeeping. Note that you can query status inside the 220 * implementation of this method to determine whether this task 221 * has been cancelled. 222 */ done()223 protected void done() { 224 _unfinishedLock.lock(); 225 try { 226 _unfinishedFutures.remove(this); 227 _unfinishedCondition.signalAll(); 228 } 229 finally { 230 _unfinishedLock.unlock(); 231 } 232 // We assign the listener to a local variable so we don't have to 233 // worry about it becoming null inside the if 234 FutureListener<V> listener = _listener; 235 // We don't want this to run on server 236 if (listener != null) { 237 listener.futureDone(this); 238 } 239 } 240 241 @Override setListener(FutureListener<V> listener)242 public NotifyingFuture<V> setListener(FutureListener<V> listener) { 243 _listener = listener; 244 if (sync.innerIsDone()) { 245 _listener.futureDone(this); 246 } 247 return this; 248 } 249 250 /** 251 * Sets the result of this Future to the given value unless 252 * this future has already been set or has been cancelled. 253 * This method is invoked internally by the <tt>run</tt> method 254 * upon successful completion of the computation. 255 * @param v the value 256 */ set(V v)257 protected void set(V v) { 258 sync.innerSet(v); 259 } 260 261 /** 262 * Causes this future to report an <tt>ExecutionException</tt> 263 * with the given throwable as its cause, unless this Future has 264 * already been set or has been cancelled. 265 * This method is invoked internally by the <tt>run</tt> method 266 * upon failure of the computation. 267 * @param t the cause of failure 268 */ setException(Throwable t)269 protected void setException(Throwable t) { 270 sync.innerSetException(t); 271 } 272 273 // The following (duplicated) doc comment can be removed once 274 // 275 // 6270645: Javadoc comments should be inherited from most derived 276 // superinterface or superclass 277 // is fixed. 278 /** 279 * Sets this Future to the result of its computation 280 * unless it has been cancelled. 281 */ run()282 public void run() { 283 sync.innerRun(); 284 } 285 286 /** 287 * Executes the computation without setting its result, and then 288 * resets this Future to initial state, failing to do so if the 289 * computation encounters an exception or is cancelled. This is 290 * designed for use with tasks that intrinsically execute more 291 * than once. 292 * @return true if successfully run and reset 293 */ runAndReset()294 protected boolean runAndReset() { 295 return sync.innerRunAndReset(); 296 } 297 298 /** 299 * Synchronization control for FutureTask. Note that this must be 300 * a non-static inner class in order to invoke the protected 301 * <tt>done</tt> method. For clarity, all inner class support 302 * methods are same as outer, prefixed with "inner". 303 * 304 * Uses AQS sync state to represent run status 305 */ 306 protected static final class Sync<V> extends AbstractQueuedSynchronizer { 307 private static final long serialVersionUID = -7828117401763700385L; 308 309 /** State value representing that task is running */ 310 protected static final int RUNNING = 1; 311 /** State value representing that task ran */ 312 protected static final int RAN = 2; 313 /** State value representing that task was cancelled */ 314 protected static final int CANCELLED = 4; 315 316 /** The containing future */ 317 protected final DistributedFuture<V> future; 318 /** The underlying callable */ 319 protected final Callable<V> callable; 320 /** The result to return from get() */ 321 protected V result; 322 /** The exception to throw from get() */ 323 protected Throwable exception; 324 325 /** 326 * The thread running task. When nulled after set/cancel, this 327 * indicates that the results are accessible. Must be 328 * volatile, to ensure visibility upon completion. 329 */ 330 protected transient volatile Thread runner; 331 Sync(DistributedFuture<V> future, Callable<V> callable)332 public Sync(DistributedFuture<V> future, Callable<V> callable) { 333 this.future = future; 334 this.callable = callable; 335 } 336 ranOrCancelled(int state)337 private static boolean ranOrCancelled(int state) { 338 return (state & (RAN | CANCELLED)) != 0; 339 } 340 341 /** 342 * Implements AQS base acquire to succeed if ran or cancelled 343 */ tryAcquireShared(int ignore)344 protected int tryAcquireShared(int ignore) { 345 return innerIsDone()? 1 : -1; 346 } 347 348 /** 349 * Implements AQS base release to always signal after setting 350 * final done status by nulling runner thread. 351 */ tryReleaseShared(int ignore)352 protected boolean tryReleaseShared(int ignore) { 353 runner = null; 354 return true; 355 } 356 innerIsCancelled()357 boolean innerIsCancelled() { 358 return getState() == CANCELLED; 359 } 360 innerIsDone()361 boolean innerIsDone() { 362 return ranOrCancelled(getState()) && runner == null; 363 } 364 innerGet()365 V innerGet() throws InterruptedException, ExecutionException { 366 acquireSharedInterruptibly(0); 367 if (getState() == CANCELLED) 368 throw new CancellationException(); 369 if (exception != null) 370 throw new ExecutionException(exception); 371 return result; 372 } 373 innerGet(long nanosTimeout)374 V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException { 375 if (!tryAcquireSharedNanos(0, nanosTimeout)) 376 throw new TimeoutException(); 377 if (getState() == CANCELLED) 378 throw new CancellationException(); 379 if (exception != null) 380 throw new ExecutionException(exception); 381 return result; 382 } 383 innerSet(V v)384 void innerSet(V v) { 385 for (;;) { 386 int s = getState(); 387 if (s == RAN) 388 return; 389 if (s == CANCELLED) { 390 // aggressively release to set runner to null, 391 // in case we are racing with a cancel request 392 // that will try to interrupt runner 393 releaseShared(0); 394 return; 395 } 396 if (compareAndSetState(s, RAN)) { 397 result = v; 398 releaseShared(0); 399 future.done(); 400 return; 401 } 402 } 403 } 404 innerSetException(Throwable t)405 void innerSetException(Throwable t) { 406 for (;;) { 407 int s = getState(); 408 if (s == RAN) 409 return; 410 if (s == CANCELLED) { 411 // aggressively release to set runner to null, 412 // in case we are racing with a cancel request 413 // that will try to interrupt runner 414 releaseShared(0); 415 return; 416 } 417 if (compareAndSetState(s, RAN)) { 418 exception = t; 419 result = null; 420 releaseShared(0); 421 future.done(); 422 return; 423 } 424 } 425 } 426 innerCancel(boolean mayInterruptIfRunning)427 boolean innerCancel(boolean mayInterruptIfRunning) { 428 for (;;) { 429 int s = getState(); 430 if (ranOrCancelled(s)) 431 return false; 432 if (compareAndSetState(s, CANCELLED)) 433 break; 434 } 435 if (mayInterruptIfRunning) { 436 Thread r = runner; 437 if (r != null) 438 r.interrupt(); 439 } 440 releaseShared(0); 441 future.done(); 442 return true; 443 } 444 innerRun()445 void innerRun() { 446 if (!compareAndSetState(0, RUNNING)) 447 return; 448 try { 449 runner = Thread.currentThread(); 450 if (getState() == RUNNING) // recheck after setting thread 451 innerSet(callable.call()); 452 else 453 releaseShared(0); // cancel 454 } catch (Throwable ex) { 455 innerSetException(ex); 456 } 457 } 458 innerRunAndReset()459 boolean innerRunAndReset() { 460 if (!compareAndSetState(0, RUNNING)) 461 return false; 462 try { 463 runner = Thread.currentThread(); 464 if (getState() == RUNNING) 465 callable.call(); // don't set result 466 runner = null; 467 return compareAndSetState(RUNNING, 0); 468 } catch (Throwable ex) { 469 innerSetException(ex); 470 return false; 471 } 472 } 473 } 474 475 // @see org.jgroups.blocks.executor.ExecutorNotification#resultReturned(java.lang.Object) 476 @SuppressWarnings("unchecked") 477 @Override resultReturned(Object obj)478 public void resultReturned(Object obj) { 479 set((V)obj); 480 } 481 482 // @see org.jgroups.blocks.executor.ExecutorNotification#throwableEncountered(java.lang.Throwable) 483 @Override throwableEncountered(Throwable t)484 public void throwableEncountered(Throwable t) { 485 setException(t); 486 } 487 488 @Override interrupted(Runnable runnable)489 public void interrupted(Runnable runnable) { 490 _unfinishedLock.lock(); 491 try { 492 _unfinishedFutures.remove(this); 493 _unfinishedCondition.signalAll(); 494 } 495 finally { 496 _unfinishedLock.unlock(); 497 } 498 499 // We assign the listener to a local variable so we don't have to 500 // worry about it becoming null inside the if 501 FutureListener<V> listener = _listener; 502 // We don't want this to run on server 503 if (listener != null) { 504 listener.futureDone(this); 505 } 506 } 507 } 508 509 // @see java.util.concurrent.ExecutorService#shutdown() 510 @Override shutdown()511 public void shutdown() { 512 _realShutdown(false); 513 } 514 515 @SuppressWarnings("unchecked") _realShutdown(boolean interrupt)516 private List<Runnable> _realShutdown(boolean interrupt) { 517 _shutdown.set(true); 518 _unfinishedLock.lock(); 519 Set<Future<?>> futures; 520 try { 521 futures = new HashSet<Future<?>>(_unfinishedFutures); 522 } 523 finally { 524 _unfinishedLock.unlock(); 525 } 526 return (List<Runnable>)ch.downcall(new ExecutorEvent( 527 ExecutorEvent.ALL_TASK_CANCEL, new Object[]{futures, interrupt})); 528 } 529 530 // @see java.util.concurrent.ExecutorService#shutdownNow() 531 @Override shutdownNow()532 public List<Runnable> shutdownNow() { 533 return _realShutdown(true); 534 } 535 536 // @see java.util.concurrent.ExecutorService#isShutdown() 537 @Override isShutdown()538 public boolean isShutdown() { 539 return _shutdown.get(); 540 } 541 542 // @see java.util.concurrent.ExecutorService#isTerminated() 543 @Override isTerminated()544 public boolean isTerminated() { 545 if (_shutdown.get()) { 546 _unfinishedLock.lock(); 547 try { 548 return _unfinishedFutures.isEmpty(); 549 } 550 finally { 551 _unfinishedLock.unlock(); 552 } 553 } 554 return false; 555 } 556 557 // @see java.util.concurrent.ExecutorService#awaitTermination(long, java.util.concurrent.TimeUnit) 558 @Override awaitTermination(long timeout, TimeUnit unit)559 public boolean awaitTermination(long timeout, TimeUnit unit) 560 throws InterruptedException { 561 long nanoTimeWait = unit.toNanos(timeout); 562 _unfinishedLock.lock(); 563 try { 564 while (!_unfinishedFutures.isEmpty()) { 565 if ((nanoTimeWait = _unfinishedCondition.awaitNanos( 566 nanoTimeWait)) <= 0) { 567 return false; 568 } 569 } 570 } 571 finally { 572 _unfinishedLock.unlock(); 573 } 574 575 return true; 576 } 577 578 // @see java.util.concurrent.Executor#execute(java.lang.Runnable) 579 @Override execute(Runnable command)580 public void execute(Runnable command) { 581 if (!_shutdown.get()) { 582 Object serializeCheck; 583 // If it is wrapped by our future, then we have to make sure to 584 // check the actual callable/runnable given to us for serialization 585 if (command instanceof DistributedFuture) { 586 serializeCheck = ((DistributedFuture<?>)command).getCallable(); 587 if (serializeCheck instanceof RunnableAdapter) { 588 serializeCheck = ((RunnableAdapter<?>)serializeCheck).task; 589 } 590 } 591 else { 592 serializeCheck = command; 593 } 594 595 if (serializeCheck instanceof Serializable || 596 serializeCheck instanceof Streamable) { 597 ch.down(new ExecutorEvent(ExecutorEvent.TASK_SUBMIT, command)); 598 } 599 else { 600 throw new IllegalArgumentException( 601 "Command was not Serializable, Externalizable, or Streamable - " 602 + serializeCheck); 603 } 604 } 605 else { 606 throw new RejectedExecutionException(); 607 } 608 } 609 610 /** 611 * This is copied from {@see java.util.concurrent.Executors} class which 612 * contains RunnableAdapter. However that adapter isn't serializable, and 613 * is final and package level so we can' reference. 614 */ 615 protected static final class RunnableAdapter<T> implements Callable<T>, Streamable { 616 protected Runnable task; 617 protected T result; 618 RunnableAdapter()619 protected RunnableAdapter() { 620 621 } RunnableAdapter(Runnable task, T result)622 protected RunnableAdapter(Runnable task, T result) { 623 this.task = task; 624 this.result = result; 625 } call()626 public T call() { 627 task.run(); 628 return result; 629 } 630 @Override writeTo(DataOutputStream out)631 public void writeTo(DataOutputStream out) throws IOException { 632 try { 633 Util.writeObject(task, out); 634 } 635 catch (IOException e) { 636 throw e; 637 } 638 catch (Exception e) { 639 throw new IOException("Exception encountered while writing execution runnable", e); 640 } 641 642 try { 643 Util.writeObject(result, out); 644 } 645 catch (IOException e) { 646 throw e; 647 } 648 catch (Exception e) { 649 throw new IOException("Exception encountered while writing execution result", e); 650 } 651 } 652 @SuppressWarnings("unchecked") 653 @Override readFrom(DataInputStream in)654 public void readFrom(DataInputStream in) throws IOException, 655 IllegalAccessException, InstantiationException { 656 // We can't use Util.readObject since it's size is limited to 2^15-1 657 // The runner could be larger than that possibly 658 try { 659 task = (Runnable)Util.readObject(in); 660 } 661 catch (IOException e) { 662 throw e; 663 } 664 catch (Exception e) { 665 throw new IOException("Exception encountered while reading execution runnable", e); 666 } 667 668 try { 669 result = (T)Util.readObject(in); 670 } 671 catch (IOException e) { 672 throw e; 673 } 674 catch (Exception e) { 675 throw new IOException("Exception encountered while reading execution result", e); 676 } 677 } 678 } 679 680 // @see java.util.concurrent.AbstractExecutorService#newTaskFor(java.lang.Runnable, java.lang.Object) 681 @Override newTaskFor(Runnable runnable, T value)682 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { 683 DistributedFuture<T> future = new DistributedFuture<T>(ch, _unfinishedLock, 684 _unfinishedCondition, _unfinishedFutures, runnable, value); 685 _execProt.addExecutorListener(future, future); 686 _unfinishedLock.lock(); 687 try { 688 _unfinishedFutures.add(future); 689 } 690 finally { 691 _unfinishedLock.unlock(); 692 } 693 return future; 694 } 695 696 // @see java.util.concurrent.AbstractExecutorService#newTaskFor(java.util.concurrent.Callable) 697 @Override newTaskFor(Callable<T> callable)698 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { 699 DistributedFuture<T> future = new DistributedFuture<T>(ch, _unfinishedLock, 700 _unfinishedCondition, _unfinishedFutures, callable); 701 _execProt.addExecutorListener(future, future); 702 _unfinishedLock.lock(); 703 try { 704 _unfinishedFutures.add(future); 705 } 706 finally { 707 _unfinishedLock.unlock(); 708 } 709 return future; 710 } 711 } 712