1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 38 import java.lang.invoke.MethodHandles; 39 import java.lang.invoke.VarHandle; 40 import java.util.concurrent.locks.LockSupport; 41 import java.util.function.BiConsumer; 42 import java.util.function.BiFunction; 43 import java.util.function.Consumer; 44 import java.util.function.Function; 45 import java.util.function.Supplier; 46 47 /** 48 * A {@link Future} that may be explicitly completed (setting its 49 * value and status), and may be used as a {@link CompletionStage}, 50 * supporting dependent functions and actions that trigger upon its 51 * completion. 52 * 53 * <p>When two or more threads attempt to 54 * {@link #complete complete}, 55 * {@link #completeExceptionally completeExceptionally}, or 56 * {@link #cancel cancel} 57 * a CompletableFuture, only one of them succeeds. 58 * 59 * <p>In addition to these and related methods for directly 60 * manipulating status and results, CompletableFuture implements 61 * interface {@link CompletionStage} with the following policies: <ul> 62 * 63 * <li>Actions supplied for dependent completions of 64 * <em>non-async</em> methods may be performed by the thread that 65 * completes the current CompletableFuture, or by any other caller of 66 * a completion method. 67 * 68 * <li>All <em>async</em> methods without an explicit Executor 69 * argument are performed using the {@link ForkJoinPool#commonPool()} 70 * (unless it does not support a parallelism level of at least two, in 71 * which case, a new Thread is created to run each task). This may be 72 * overridden for non-static methods in subclasses by defining method 73 * {@link #defaultExecutor()}. To simplify monitoring, debugging, 74 * and tracking, all generated asynchronous tasks are instances of the 75 * marker interface {@link AsynchronousCompletionTask}. Operations 76 * with time-delays can use adapter methods defined in this class, for 77 * example: {@code supplyAsync(supplier, delayedExecutor(timeout, 78 * timeUnit))}. To support methods with delays and timeouts, this 79 * class maintains at most one daemon thread for triggering and 80 * cancelling actions, not for running them. 81 * 82 * <li>All CompletionStage methods are implemented independently of 83 * other public methods, so the behavior of one method is not impacted 84 * by overrides of others in subclasses. 85 * 86 * <li>All CompletionStage methods return CompletableFutures. To 87 * restrict usages to only those methods defined in interface 88 * CompletionStage, use method {@link #minimalCompletionStage}. Or to 89 * ensure only that clients do not themselves modify a future, use 90 * method {@link #copy}. 91 * </ul> 92 * 93 * <p>CompletableFuture also implements {@link Future} with the following 94 * policies: <ul> 95 * 96 * <li>Since (unlike {@link FutureTask}) this class has no direct 97 * control over the computation that causes it to be completed, 98 * cancellation is treated as just another form of exceptional 99 * completion. Method {@link #cancel cancel} has the same effect as 100 * {@code completeExceptionally(new CancellationException())}. Method 101 * {@link #isCompletedExceptionally} can be used to determine if a 102 * CompletableFuture completed in any exceptional fashion. 103 * 104 * <li>In case of exceptional completion with a CompletionException, 105 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an 106 * {@link ExecutionException} with the same cause as held in the 107 * corresponding CompletionException. To simplify usage in most 108 * contexts, this class also defines methods {@link #join()} and 109 * {@link #getNow} that instead throw the CompletionException directly 110 * in these cases. 111 * </ul> 112 * 113 * <p>Arguments used to pass a completion result (that is, for 114 * parameters of type {@code T}) for methods accepting them may be 115 * null, but passing a null value for any other parameter will result 116 * in a {@link NullPointerException} being thrown. 117 * 118 * <p>Subclasses of this class should normally override the "virtual 119 * constructor" method {@link #newIncompleteFuture}, which establishes 120 * the concrete type returned by CompletionStage methods. For example, 121 * here is a class that substitutes a different default Executor and 122 * disables the {@code obtrude} methods: 123 * 124 * <pre> {@code 125 * class MyCompletableFuture<T> extends CompletableFuture<T> { 126 * static final Executor myExecutor = ...; 127 * public MyCompletableFuture() { } 128 * public <U> CompletableFuture<U> newIncompleteFuture() { 129 * return new MyCompletableFuture<U>(); } 130 * public Executor defaultExecutor() { 131 * return myExecutor; } 132 * public void obtrudeValue(T value) { 133 * throw new UnsupportedOperationException(); } 134 * public void obtrudeException(Throwable ex) { 135 * throw new UnsupportedOperationException(); } 136 * }}</pre> 137 * 138 * @author Doug Lea 139 * @param <T> The result type returned by this future's {@code join} 140 * and {@code get} methods 141 * @since 1.8 142 */ 143 public class CompletableFuture<T> implements Future<T>, CompletionStage<T> { 144 145 /* 146 * Overview: 147 * 148 * A CompletableFuture may have dependent completion actions, 149 * collected in a linked stack. It atomically completes by CASing 150 * a result field, and then pops off and runs those actions. This 151 * applies across normal vs exceptional outcomes, sync vs async 152 * actions, binary triggers, and various forms of completions. 153 * 154 * Non-nullness of volatile field "result" indicates done. It may 155 * be set directly if known to be thread-confined, else via CAS. 156 * An AltResult is used to box null as a result, as well as to 157 * hold exceptions. Using a single field makes completion simple 158 * to detect and trigger. Result encoding and decoding is 159 * straightforward but tedious and adds to the sprawl of trapping 160 * and associating exceptions with targets. Minor simplifications 161 * rely on (static) NIL (to box null results) being the only 162 * AltResult with a null exception field, so we don't usually need 163 * explicit comparisons. Even though some of the generics casts 164 * are unchecked (see SuppressWarnings annotations), they are 165 * placed to be appropriate even if checked. 166 * 167 * Dependent actions are represented by Completion objects linked 168 * as Treiber stacks headed by field "stack". There are Completion 169 * classes for each kind of action, grouped into: 170 * - single-input (UniCompletion), 171 * - two-input (BiCompletion), 172 * - projected (BiCompletions using exactly one of two inputs), 173 * - shared (CoCompletion, used by the second of two sources), 174 * - zero-input source actions, 175 * - Signallers that unblock waiters. 176 * Class Completion extends ForkJoinTask to enable async execution 177 * (adding no space overhead because we exploit its "tag" methods 178 * to maintain claims). It is also declared as Runnable to allow 179 * usage with arbitrary executors. 180 * 181 * Support for each kind of CompletionStage relies on a separate 182 * class, along with two CompletableFuture methods: 183 * 184 * * A Completion class with name X corresponding to function, 185 * prefaced with "Uni", "Bi", or "Or". Each class contains 186 * fields for source(s), actions, and dependent. They are 187 * boringly similar, differing from others only with respect to 188 * underlying functional forms. We do this so that users don't 189 * encounter layers of adapters in common usages. 190 * 191 * * Boolean CompletableFuture method x(...) (for example 192 * biApply) takes all of the arguments needed to check that an 193 * action is triggerable, and then either runs the action or 194 * arranges its async execution by executing its Completion 195 * argument, if present. The method returns true if known to be 196 * complete. 197 * 198 * * Completion method tryFire(int mode) invokes the associated x 199 * method with its held arguments, and on success cleans up. 200 * The mode argument allows tryFire to be called twice (SYNC, 201 * then ASYNC); the first to screen and trap exceptions while 202 * arranging to execute, and the second when called from a task. 203 * (A few classes are not used async so take slightly different 204 * forms.) The claim() callback suppresses function invocation 205 * if already claimed by another thread. 206 * 207 * * Some classes (for example UniApply) have separate handling 208 * code for when known to be thread-confined ("now" methods) and 209 * for when shared (in tryFire), for efficiency. 210 * 211 * * CompletableFuture method xStage(...) is called from a public 212 * stage method of CompletableFuture f. It screens user 213 * arguments and invokes and/or creates the stage object. If 214 * not async and already triggerable, the action is run 215 * immediately. Otherwise a Completion c is created, and 216 * submitted to the executor if triggerable, or pushed onto f's 217 * stack if not. Completion actions are started via c.tryFire. 218 * We recheck after pushing to a source future's stack to cover 219 * possible races if the source completes while pushing. 220 * Classes with two inputs (for example BiApply) deal with races 221 * across both while pushing actions. The second completion is 222 * a CoCompletion pointing to the first, shared so that at most 223 * one performs the action. The multiple-arity methods allOf 224 * does this pairwise to form trees of completions. Method 225 * anyOf is handled differently from allOf because completion of 226 * any source should trigger a cleanStack of other sources. 227 * Each AnyOf completion can reach others via a shared array. 228 * 229 * Note that the generic type parameters of methods vary according 230 * to whether "this" is a source, dependent, or completion. 231 * 232 * Method postComplete is called upon completion unless the target 233 * is guaranteed not to be observable (i.e., not yet returned or 234 * linked). Multiple threads can call postComplete, which 235 * atomically pops each dependent action, and tries to trigger it 236 * via method tryFire, in NESTED mode. Triggering can propagate 237 * recursively, so NESTED mode returns its completed dependent (if 238 * one exists) for further processing by its caller (see method 239 * postFire). 240 * 241 * Blocking methods get() and join() rely on Signaller Completions 242 * that wake up waiting threads. The mechanics are similar to 243 * Treiber stack wait-nodes used in FutureTask, Phaser, and 244 * SynchronousQueue. See their internal documentation for 245 * algorithmic details. 246 * 247 * Without precautions, CompletableFutures would be prone to 248 * garbage accumulation as chains of Completions build up, each 249 * pointing back to its sources. So we null out fields as soon as 250 * possible. The screening checks needed anyway harmlessly ignore 251 * null arguments that may have been obtained during races with 252 * threads nulling out fields. We also try to unlink non-isLive 253 * (fired or cancelled) Completions from stacks that might 254 * otherwise never be popped: Method cleanStack always unlinks non 255 * isLive completions from the head of stack; others may 256 * occasionally remain if racing with other cancellations or 257 * removals. 258 * 259 * Completion fields need not be declared as final or volatile 260 * because they are only visible to other threads upon safe 261 * publication. 262 */ 263 264 volatile Object result; // Either the result or boxed AltResult 265 volatile Completion stack; // Top of Treiber stack of dependent actions 266 internalComplete(Object r)267 final boolean internalComplete(Object r) { // CAS from null to r 268 return RESULT.compareAndSet(this, null, r); 269 } 270 271 /** Returns true if successfully pushed c onto stack. */ tryPushStack(Completion c)272 final boolean tryPushStack(Completion c) { 273 Completion h = stack; 274 NEXT.set(c, h); // CAS piggyback 275 return STACK.compareAndSet(this, h, c); 276 } 277 278 /** Unconditionally pushes c onto stack, retrying if necessary. */ pushStack(Completion c)279 final void pushStack(Completion c) { 280 do {} while (!tryPushStack(c)); 281 } 282 283 /* ------------- Encoding and decoding outcomes -------------- */ 284 285 static final class AltResult { // See above 286 final Throwable ex; // null only for NIL AltResult(Throwable x)287 AltResult(Throwable x) { this.ex = x; } 288 } 289 290 /** The encoding of the null value. */ 291 static final AltResult NIL = new AltResult(null); 292 293 /** Completes with the null value, unless already completed. */ completeNull()294 final boolean completeNull() { 295 return RESULT.compareAndSet(this, null, NIL); 296 } 297 298 /** Returns the encoding of the given non-exceptional value. */ encodeValue(T t)299 final Object encodeValue(T t) { 300 return (t == null) ? NIL : t; 301 } 302 303 /** Completes with a non-exceptional result, unless already completed. */ completeValue(T t)304 final boolean completeValue(T t) { 305 return RESULT.compareAndSet(this, null, (t == null) ? NIL : t); 306 } 307 308 /** 309 * Returns the encoding of the given (non-null) exception as a 310 * wrapped CompletionException unless it is one already. 311 */ encodeThrowable(Throwable x)312 static AltResult encodeThrowable(Throwable x) { 313 return new AltResult((x instanceof CompletionException) ? x : 314 new CompletionException(x)); 315 } 316 317 /** Completes with an exceptional result, unless already completed. */ completeThrowable(Throwable x)318 final boolean completeThrowable(Throwable x) { 319 return RESULT.compareAndSet(this, null, encodeThrowable(x)); 320 } 321 322 /** 323 * Returns the encoding of the given (non-null) exception as a 324 * wrapped CompletionException unless it is one already. May 325 * return the given Object r (which must have been the result of a 326 * source future) if it is equivalent, i.e. if this is a simple 327 * relay of an existing CompletionException. 328 */ encodeThrowable(Throwable x, Object r)329 static Object encodeThrowable(Throwable x, Object r) { 330 if (!(x instanceof CompletionException)) 331 x = new CompletionException(x); 332 else if (r instanceof AltResult && x == ((AltResult)r).ex) 333 return r; 334 return new AltResult(x); 335 } 336 337 /** 338 * Completes with the given (non-null) exceptional result as a 339 * wrapped CompletionException unless it is one already, unless 340 * already completed. May complete with the given Object r 341 * (which must have been the result of a source future) if it is 342 * equivalent, i.e. if this is a simple propagation of an 343 * existing CompletionException. 344 */ completeThrowable(Throwable x, Object r)345 final boolean completeThrowable(Throwable x, Object r) { 346 return RESULT.compareAndSet(this, null, encodeThrowable(x, r)); 347 } 348 349 /** 350 * Returns the encoding of the given arguments: if the exception 351 * is non-null, encodes as AltResult. Otherwise uses the given 352 * value, boxed as NIL if null. 353 */ encodeOutcome(T t, Throwable x)354 Object encodeOutcome(T t, Throwable x) { 355 return (x == null) ? (t == null) ? NIL : t : encodeThrowable(x); 356 } 357 358 /** 359 * Returns the encoding of a copied outcome; if exceptional, 360 * rewraps as a CompletionException, else returns argument. 361 */ encodeRelay(Object r)362 static Object encodeRelay(Object r) { 363 Throwable x; 364 if (r instanceof AltResult 365 && (x = ((AltResult)r).ex) != null 366 && !(x instanceof CompletionException)) 367 r = new AltResult(new CompletionException(x)); 368 return r; 369 } 370 371 /** 372 * Completes with r or a copy of r, unless already completed. 373 * If exceptional, r is first coerced to a CompletionException. 374 */ completeRelay(Object r)375 final boolean completeRelay(Object r) { 376 return RESULT.compareAndSet(this, null, encodeRelay(r)); 377 } 378 379 /** 380 * Reports result using Future.get conventions. 381 */ reportGet(Object r)382 private static Object reportGet(Object r) 383 throws InterruptedException, ExecutionException { 384 if (r == null) // by convention below, null means interrupted 385 throw new InterruptedException(); 386 if (r instanceof AltResult) { 387 Throwable x, cause; 388 if ((x = ((AltResult)r).ex) == null) 389 return null; 390 if (x instanceof CancellationException) 391 throw (CancellationException)x; 392 if ((x instanceof CompletionException) && 393 (cause = x.getCause()) != null) 394 x = cause; 395 throw new ExecutionException(x); 396 } 397 return r; 398 } 399 400 /** 401 * Decodes outcome to return result or throw unchecked exception. 402 */ reportJoin(Object r)403 private static Object reportJoin(Object r) { 404 if (r instanceof AltResult) { 405 Throwable x; 406 if ((x = ((AltResult)r).ex) == null) 407 return null; 408 if (x instanceof CancellationException) 409 throw (CancellationException)x; 410 if (x instanceof CompletionException) 411 throw (CompletionException)x; 412 throw new CompletionException(x); 413 } 414 return r; 415 } 416 417 /* ------------- Async task preliminaries -------------- */ 418 419 /** 420 * A marker interface identifying asynchronous tasks produced by 421 * {@code async} methods. This may be useful for monitoring, 422 * debugging, and tracking asynchronous activities. 423 * 424 * @since 1.8 425 */ 426 public static interface AsynchronousCompletionTask { 427 } 428 429 private static final boolean USE_COMMON_POOL = 430 (ForkJoinPool.getCommonPoolParallelism() > 1); 431 432 /** 433 * Default executor -- ForkJoinPool.commonPool() unless it cannot 434 * support parallelism. 435 */ 436 private static final Executor ASYNC_POOL = USE_COMMON_POOL ? 437 ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); 438 439 /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */ 440 static final class ThreadPerTaskExecutor implements Executor { execute(Runnable r)441 public void execute(Runnable r) { new Thread(r).start(); } 442 } 443 444 /** 445 * Null-checks user executor argument, and translates uses of 446 * commonPool to ASYNC_POOL in case parallelism disabled. 447 */ screenExecutor(Executor e)448 static Executor screenExecutor(Executor e) { 449 if (!USE_COMMON_POOL && e == ForkJoinPool.commonPool()) 450 return ASYNC_POOL; 451 if (e == null) throw new NullPointerException(); 452 return e; 453 } 454 455 // Modes for Completion.tryFire. Signedness matters. 456 static final int SYNC = 0; 457 static final int ASYNC = 1; 458 static final int NESTED = -1; 459 460 /* ------------- Base Completion classes and operations -------------- */ 461 462 @SuppressWarnings("serial") 463 abstract static class Completion extends ForkJoinTask<Void> 464 implements Runnable, AsynchronousCompletionTask { 465 volatile Completion next; // Treiber stack link 466 467 /** 468 * Performs completion action if triggered, returning a 469 * dependent that may need propagation, if one exists. 470 * 471 * @param mode SYNC, ASYNC, or NESTED 472 */ tryFire(int mode)473 abstract CompletableFuture<?> tryFire(int mode); 474 475 /** Returns true if possibly still triggerable. Used by cleanStack. */ isLive()476 abstract boolean isLive(); 477 run()478 public final void run() { tryFire(ASYNC); } exec()479 public final boolean exec() { tryFire(ASYNC); return false; } getRawResult()480 public final Void getRawResult() { return null; } setRawResult(Void v)481 public final void setRawResult(Void v) {} 482 } 483 484 /** 485 * Pops and tries to trigger all reachable dependents. Call only 486 * when known to be done. 487 */ postComplete()488 final void postComplete() { 489 /* 490 * On each step, variable f holds current dependents to pop 491 * and run. It is extended along only one path at a time, 492 * pushing others to avoid unbounded recursion. 493 */ 494 CompletableFuture<?> f = this; Completion h; 495 while ((h = f.stack) != null || 496 (f != this && (h = (f = this).stack) != null)) { 497 CompletableFuture<?> d; Completion t; 498 if (STACK.compareAndSet(f, h, t = h.next)) { 499 if (t != null) { 500 if (f != this) { 501 pushStack(h); 502 continue; 503 } 504 NEXT.compareAndSet(h, t, null); // try to detach 505 } 506 f = (d = h.tryFire(NESTED)) == null ? this : d; 507 } 508 } 509 } 510 511 /** Traverses stack and unlinks one or more dead Completions, if found. */ cleanStack()512 final void cleanStack() { 513 Completion p = stack; 514 // ensure head of stack live 515 for (boolean unlinked = false;;) { 516 if (p == null) 517 return; 518 else if (p.isLive()) { 519 if (unlinked) 520 return; 521 else 522 break; 523 } 524 else if (STACK.weakCompareAndSet(this, p, (p = p.next))) 525 unlinked = true; 526 else 527 p = stack; 528 } 529 // try to unlink first non-live 530 for (Completion q = p.next; q != null;) { 531 Completion s = q.next; 532 if (q.isLive()) { 533 p = q; 534 q = s; 535 } else if (NEXT.weakCompareAndSet(p, q, s)) 536 break; 537 else 538 q = p.next; 539 } 540 } 541 542 /* ------------- One-input Completions -------------- */ 543 544 /** A Completion with a source, dependent, and executor. */ 545 @SuppressWarnings("serial") 546 abstract static class UniCompletion<T,V> extends Completion { 547 Executor executor; // executor to use (null if none) 548 CompletableFuture<V> dep; // the dependent to complete 549 CompletableFuture<T> src; // source for action 550 UniCompletion(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src)551 UniCompletion(Executor executor, CompletableFuture<V> dep, 552 CompletableFuture<T> src) { 553 this.executor = executor; this.dep = dep; this.src = src; 554 } 555 556 /** 557 * Returns true if action can be run. Call only when known to 558 * be triggerable. Uses FJ tag bit to ensure that only one 559 * thread claims ownership. If async, starts as task -- a 560 * later call to tryFire will run action. 561 */ claim()562 final boolean claim() { 563 Executor e = executor; 564 if (compareAndSetForkJoinTaskTag((short)0, (short)1)) { 565 if (e == null) 566 return true; 567 executor = null; // disable 568 e.execute(this); 569 } 570 return false; 571 } 572 isLive()573 final boolean isLive() { return dep != null; } 574 } 575 576 /** 577 * Pushes the given completion unless it completes while trying. 578 * Caller should first check that result is null. 579 */ unipush(Completion c)580 final void unipush(Completion c) { 581 if (c != null) { 582 while (!tryPushStack(c)) { 583 if (result != null) { 584 NEXT.set(c, null); 585 break; 586 } 587 } 588 if (result != null) 589 c.tryFire(SYNC); 590 } 591 } 592 593 /** 594 * Post-processing by dependent after successful UniCompletion tryFire. 595 * Tries to clean stack of source a, and then either runs postComplete 596 * or returns this to caller, depending on mode. 597 */ postFire(CompletableFuture<?> a, int mode)598 final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) { 599 if (a != null && a.stack != null) { 600 Object r; 601 if ((r = a.result) == null) 602 a.cleanStack(); 603 if (mode >= 0 && (r != null || a.result != null)) 604 a.postComplete(); 605 } 606 if (result != null && stack != null) { 607 if (mode < 0) 608 return this; 609 else 610 postComplete(); 611 } 612 return null; 613 } 614 615 @SuppressWarnings("serial") 616 static final class UniApply<T,V> extends UniCompletion<T,V> { 617 Function<? super T,? extends V> fn; UniApply(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, Function<? super T,? extends V> fn)618 UniApply(Executor executor, CompletableFuture<V> dep, 619 CompletableFuture<T> src, 620 Function<? super T,? extends V> fn) { 621 super(executor, dep, src); this.fn = fn; 622 } tryFire(int mode)623 final CompletableFuture<V> tryFire(int mode) { 624 CompletableFuture<V> d; CompletableFuture<T> a; 625 Object r; Throwable x; Function<? super T,? extends V> f; 626 if ((d = dep) == null || (f = fn) == null 627 || (a = src) == null || (r = a.result) == null) 628 return null; 629 tryComplete: if (d.result == null) { 630 if (r instanceof AltResult) { 631 if ((x = ((AltResult)r).ex) != null) { 632 d.completeThrowable(x, r); 633 break tryComplete; 634 } 635 r = null; 636 } 637 try { 638 if (mode <= 0 && !claim()) 639 return null; 640 else { 641 @SuppressWarnings("unchecked") T t = (T) r; 642 d.completeValue(f.apply(t)); 643 } 644 } catch (Throwable ex) { 645 d.completeThrowable(ex); 646 } 647 } 648 dep = null; src = null; fn = null; 649 return d.postFire(a, mode); 650 } 651 } 652 uniApplyStage( Executor e, Function<? super T,? extends V> f)653 private <V> CompletableFuture<V> uniApplyStage( 654 Executor e, Function<? super T,? extends V> f) { 655 if (f == null) throw new NullPointerException(); 656 Object r; 657 if ((r = result) != null) 658 return uniApplyNow(r, e, f); 659 CompletableFuture<V> d = newIncompleteFuture(); 660 unipush(new UniApply<T,V>(e, d, this, f)); 661 return d; 662 } 663 uniApplyNow( Object r, Executor e, Function<? super T,? extends V> f)664 private <V> CompletableFuture<V> uniApplyNow( 665 Object r, Executor e, Function<? super T,? extends V> f) { 666 Throwable x; 667 CompletableFuture<V> d = newIncompleteFuture(); 668 if (r instanceof AltResult) { 669 if ((x = ((AltResult)r).ex) != null) { 670 d.result = encodeThrowable(x, r); 671 return d; 672 } 673 r = null; 674 } 675 try { 676 if (e != null) { 677 e.execute(new UniApply<T,V>(null, d, this, f)); 678 } else { 679 @SuppressWarnings("unchecked") T t = (T) r; 680 d.result = d.encodeValue(f.apply(t)); 681 } 682 } catch (Throwable ex) { 683 d.result = encodeThrowable(ex); 684 } 685 return d; 686 } 687 688 @SuppressWarnings("serial") 689 static final class UniAccept<T> extends UniCompletion<T,Void> { 690 Consumer<? super T> fn; UniAccept(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, Consumer<? super T> fn)691 UniAccept(Executor executor, CompletableFuture<Void> dep, 692 CompletableFuture<T> src, Consumer<? super T> fn) { 693 super(executor, dep, src); this.fn = fn; 694 } tryFire(int mode)695 final CompletableFuture<Void> tryFire(int mode) { 696 CompletableFuture<Void> d; CompletableFuture<T> a; 697 Object r; Throwable x; Consumer<? super T> f; 698 if ((d = dep) == null || (f = fn) == null 699 || (a = src) == null || (r = a.result) == null) 700 return null; 701 tryComplete: if (d.result == null) { 702 if (r instanceof AltResult) { 703 if ((x = ((AltResult)r).ex) != null) { 704 d.completeThrowable(x, r); 705 break tryComplete; 706 } 707 r = null; 708 } 709 try { 710 if (mode <= 0 && !claim()) 711 return null; 712 else { 713 @SuppressWarnings("unchecked") T t = (T) r; 714 f.accept(t); 715 d.completeNull(); 716 } 717 } catch (Throwable ex) { 718 d.completeThrowable(ex); 719 } 720 } 721 dep = null; src = null; fn = null; 722 return d.postFire(a, mode); 723 } 724 } 725 uniAcceptStage(Executor e, Consumer<? super T> f)726 private CompletableFuture<Void> uniAcceptStage(Executor e, 727 Consumer<? super T> f) { 728 if (f == null) throw new NullPointerException(); 729 Object r; 730 if ((r = result) != null) 731 return uniAcceptNow(r, e, f); 732 CompletableFuture<Void> d = newIncompleteFuture(); 733 unipush(new UniAccept<T>(e, d, this, f)); 734 return d; 735 } 736 uniAcceptNow( Object r, Executor e, Consumer<? super T> f)737 private CompletableFuture<Void> uniAcceptNow( 738 Object r, Executor e, Consumer<? super T> f) { 739 Throwable x; 740 CompletableFuture<Void> d = newIncompleteFuture(); 741 if (r instanceof AltResult) { 742 if ((x = ((AltResult)r).ex) != null) { 743 d.result = encodeThrowable(x, r); 744 return d; 745 } 746 r = null; 747 } 748 try { 749 if (e != null) { 750 e.execute(new UniAccept<T>(null, d, this, f)); 751 } else { 752 @SuppressWarnings("unchecked") T t = (T) r; 753 f.accept(t); 754 d.result = NIL; 755 } 756 } catch (Throwable ex) { 757 d.result = encodeThrowable(ex); 758 } 759 return d; 760 } 761 762 @SuppressWarnings("serial") 763 static final class UniRun<T> extends UniCompletion<T,Void> { 764 Runnable fn; UniRun(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, Runnable fn)765 UniRun(Executor executor, CompletableFuture<Void> dep, 766 CompletableFuture<T> src, Runnable fn) { 767 super(executor, dep, src); this.fn = fn; 768 } tryFire(int mode)769 final CompletableFuture<Void> tryFire(int mode) { 770 CompletableFuture<Void> d; CompletableFuture<T> a; 771 Object r; Throwable x; Runnable f; 772 if ((d = dep) == null || (f = fn) == null 773 || (a = src) == null || (r = a.result) == null) 774 return null; 775 if (d.result == null) { 776 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) 777 d.completeThrowable(x, r); 778 else 779 try { 780 if (mode <= 0 && !claim()) 781 return null; 782 else { 783 f.run(); 784 d.completeNull(); 785 } 786 } catch (Throwable ex) { 787 d.completeThrowable(ex); 788 } 789 } 790 dep = null; src = null; fn = null; 791 return d.postFire(a, mode); 792 } 793 } 794 uniRunStage(Executor e, Runnable f)795 private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) { 796 if (f == null) throw new NullPointerException(); 797 Object r; 798 if ((r = result) != null) 799 return uniRunNow(r, e, f); 800 CompletableFuture<Void> d = newIncompleteFuture(); 801 unipush(new UniRun<T>(e, d, this, f)); 802 return d; 803 } 804 uniRunNow(Object r, Executor e, Runnable f)805 private CompletableFuture<Void> uniRunNow(Object r, Executor e, Runnable f) { 806 Throwable x; 807 CompletableFuture<Void> d = newIncompleteFuture(); 808 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) 809 d.result = encodeThrowable(x, r); 810 else 811 try { 812 if (e != null) { 813 e.execute(new UniRun<T>(null, d, this, f)); 814 } else { 815 f.run(); 816 d.result = NIL; 817 } 818 } catch (Throwable ex) { 819 d.result = encodeThrowable(ex); 820 } 821 return d; 822 } 823 824 @SuppressWarnings("serial") 825 static final class UniWhenComplete<T> extends UniCompletion<T,T> { 826 BiConsumer<? super T, ? super Throwable> fn; UniWhenComplete(Executor executor, CompletableFuture<T> dep, CompletableFuture<T> src, BiConsumer<? super T, ? super Throwable> fn)827 UniWhenComplete(Executor executor, CompletableFuture<T> dep, 828 CompletableFuture<T> src, 829 BiConsumer<? super T, ? super Throwable> fn) { 830 super(executor, dep, src); this.fn = fn; 831 } tryFire(int mode)832 final CompletableFuture<T> tryFire(int mode) { 833 CompletableFuture<T> d; CompletableFuture<T> a; 834 Object r; BiConsumer<? super T, ? super Throwable> f; 835 if ((d = dep) == null || (f = fn) == null 836 || (a = src) == null || (r = a.result) == null 837 || !d.uniWhenComplete(r, f, mode > 0 ? null : this)) 838 return null; 839 dep = null; src = null; fn = null; 840 return d.postFire(a, mode); 841 } 842 } 843 uniWhenComplete(Object r, BiConsumer<? super T,? super Throwable> f, UniWhenComplete<T> c)844 final boolean uniWhenComplete(Object r, 845 BiConsumer<? super T,? super Throwable> f, 846 UniWhenComplete<T> c) { 847 T t; Throwable x = null; 848 if (result == null) { 849 try { 850 if (c != null && !c.claim()) 851 return false; 852 if (r instanceof AltResult) { 853 x = ((AltResult)r).ex; 854 t = null; 855 } else { 856 @SuppressWarnings("unchecked") T tr = (T) r; 857 t = tr; 858 } 859 f.accept(t, x); 860 if (x == null) { 861 internalComplete(r); 862 return true; 863 } 864 } catch (Throwable ex) { 865 if (x == null) 866 x = ex; 867 else if (x != ex) 868 x.addSuppressed(ex); 869 } 870 completeThrowable(x, r); 871 } 872 return true; 873 } 874 uniWhenCompleteStage( Executor e, BiConsumer<? super T, ? super Throwable> f)875 private CompletableFuture<T> uniWhenCompleteStage( 876 Executor e, BiConsumer<? super T, ? super Throwable> f) { 877 if (f == null) throw new NullPointerException(); 878 CompletableFuture<T> d = newIncompleteFuture(); 879 Object r; 880 if ((r = result) == null) 881 unipush(new UniWhenComplete<T>(e, d, this, f)); 882 else if (e == null) 883 d.uniWhenComplete(r, f, null); 884 else { 885 try { 886 e.execute(new UniWhenComplete<T>(null, d, this, f)); 887 } catch (Throwable ex) { 888 d.result = encodeThrowable(ex); 889 } 890 } 891 return d; 892 } 893 894 @SuppressWarnings("serial") 895 static final class UniHandle<T,V> extends UniCompletion<T,V> { 896 BiFunction<? super T, Throwable, ? extends V> fn; UniHandle(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, BiFunction<? super T, Throwable, ? extends V> fn)897 UniHandle(Executor executor, CompletableFuture<V> dep, 898 CompletableFuture<T> src, 899 BiFunction<? super T, Throwable, ? extends V> fn) { 900 super(executor, dep, src); this.fn = fn; 901 } tryFire(int mode)902 final CompletableFuture<V> tryFire(int mode) { 903 CompletableFuture<V> d; CompletableFuture<T> a; 904 Object r; BiFunction<? super T, Throwable, ? extends V> f; 905 if ((d = dep) == null || (f = fn) == null 906 || (a = src) == null || (r = a.result) == null 907 || !d.uniHandle(r, f, mode > 0 ? null : this)) 908 return null; 909 dep = null; src = null; fn = null; 910 return d.postFire(a, mode); 911 } 912 } 913 uniHandle(Object r, BiFunction<? super S, Throwable, ? extends T> f, UniHandle<S,T> c)914 final <S> boolean uniHandle(Object r, 915 BiFunction<? super S, Throwable, ? extends T> f, 916 UniHandle<S,T> c) { 917 S s; Throwable x; 918 if (result == null) { 919 try { 920 if (c != null && !c.claim()) 921 return false; 922 if (r instanceof AltResult) { 923 x = ((AltResult)r).ex; 924 s = null; 925 } else { 926 x = null; 927 @SuppressWarnings("unchecked") S ss = (S) r; 928 s = ss; 929 } 930 completeValue(f.apply(s, x)); 931 } catch (Throwable ex) { 932 completeThrowable(ex); 933 } 934 } 935 return true; 936 } 937 uniHandleStage( Executor e, BiFunction<? super T, Throwable, ? extends V> f)938 private <V> CompletableFuture<V> uniHandleStage( 939 Executor e, BiFunction<? super T, Throwable, ? extends V> f) { 940 if (f == null) throw new NullPointerException(); 941 CompletableFuture<V> d = newIncompleteFuture(); 942 Object r; 943 if ((r = result) == null) 944 unipush(new UniHandle<T,V>(e, d, this, f)); 945 else if (e == null) 946 d.uniHandle(r, f, null); 947 else { 948 try { 949 e.execute(new UniHandle<T,V>(null, d, this, f)); 950 } catch (Throwable ex) { 951 d.result = encodeThrowable(ex); 952 } 953 } 954 return d; 955 } 956 957 @SuppressWarnings("serial") 958 static final class UniExceptionally<T> extends UniCompletion<T,T> { 959 Function<? super Throwable, ? extends T> fn; UniExceptionally(Executor executor, CompletableFuture<T> dep, CompletableFuture<T> src, Function<? super Throwable, ? extends T> fn)960 UniExceptionally(Executor executor, 961 CompletableFuture<T> dep, CompletableFuture<T> src, 962 Function<? super Throwable, ? extends T> fn) { 963 super(executor, dep, src); this.fn = fn; 964 } tryFire(int mode)965 final CompletableFuture<T> tryFire(int mode) { 966 CompletableFuture<T> d; CompletableFuture<T> a; 967 Object r; Function<? super Throwable, ? extends T> f; 968 if ((d = dep) == null || (f = fn) == null 969 || (a = src) == null || (r = a.result) == null 970 || !d.uniExceptionally(r, f, mode > 0 ? null : this)) 971 return null; 972 dep = null; src = null; fn = null; 973 return d.postFire(a, mode); 974 } 975 } 976 uniExceptionally(Object r, Function<? super Throwable, ? extends T> f, UniExceptionally<T> c)977 final boolean uniExceptionally(Object r, 978 Function<? super Throwable, ? extends T> f, 979 UniExceptionally<T> c) { 980 Throwable x; 981 if (result == null) { 982 try { 983 if (c != null && !c.claim()) 984 return false; 985 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) 986 completeValue(f.apply(x)); 987 else 988 internalComplete(r); 989 } catch (Throwable ex) { 990 completeThrowable(ex); 991 } 992 } 993 return true; 994 } 995 uniExceptionallyStage( Executor e, Function<Throwable, ? extends T> f)996 private CompletableFuture<T> uniExceptionallyStage( 997 Executor e, Function<Throwable, ? extends T> f) { 998 if (f == null) throw new NullPointerException(); 999 CompletableFuture<T> d = newIncompleteFuture(); 1000 Object r; 1001 if ((r = result) == null) 1002 unipush(new UniExceptionally<T>(e, d, this, f)); 1003 else if (e == null) 1004 d.uniExceptionally(r, f, null); 1005 else { 1006 try { 1007 e.execute(new UniExceptionally<T>(null, d, this, f)); 1008 } catch (Throwable ex) { 1009 d.result = encodeThrowable(ex); 1010 } 1011 } 1012 return d; 1013 } 1014 1015 @SuppressWarnings("serial") 1016 static final class UniComposeExceptionally<T> extends UniCompletion<T,T> { 1017 Function<Throwable, ? extends CompletionStage<T>> fn; UniComposeExceptionally(Executor executor, CompletableFuture<T> dep, CompletableFuture<T> src, Function<Throwable, ? extends CompletionStage<T>> fn)1018 UniComposeExceptionally(Executor executor, CompletableFuture<T> dep, 1019 CompletableFuture<T> src, 1020 Function<Throwable, ? extends CompletionStage<T>> fn) { 1021 super(executor, dep, src); this.fn = fn; 1022 } tryFire(int mode)1023 final CompletableFuture<T> tryFire(int mode) { 1024 CompletableFuture<T> d; CompletableFuture<T> a; 1025 Function<Throwable, ? extends CompletionStage<T>> f; 1026 Object r; Throwable x; 1027 if ((d = dep) == null || (f = fn) == null 1028 || (a = src) == null || (r = a.result) == null) 1029 return null; 1030 if (d.result == null) { 1031 if ((r instanceof AltResult) && 1032 (x = ((AltResult)r).ex) != null) { 1033 try { 1034 if (mode <= 0 && !claim()) 1035 return null; 1036 CompletableFuture<T> g = f.apply(x).toCompletableFuture(); 1037 if ((r = g.result) != null) 1038 d.completeRelay(r); 1039 else { 1040 g.unipush(new UniRelay<T,T>(d, g)); 1041 if (d.result == null) 1042 return null; 1043 } 1044 } catch (Throwable ex) { 1045 d.completeThrowable(ex); 1046 } 1047 } 1048 else 1049 d.internalComplete(r); 1050 } 1051 dep = null; src = null; fn = null; 1052 return d.postFire(a, mode); 1053 } 1054 } 1055 uniComposeExceptionallyStage( Executor e, Function<Throwable, ? extends CompletionStage<T>> f)1056 private CompletableFuture<T> uniComposeExceptionallyStage( 1057 Executor e, Function<Throwable, ? extends CompletionStage<T>> f) { 1058 if (f == null) throw new NullPointerException(); 1059 CompletableFuture<T> d = newIncompleteFuture(); 1060 Object r, s; Throwable x; 1061 if ((r = result) == null) 1062 unipush(new UniComposeExceptionally<T>(e, d, this, f)); 1063 else if (!(r instanceof AltResult) || (x = ((AltResult)r).ex) == null) 1064 d.internalComplete(r); 1065 else 1066 try { 1067 if (e != null) 1068 e.execute(new UniComposeExceptionally<T>(null, d, this, f)); 1069 else { 1070 CompletableFuture<T> g = f.apply(x).toCompletableFuture(); 1071 if ((s = g.result) != null) 1072 d.result = encodeRelay(s); 1073 else 1074 g.unipush(new UniRelay<T,T>(d, g)); 1075 } 1076 } catch (Throwable ex) { 1077 d.result = encodeThrowable(ex); 1078 } 1079 return d; 1080 } 1081 1082 @SuppressWarnings("serial") 1083 static final class UniRelay<U, T extends U> extends UniCompletion<T,U> { UniRelay(CompletableFuture<U> dep, CompletableFuture<T> src)1084 UniRelay(CompletableFuture<U> dep, CompletableFuture<T> src) { 1085 super(null, dep, src); 1086 } tryFire(int mode)1087 final CompletableFuture<U> tryFire(int mode) { 1088 CompletableFuture<U> d; CompletableFuture<T> a; Object r; 1089 if ((d = dep) == null 1090 || (a = src) == null || (r = a.result) == null) 1091 return null; 1092 if (d.result == null) 1093 d.completeRelay(r); 1094 src = null; dep = null; 1095 return d.postFire(a, mode); 1096 } 1097 } 1098 uniCopyStage( CompletableFuture<T> src)1099 private static <U, T extends U> CompletableFuture<U> uniCopyStage( 1100 CompletableFuture<T> src) { 1101 Object r; 1102 CompletableFuture<U> d = src.newIncompleteFuture(); 1103 if ((r = src.result) != null) 1104 d.result = encodeRelay(r); 1105 else 1106 src.unipush(new UniRelay<U,T>(d, src)); 1107 return d; 1108 } 1109 uniAsMinimalStage()1110 private MinimalStage<T> uniAsMinimalStage() { 1111 Object r; 1112 if ((r = result) != null) 1113 return new MinimalStage<T>(encodeRelay(r)); 1114 MinimalStage<T> d = new MinimalStage<T>(); 1115 unipush(new UniRelay<T,T>(d, this)); 1116 return d; 1117 } 1118 1119 @SuppressWarnings("serial") 1120 static final class UniCompose<T,V> extends UniCompletion<T,V> { 1121 Function<? super T, ? extends CompletionStage<V>> fn; UniCompose(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, Function<? super T, ? extends CompletionStage<V>> fn)1122 UniCompose(Executor executor, CompletableFuture<V> dep, 1123 CompletableFuture<T> src, 1124 Function<? super T, ? extends CompletionStage<V>> fn) { 1125 super(executor, dep, src); this.fn = fn; 1126 } tryFire(int mode)1127 final CompletableFuture<V> tryFire(int mode) { 1128 CompletableFuture<V> d; CompletableFuture<T> a; 1129 Function<? super T, ? extends CompletionStage<V>> f; 1130 Object r; Throwable x; 1131 if ((d = dep) == null || (f = fn) == null 1132 || (a = src) == null || (r = a.result) == null) 1133 return null; 1134 tryComplete: if (d.result == null) { 1135 if (r instanceof AltResult) { 1136 if ((x = ((AltResult)r).ex) != null) { 1137 d.completeThrowable(x, r); 1138 break tryComplete; 1139 } 1140 r = null; 1141 } 1142 try { 1143 if (mode <= 0 && !claim()) 1144 return null; 1145 @SuppressWarnings("unchecked") T t = (T) r; 1146 CompletableFuture<V> g = f.apply(t).toCompletableFuture(); 1147 if ((r = g.result) != null) 1148 d.completeRelay(r); 1149 else { 1150 g.unipush(new UniRelay<V,V>(d, g)); 1151 if (d.result == null) 1152 return null; 1153 } 1154 } catch (Throwable ex) { 1155 d.completeThrowable(ex); 1156 } 1157 } 1158 dep = null; src = null; fn = null; 1159 return d.postFire(a, mode); 1160 } 1161 } 1162 uniComposeStage( Executor e, Function<? super T, ? extends CompletionStage<V>> f)1163 private <V> CompletableFuture<V> uniComposeStage( 1164 Executor e, Function<? super T, ? extends CompletionStage<V>> f) { 1165 if (f == null) throw new NullPointerException(); 1166 CompletableFuture<V> d = newIncompleteFuture(); 1167 Object r, s; Throwable x; 1168 if ((r = result) == null) 1169 unipush(new UniCompose<T,V>(e, d, this, f)); 1170 else { 1171 if (r instanceof AltResult) { 1172 if ((x = ((AltResult)r).ex) != null) { 1173 d.result = encodeThrowable(x, r); 1174 return d; 1175 } 1176 r = null; 1177 } 1178 try { 1179 if (e != null) 1180 e.execute(new UniCompose<T,V>(null, d, this, f)); 1181 else { 1182 @SuppressWarnings("unchecked") T t = (T) r; 1183 CompletableFuture<V> g = f.apply(t).toCompletableFuture(); 1184 if ((s = g.result) != null) 1185 d.result = encodeRelay(s); 1186 else 1187 g.unipush(new UniRelay<V,V>(d, g)); 1188 } 1189 } catch (Throwable ex) { 1190 d.result = encodeThrowable(ex); 1191 } 1192 } 1193 return d; 1194 } 1195 1196 /* ------------- Two-input Completions -------------- */ 1197 1198 /** A Completion for an action with two sources */ 1199 @SuppressWarnings("serial") 1200 abstract static class BiCompletion<T,U,V> extends UniCompletion<T,V> { 1201 CompletableFuture<U> snd; // second source for action BiCompletion(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, CompletableFuture<U> snd)1202 BiCompletion(Executor executor, CompletableFuture<V> dep, 1203 CompletableFuture<T> src, CompletableFuture<U> snd) { 1204 super(executor, dep, src); this.snd = snd; 1205 } 1206 } 1207 1208 /** A Completion delegating to a BiCompletion */ 1209 @SuppressWarnings("serial") 1210 static final class CoCompletion extends Completion { 1211 BiCompletion<?,?,?> base; CoCompletion(BiCompletion<?,?,?> base)1212 CoCompletion(BiCompletion<?,?,?> base) { this.base = base; } tryFire(int mode)1213 final CompletableFuture<?> tryFire(int mode) { 1214 BiCompletion<?,?,?> c; CompletableFuture<?> d; 1215 if ((c = base) == null || (d = c.tryFire(mode)) == null) 1216 return null; 1217 base = null; // detach 1218 return d; 1219 } isLive()1220 final boolean isLive() { 1221 BiCompletion<?,?,?> c; 1222 return (c = base) != null 1223 // && c.isLive() 1224 && c.dep != null; 1225 } 1226 } 1227 1228 /** 1229 * Pushes completion to this and b unless both done. 1230 * Caller should first check that either result or b.result is null. 1231 */ bipush(CompletableFuture<?> b, BiCompletion<?,?,?> c)1232 final void bipush(CompletableFuture<?> b, BiCompletion<?,?,?> c) { 1233 if (c != null) { 1234 while (result == null) { 1235 if (tryPushStack(c)) { 1236 if (b.result == null) 1237 b.unipush(new CoCompletion(c)); 1238 else if (result != null) 1239 c.tryFire(SYNC); 1240 return; 1241 } 1242 } 1243 b.unipush(c); 1244 } 1245 } 1246 1247 /** Post-processing after successful BiCompletion tryFire. */ postFire(CompletableFuture<?> a, CompletableFuture<?> b, int mode)1248 final CompletableFuture<T> postFire(CompletableFuture<?> a, 1249 CompletableFuture<?> b, int mode) { 1250 if (b != null && b.stack != null) { // clean second source 1251 Object r; 1252 if ((r = b.result) == null) 1253 b.cleanStack(); 1254 if (mode >= 0 && (r != null || b.result != null)) 1255 b.postComplete(); 1256 } 1257 return postFire(a, mode); 1258 } 1259 1260 @SuppressWarnings("serial") 1261 static final class BiApply<T,U,V> extends BiCompletion<T,U,V> { 1262 BiFunction<? super T,? super U,? extends V> fn; BiApply(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, CompletableFuture<U> snd, BiFunction<? super T,? super U,? extends V> fn)1263 BiApply(Executor executor, CompletableFuture<V> dep, 1264 CompletableFuture<T> src, CompletableFuture<U> snd, 1265 BiFunction<? super T,? super U,? extends V> fn) { 1266 super(executor, dep, src, snd); this.fn = fn; 1267 } tryFire(int mode)1268 final CompletableFuture<V> tryFire(int mode) { 1269 CompletableFuture<V> d; 1270 CompletableFuture<T> a; 1271 CompletableFuture<U> b; 1272 Object r, s; BiFunction<? super T,? super U,? extends V> f; 1273 if ((d = dep) == null || (f = fn) == null 1274 || (a = src) == null || (r = a.result) == null 1275 || (b = snd) == null || (s = b.result) == null 1276 || !d.biApply(r, s, f, mode > 0 ? null : this)) 1277 return null; 1278 dep = null; src = null; snd = null; fn = null; 1279 return d.postFire(a, b, mode); 1280 } 1281 } 1282 biApply(Object r, Object s, BiFunction<? super R,? super S,? extends T> f, BiApply<R,S,T> c)1283 final <R,S> boolean biApply(Object r, Object s, 1284 BiFunction<? super R,? super S,? extends T> f, 1285 BiApply<R,S,T> c) { 1286 Throwable x; 1287 tryComplete: if (result == null) { 1288 if (r instanceof AltResult) { 1289 if ((x = ((AltResult)r).ex) != null) { 1290 completeThrowable(x, r); 1291 break tryComplete; 1292 } 1293 r = null; 1294 } 1295 if (s instanceof AltResult) { 1296 if ((x = ((AltResult)s).ex) != null) { 1297 completeThrowable(x, s); 1298 break tryComplete; 1299 } 1300 s = null; 1301 } 1302 try { 1303 if (c != null && !c.claim()) 1304 return false; 1305 @SuppressWarnings("unchecked") R rr = (R) r; 1306 @SuppressWarnings("unchecked") S ss = (S) s; 1307 completeValue(f.apply(rr, ss)); 1308 } catch (Throwable ex) { 1309 completeThrowable(ex); 1310 } 1311 } 1312 return true; 1313 } 1314 biApplyStage( Executor e, CompletionStage<U> o, BiFunction<? super T,? super U,? extends V> f)1315 private <U,V> CompletableFuture<V> biApplyStage( 1316 Executor e, CompletionStage<U> o, 1317 BiFunction<? super T,? super U,? extends V> f) { 1318 CompletableFuture<U> b; Object r, s; 1319 if (f == null || (b = o.toCompletableFuture()) == null) 1320 throw new NullPointerException(); 1321 CompletableFuture<V> d = newIncompleteFuture(); 1322 if ((r = result) == null || (s = b.result) == null) 1323 bipush(b, new BiApply<T,U,V>(e, d, this, b, f)); 1324 else if (e == null) 1325 d.biApply(r, s, f, null); 1326 else 1327 try { 1328 e.execute(new BiApply<T,U,V>(null, d, this, b, f)); 1329 } catch (Throwable ex) { 1330 d.result = encodeThrowable(ex); 1331 } 1332 return d; 1333 } 1334 1335 @SuppressWarnings("serial") 1336 static final class BiAccept<T,U> extends BiCompletion<T,U,Void> { 1337 BiConsumer<? super T,? super U> fn; BiAccept(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd, BiConsumer<? super T,? super U> fn)1338 BiAccept(Executor executor, CompletableFuture<Void> dep, 1339 CompletableFuture<T> src, CompletableFuture<U> snd, 1340 BiConsumer<? super T,? super U> fn) { 1341 super(executor, dep, src, snd); this.fn = fn; 1342 } tryFire(int mode)1343 final CompletableFuture<Void> tryFire(int mode) { 1344 CompletableFuture<Void> d; 1345 CompletableFuture<T> a; 1346 CompletableFuture<U> b; 1347 Object r, s; BiConsumer<? super T,? super U> f; 1348 if ((d = dep) == null || (f = fn) == null 1349 || (a = src) == null || (r = a.result) == null 1350 || (b = snd) == null || (s = b.result) == null 1351 || !d.biAccept(r, s, f, mode > 0 ? null : this)) 1352 return null; 1353 dep = null; src = null; snd = null; fn = null; 1354 return d.postFire(a, b, mode); 1355 } 1356 } 1357 biAccept(Object r, Object s, BiConsumer<? super R,? super S> f, BiAccept<R,S> c)1358 final <R,S> boolean biAccept(Object r, Object s, 1359 BiConsumer<? super R,? super S> f, 1360 BiAccept<R,S> c) { 1361 Throwable x; 1362 tryComplete: if (result == null) { 1363 if (r instanceof AltResult) { 1364 if ((x = ((AltResult)r).ex) != null) { 1365 completeThrowable(x, r); 1366 break tryComplete; 1367 } 1368 r = null; 1369 } 1370 if (s instanceof AltResult) { 1371 if ((x = ((AltResult)s).ex) != null) { 1372 completeThrowable(x, s); 1373 break tryComplete; 1374 } 1375 s = null; 1376 } 1377 try { 1378 if (c != null && !c.claim()) 1379 return false; 1380 @SuppressWarnings("unchecked") R rr = (R) r; 1381 @SuppressWarnings("unchecked") S ss = (S) s; 1382 f.accept(rr, ss); 1383 completeNull(); 1384 } catch (Throwable ex) { 1385 completeThrowable(ex); 1386 } 1387 } 1388 return true; 1389 } 1390 biAcceptStage( Executor e, CompletionStage<U> o, BiConsumer<? super T,? super U> f)1391 private <U> CompletableFuture<Void> biAcceptStage( 1392 Executor e, CompletionStage<U> o, 1393 BiConsumer<? super T,? super U> f) { 1394 CompletableFuture<U> b; Object r, s; 1395 if (f == null || (b = o.toCompletableFuture()) == null) 1396 throw new NullPointerException(); 1397 CompletableFuture<Void> d = newIncompleteFuture(); 1398 if ((r = result) == null || (s = b.result) == null) 1399 bipush(b, new BiAccept<T,U>(e, d, this, b, f)); 1400 else if (e == null) 1401 d.biAccept(r, s, f, null); 1402 else 1403 try { 1404 e.execute(new BiAccept<T,U>(null, d, this, b, f)); 1405 } catch (Throwable ex) { 1406 d.result = encodeThrowable(ex); 1407 } 1408 return d; 1409 } 1410 1411 @SuppressWarnings("serial") 1412 static final class BiRun<T,U> extends BiCompletion<T,U,Void> { 1413 Runnable fn; BiRun(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd, Runnable fn)1414 BiRun(Executor executor, CompletableFuture<Void> dep, 1415 CompletableFuture<T> src, CompletableFuture<U> snd, 1416 Runnable fn) { 1417 super(executor, dep, src, snd); this.fn = fn; 1418 } tryFire(int mode)1419 final CompletableFuture<Void> tryFire(int mode) { 1420 CompletableFuture<Void> d; 1421 CompletableFuture<T> a; 1422 CompletableFuture<U> b; 1423 Object r, s; Runnable f; 1424 if ((d = dep) == null || (f = fn) == null 1425 || (a = src) == null || (r = a.result) == null 1426 || (b = snd) == null || (s = b.result) == null 1427 || !d.biRun(r, s, f, mode > 0 ? null : this)) 1428 return null; 1429 dep = null; src = null; snd = null; fn = null; 1430 return d.postFire(a, b, mode); 1431 } 1432 } 1433 biRun(Object r, Object s, Runnable f, BiRun<?,?> c)1434 final boolean biRun(Object r, Object s, Runnable f, BiRun<?,?> c) { 1435 Throwable x; Object z; 1436 if (result == null) { 1437 if ((r instanceof AltResult 1438 && (x = ((AltResult)(z = r)).ex) != null) || 1439 (s instanceof AltResult 1440 && (x = ((AltResult)(z = s)).ex) != null)) 1441 completeThrowable(x, z); 1442 else 1443 try { 1444 if (c != null && !c.claim()) 1445 return false; 1446 f.run(); 1447 completeNull(); 1448 } catch (Throwable ex) { 1449 completeThrowable(ex); 1450 } 1451 } 1452 return true; 1453 } 1454 biRunStage(Executor e, CompletionStage<?> o, Runnable f)1455 private CompletableFuture<Void> biRunStage(Executor e, CompletionStage<?> o, 1456 Runnable f) { 1457 CompletableFuture<?> b; Object r, s; 1458 if (f == null || (b = o.toCompletableFuture()) == null) 1459 throw new NullPointerException(); 1460 CompletableFuture<Void> d = newIncompleteFuture(); 1461 if ((r = result) == null || (s = b.result) == null) 1462 bipush(b, new BiRun<>(e, d, this, b, f)); 1463 else if (e == null) 1464 d.biRun(r, s, f, null); 1465 else 1466 try { 1467 e.execute(new BiRun<>(null, d, this, b, f)); 1468 } catch (Throwable ex) { 1469 d.result = encodeThrowable(ex); 1470 } 1471 return d; 1472 } 1473 1474 @SuppressWarnings("serial") 1475 static final class BiRelay<T,U> extends BiCompletion<T,U,Void> { // for And BiRelay(CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd)1476 BiRelay(CompletableFuture<Void> dep, 1477 CompletableFuture<T> src, CompletableFuture<U> snd) { 1478 super(null, dep, src, snd); 1479 } tryFire(int mode)1480 final CompletableFuture<Void> tryFire(int mode) { 1481 CompletableFuture<Void> d; 1482 CompletableFuture<T> a; 1483 CompletableFuture<U> b; 1484 Object r, s, z; Throwable x; 1485 if ((d = dep) == null 1486 || (a = src) == null || (r = a.result) == null 1487 || (b = snd) == null || (s = b.result) == null) 1488 return null; 1489 if (d.result == null) { 1490 if ((r instanceof AltResult 1491 && (x = ((AltResult)(z = r)).ex) != null) || 1492 (s instanceof AltResult 1493 && (x = ((AltResult)(z = s)).ex) != null)) 1494 d.completeThrowable(x, z); 1495 else 1496 d.completeNull(); 1497 } 1498 src = null; snd = null; dep = null; 1499 return d.postFire(a, b, mode); 1500 } 1501 } 1502 1503 /** Recursively constructs a tree of completions. */ andTree(CompletableFuture<?>[] cfs, int lo, int hi)1504 static CompletableFuture<Void> andTree(CompletableFuture<?>[] cfs, 1505 int lo, int hi) { 1506 CompletableFuture<Void> d = new CompletableFuture<Void>(); 1507 if (lo > hi) // empty 1508 d.result = NIL; 1509 else { 1510 CompletableFuture<?> a, b; Object r, s, z; Throwable x; 1511 int mid = (lo + hi) >>> 1; 1512 if ((a = (lo == mid ? cfs[lo] : 1513 andTree(cfs, lo, mid))) == null || 1514 (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] : 1515 andTree(cfs, mid+1, hi))) == null) 1516 throw new NullPointerException(); 1517 if ((r = a.result) == null || (s = b.result) == null) 1518 a.bipush(b, new BiRelay<>(d, a, b)); 1519 else if ((r instanceof AltResult 1520 && (x = ((AltResult)(z = r)).ex) != null) || 1521 (s instanceof AltResult 1522 && (x = ((AltResult)(z = s)).ex) != null)) 1523 d.result = encodeThrowable(x, z); 1524 else 1525 d.result = NIL; 1526 } 1527 return d; 1528 } 1529 1530 /* ------------- Projected (Ored) BiCompletions -------------- */ 1531 1532 /** 1533 * Pushes completion to this and b unless either done. 1534 * Caller should first check that result and b.result are both null. 1535 */ orpush(CompletableFuture<?> b, BiCompletion<?,?,?> c)1536 final void orpush(CompletableFuture<?> b, BiCompletion<?,?,?> c) { 1537 if (c != null) { 1538 while (!tryPushStack(c)) { 1539 if (result != null) { 1540 NEXT.set(c, null); 1541 break; 1542 } 1543 } 1544 if (result != null) 1545 c.tryFire(SYNC); 1546 else 1547 b.unipush(new CoCompletion(c)); 1548 } 1549 } 1550 1551 @SuppressWarnings("serial") 1552 static final class OrApply<T,U extends T,V> extends BiCompletion<T,U,V> { 1553 Function<? super T,? extends V> fn; OrApply(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, CompletableFuture<U> snd, Function<? super T,? extends V> fn)1554 OrApply(Executor executor, CompletableFuture<V> dep, 1555 CompletableFuture<T> src, CompletableFuture<U> snd, 1556 Function<? super T,? extends V> fn) { 1557 super(executor, dep, src, snd); this.fn = fn; 1558 } tryFire(int mode)1559 final CompletableFuture<V> tryFire(int mode) { 1560 CompletableFuture<V> d; 1561 CompletableFuture<T> a; 1562 CompletableFuture<U> b; 1563 Object r; Throwable x; Function<? super T,? extends V> f; 1564 if ((d = dep) == null || (f = fn) == null 1565 || (a = src) == null || (b = snd) == null 1566 || ((r = a.result) == null && (r = b.result) == null)) 1567 return null; 1568 tryComplete: if (d.result == null) { 1569 try { 1570 if (mode <= 0 && !claim()) 1571 return null; 1572 if (r instanceof AltResult) { 1573 if ((x = ((AltResult)r).ex) != null) { 1574 d.completeThrowable(x, r); 1575 break tryComplete; 1576 } 1577 r = null; 1578 } 1579 @SuppressWarnings("unchecked") T t = (T) r; 1580 d.completeValue(f.apply(t)); 1581 } catch (Throwable ex) { 1582 d.completeThrowable(ex); 1583 } 1584 } 1585 dep = null; src = null; snd = null; fn = null; 1586 return d.postFire(a, b, mode); 1587 } 1588 } 1589 orApplyStage( Executor e, CompletionStage<U> o, Function<? super T, ? extends V> f)1590 private <U extends T,V> CompletableFuture<V> orApplyStage( 1591 Executor e, CompletionStage<U> o, Function<? super T, ? extends V> f) { 1592 CompletableFuture<U> b; 1593 if (f == null || (b = o.toCompletableFuture()) == null) 1594 throw new NullPointerException(); 1595 1596 Object r; CompletableFuture<? extends T> z; 1597 if ((r = (z = this).result) != null || 1598 (r = (z = b).result) != null) 1599 return z.uniApplyNow(r, e, f); 1600 1601 CompletableFuture<V> d = newIncompleteFuture(); 1602 orpush(b, new OrApply<T,U,V>(e, d, this, b, f)); 1603 return d; 1604 } 1605 1606 @SuppressWarnings("serial") 1607 static final class OrAccept<T,U extends T> extends BiCompletion<T,U,Void> { 1608 Consumer<? super T> fn; OrAccept(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd, Consumer<? super T> fn)1609 OrAccept(Executor executor, CompletableFuture<Void> dep, 1610 CompletableFuture<T> src, CompletableFuture<U> snd, 1611 Consumer<? super T> fn) { 1612 super(executor, dep, src, snd); this.fn = fn; 1613 } tryFire(int mode)1614 final CompletableFuture<Void> tryFire(int mode) { 1615 CompletableFuture<Void> d; 1616 CompletableFuture<T> a; 1617 CompletableFuture<U> b; 1618 Object r; Throwable x; Consumer<? super T> f; 1619 if ((d = dep) == null || (f = fn) == null 1620 || (a = src) == null || (b = snd) == null 1621 || ((r = a.result) == null && (r = b.result) == null)) 1622 return null; 1623 tryComplete: if (d.result == null) { 1624 try { 1625 if (mode <= 0 && !claim()) 1626 return null; 1627 if (r instanceof AltResult) { 1628 if ((x = ((AltResult)r).ex) != null) { 1629 d.completeThrowable(x, r); 1630 break tryComplete; 1631 } 1632 r = null; 1633 } 1634 @SuppressWarnings("unchecked") T t = (T) r; 1635 f.accept(t); 1636 d.completeNull(); 1637 } catch (Throwable ex) { 1638 d.completeThrowable(ex); 1639 } 1640 } 1641 dep = null; src = null; snd = null; fn = null; 1642 return d.postFire(a, b, mode); 1643 } 1644 } 1645 orAcceptStage( Executor e, CompletionStage<U> o, Consumer<? super T> f)1646 private <U extends T> CompletableFuture<Void> orAcceptStage( 1647 Executor e, CompletionStage<U> o, Consumer<? super T> f) { 1648 CompletableFuture<U> b; 1649 if (f == null || (b = o.toCompletableFuture()) == null) 1650 throw new NullPointerException(); 1651 1652 Object r; CompletableFuture<? extends T> z; 1653 if ((r = (z = this).result) != null || 1654 (r = (z = b).result) != null) 1655 return z.uniAcceptNow(r, e, f); 1656 1657 CompletableFuture<Void> d = newIncompleteFuture(); 1658 orpush(b, new OrAccept<T,U>(e, d, this, b, f)); 1659 return d; 1660 } 1661 1662 @SuppressWarnings("serial") 1663 static final class OrRun<T,U> extends BiCompletion<T,U,Void> { 1664 Runnable fn; OrRun(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd, Runnable fn)1665 OrRun(Executor executor, CompletableFuture<Void> dep, 1666 CompletableFuture<T> src, CompletableFuture<U> snd, 1667 Runnable fn) { 1668 super(executor, dep, src, snd); this.fn = fn; 1669 } tryFire(int mode)1670 final CompletableFuture<Void> tryFire(int mode) { 1671 CompletableFuture<Void> d; 1672 CompletableFuture<T> a; 1673 CompletableFuture<U> b; 1674 Object r; Throwable x; Runnable f; 1675 if ((d = dep) == null || (f = fn) == null 1676 || (a = src) == null || (b = snd) == null 1677 || ((r = a.result) == null && (r = b.result) == null)) 1678 return null; 1679 if (d.result == null) { 1680 try { 1681 if (mode <= 0 && !claim()) 1682 return null; 1683 else if (r instanceof AltResult 1684 && (x = ((AltResult)r).ex) != null) 1685 d.completeThrowable(x, r); 1686 else { 1687 f.run(); 1688 d.completeNull(); 1689 } 1690 } catch (Throwable ex) { 1691 d.completeThrowable(ex); 1692 } 1693 } 1694 dep = null; src = null; snd = null; fn = null; 1695 return d.postFire(a, b, mode); 1696 } 1697 } 1698 orRunStage(Executor e, CompletionStage<?> o, Runnable f)1699 private CompletableFuture<Void> orRunStage(Executor e, CompletionStage<?> o, 1700 Runnable f) { 1701 CompletableFuture<?> b; 1702 if (f == null || (b = o.toCompletableFuture()) == null) 1703 throw new NullPointerException(); 1704 1705 Object r; CompletableFuture<?> z; 1706 if ((r = (z = this).result) != null || 1707 (r = (z = b).result) != null) 1708 return z.uniRunNow(r, e, f); 1709 1710 CompletableFuture<Void> d = newIncompleteFuture(); 1711 orpush(b, new OrRun<>(e, d, this, b, f)); 1712 return d; 1713 } 1714 1715 /** Completion for an anyOf input future. */ 1716 @SuppressWarnings("serial") 1717 static class AnyOf extends Completion { 1718 CompletableFuture<Object> dep; CompletableFuture<?> src; 1719 CompletableFuture<?>[] srcs; AnyOf(CompletableFuture<Object> dep, CompletableFuture<?> src, CompletableFuture<?>[] srcs)1720 AnyOf(CompletableFuture<Object> dep, CompletableFuture<?> src, 1721 CompletableFuture<?>[] srcs) { 1722 this.dep = dep; this.src = src; this.srcs = srcs; 1723 } tryFire(int mode)1724 final CompletableFuture<Object> tryFire(int mode) { 1725 // assert mode != ASYNC; 1726 CompletableFuture<Object> d; CompletableFuture<?> a; 1727 CompletableFuture<?>[] as; 1728 Object r; 1729 if ((d = dep) == null 1730 || (a = src) == null || (r = a.result) == null 1731 || (as = srcs) == null) 1732 return null; 1733 dep = null; src = null; srcs = null; 1734 if (d.completeRelay(r)) { 1735 for (CompletableFuture<?> b : as) 1736 if (b != a) 1737 b.cleanStack(); 1738 if (mode < 0) 1739 return d; 1740 else 1741 d.postComplete(); 1742 } 1743 return null; 1744 } isLive()1745 final boolean isLive() { 1746 CompletableFuture<Object> d; 1747 return (d = dep) != null && d.result == null; 1748 } 1749 } 1750 1751 /* ------------- Zero-input Async forms -------------- */ 1752 1753 @SuppressWarnings("serial") 1754 static final class AsyncSupply<T> extends ForkJoinTask<Void> 1755 implements Runnable, AsynchronousCompletionTask { 1756 CompletableFuture<T> dep; Supplier<? extends T> fn; AsyncSupply(CompletableFuture<T> dep, Supplier<? extends T> fn)1757 AsyncSupply(CompletableFuture<T> dep, Supplier<? extends T> fn) { 1758 this.dep = dep; this.fn = fn; 1759 } 1760 getRawResult()1761 public final Void getRawResult() { return null; } setRawResult(Void v)1762 public final void setRawResult(Void v) {} exec()1763 public final boolean exec() { run(); return false; } 1764 run()1765 public void run() { 1766 CompletableFuture<T> d; Supplier<? extends T> f; 1767 if ((d = dep) != null && (f = fn) != null) { 1768 dep = null; fn = null; 1769 if (d.result == null) { 1770 try { 1771 d.completeValue(f.get()); 1772 } catch (Throwable ex) { 1773 d.completeThrowable(ex); 1774 } 1775 } 1776 d.postComplete(); 1777 } 1778 } 1779 } 1780 asyncSupplyStage(Executor e, Supplier<U> f)1781 static <U> CompletableFuture<U> asyncSupplyStage(Executor e, 1782 Supplier<U> f) { 1783 if (f == null) throw new NullPointerException(); 1784 CompletableFuture<U> d = new CompletableFuture<U>(); 1785 e.execute(new AsyncSupply<U>(d, f)); 1786 return d; 1787 } 1788 1789 @SuppressWarnings("serial") 1790 static final class AsyncRun extends ForkJoinTask<Void> 1791 implements Runnable, AsynchronousCompletionTask { 1792 CompletableFuture<Void> dep; Runnable fn; AsyncRun(CompletableFuture<Void> dep, Runnable fn)1793 AsyncRun(CompletableFuture<Void> dep, Runnable fn) { 1794 this.dep = dep; this.fn = fn; 1795 } 1796 getRawResult()1797 public final Void getRawResult() { return null; } setRawResult(Void v)1798 public final void setRawResult(Void v) {} exec()1799 public final boolean exec() { run(); return false; } 1800 run()1801 public void run() { 1802 CompletableFuture<Void> d; Runnable f; 1803 if ((d = dep) != null && (f = fn) != null) { 1804 dep = null; fn = null; 1805 if (d.result == null) { 1806 try { 1807 f.run(); 1808 d.completeNull(); 1809 } catch (Throwable ex) { 1810 d.completeThrowable(ex); 1811 } 1812 } 1813 d.postComplete(); 1814 } 1815 } 1816 } 1817 asyncRunStage(Executor e, Runnable f)1818 static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) { 1819 if (f == null) throw new NullPointerException(); 1820 CompletableFuture<Void> d = new CompletableFuture<Void>(); 1821 e.execute(new AsyncRun(d, f)); 1822 return d; 1823 } 1824 1825 /* ------------- Signallers -------------- */ 1826 1827 /** 1828 * Completion for recording and releasing a waiting thread. This 1829 * class implements ManagedBlocker to avoid starvation when 1830 * blocking actions pile up in ForkJoinPools. 1831 */ 1832 @SuppressWarnings("serial") 1833 static final class Signaller extends Completion 1834 implements ForkJoinPool.ManagedBlocker { 1835 long nanos; // remaining wait time if timed 1836 final long deadline; // non-zero if timed 1837 final boolean interruptible; 1838 boolean interrupted; 1839 volatile Thread thread; 1840 Signaller(boolean interruptible, long nanos, long deadline)1841 Signaller(boolean interruptible, long nanos, long deadline) { 1842 this.thread = Thread.currentThread(); 1843 this.interruptible = interruptible; 1844 this.nanos = nanos; 1845 this.deadline = deadline; 1846 } tryFire(int ignore)1847 final CompletableFuture<?> tryFire(int ignore) { 1848 Thread w; // no need to atomically claim 1849 if ((w = thread) != null) { 1850 thread = null; 1851 LockSupport.unpark(w); 1852 } 1853 return null; 1854 } isReleasable()1855 public boolean isReleasable() { 1856 if (Thread.interrupted()) 1857 interrupted = true; 1858 return ((interrupted && interruptible) || 1859 (deadline != 0L && 1860 (nanos <= 0L || 1861 (nanos = deadline - System.nanoTime()) <= 0L)) || 1862 thread == null); 1863 } block()1864 public boolean block() { 1865 while (!isReleasable()) { 1866 if (deadline == 0L) 1867 LockSupport.park(this); 1868 else 1869 LockSupport.parkNanos(this, nanos); 1870 } 1871 return true; 1872 } isLive()1873 final boolean isLive() { return thread != null; } 1874 } 1875 1876 /** 1877 * Returns raw result after waiting, or null if interruptible and 1878 * interrupted. 1879 */ waitingGet(boolean interruptible)1880 private Object waitingGet(boolean interruptible) { 1881 Signaller q = null; 1882 boolean queued = false; 1883 Object r; 1884 while ((r = result) == null) { 1885 if (q == null) { 1886 q = new Signaller(interruptible, 0L, 0L); 1887 if (Thread.currentThread() instanceof ForkJoinWorkerThread) 1888 ForkJoinPool.helpAsyncBlocker(defaultExecutor(), q); 1889 } 1890 else if (!queued) 1891 queued = tryPushStack(q); 1892 else { 1893 try { 1894 ForkJoinPool.managedBlock(q); 1895 } catch (InterruptedException ie) { // currently cannot happen 1896 q.interrupted = true; 1897 } 1898 if (q.interrupted && interruptible) 1899 break; 1900 } 1901 } 1902 if (q != null && queued) { 1903 q.thread = null; 1904 if (!interruptible && q.interrupted) 1905 Thread.currentThread().interrupt(); 1906 if (r == null) 1907 cleanStack(); 1908 } 1909 if (r != null || (r = result) != null) 1910 postComplete(); 1911 return r; 1912 } 1913 1914 /** 1915 * Returns raw result after waiting, or null if interrupted, or 1916 * throws TimeoutException on timeout. 1917 */ timedGet(long nanos)1918 private Object timedGet(long nanos) throws TimeoutException { 1919 if (Thread.interrupted()) 1920 return null; 1921 if (nanos > 0L) { 1922 long d = System.nanoTime() + nanos; 1923 long deadline = (d == 0L) ? 1L : d; // avoid 0 1924 Signaller q = null; 1925 boolean queued = false; 1926 Object r; 1927 while ((r = result) == null) { // similar to untimed 1928 if (q == null) { 1929 q = new Signaller(true, nanos, deadline); 1930 if (Thread.currentThread() instanceof ForkJoinWorkerThread) 1931 ForkJoinPool.helpAsyncBlocker(defaultExecutor(), q); 1932 } 1933 else if (!queued) 1934 queued = tryPushStack(q); 1935 else if (q.nanos <= 0L) 1936 break; 1937 else { 1938 try { 1939 ForkJoinPool.managedBlock(q); 1940 } catch (InterruptedException ie) { 1941 q.interrupted = true; 1942 } 1943 if (q.interrupted) 1944 break; 1945 } 1946 } 1947 if (q != null && queued) { 1948 q.thread = null; 1949 if (r == null) 1950 cleanStack(); 1951 } 1952 if (r != null || (r = result) != null) 1953 postComplete(); 1954 if (r != null || (q != null && q.interrupted)) 1955 return r; 1956 } 1957 throw new TimeoutException(); 1958 } 1959 1960 /* ------------- public methods -------------- */ 1961 1962 /** 1963 * Creates a new incomplete CompletableFuture. 1964 */ CompletableFuture()1965 public CompletableFuture() { 1966 } 1967 1968 /** 1969 * Creates a new complete CompletableFuture with given encoded result. 1970 */ CompletableFuture(Object r)1971 CompletableFuture(Object r) { 1972 RESULT.setRelease(this, r); 1973 } 1974 1975 /** 1976 * Returns a new CompletableFuture that is asynchronously completed 1977 * by a task running in the {@link ForkJoinPool#commonPool()} with 1978 * the value obtained by calling the given Supplier. 1979 * 1980 * @param supplier a function returning the value to be used 1981 * to complete the returned CompletableFuture 1982 * @param <U> the function's return type 1983 * @return the new CompletableFuture 1984 */ supplyAsync(Supplier<U> supplier)1985 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { 1986 return asyncSupplyStage(ASYNC_POOL, supplier); 1987 } 1988 1989 /** 1990 * Returns a new CompletableFuture that is asynchronously completed 1991 * by a task running in the given executor with the value obtained 1992 * by calling the given Supplier. 1993 * 1994 * @param supplier a function returning the value to be used 1995 * to complete the returned CompletableFuture 1996 * @param executor the executor to use for asynchronous execution 1997 * @param <U> the function's return type 1998 * @return the new CompletableFuture 1999 */ supplyAsync(Supplier<U> supplier, Executor executor)2000 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, 2001 Executor executor) { 2002 return asyncSupplyStage(screenExecutor(executor), supplier); 2003 } 2004 2005 /** 2006 * Returns a new CompletableFuture that is asynchronously completed 2007 * by a task running in the {@link ForkJoinPool#commonPool()} after 2008 * it runs the given action. 2009 * 2010 * @param runnable the action to run before completing the 2011 * returned CompletableFuture 2012 * @return the new CompletableFuture 2013 */ runAsync(Runnable runnable)2014 public static CompletableFuture<Void> runAsync(Runnable runnable) { 2015 return asyncRunStage(ASYNC_POOL, runnable); 2016 } 2017 2018 /** 2019 * Returns a new CompletableFuture that is asynchronously completed 2020 * by a task running in the given executor after it runs the given 2021 * action. 2022 * 2023 * @param runnable the action to run before completing the 2024 * returned CompletableFuture 2025 * @param executor the executor to use for asynchronous execution 2026 * @return the new CompletableFuture 2027 */ runAsync(Runnable runnable, Executor executor)2028 public static CompletableFuture<Void> runAsync(Runnable runnable, 2029 Executor executor) { 2030 return asyncRunStage(screenExecutor(executor), runnable); 2031 } 2032 2033 /** 2034 * Returns a new CompletableFuture that is already completed with 2035 * the given value. 2036 * 2037 * @param value the value 2038 * @param <U> the type of the value 2039 * @return the completed CompletableFuture 2040 */ completedFuture(U value)2041 public static <U> CompletableFuture<U> completedFuture(U value) { 2042 return new CompletableFuture<U>((value == null) ? NIL : value); 2043 } 2044 2045 /** 2046 * Returns {@code true} if completed in any fashion: normally, 2047 * exceptionally, or via cancellation. 2048 * 2049 * @return {@code true} if completed 2050 */ isDone()2051 public boolean isDone() { 2052 return result != null; 2053 } 2054 2055 /** 2056 * Waits if necessary for this future to complete, and then 2057 * returns its result. 2058 * 2059 * @return the result value 2060 * @throws CancellationException if this future was cancelled 2061 * @throws ExecutionException if this future completed exceptionally 2062 * @throws InterruptedException if the current thread was interrupted 2063 * while waiting 2064 */ 2065 @SuppressWarnings("unchecked") get()2066 public T get() throws InterruptedException, ExecutionException { 2067 Object r; 2068 if ((r = result) == null) 2069 r = waitingGet(true); 2070 return (T) reportGet(r); 2071 } 2072 2073 /** 2074 * Waits if necessary for at most the given time for this future 2075 * to complete, and then returns its result, if available. 2076 * 2077 * @param timeout the maximum time to wait 2078 * @param unit the time unit of the timeout argument 2079 * @return the result value 2080 * @throws CancellationException if this future was cancelled 2081 * @throws ExecutionException if this future completed exceptionally 2082 * @throws InterruptedException if the current thread was interrupted 2083 * while waiting 2084 * @throws TimeoutException if the wait timed out 2085 */ 2086 @SuppressWarnings("unchecked") get(long timeout, TimeUnit unit)2087 public T get(long timeout, TimeUnit unit) 2088 throws InterruptedException, ExecutionException, TimeoutException { 2089 long nanos = unit.toNanos(timeout); 2090 Object r; 2091 if ((r = result) == null) 2092 r = timedGet(nanos); 2093 return (T) reportGet(r); 2094 } 2095 2096 /** 2097 * Returns the result value when complete, or throws an 2098 * (unchecked) exception if completed exceptionally. To better 2099 * conform with the use of common functional forms, if a 2100 * computation involved in the completion of this 2101 * CompletableFuture threw an exception, this method throws an 2102 * (unchecked) {@link CompletionException} with the underlying 2103 * exception as its cause. 2104 * 2105 * @return the result value 2106 * @throws CancellationException if the computation was cancelled 2107 * @throws CompletionException if this future completed 2108 * exceptionally or a completion computation threw an exception 2109 */ 2110 @SuppressWarnings("unchecked") join()2111 public T join() { 2112 Object r; 2113 if ((r = result) == null) 2114 r = waitingGet(false); 2115 return (T) reportJoin(r); 2116 } 2117 2118 /** 2119 * Returns the result value (or throws any encountered exception) 2120 * if completed, else returns the given valueIfAbsent. 2121 * 2122 * @param valueIfAbsent the value to return if not completed 2123 * @return the result value, if completed, else the given valueIfAbsent 2124 * @throws CancellationException if the computation was cancelled 2125 * @throws CompletionException if this future completed 2126 * exceptionally or a completion computation threw an exception 2127 */ 2128 @SuppressWarnings("unchecked") getNow(T valueIfAbsent)2129 public T getNow(T valueIfAbsent) { 2130 Object r; 2131 return ((r = result) == null) ? valueIfAbsent : (T) reportJoin(r); 2132 } 2133 2134 /** 2135 * If not already completed, sets the value returned by {@link 2136 * #get()} and related methods to the given value. 2137 * 2138 * @param value the result value 2139 * @return {@code true} if this invocation caused this CompletableFuture 2140 * to transition to a completed state, else {@code false} 2141 */ complete(T value)2142 public boolean complete(T value) { 2143 boolean triggered = completeValue(value); 2144 postComplete(); 2145 return triggered; 2146 } 2147 2148 /** 2149 * If not already completed, causes invocations of {@link #get()} 2150 * and related methods to throw the given exception. 2151 * 2152 * @param ex the exception 2153 * @return {@code true} if this invocation caused this CompletableFuture 2154 * to transition to a completed state, else {@code false} 2155 */ completeExceptionally(Throwable ex)2156 public boolean completeExceptionally(Throwable ex) { 2157 if (ex == null) throw new NullPointerException(); 2158 boolean triggered = internalComplete(new AltResult(ex)); 2159 postComplete(); 2160 return triggered; 2161 } 2162 thenApply( Function<? super T,? extends U> fn)2163 public <U> CompletableFuture<U> thenApply( 2164 Function<? super T,? extends U> fn) { 2165 return uniApplyStage(null, fn); 2166 } 2167 thenApplyAsync( Function<? super T,? extends U> fn)2168 public <U> CompletableFuture<U> thenApplyAsync( 2169 Function<? super T,? extends U> fn) { 2170 return uniApplyStage(defaultExecutor(), fn); 2171 } 2172 thenApplyAsync( Function<? super T,? extends U> fn, Executor executor)2173 public <U> CompletableFuture<U> thenApplyAsync( 2174 Function<? super T,? extends U> fn, Executor executor) { 2175 return uniApplyStage(screenExecutor(executor), fn); 2176 } 2177 thenAccept(Consumer<? super T> action)2178 public CompletableFuture<Void> thenAccept(Consumer<? super T> action) { 2179 return uniAcceptStage(null, action); 2180 } 2181 thenAcceptAsync(Consumer<? super T> action)2182 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) { 2183 return uniAcceptStage(defaultExecutor(), action); 2184 } 2185 thenAcceptAsync(Consumer<? super T> action, Executor executor)2186 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, 2187 Executor executor) { 2188 return uniAcceptStage(screenExecutor(executor), action); 2189 } 2190 thenRun(Runnable action)2191 public CompletableFuture<Void> thenRun(Runnable action) { 2192 return uniRunStage(null, action); 2193 } 2194 thenRunAsync(Runnable action)2195 public CompletableFuture<Void> thenRunAsync(Runnable action) { 2196 return uniRunStage(defaultExecutor(), action); 2197 } 2198 thenRunAsync(Runnable action, Executor executor)2199 public CompletableFuture<Void> thenRunAsync(Runnable action, 2200 Executor executor) { 2201 return uniRunStage(screenExecutor(executor), action); 2202 } 2203 thenCombine( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)2204 public <U,V> CompletableFuture<V> thenCombine( 2205 CompletionStage<? extends U> other, 2206 BiFunction<? super T,? super U,? extends V> fn) { 2207 return biApplyStage(null, other, fn); 2208 } 2209 thenCombineAsync( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)2210 public <U,V> CompletableFuture<V> thenCombineAsync( 2211 CompletionStage<? extends U> other, 2212 BiFunction<? super T,? super U,? extends V> fn) { 2213 return biApplyStage(defaultExecutor(), other, fn); 2214 } 2215 thenCombineAsync( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)2216 public <U,V> CompletableFuture<V> thenCombineAsync( 2217 CompletionStage<? extends U> other, 2218 BiFunction<? super T,? super U,? extends V> fn, Executor executor) { 2219 return biApplyStage(screenExecutor(executor), other, fn); 2220 } 2221 thenAcceptBoth( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)2222 public <U> CompletableFuture<Void> thenAcceptBoth( 2223 CompletionStage<? extends U> other, 2224 BiConsumer<? super T, ? super U> action) { 2225 return biAcceptStage(null, other, action); 2226 } 2227 thenAcceptBothAsync( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)2228 public <U> CompletableFuture<Void> thenAcceptBothAsync( 2229 CompletionStage<? extends U> other, 2230 BiConsumer<? super T, ? super U> action) { 2231 return biAcceptStage(defaultExecutor(), other, action); 2232 } 2233 thenAcceptBothAsync( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)2234 public <U> CompletableFuture<Void> thenAcceptBothAsync( 2235 CompletionStage<? extends U> other, 2236 BiConsumer<? super T, ? super U> action, Executor executor) { 2237 return biAcceptStage(screenExecutor(executor), other, action); 2238 } 2239 runAfterBoth(CompletionStage<?> other, Runnable action)2240 public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, 2241 Runnable action) { 2242 return biRunStage(null, other, action); 2243 } 2244 runAfterBothAsync(CompletionStage<?> other, Runnable action)2245 public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, 2246 Runnable action) { 2247 return biRunStage(defaultExecutor(), other, action); 2248 } 2249 runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor)2250 public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, 2251 Runnable action, 2252 Executor executor) { 2253 return biRunStage(screenExecutor(executor), other, action); 2254 } 2255 applyToEither( CompletionStage<? extends T> other, Function<? super T, U> fn)2256 public <U> CompletableFuture<U> applyToEither( 2257 CompletionStage<? extends T> other, Function<? super T, U> fn) { 2258 return orApplyStage(null, other, fn); 2259 } 2260 applyToEitherAsync( CompletionStage<? extends T> other, Function<? super T, U> fn)2261 public <U> CompletableFuture<U> applyToEitherAsync( 2262 CompletionStage<? extends T> other, Function<? super T, U> fn) { 2263 return orApplyStage(defaultExecutor(), other, fn); 2264 } 2265 applyToEitherAsync( CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor)2266 public <U> CompletableFuture<U> applyToEitherAsync( 2267 CompletionStage<? extends T> other, Function<? super T, U> fn, 2268 Executor executor) { 2269 return orApplyStage(screenExecutor(executor), other, fn); 2270 } 2271 acceptEither( CompletionStage<? extends T> other, Consumer<? super T> action)2272 public CompletableFuture<Void> acceptEither( 2273 CompletionStage<? extends T> other, Consumer<? super T> action) { 2274 return orAcceptStage(null, other, action); 2275 } 2276 acceptEitherAsync( CompletionStage<? extends T> other, Consumer<? super T> action)2277 public CompletableFuture<Void> acceptEitherAsync( 2278 CompletionStage<? extends T> other, Consumer<? super T> action) { 2279 return orAcceptStage(defaultExecutor(), other, action); 2280 } 2281 acceptEitherAsync( CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)2282 public CompletableFuture<Void> acceptEitherAsync( 2283 CompletionStage<? extends T> other, Consumer<? super T> action, 2284 Executor executor) { 2285 return orAcceptStage(screenExecutor(executor), other, action); 2286 } 2287 runAfterEither(CompletionStage<?> other, Runnable action)2288 public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, 2289 Runnable action) { 2290 return orRunStage(null, other, action); 2291 } 2292 runAfterEitherAsync(CompletionStage<?> other, Runnable action)2293 public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, 2294 Runnable action) { 2295 return orRunStage(defaultExecutor(), other, action); 2296 } 2297 runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor)2298 public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, 2299 Runnable action, 2300 Executor executor) { 2301 return orRunStage(screenExecutor(executor), other, action); 2302 } 2303 thenCompose( Function<? super T, ? extends CompletionStage<U>> fn)2304 public <U> CompletableFuture<U> thenCompose( 2305 Function<? super T, ? extends CompletionStage<U>> fn) { 2306 return uniComposeStage(null, fn); 2307 } 2308 thenComposeAsync( Function<? super T, ? extends CompletionStage<U>> fn)2309 public <U> CompletableFuture<U> thenComposeAsync( 2310 Function<? super T, ? extends CompletionStage<U>> fn) { 2311 return uniComposeStage(defaultExecutor(), fn); 2312 } 2313 thenComposeAsync( Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)2314 public <U> CompletableFuture<U> thenComposeAsync( 2315 Function<? super T, ? extends CompletionStage<U>> fn, 2316 Executor executor) { 2317 return uniComposeStage(screenExecutor(executor), fn); 2318 } 2319 whenComplete( BiConsumer<? super T, ? super Throwable> action)2320 public CompletableFuture<T> whenComplete( 2321 BiConsumer<? super T, ? super Throwable> action) { 2322 return uniWhenCompleteStage(null, action); 2323 } 2324 whenCompleteAsync( BiConsumer<? super T, ? super Throwable> action)2325 public CompletableFuture<T> whenCompleteAsync( 2326 BiConsumer<? super T, ? super Throwable> action) { 2327 return uniWhenCompleteStage(defaultExecutor(), action); 2328 } 2329 whenCompleteAsync( BiConsumer<? super T, ? super Throwable> action, Executor executor)2330 public CompletableFuture<T> whenCompleteAsync( 2331 BiConsumer<? super T, ? super Throwable> action, Executor executor) { 2332 return uniWhenCompleteStage(screenExecutor(executor), action); 2333 } 2334 handle( BiFunction<? super T, Throwable, ? extends U> fn)2335 public <U> CompletableFuture<U> handle( 2336 BiFunction<? super T, Throwable, ? extends U> fn) { 2337 return uniHandleStage(null, fn); 2338 } 2339 handleAsync( BiFunction<? super T, Throwable, ? extends U> fn)2340 public <U> CompletableFuture<U> handleAsync( 2341 BiFunction<? super T, Throwable, ? extends U> fn) { 2342 return uniHandleStage(defaultExecutor(), fn); 2343 } 2344 handleAsync( BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)2345 public <U> CompletableFuture<U> handleAsync( 2346 BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) { 2347 return uniHandleStage(screenExecutor(executor), fn); 2348 } 2349 2350 /** 2351 * Returns this CompletableFuture. 2352 * 2353 * @return this CompletableFuture 2354 */ toCompletableFuture()2355 public CompletableFuture<T> toCompletableFuture() { 2356 return this; 2357 } 2358 exceptionally( Function<Throwable, ? extends T> fn)2359 public CompletableFuture<T> exceptionally( 2360 Function<Throwable, ? extends T> fn) { 2361 return uniExceptionallyStage(null, fn); 2362 } 2363 exceptionallyAsync( Function<Throwable, ? extends T> fn)2364 public CompletableFuture<T> exceptionallyAsync( 2365 Function<Throwable, ? extends T> fn) { 2366 return uniExceptionallyStage(defaultExecutor(), fn); 2367 } 2368 exceptionallyAsync( Function<Throwable, ? extends T> fn, Executor executor)2369 public CompletableFuture<T> exceptionallyAsync( 2370 Function<Throwable, ? extends T> fn, Executor executor) { 2371 return uniExceptionallyStage(screenExecutor(executor), fn); 2372 } 2373 exceptionallyCompose( Function<Throwable, ? extends CompletionStage<T>> fn)2374 public CompletableFuture<T> exceptionallyCompose( 2375 Function<Throwable, ? extends CompletionStage<T>> fn) { 2376 return uniComposeExceptionallyStage(null, fn); 2377 } 2378 exceptionallyComposeAsync( Function<Throwable, ? extends CompletionStage<T>> fn)2379 public CompletableFuture<T> exceptionallyComposeAsync( 2380 Function<Throwable, ? extends CompletionStage<T>> fn) { 2381 return uniComposeExceptionallyStage(defaultExecutor(), fn); 2382 } 2383 exceptionallyComposeAsync( Function<Throwable, ? extends CompletionStage<T>> fn, Executor executor)2384 public CompletableFuture<T> exceptionallyComposeAsync( 2385 Function<Throwable, ? extends CompletionStage<T>> fn, 2386 Executor executor) { 2387 return uniComposeExceptionallyStage(screenExecutor(executor), fn); 2388 } 2389 2390 /* ------------- Arbitrary-arity constructions -------------- */ 2391 2392 /** 2393 * Returns a new CompletableFuture that is completed when all of 2394 * the given CompletableFutures complete. If any of the given 2395 * CompletableFutures complete exceptionally, then the returned 2396 * CompletableFuture also does so, with a CompletionException 2397 * holding this exception as its cause. Otherwise, the results, 2398 * if any, of the given CompletableFutures are not reflected in 2399 * the returned CompletableFuture, but may be obtained by 2400 * inspecting them individually. If no CompletableFutures are 2401 * provided, returns a CompletableFuture completed with the value 2402 * {@code null}. 2403 * 2404 * <p>Among the applications of this method is to await completion 2405 * of a set of independent CompletableFutures before continuing a 2406 * program, as in: {@code CompletableFuture.allOf(c1, c2, 2407 * c3).join();}. 2408 * 2409 * @param cfs the CompletableFutures 2410 * @return a new CompletableFuture that is completed when all of the 2411 * given CompletableFutures complete 2412 * @throws NullPointerException if the array or any of its elements are 2413 * {@code null} 2414 */ allOf(CompletableFuture<?>.... cfs)2415 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) { 2416 return andTree(cfs, 0, cfs.length - 1); 2417 } 2418 2419 /** 2420 * Returns a new CompletableFuture that is completed when any of 2421 * the given CompletableFutures complete, with the same result. 2422 * Otherwise, if it completed exceptionally, the returned 2423 * CompletableFuture also does so, with a CompletionException 2424 * holding this exception as its cause. If no CompletableFutures 2425 * are provided, returns an incomplete CompletableFuture. 2426 * 2427 * @param cfs the CompletableFutures 2428 * @return a new CompletableFuture that is completed with the 2429 * result or exception of any of the given CompletableFutures when 2430 * one completes 2431 * @throws NullPointerException if the array or any of its elements are 2432 * {@code null} 2433 */ anyOf(CompletableFuture<?>.... cfs)2434 public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) { 2435 int n; Object r; 2436 if ((n = cfs.length) <= 1) 2437 return (n == 0) 2438 ? new CompletableFuture<Object>() 2439 : uniCopyStage(cfs[0]); 2440 for (CompletableFuture<?> cf : cfs) 2441 if ((r = cf.result) != null) 2442 return new CompletableFuture<Object>(encodeRelay(r)); 2443 cfs = cfs.clone(); 2444 CompletableFuture<Object> d = new CompletableFuture<>(); 2445 for (CompletableFuture<?> cf : cfs) 2446 cf.unipush(new AnyOf(d, cf, cfs)); 2447 // If d was completed while we were adding completions, we should 2448 // clean the stack of any sources that may have had completions 2449 // pushed on their stack after d was completed. 2450 if (d.result != null) 2451 for (int i = 0, len = cfs.length; i < len; i++) 2452 if (cfs[i].result != null) 2453 for (i++; i < len; i++) 2454 if (cfs[i].result == null) 2455 cfs[i].cleanStack(); 2456 return d; 2457 } 2458 2459 /* ------------- Control and status methods -------------- */ 2460 2461 /** 2462 * If not already completed, completes this CompletableFuture with 2463 * a {@link CancellationException}. Dependent CompletableFutures 2464 * that have not already completed will also complete 2465 * exceptionally, with a {@link CompletionException} caused by 2466 * this {@code CancellationException}. 2467 * 2468 * @param mayInterruptIfRunning this value has no effect in this 2469 * implementation because interrupts are not used to control 2470 * processing. 2471 * 2472 * @return {@code true} if this task is now cancelled 2473 */ cancel(boolean mayInterruptIfRunning)2474 public boolean cancel(boolean mayInterruptIfRunning) { 2475 boolean cancelled = (result == null) && 2476 internalComplete(new AltResult(new CancellationException())); 2477 postComplete(); 2478 return cancelled || isCancelled(); 2479 } 2480 2481 /** 2482 * Returns {@code true} if this CompletableFuture was cancelled 2483 * before it completed normally. 2484 * 2485 * @return {@code true} if this CompletableFuture was cancelled 2486 * before it completed normally 2487 */ isCancelled()2488 public boolean isCancelled() { 2489 Object r; 2490 return ((r = result) instanceof AltResult) && 2491 (((AltResult)r).ex instanceof CancellationException); 2492 } 2493 2494 /** 2495 * Returns {@code true} if this CompletableFuture completed 2496 * exceptionally, in any way. Possible causes include 2497 * cancellation, explicit invocation of {@code 2498 * completeExceptionally}, and abrupt termination of a 2499 * CompletionStage action. 2500 * 2501 * @return {@code true} if this CompletableFuture completed 2502 * exceptionally 2503 */ isCompletedExceptionally()2504 public boolean isCompletedExceptionally() { 2505 Object r; 2506 return ((r = result) instanceof AltResult) && r != NIL; 2507 } 2508 2509 /** 2510 * Forcibly sets or resets the value subsequently returned by 2511 * method {@link #get()} and related methods, whether or not 2512 * already completed. This method is designed for use only in 2513 * error recovery actions, and even in such situations may result 2514 * in ongoing dependent completions using established versus 2515 * overwritten outcomes. 2516 * 2517 * @param value the completion value 2518 */ obtrudeValue(T value)2519 public void obtrudeValue(T value) { 2520 result = (value == null) ? NIL : value; 2521 postComplete(); 2522 } 2523 2524 /** 2525 * Forcibly causes subsequent invocations of method {@link #get()} 2526 * and related methods to throw the given exception, whether or 2527 * not already completed. This method is designed for use only in 2528 * error recovery actions, and even in such situations may result 2529 * in ongoing dependent completions using established versus 2530 * overwritten outcomes. 2531 * 2532 * @param ex the exception 2533 * @throws NullPointerException if the exception is null 2534 */ obtrudeException(Throwable ex)2535 public void obtrudeException(Throwable ex) { 2536 if (ex == null) throw new NullPointerException(); 2537 result = new AltResult(ex); 2538 postComplete(); 2539 } 2540 2541 /** 2542 * Returns the estimated number of CompletableFutures whose 2543 * completions are awaiting completion of this CompletableFuture. 2544 * This method is designed for use in monitoring system state, not 2545 * for synchronization control. 2546 * 2547 * @return the number of dependent CompletableFutures 2548 */ getNumberOfDependents()2549 public int getNumberOfDependents() { 2550 int count = 0; 2551 for (Completion p = stack; p != null; p = p.next) 2552 ++count; 2553 return count; 2554 } 2555 2556 /** 2557 * Returns a string identifying this CompletableFuture, as well as 2558 * its completion state. The state, in brackets, contains the 2559 * String {@code "Completed Normally"} or the String {@code 2560 * "Completed Exceptionally"}, or the String {@code "Not 2561 * completed"} followed by the number of CompletableFutures 2562 * dependent upon its completion, if any. 2563 * 2564 * @return a string identifying this CompletableFuture, as well as its state 2565 */ toString()2566 public String toString() { 2567 Object r = result; 2568 int count = 0; // avoid call to getNumberOfDependents in case disabled 2569 for (Completion p = stack; p != null; p = p.next) 2570 ++count; 2571 return super.toString() + 2572 ((r == null) 2573 ? ((count == 0) 2574 ? "[Not completed]" 2575 : "[Not completed, " + count + " dependents]") 2576 : (((r instanceof AltResult) && ((AltResult)r).ex != null) 2577 ? "[Completed exceptionally: " + ((AltResult)r).ex + "]" 2578 : "[Completed normally]")); 2579 } 2580 2581 // jdk9 additions 2582 2583 /** 2584 * Returns a new incomplete CompletableFuture of the type to be 2585 * returned by a CompletionStage method. Subclasses should 2586 * normally override this method to return an instance of the same 2587 * class as this CompletableFuture. The default implementation 2588 * returns an instance of class CompletableFuture. 2589 * 2590 * @param <U> the type of the value 2591 * @return a new CompletableFuture 2592 * @since 9 2593 */ newIncompleteFuture()2594 public <U> CompletableFuture<U> newIncompleteFuture() { 2595 return new CompletableFuture<U>(); 2596 } 2597 2598 /** 2599 * Returns the default Executor used for async methods that do not 2600 * specify an Executor. This class uses the {@link 2601 * ForkJoinPool#commonPool()} if it supports more than one 2602 * parallel thread, or else an Executor using one thread per async 2603 * task. This method may be overridden in subclasses to return 2604 * an Executor that provides at least one independent thread. 2605 * 2606 * @return the executor 2607 * @since 9 2608 */ defaultExecutor()2609 public Executor defaultExecutor() { 2610 return ASYNC_POOL; 2611 } 2612 2613 /** 2614 * Returns a new CompletableFuture that is completed normally with 2615 * the same value as this CompletableFuture when it completes 2616 * normally. If this CompletableFuture completes exceptionally, 2617 * then the returned CompletableFuture completes exceptionally 2618 * with a CompletionException with this exception as cause. The 2619 * behavior is equivalent to {@code thenApply(x -> x)}. This 2620 * method may be useful as a form of "defensive copying", to 2621 * prevent clients from completing, while still being able to 2622 * arrange dependent actions. 2623 * 2624 * @return the new CompletableFuture 2625 * @since 9 2626 */ copy()2627 public CompletableFuture<T> copy() { 2628 return uniCopyStage(this); 2629 } 2630 2631 /** 2632 * Returns a new CompletionStage that is completed normally with 2633 * the same value as this CompletableFuture when it completes 2634 * normally, and cannot be independently completed or otherwise 2635 * used in ways not defined by the methods of interface {@link 2636 * CompletionStage}. If this CompletableFuture completes 2637 * exceptionally, then the returned CompletionStage completes 2638 * exceptionally with a CompletionException with this exception as 2639 * cause. 2640 * 2641 * <p>Unless overridden by a subclass, a new non-minimal 2642 * CompletableFuture with all methods available can be obtained from 2643 * a minimal CompletionStage via {@link #toCompletableFuture()}. 2644 * For example, completion of a minimal stage can be awaited by 2645 * 2646 * <pre> {@code minimalStage.toCompletableFuture().join(); }</pre> 2647 * 2648 * @return the new CompletionStage 2649 * @since 9 2650 */ minimalCompletionStage()2651 public CompletionStage<T> minimalCompletionStage() { 2652 return uniAsMinimalStage(); 2653 } 2654 2655 /** 2656 * Completes this CompletableFuture with the result of 2657 * the given Supplier function invoked from an asynchronous 2658 * task using the given executor. 2659 * 2660 * @param supplier a function returning the value to be used 2661 * to complete this CompletableFuture 2662 * @param executor the executor to use for asynchronous execution 2663 * @return this CompletableFuture 2664 * @since 9 2665 */ completeAsync(Supplier<? extends T> supplier, Executor executor)2666 public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier, 2667 Executor executor) { 2668 if (supplier == null || executor == null) 2669 throw new NullPointerException(); 2670 executor.execute(new AsyncSupply<T>(this, supplier)); 2671 return this; 2672 } 2673 2674 /** 2675 * Completes this CompletableFuture with the result of the given 2676 * Supplier function invoked from an asynchronous task using the 2677 * default executor. 2678 * 2679 * @param supplier a function returning the value to be used 2680 * to complete this CompletableFuture 2681 * @return this CompletableFuture 2682 * @since 9 2683 */ completeAsync(Supplier<? extends T> supplier)2684 public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier) { 2685 return completeAsync(supplier, defaultExecutor()); 2686 } 2687 2688 /** 2689 * Exceptionally completes this CompletableFuture with 2690 * a {@link TimeoutException} if not otherwise completed 2691 * before the given timeout. 2692 * 2693 * @param timeout how long to wait before completing exceptionally 2694 * with a TimeoutException, in units of {@code unit} 2695 * @param unit a {@code TimeUnit} determining how to interpret the 2696 * {@code timeout} parameter 2697 * @return this CompletableFuture 2698 * @since 9 2699 */ orTimeout(long timeout, TimeUnit unit)2700 public CompletableFuture<T> orTimeout(long timeout, TimeUnit unit) { 2701 if (unit == null) 2702 throw new NullPointerException(); 2703 if (result == null) 2704 whenComplete(new Canceller(Delayer.delay(new Timeout(this), 2705 timeout, unit))); 2706 return this; 2707 } 2708 2709 /** 2710 * Completes this CompletableFuture with the given value if not 2711 * otherwise completed before the given timeout. 2712 * 2713 * @param value the value to use upon timeout 2714 * @param timeout how long to wait before completing normally 2715 * with the given value, in units of {@code unit} 2716 * @param unit a {@code TimeUnit} determining how to interpret the 2717 * {@code timeout} parameter 2718 * @return this CompletableFuture 2719 * @since 9 2720 */ completeOnTimeout(T value, long timeout, TimeUnit unit)2721 public CompletableFuture<T> completeOnTimeout(T value, long timeout, 2722 TimeUnit unit) { 2723 if (unit == null) 2724 throw new NullPointerException(); 2725 if (result == null) 2726 whenComplete(new Canceller(Delayer.delay( 2727 new DelayedCompleter<T>(this, value), 2728 timeout, unit))); 2729 return this; 2730 } 2731 2732 /** 2733 * Returns a new Executor that submits a task to the given base 2734 * executor after the given delay (or no delay if non-positive). 2735 * Each delay commences upon invocation of the returned executor's 2736 * {@code execute} method. 2737 * 2738 * @param delay how long to delay, in units of {@code unit} 2739 * @param unit a {@code TimeUnit} determining how to interpret the 2740 * {@code delay} parameter 2741 * @param executor the base executor 2742 * @return the new delayed executor 2743 * @since 9 2744 */ delayedExecutor(long delay, TimeUnit unit, Executor executor)2745 public static Executor delayedExecutor(long delay, TimeUnit unit, 2746 Executor executor) { 2747 if (unit == null || executor == null) 2748 throw new NullPointerException(); 2749 return new DelayedExecutor(delay, unit, executor); 2750 } 2751 2752 /** 2753 * Returns a new Executor that submits a task to the default 2754 * executor after the given delay (or no delay if non-positive). 2755 * Each delay commences upon invocation of the returned executor's 2756 * {@code execute} method. 2757 * 2758 * @param delay how long to delay, in units of {@code unit} 2759 * @param unit a {@code TimeUnit} determining how to interpret the 2760 * {@code delay} parameter 2761 * @return the new delayed executor 2762 * @since 9 2763 */ delayedExecutor(long delay, TimeUnit unit)2764 public static Executor delayedExecutor(long delay, TimeUnit unit) { 2765 if (unit == null) 2766 throw new NullPointerException(); 2767 return new DelayedExecutor(delay, unit, ASYNC_POOL); 2768 } 2769 2770 /** 2771 * Returns a new CompletionStage that is already completed with 2772 * the given value and supports only those methods in 2773 * interface {@link CompletionStage}. 2774 * 2775 * @param value the value 2776 * @param <U> the type of the value 2777 * @return the completed CompletionStage 2778 * @since 9 2779 */ completedStage(U value)2780 public static <U> CompletionStage<U> completedStage(U value) { 2781 return new MinimalStage<U>((value == null) ? NIL : value); 2782 } 2783 2784 /** 2785 * Returns a new CompletableFuture that is already completed 2786 * exceptionally with the given exception. 2787 * 2788 * @param ex the exception 2789 * @param <U> the type of the value 2790 * @return the exceptionally completed CompletableFuture 2791 * @since 9 2792 */ failedFuture(Throwable ex)2793 public static <U> CompletableFuture<U> failedFuture(Throwable ex) { 2794 if (ex == null) throw new NullPointerException(); 2795 return new CompletableFuture<U>(new AltResult(ex)); 2796 } 2797 2798 /** 2799 * Returns a new CompletionStage that is already completed 2800 * exceptionally with the given exception and supports only those 2801 * methods in interface {@link CompletionStage}. 2802 * 2803 * @param ex the exception 2804 * @param <U> the type of the value 2805 * @return the exceptionally completed CompletionStage 2806 * @since 9 2807 */ failedStage(Throwable ex)2808 public static <U> CompletionStage<U> failedStage(Throwable ex) { 2809 if (ex == null) throw new NullPointerException(); 2810 return new MinimalStage<U>(new AltResult(ex)); 2811 } 2812 2813 /** 2814 * Singleton delay scheduler, used only for starting and 2815 * cancelling tasks. 2816 */ 2817 static final class Delayer { delay(Runnable command, long delay, TimeUnit unit)2818 static ScheduledFuture<?> delay(Runnable command, long delay, 2819 TimeUnit unit) { 2820 return delayer.schedule(command, delay, unit); 2821 } 2822 2823 static final class DaemonThreadFactory implements ThreadFactory { newThread(Runnable r)2824 public Thread newThread(Runnable r) { 2825 Thread t = new Thread(r); 2826 t.setDaemon(true); 2827 t.setName("CompletableFutureDelayScheduler"); 2828 return t; 2829 } 2830 } 2831 2832 static final ScheduledThreadPoolExecutor delayer; 2833 static { 2834 (delayer = new ScheduledThreadPoolExecutor( 2835 1, new DaemonThreadFactory())). 2836 setRemoveOnCancelPolicy(true); 2837 } 2838 } 2839 2840 // Little class-ified lambdas to better support monitoring 2841 2842 static final class DelayedExecutor implements Executor { 2843 final long delay; 2844 final TimeUnit unit; 2845 final Executor executor; DelayedExecutor(long delay, TimeUnit unit, Executor executor)2846 DelayedExecutor(long delay, TimeUnit unit, Executor executor) { 2847 this.delay = delay; this.unit = unit; this.executor = executor; 2848 } execute(Runnable r)2849 public void execute(Runnable r) { 2850 Delayer.delay(new TaskSubmitter(executor, r), delay, unit); 2851 } 2852 } 2853 2854 /** Action to submit user task */ 2855 static final class TaskSubmitter implements Runnable { 2856 final Executor executor; 2857 final Runnable action; TaskSubmitter(Executor executor, Runnable action)2858 TaskSubmitter(Executor executor, Runnable action) { 2859 this.executor = executor; 2860 this.action = action; 2861 } run()2862 public void run() { executor.execute(action); } 2863 } 2864 2865 /** Action to completeExceptionally on timeout */ 2866 static final class Timeout implements Runnable { 2867 final CompletableFuture<?> f; Timeout(CompletableFuture<?> f)2868 Timeout(CompletableFuture<?> f) { this.f = f; } run()2869 public void run() { 2870 if (f != null && !f.isDone()) 2871 f.completeExceptionally(new TimeoutException()); 2872 } 2873 } 2874 2875 /** Action to complete on timeout */ 2876 static final class DelayedCompleter<U> implements Runnable { 2877 final CompletableFuture<U> f; 2878 final U u; DelayedCompleter(CompletableFuture<U> f, U u)2879 DelayedCompleter(CompletableFuture<U> f, U u) { this.f = f; this.u = u; } run()2880 public void run() { 2881 if (f != null) 2882 f.complete(u); 2883 } 2884 } 2885 2886 /** Action to cancel unneeded timeouts */ 2887 static final class Canceller implements BiConsumer<Object, Throwable> { 2888 final Future<?> f; Canceller(Future<?> f)2889 Canceller(Future<?> f) { this.f = f; } accept(Object ignore, Throwable ex)2890 public void accept(Object ignore, Throwable ex) { 2891 if (ex == null && f != null && !f.isDone()) 2892 f.cancel(false); 2893 } 2894 } 2895 2896 /** 2897 * A subclass that just throws UOE for most non-CompletionStage methods. 2898 */ 2899 static final class MinimalStage<T> extends CompletableFuture<T> { MinimalStage()2900 MinimalStage() { } MinimalStage(Object r)2901 MinimalStage(Object r) { super(r); } newIncompleteFuture()2902 @Override public <U> CompletableFuture<U> newIncompleteFuture() { 2903 return new MinimalStage<U>(); } get()2904 @Override public T get() { 2905 throw new UnsupportedOperationException(); } get(long timeout, TimeUnit unit)2906 @Override public T get(long timeout, TimeUnit unit) { 2907 throw new UnsupportedOperationException(); } getNow(T valueIfAbsent)2908 @Override public T getNow(T valueIfAbsent) { 2909 throw new UnsupportedOperationException(); } join()2910 @Override public T join() { 2911 throw new UnsupportedOperationException(); } complete(T value)2912 @Override public boolean complete(T value) { 2913 throw new UnsupportedOperationException(); } completeExceptionally(Throwable ex)2914 @Override public boolean completeExceptionally(Throwable ex) { 2915 throw new UnsupportedOperationException(); } cancel(boolean mayInterruptIfRunning)2916 @Override public boolean cancel(boolean mayInterruptIfRunning) { 2917 throw new UnsupportedOperationException(); } obtrudeValue(T value)2918 @Override public void obtrudeValue(T value) { 2919 throw new UnsupportedOperationException(); } obtrudeException(Throwable ex)2920 @Override public void obtrudeException(Throwable ex) { 2921 throw new UnsupportedOperationException(); } isDone()2922 @Override public boolean isDone() { 2923 throw new UnsupportedOperationException(); } isCancelled()2924 @Override public boolean isCancelled() { 2925 throw new UnsupportedOperationException(); } isCompletedExceptionally()2926 @Override public boolean isCompletedExceptionally() { 2927 throw new UnsupportedOperationException(); } getNumberOfDependents()2928 @Override public int getNumberOfDependents() { 2929 throw new UnsupportedOperationException(); } completeAsync(Supplier<? extends T> supplier, Executor executor)2930 @Override public CompletableFuture<T> completeAsync 2931 (Supplier<? extends T> supplier, Executor executor) { 2932 throw new UnsupportedOperationException(); } completeAsync(Supplier<? extends T> supplier)2933 @Override public CompletableFuture<T> completeAsync 2934 (Supplier<? extends T> supplier) { 2935 throw new UnsupportedOperationException(); } orTimeout(long timeout, TimeUnit unit)2936 @Override public CompletableFuture<T> orTimeout 2937 (long timeout, TimeUnit unit) { 2938 throw new UnsupportedOperationException(); } completeOnTimeout(T value, long timeout, TimeUnit unit)2939 @Override public CompletableFuture<T> completeOnTimeout 2940 (T value, long timeout, TimeUnit unit) { 2941 throw new UnsupportedOperationException(); } toCompletableFuture()2942 @Override public CompletableFuture<T> toCompletableFuture() { 2943 Object r; 2944 if ((r = result) != null) 2945 return new CompletableFuture<T>(encodeRelay(r)); 2946 else { 2947 CompletableFuture<T> d = new CompletableFuture<>(); 2948 unipush(new UniRelay<T,T>(d, this)); 2949 return d; 2950 } 2951 } 2952 } 2953 2954 // VarHandle mechanics 2955 private static final VarHandle RESULT; 2956 private static final VarHandle STACK; 2957 private static final VarHandle NEXT; 2958 static { 2959 try { 2960 MethodHandles.Lookup l = MethodHandles.lookup(); 2961 RESULT = l.findVarHandle(CompletableFuture.class, "result", Object.class); 2962 STACK = l.findVarHandle(CompletableFuture.class, "stack", Completion.class); 2963 NEXT = l.findVarHandle(Completion.class, "next", Completion.class); 2964 } catch (ReflectiveOperationException e) { 2965 throw new ExceptionInInitializerError(e); 2966 } 2967 2968 // Reduce the risk of rare disastrous classloading in first call to 2969 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773 2970 Class<?> ensureLoaded = LockSupport.class; 2971 } 2972 } 2973