1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 38 import java.lang.invoke.MethodHandles; 39 import java.lang.invoke.VarHandle; 40 import java.util.concurrent.locks.LockSupport; 41 import java.util.function.BiConsumer; 42 import java.util.function.BiFunction; 43 import java.util.function.Consumer; 44 import java.util.function.Function; 45 import java.util.function.Supplier; 46 import java.util.Objects; 47 48 /** 49 * A {@link Future} that may be explicitly completed (setting its 50 * value and status), and may be used as a {@link CompletionStage}, 51 * supporting dependent functions and actions that trigger upon its 52 * completion. 53 * 54 * <p>When two or more threads attempt to 55 * {@link #complete complete}, 56 * {@link #completeExceptionally completeExceptionally}, or 57 * {@link #cancel cancel} 58 * a CompletableFuture, only one of them succeeds. 59 * 60 * <p>In addition to these and related methods for directly 61 * manipulating status and results, CompletableFuture implements 62 * interface {@link CompletionStage} with the following policies: <ul> 63 * 64 * <li>Actions supplied for dependent completions of 65 * <em>non-async</em> methods may be performed by the thread that 66 * completes the current CompletableFuture, or by any other caller of 67 * a completion method. 68 * 69 * <li>All <em>async</em> methods without an explicit Executor 70 * argument are performed using the {@link ForkJoinPool#commonPool()} 71 * (unless it does not support a parallelism level of at least two, in 72 * which case, a new Thread is created to run each task). This may be 73 * overridden for non-static methods in subclasses by defining method 74 * {@link #defaultExecutor()}. To simplify monitoring, debugging, 75 * and tracking, all generated asynchronous tasks are instances of the 76 * marker interface {@link AsynchronousCompletionTask}. Operations 77 * with time-delays can use adapter methods defined in this class, for 78 * example: {@code supplyAsync(supplier, delayedExecutor(timeout, 79 * timeUnit))}. To support methods with delays and timeouts, this 80 * class maintains at most one daemon thread for triggering and 81 * cancelling actions, not for running them. 82 * 83 * <li>All CompletionStage methods are implemented independently of 84 * other public methods, so the behavior of one method is not impacted 85 * by overrides of others in subclasses. 86 * 87 * <li>All CompletionStage methods return CompletableFutures. To 88 * restrict usages to only those methods defined in interface 89 * CompletionStage, use method {@link #minimalCompletionStage}. Or to 90 * ensure only that clients do not themselves modify a future, use 91 * method {@link #copy}. 92 * </ul> 93 * 94 * <p>CompletableFuture also implements {@link Future} with the following 95 * policies: <ul> 96 * 97 * <li>Since (unlike {@link FutureTask}) this class has no direct 98 * control over the computation that causes it to be completed, 99 * cancellation is treated as just another form of exceptional 100 * completion. Method {@link #cancel cancel} has the same effect as 101 * {@code completeExceptionally(new CancellationException())}. Method 102 * {@link #isCompletedExceptionally} can be used to determine if a 103 * CompletableFuture completed in any exceptional fashion. 104 * 105 * <li>In case of exceptional completion with a CompletionException, 106 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an 107 * {@link ExecutionException} with the same cause as held in the 108 * corresponding CompletionException. To simplify usage in most 109 * contexts, this class also defines methods {@link #join()} and 110 * {@link #getNow} that instead throw the CompletionException directly 111 * in these cases. 112 * </ul> 113 * 114 * <p>Arguments used to pass a completion result (that is, for 115 * parameters of type {@code T}) for methods accepting them may be 116 * null, but passing a null value for any other parameter will result 117 * in a {@link NullPointerException} being thrown. 118 * 119 * <p>Subclasses of this class should normally override the "virtual 120 * constructor" method {@link #newIncompleteFuture}, which establishes 121 * the concrete type returned by CompletionStage methods. For example, 122 * here is a class that substitutes a different default Executor and 123 * disables the {@code obtrude} methods: 124 * 125 * <pre> {@code 126 * class MyCompletableFuture<T> extends CompletableFuture<T> { 127 * static final Executor myExecutor = ...; 128 * public MyCompletableFuture() { } 129 * public <U> CompletableFuture<U> newIncompleteFuture() { 130 * return new MyCompletableFuture<U>(); } 131 * public Executor defaultExecutor() { 132 * return myExecutor; } 133 * public void obtrudeValue(T value) { 134 * throw new UnsupportedOperationException(); } 135 * public void obtrudeException(Throwable ex) { 136 * throw new UnsupportedOperationException(); } 137 * }}</pre> 138 * 139 * @author Doug Lea 140 * @param <T> The result type returned by this future's {@code join} 141 * and {@code get} methods 142 * @since 1.8 143 */ 144 public class CompletableFuture<T> implements Future<T>, CompletionStage<T> { 145 146 /* 147 * Overview: 148 * 149 * A CompletableFuture may have dependent completion actions, 150 * collected in a linked stack. It atomically completes by CASing 151 * a result field, and then pops off and runs those actions. This 152 * applies across normal vs exceptional outcomes, sync vs async 153 * actions, binary triggers, and various forms of completions. 154 * 155 * Non-nullness of volatile field "result" indicates done. It may 156 * be set directly if known to be thread-confined, else via CAS. 157 * An AltResult is used to box null as a result, as well as to 158 * hold exceptions. Using a single field makes completion simple 159 * to detect and trigger. Result encoding and decoding is 160 * straightforward but tedious and adds to the sprawl of trapping 161 * and associating exceptions with targets. Minor simplifications 162 * rely on (static) NIL (to box null results) being the only 163 * AltResult with a null exception field, so we don't usually need 164 * explicit comparisons. Even though some of the generics casts 165 * are unchecked (see SuppressWarnings annotations), they are 166 * placed to be appropriate even if checked. 167 * 168 * Dependent actions are represented by Completion objects linked 169 * as Treiber stacks headed by field "stack". There are Completion 170 * classes for each kind of action, grouped into: 171 * - single-input (UniCompletion), 172 * - two-input (BiCompletion), 173 * - projected (BiCompletions using exactly one of two inputs), 174 * - shared (CoCompletion, used by the second of two sources), 175 * - zero-input source actions, 176 * - Signallers that unblock waiters. 177 * Class Completion extends ForkJoinTask to enable async execution 178 * (adding no space overhead because we exploit its "tag" methods 179 * to maintain claims). It is also declared as Runnable to allow 180 * usage with arbitrary executors. 181 * 182 * Support for each kind of CompletionStage relies on a separate 183 * class, along with two CompletableFuture methods: 184 * 185 * * A Completion class with name X corresponding to function, 186 * prefaced with "Uni", "Bi", or "Or". Each class contains 187 * fields for source(s), actions, and dependent. They are 188 * boringly similar, differing from others only with respect to 189 * underlying functional forms. We do this so that users don't 190 * encounter layers of adapters in common usages. 191 * 192 * * Boolean CompletableFuture method x(...) (for example 193 * biApply) takes all of the arguments needed to check that an 194 * action is triggerable, and then either runs the action or 195 * arranges its async execution by executing its Completion 196 * argument, if present. The method returns true if known to be 197 * complete. 198 * 199 * * Completion method tryFire(int mode) invokes the associated x 200 * method with its held arguments, and on success cleans up. 201 * The mode argument allows tryFire to be called twice (SYNC, 202 * then ASYNC); the first to screen and trap exceptions while 203 * arranging to execute, and the second when called from a task. 204 * (A few classes are not used async so take slightly different 205 * forms.) The claim() callback suppresses function invocation 206 * if already claimed by another thread. 207 * 208 * * Some classes (for example UniApply) have separate handling 209 * code for when known to be thread-confined ("now" methods) and 210 * for when shared (in tryFire), for efficiency. 211 * 212 * * CompletableFuture method xStage(...) is called from a public 213 * stage method of CompletableFuture f. It screens user 214 * arguments and invokes and/or creates the stage object. If 215 * not async and already triggerable, the action is run 216 * immediately. Otherwise a Completion c is created, and 217 * submitted to the executor if triggerable, or pushed onto f's 218 * stack if not. Completion actions are started via c.tryFire. 219 * We recheck after pushing to a source future's stack to cover 220 * possible races if the source completes while pushing. 221 * Classes with two inputs (for example BiApply) deal with races 222 * across both while pushing actions. The second completion is 223 * a CoCompletion pointing to the first, shared so that at most 224 * one performs the action. The multiple-arity methods allOf 225 * does this pairwise to form trees of completions. Method 226 * anyOf is handled differently from allOf because completion of 227 * any source should trigger a cleanStack of other sources. 228 * Each AnyOf completion can reach others via a shared array. 229 * 230 * Note that the generic type parameters of methods vary according 231 * to whether "this" is a source, dependent, or completion. 232 * 233 * Method postComplete is called upon completion unless the target 234 * is guaranteed not to be observable (i.e., not yet returned or 235 * linked). Multiple threads can call postComplete, which 236 * atomically pops each dependent action, and tries to trigger it 237 * via method tryFire, in NESTED mode. Triggering can propagate 238 * recursively, so NESTED mode returns its completed dependent (if 239 * one exists) for further processing by its caller (see method 240 * postFire). 241 * 242 * Blocking methods get() and join() rely on Signaller Completions 243 * that wake up waiting threads. The mechanics are similar to 244 * Treiber stack wait-nodes used in FutureTask, Phaser, and 245 * SynchronousQueue. See their internal documentation for 246 * algorithmic details. 247 * 248 * Without precautions, CompletableFutures would be prone to 249 * garbage accumulation as chains of Completions build up, each 250 * pointing back to its sources. So we null out fields as soon as 251 * possible. The screening checks needed anyway harmlessly ignore 252 * null arguments that may have been obtained during races with 253 * threads nulling out fields. We also try to unlink non-isLive 254 * (fired or cancelled) Completions from stacks that might 255 * otherwise never be popped: Method cleanStack always unlinks non 256 * isLive completions from the head of stack; others may 257 * occasionally remain if racing with other cancellations or 258 * removals. 259 * 260 * Completion fields need not be declared as final or volatile 261 * because they are only visible to other threads upon safe 262 * publication. 263 */ 264 265 volatile Object result; // Either the result or boxed AltResult 266 volatile Completion stack; // Top of Treiber stack of dependent actions 267 internalComplete(Object r)268 final boolean internalComplete(Object r) { // CAS from null to r 269 return RESULT.compareAndSet(this, null, r); 270 } 271 272 /** Returns true if successfully pushed c onto stack. */ tryPushStack(Completion c)273 final boolean tryPushStack(Completion c) { 274 Completion h = stack; 275 NEXT.set(c, h); // CAS piggyback 276 return STACK.compareAndSet(this, h, c); 277 } 278 279 /** Unconditionally pushes c onto stack, retrying if necessary. */ pushStack(Completion c)280 final void pushStack(Completion c) { 281 do {} while (!tryPushStack(c)); 282 } 283 284 /* ------------- Encoding and decoding outcomes -------------- */ 285 286 static final class AltResult { // See above 287 final Throwable ex; // null only for NIL AltResult(Throwable x)288 AltResult(Throwable x) { this.ex = x; } 289 } 290 291 /** The encoding of the null value. */ 292 static final AltResult NIL = new AltResult(null); 293 294 /** Completes with the null value, unless already completed. */ completeNull()295 final boolean completeNull() { 296 return RESULT.compareAndSet(this, null, NIL); 297 } 298 299 /** Returns the encoding of the given non-exceptional value. */ encodeValue(T t)300 final Object encodeValue(T t) { 301 return (t == null) ? NIL : t; 302 } 303 304 /** Completes with a non-exceptional result, unless already completed. */ completeValue(T t)305 final boolean completeValue(T t) { 306 return RESULT.compareAndSet(this, null, (t == null) ? NIL : t); 307 } 308 309 /** 310 * Returns the encoding of the given (non-null) exception as a 311 * wrapped CompletionException unless it is one already. 312 */ encodeThrowable(Throwable x)313 static AltResult encodeThrowable(Throwable x) { 314 return new AltResult((x instanceof CompletionException) ? x : 315 new CompletionException(x)); 316 } 317 318 /** Completes with an exceptional result, unless already completed. */ completeThrowable(Throwable x)319 final boolean completeThrowable(Throwable x) { 320 return RESULT.compareAndSet(this, null, encodeThrowable(x)); 321 } 322 323 /** 324 * Returns the encoding of the given (non-null) exception as a 325 * wrapped CompletionException unless it is one already. May 326 * return the given Object r (which must have been the result of a 327 * source future) if it is equivalent, i.e. if this is a simple 328 * relay of an existing CompletionException. 329 */ encodeThrowable(Throwable x, Object r)330 static Object encodeThrowable(Throwable x, Object r) { 331 if (!(x instanceof CompletionException)) 332 x = new CompletionException(x); 333 else if (r instanceof AltResult && x == ((AltResult)r).ex) 334 return r; 335 return new AltResult(x); 336 } 337 338 /** 339 * Completes with the given (non-null) exceptional result as a 340 * wrapped CompletionException unless it is one already, unless 341 * already completed. May complete with the given Object r 342 * (which must have been the result of a source future) if it is 343 * equivalent, i.e. if this is a simple propagation of an 344 * existing CompletionException. 345 */ completeThrowable(Throwable x, Object r)346 final boolean completeThrowable(Throwable x, Object r) { 347 return RESULT.compareAndSet(this, null, encodeThrowable(x, r)); 348 } 349 350 /** 351 * Returns the encoding of the given arguments: if the exception 352 * is non-null, encodes as AltResult. Otherwise uses the given 353 * value, boxed as NIL if null. 354 */ encodeOutcome(T t, Throwable x)355 Object encodeOutcome(T t, Throwable x) { 356 return (x == null) ? (t == null) ? NIL : t : encodeThrowable(x); 357 } 358 359 /** 360 * Returns the encoding of a copied outcome; if exceptional, 361 * rewraps as a CompletionException, else returns argument. 362 */ encodeRelay(Object r)363 static Object encodeRelay(Object r) { 364 Throwable x; 365 if (r instanceof AltResult 366 && (x = ((AltResult)r).ex) != null 367 && !(x instanceof CompletionException)) 368 r = new AltResult(new CompletionException(x)); 369 return r; 370 } 371 372 /** 373 * Completes with r or a copy of r, unless already completed. 374 * If exceptional, r is first coerced to a CompletionException. 375 */ completeRelay(Object r)376 final boolean completeRelay(Object r) { 377 return RESULT.compareAndSet(this, null, encodeRelay(r)); 378 } 379 380 /** 381 * Reports result using Future.get conventions. 382 */ reportGet(Object r)383 private static Object reportGet(Object r) 384 throws InterruptedException, ExecutionException { 385 if (r == null) // by convention below, null means interrupted 386 throw new InterruptedException(); 387 if (r instanceof AltResult) { 388 Throwable x, cause; 389 if ((x = ((AltResult)r).ex) == null) 390 return null; 391 if (x instanceof CancellationException) 392 throw (CancellationException)x; 393 if ((x instanceof CompletionException) && 394 (cause = x.getCause()) != null) 395 x = cause; 396 throw new ExecutionException(x); 397 } 398 return r; 399 } 400 401 /** 402 * Decodes outcome to return result or throw unchecked exception. 403 */ reportJoin(Object r)404 private static Object reportJoin(Object r) { 405 if (r instanceof AltResult) { 406 Throwable x; 407 if ((x = ((AltResult)r).ex) == null) 408 return null; 409 if (x instanceof CancellationException) 410 throw (CancellationException)x; 411 if (x instanceof CompletionException) 412 throw (CompletionException)x; 413 throw new CompletionException(x); 414 } 415 return r; 416 } 417 418 /* ------------- Async task preliminaries -------------- */ 419 420 /** 421 * A marker interface identifying asynchronous tasks produced by 422 * {@code async} methods. This may be useful for monitoring, 423 * debugging, and tracking asynchronous activities. 424 * 425 * @since 1.8 426 */ 427 public static interface AsynchronousCompletionTask { 428 } 429 430 private static final boolean USE_COMMON_POOL = 431 (ForkJoinPool.getCommonPoolParallelism() > 1); 432 433 /** 434 * Default executor -- ForkJoinPool.commonPool() unless it cannot 435 * support parallelism. 436 */ 437 private static final Executor ASYNC_POOL = USE_COMMON_POOL ? 438 ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); 439 440 /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */ 441 static final class ThreadPerTaskExecutor implements Executor { execute(Runnable r)442 public void execute(Runnable r) { 443 Objects.requireNonNull(r); 444 new Thread(r).start(); 445 } 446 } 447 448 /** 449 * Null-checks user executor argument, and translates uses of 450 * commonPool to ASYNC_POOL in case parallelism disabled. 451 */ screenExecutor(Executor e)452 static Executor screenExecutor(Executor e) { 453 if (!USE_COMMON_POOL && e == ForkJoinPool.commonPool()) 454 return ASYNC_POOL; 455 if (e == null) throw new NullPointerException(); 456 return e; 457 } 458 459 // Modes for Completion.tryFire. Signedness matters. 460 static final int SYNC = 0; 461 static final int ASYNC = 1; 462 static final int NESTED = -1; 463 464 /* ------------- Base Completion classes and operations -------------- */ 465 466 @SuppressWarnings("serial") 467 abstract static class Completion extends ForkJoinTask<Void> 468 implements Runnable, AsynchronousCompletionTask { 469 volatile Completion next; // Treiber stack link 470 471 /** 472 * Performs completion action if triggered, returning a 473 * dependent that may need propagation, if one exists. 474 * 475 * @param mode SYNC, ASYNC, or NESTED 476 */ tryFire(int mode)477 abstract CompletableFuture<?> tryFire(int mode); 478 479 /** Returns true if possibly still triggerable. Used by cleanStack. */ isLive()480 abstract boolean isLive(); 481 run()482 public final void run() { tryFire(ASYNC); } exec()483 public final boolean exec() { tryFire(ASYNC); return false; } getRawResult()484 public final Void getRawResult() { return null; } setRawResult(Void v)485 public final void setRawResult(Void v) {} 486 } 487 488 /** 489 * Pops and tries to trigger all reachable dependents. Call only 490 * when known to be done. 491 */ postComplete()492 final void postComplete() { 493 /* 494 * On each step, variable f holds current dependents to pop 495 * and run. It is extended along only one path at a time, 496 * pushing others to avoid unbounded recursion. 497 */ 498 CompletableFuture<?> f = this; Completion h; 499 while ((h = f.stack) != null || 500 (f != this && (h = (f = this).stack) != null)) { 501 CompletableFuture<?> d; Completion t; 502 if (STACK.compareAndSet(f, h, t = h.next)) { 503 if (t != null) { 504 if (f != this) { 505 pushStack(h); 506 continue; 507 } 508 NEXT.compareAndSet(h, t, null); // try to detach 509 } 510 f = (d = h.tryFire(NESTED)) == null ? this : d; 511 } 512 } 513 } 514 515 /** Traverses stack and unlinks one or more dead Completions, if found. */ cleanStack()516 final void cleanStack() { 517 Completion p = stack; 518 // ensure head of stack live 519 for (boolean unlinked = false;;) { 520 if (p == null) 521 return; 522 else if (p.isLive()) { 523 if (unlinked) 524 return; 525 else 526 break; 527 } 528 else if (STACK.weakCompareAndSet(this, p, (p = p.next))) 529 unlinked = true; 530 else 531 p = stack; 532 } 533 // try to unlink first non-live 534 for (Completion q = p.next; q != null;) { 535 Completion s = q.next; 536 if (q.isLive()) { 537 p = q; 538 q = s; 539 } else if (NEXT.weakCompareAndSet(p, q, s)) 540 break; 541 else 542 q = p.next; 543 } 544 } 545 546 /* ------------- One-input Completions -------------- */ 547 548 /** A Completion with a source, dependent, and executor. */ 549 @SuppressWarnings("serial") 550 abstract static class UniCompletion<T,V> extends Completion { 551 Executor executor; // executor to use (null if none) 552 CompletableFuture<V> dep; // the dependent to complete 553 CompletableFuture<T> src; // source for action 554 UniCompletion(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src)555 UniCompletion(Executor executor, CompletableFuture<V> dep, 556 CompletableFuture<T> src) { 557 this.executor = executor; this.dep = dep; this.src = src; 558 } 559 560 /** 561 * Returns true if action can be run. Call only when known to 562 * be triggerable. Uses FJ tag bit to ensure that only one 563 * thread claims ownership. If async, starts as task -- a 564 * later call to tryFire will run action. 565 */ claim()566 final boolean claim() { 567 Executor e = executor; 568 if (compareAndSetForkJoinTaskTag((short)0, (short)1)) { 569 if (e == null) 570 return true; 571 executor = null; // disable 572 e.execute(this); 573 } 574 return false; 575 } 576 isLive()577 final boolean isLive() { return dep != null; } 578 } 579 580 /** 581 * Pushes the given completion unless it completes while trying. 582 * Caller should first check that result is null. 583 */ unipush(Completion c)584 final void unipush(Completion c) { 585 if (c != null) { 586 while (!tryPushStack(c)) { 587 if (result != null) { 588 NEXT.set(c, null); 589 break; 590 } 591 } 592 if (result != null) 593 c.tryFire(SYNC); 594 } 595 } 596 597 /** 598 * Post-processing by dependent after successful UniCompletion tryFire. 599 * Tries to clean stack of source a, and then either runs postComplete 600 * or returns this to caller, depending on mode. 601 */ postFire(CompletableFuture<?> a, int mode)602 final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) { 603 if (a != null && a.stack != null) { 604 Object r; 605 if ((r = a.result) == null) 606 a.cleanStack(); 607 if (mode >= 0 && (r != null || a.result != null)) 608 a.postComplete(); 609 } 610 if (result != null && stack != null) { 611 if (mode < 0) 612 return this; 613 else 614 postComplete(); 615 } 616 return null; 617 } 618 619 @SuppressWarnings("serial") 620 static final class UniApply<T,V> extends UniCompletion<T,V> { 621 Function<? super T,? extends V> fn; UniApply(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, Function<? super T,? extends V> fn)622 UniApply(Executor executor, CompletableFuture<V> dep, 623 CompletableFuture<T> src, 624 Function<? super T,? extends V> fn) { 625 super(executor, dep, src); this.fn = fn; 626 } tryFire(int mode)627 final CompletableFuture<V> tryFire(int mode) { 628 CompletableFuture<V> d; CompletableFuture<T> a; 629 Object r; Throwable x; Function<? super T,? extends V> f; 630 if ((a = src) == null || (r = a.result) == null 631 || (d = dep) == null || (f = fn) == null) 632 return null; 633 tryComplete: if (d.result == null) { 634 if (r instanceof AltResult) { 635 if ((x = ((AltResult)r).ex) != null) { 636 d.completeThrowable(x, r); 637 break tryComplete; 638 } 639 r = null; 640 } 641 try { 642 if (mode <= 0 && !claim()) 643 return null; 644 else { 645 @SuppressWarnings("unchecked") T t = (T) r; 646 d.completeValue(f.apply(t)); 647 } 648 } catch (Throwable ex) { 649 d.completeThrowable(ex); 650 } 651 } 652 src = null; dep = null; fn = null; 653 return d.postFire(a, mode); 654 } 655 } 656 uniApplyStage( Executor e, Function<? super T,? extends V> f)657 private <V> CompletableFuture<V> uniApplyStage( 658 Executor e, Function<? super T,? extends V> f) { 659 if (f == null) throw new NullPointerException(); 660 Object r; 661 if ((r = result) != null) 662 return uniApplyNow(r, e, f); 663 CompletableFuture<V> d = newIncompleteFuture(); 664 unipush(new UniApply<T,V>(e, d, this, f)); 665 return d; 666 } 667 uniApplyNow( Object r, Executor e, Function<? super T,? extends V> f)668 private <V> CompletableFuture<V> uniApplyNow( 669 Object r, Executor e, Function<? super T,? extends V> f) { 670 Throwable x; 671 CompletableFuture<V> d = newIncompleteFuture(); 672 if (r instanceof AltResult) { 673 if ((x = ((AltResult)r).ex) != null) { 674 d.result = encodeThrowable(x, r); 675 return d; 676 } 677 r = null; 678 } 679 try { 680 if (e != null) { 681 e.execute(new UniApply<T,V>(null, d, this, f)); 682 } else { 683 @SuppressWarnings("unchecked") T t = (T) r; 684 d.result = d.encodeValue(f.apply(t)); 685 } 686 } catch (Throwable ex) { 687 d.result = encodeThrowable(ex); 688 } 689 return d; 690 } 691 692 @SuppressWarnings("serial") 693 static final class UniAccept<T> extends UniCompletion<T,Void> { 694 Consumer<? super T> fn; UniAccept(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, Consumer<? super T> fn)695 UniAccept(Executor executor, CompletableFuture<Void> dep, 696 CompletableFuture<T> src, Consumer<? super T> fn) { 697 super(executor, dep, src); this.fn = fn; 698 } tryFire(int mode)699 final CompletableFuture<Void> tryFire(int mode) { 700 CompletableFuture<Void> d; CompletableFuture<T> a; 701 Object r; Throwable x; Consumer<? super T> f; 702 if ((a = src) == null || (r = a.result) == null 703 || (d = dep) == null || (f = fn) == null) 704 return null; 705 tryComplete: if (d.result == null) { 706 if (r instanceof AltResult) { 707 if ((x = ((AltResult)r).ex) != null) { 708 d.completeThrowable(x, r); 709 break tryComplete; 710 } 711 r = null; 712 } 713 try { 714 if (mode <= 0 && !claim()) 715 return null; 716 else { 717 @SuppressWarnings("unchecked") T t = (T) r; 718 f.accept(t); 719 d.completeNull(); 720 } 721 } catch (Throwable ex) { 722 d.completeThrowable(ex); 723 } 724 } 725 src = null; dep = null; fn = null; 726 return d.postFire(a, mode); 727 } 728 } 729 uniAcceptStage(Executor e, Consumer<? super T> f)730 private CompletableFuture<Void> uniAcceptStage(Executor e, 731 Consumer<? super T> f) { 732 if (f == null) throw new NullPointerException(); 733 Object r; 734 if ((r = result) != null) 735 return uniAcceptNow(r, e, f); 736 CompletableFuture<Void> d = newIncompleteFuture(); 737 unipush(new UniAccept<T>(e, d, this, f)); 738 return d; 739 } 740 uniAcceptNow( Object r, Executor e, Consumer<? super T> f)741 private CompletableFuture<Void> uniAcceptNow( 742 Object r, Executor e, Consumer<? super T> f) { 743 Throwable x; 744 CompletableFuture<Void> d = newIncompleteFuture(); 745 if (r instanceof AltResult) { 746 if ((x = ((AltResult)r).ex) != null) { 747 d.result = encodeThrowable(x, r); 748 return d; 749 } 750 r = null; 751 } 752 try { 753 if (e != null) { 754 e.execute(new UniAccept<T>(null, d, this, f)); 755 } else { 756 @SuppressWarnings("unchecked") T t = (T) r; 757 f.accept(t); 758 d.result = NIL; 759 } 760 } catch (Throwable ex) { 761 d.result = encodeThrowable(ex); 762 } 763 return d; 764 } 765 766 @SuppressWarnings("serial") 767 static final class UniRun<T> extends UniCompletion<T,Void> { 768 Runnable fn; UniRun(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, Runnable fn)769 UniRun(Executor executor, CompletableFuture<Void> dep, 770 CompletableFuture<T> src, Runnable fn) { 771 super(executor, dep, src); this.fn = fn; 772 } tryFire(int mode)773 final CompletableFuture<Void> tryFire(int mode) { 774 CompletableFuture<Void> d; CompletableFuture<T> a; 775 Object r; Throwable x; Runnable f; 776 if ((a = src) == null || (r = a.result) == null 777 || (d = dep) == null || (f = fn) == null) 778 return null; 779 if (d.result == null) { 780 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) 781 d.completeThrowable(x, r); 782 else 783 try { 784 if (mode <= 0 && !claim()) 785 return null; 786 else { 787 f.run(); 788 d.completeNull(); 789 } 790 } catch (Throwable ex) { 791 d.completeThrowable(ex); 792 } 793 } 794 src = null; dep = null; fn = null; 795 return d.postFire(a, mode); 796 } 797 } 798 uniRunStage(Executor e, Runnable f)799 private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) { 800 if (f == null) throw new NullPointerException(); 801 Object r; 802 if ((r = result) != null) 803 return uniRunNow(r, e, f); 804 CompletableFuture<Void> d = newIncompleteFuture(); 805 unipush(new UniRun<T>(e, d, this, f)); 806 return d; 807 } 808 uniRunNow(Object r, Executor e, Runnable f)809 private CompletableFuture<Void> uniRunNow(Object r, Executor e, Runnable f) { 810 Throwable x; 811 CompletableFuture<Void> d = newIncompleteFuture(); 812 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) 813 d.result = encodeThrowable(x, r); 814 else 815 try { 816 if (e != null) { 817 e.execute(new UniRun<T>(null, d, this, f)); 818 } else { 819 f.run(); 820 d.result = NIL; 821 } 822 } catch (Throwable ex) { 823 d.result = encodeThrowable(ex); 824 } 825 return d; 826 } 827 828 @SuppressWarnings("serial") 829 static final class UniWhenComplete<T> extends UniCompletion<T,T> { 830 BiConsumer<? super T, ? super Throwable> fn; UniWhenComplete(Executor executor, CompletableFuture<T> dep, CompletableFuture<T> src, BiConsumer<? super T, ? super Throwable> fn)831 UniWhenComplete(Executor executor, CompletableFuture<T> dep, 832 CompletableFuture<T> src, 833 BiConsumer<? super T, ? super Throwable> fn) { 834 super(executor, dep, src); this.fn = fn; 835 } tryFire(int mode)836 final CompletableFuture<T> tryFire(int mode) { 837 CompletableFuture<T> d; CompletableFuture<T> a; 838 Object r; BiConsumer<? super T, ? super Throwable> f; 839 if ((a = src) == null || (r = a.result) == null 840 || (d = dep) == null || (f = fn) == null 841 || !d.uniWhenComplete(r, f, mode > 0 ? null : this)) 842 return null; 843 src = null; dep = null; fn = null; 844 return d.postFire(a, mode); 845 } 846 } 847 uniWhenComplete(Object r, BiConsumer<? super T,? super Throwable> f, UniWhenComplete<T> c)848 final boolean uniWhenComplete(Object r, 849 BiConsumer<? super T,? super Throwable> f, 850 UniWhenComplete<T> c) { 851 T t; Throwable x = null; 852 if (result == null) { 853 try { 854 if (c != null && !c.claim()) 855 return false; 856 if (r instanceof AltResult) { 857 x = ((AltResult)r).ex; 858 t = null; 859 } else { 860 @SuppressWarnings("unchecked") T tr = (T) r; 861 t = tr; 862 } 863 f.accept(t, x); 864 if (x == null) { 865 internalComplete(r); 866 return true; 867 } 868 } catch (Throwable ex) { 869 if (x == null) 870 x = ex; 871 else if (x != ex) 872 x.addSuppressed(ex); 873 } 874 completeThrowable(x, r); 875 } 876 return true; 877 } 878 uniWhenCompleteStage( Executor e, BiConsumer<? super T, ? super Throwable> f)879 private CompletableFuture<T> uniWhenCompleteStage( 880 Executor e, BiConsumer<? super T, ? super Throwable> f) { 881 if (f == null) throw new NullPointerException(); 882 CompletableFuture<T> d = newIncompleteFuture(); 883 Object r; 884 if ((r = result) == null) 885 unipush(new UniWhenComplete<T>(e, d, this, f)); 886 else if (e == null) 887 d.uniWhenComplete(r, f, null); 888 else { 889 try { 890 e.execute(new UniWhenComplete<T>(null, d, this, f)); 891 } catch (Throwable ex) { 892 d.result = encodeThrowable(ex); 893 } 894 } 895 return d; 896 } 897 898 @SuppressWarnings("serial") 899 static final class UniHandle<T,V> extends UniCompletion<T,V> { 900 BiFunction<? super T, Throwable, ? extends V> fn; UniHandle(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, BiFunction<? super T, Throwable, ? extends V> fn)901 UniHandle(Executor executor, CompletableFuture<V> dep, 902 CompletableFuture<T> src, 903 BiFunction<? super T, Throwable, ? extends V> fn) { 904 super(executor, dep, src); this.fn = fn; 905 } tryFire(int mode)906 final CompletableFuture<V> tryFire(int mode) { 907 CompletableFuture<V> d; CompletableFuture<T> a; 908 Object r; BiFunction<? super T, Throwable, ? extends V> f; 909 if ((a = src) == null || (r = a.result) == null 910 || (d = dep) == null || (f = fn) == null 911 || !d.uniHandle(r, f, mode > 0 ? null : this)) 912 return null; 913 src = null; dep = null; fn = null; 914 return d.postFire(a, mode); 915 } 916 } 917 uniHandle(Object r, BiFunction<? super S, Throwable, ? extends T> f, UniHandle<S,T> c)918 final <S> boolean uniHandle(Object r, 919 BiFunction<? super S, Throwable, ? extends T> f, 920 UniHandle<S,T> c) { 921 S s; Throwable x; 922 if (result == null) { 923 try { 924 if (c != null && !c.claim()) 925 return false; 926 if (r instanceof AltResult) { 927 x = ((AltResult)r).ex; 928 s = null; 929 } else { 930 x = null; 931 @SuppressWarnings("unchecked") S ss = (S) r; 932 s = ss; 933 } 934 completeValue(f.apply(s, x)); 935 } catch (Throwable ex) { 936 completeThrowable(ex); 937 } 938 } 939 return true; 940 } 941 uniHandleStage( Executor e, BiFunction<? super T, Throwable, ? extends V> f)942 private <V> CompletableFuture<V> uniHandleStage( 943 Executor e, BiFunction<? super T, Throwable, ? extends V> f) { 944 if (f == null) throw new NullPointerException(); 945 CompletableFuture<V> d = newIncompleteFuture(); 946 Object r; 947 if ((r = result) == null) 948 unipush(new UniHandle<T,V>(e, d, this, f)); 949 else if (e == null) 950 d.uniHandle(r, f, null); 951 else { 952 try { 953 e.execute(new UniHandle<T,V>(null, d, this, f)); 954 } catch (Throwable ex) { 955 d.result = encodeThrowable(ex); 956 } 957 } 958 return d; 959 } 960 961 @SuppressWarnings("serial") 962 static final class UniExceptionally<T> extends UniCompletion<T,T> { 963 Function<? super Throwable, ? extends T> fn; UniExceptionally(Executor executor, CompletableFuture<T> dep, CompletableFuture<T> src, Function<? super Throwable, ? extends T> fn)964 UniExceptionally(Executor executor, 965 CompletableFuture<T> dep, CompletableFuture<T> src, 966 Function<? super Throwable, ? extends T> fn) { 967 super(executor, dep, src); this.fn = fn; 968 } tryFire(int mode)969 final CompletableFuture<T> tryFire(int mode) { 970 CompletableFuture<T> d; CompletableFuture<T> a; 971 Object r; Function<? super Throwable, ? extends T> f; 972 if ((a = src) == null || (r = a.result) == null 973 || (d = dep) == null || (f = fn) == null 974 || !d.uniExceptionally(r, f, mode > 0 ? null : this)) 975 return null; 976 src = null; dep = null; fn = null; 977 return d.postFire(a, mode); 978 } 979 } 980 uniExceptionally(Object r, Function<? super Throwable, ? extends T> f, UniExceptionally<T> c)981 final boolean uniExceptionally(Object r, 982 Function<? super Throwable, ? extends T> f, 983 UniExceptionally<T> c) { 984 Throwable x; 985 if (result == null) { 986 try { 987 if (c != null && !c.claim()) 988 return false; 989 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) 990 completeValue(f.apply(x)); 991 else 992 internalComplete(r); 993 } catch (Throwable ex) { 994 completeThrowable(ex); 995 } 996 } 997 return true; 998 } 999 uniExceptionallyStage( Executor e, Function<Throwable, ? extends T> f)1000 private CompletableFuture<T> uniExceptionallyStage( 1001 Executor e, Function<Throwable, ? extends T> f) { 1002 if (f == null) throw new NullPointerException(); 1003 CompletableFuture<T> d = newIncompleteFuture(); 1004 Object r; 1005 if ((r = result) == null) 1006 unipush(new UniExceptionally<T>(e, d, this, f)); 1007 else if (e == null) 1008 d.uniExceptionally(r, f, null); 1009 else { 1010 try { 1011 e.execute(new UniExceptionally<T>(null, d, this, f)); 1012 } catch (Throwable ex) { 1013 d.result = encodeThrowable(ex); 1014 } 1015 } 1016 return d; 1017 } 1018 1019 @SuppressWarnings("serial") 1020 static final class UniComposeExceptionally<T> extends UniCompletion<T,T> { 1021 Function<Throwable, ? extends CompletionStage<T>> fn; UniComposeExceptionally(Executor executor, CompletableFuture<T> dep, CompletableFuture<T> src, Function<Throwable, ? extends CompletionStage<T>> fn)1022 UniComposeExceptionally(Executor executor, CompletableFuture<T> dep, 1023 CompletableFuture<T> src, 1024 Function<Throwable, ? extends CompletionStage<T>> fn) { 1025 super(executor, dep, src); this.fn = fn; 1026 } tryFire(int mode)1027 final CompletableFuture<T> tryFire(int mode) { 1028 CompletableFuture<T> d; CompletableFuture<T> a; 1029 Function<Throwable, ? extends CompletionStage<T>> f; 1030 Object r; Throwable x; 1031 if ((a = src) == null || (r = a.result) == null 1032 || (d = dep) == null || (f = fn) == null) 1033 return null; 1034 if (d.result == null) { 1035 if ((r instanceof AltResult) && 1036 (x = ((AltResult)r).ex) != null) { 1037 try { 1038 if (mode <= 0 && !claim()) 1039 return null; 1040 CompletableFuture<T> g = f.apply(x).toCompletableFuture(); 1041 if ((r = g.result) != null) 1042 d.completeRelay(r); 1043 else { 1044 g.unipush(new UniRelay<T,T>(d, g)); 1045 if (d.result == null) 1046 return null; 1047 } 1048 } catch (Throwable ex) { 1049 d.completeThrowable(ex); 1050 } 1051 } 1052 else 1053 d.internalComplete(r); 1054 } 1055 src = null; dep = null; fn = null; 1056 return d.postFire(a, mode); 1057 } 1058 } 1059 uniComposeExceptionallyStage( Executor e, Function<Throwable, ? extends CompletionStage<T>> f)1060 private CompletableFuture<T> uniComposeExceptionallyStage( 1061 Executor e, Function<Throwable, ? extends CompletionStage<T>> f) { 1062 if (f == null) throw new NullPointerException(); 1063 CompletableFuture<T> d = newIncompleteFuture(); 1064 Object r, s; Throwable x; 1065 if ((r = result) == null) 1066 unipush(new UniComposeExceptionally<T>(e, d, this, f)); 1067 else if (!(r instanceof AltResult) || (x = ((AltResult)r).ex) == null) 1068 d.internalComplete(r); 1069 else 1070 try { 1071 if (e != null) 1072 e.execute(new UniComposeExceptionally<T>(null, d, this, f)); 1073 else { 1074 CompletableFuture<T> g = f.apply(x).toCompletableFuture(); 1075 if ((s = g.result) != null) 1076 d.result = encodeRelay(s); 1077 else 1078 g.unipush(new UniRelay<T,T>(d, g)); 1079 } 1080 } catch (Throwable ex) { 1081 d.result = encodeThrowable(ex); 1082 } 1083 return d; 1084 } 1085 1086 @SuppressWarnings("serial") 1087 static final class UniRelay<U, T extends U> extends UniCompletion<T,U> { UniRelay(CompletableFuture<U> dep, CompletableFuture<T> src)1088 UniRelay(CompletableFuture<U> dep, CompletableFuture<T> src) { 1089 super(null, dep, src); 1090 } tryFire(int mode)1091 final CompletableFuture<U> tryFire(int mode) { 1092 CompletableFuture<U> d; CompletableFuture<T> a; Object r; 1093 if ((a = src) == null || (r = a.result) == null 1094 || (d = dep) == null) 1095 return null; 1096 if (d.result == null) 1097 d.completeRelay(r); 1098 src = null; dep = null; 1099 return d.postFire(a, mode); 1100 } 1101 } 1102 uniCopyStage( CompletableFuture<T> src)1103 private static <U, T extends U> CompletableFuture<U> uniCopyStage( 1104 CompletableFuture<T> src) { 1105 Object r; 1106 CompletableFuture<U> d = src.newIncompleteFuture(); 1107 if ((r = src.result) != null) 1108 d.result = encodeRelay(r); 1109 else 1110 src.unipush(new UniRelay<U,T>(d, src)); 1111 return d; 1112 } 1113 uniAsMinimalStage()1114 private MinimalStage<T> uniAsMinimalStage() { 1115 Object r; 1116 if ((r = result) != null) 1117 return new MinimalStage<T>(encodeRelay(r)); 1118 MinimalStage<T> d = new MinimalStage<T>(); 1119 unipush(new UniRelay<T,T>(d, this)); 1120 return d; 1121 } 1122 1123 @SuppressWarnings("serial") 1124 static final class UniCompose<T,V> extends UniCompletion<T,V> { 1125 Function<? super T, ? extends CompletionStage<V>> fn; UniCompose(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, Function<? super T, ? extends CompletionStage<V>> fn)1126 UniCompose(Executor executor, CompletableFuture<V> dep, 1127 CompletableFuture<T> src, 1128 Function<? super T, ? extends CompletionStage<V>> fn) { 1129 super(executor, dep, src); this.fn = fn; 1130 } tryFire(int mode)1131 final CompletableFuture<V> tryFire(int mode) { 1132 CompletableFuture<V> d; CompletableFuture<T> a; 1133 Function<? super T, ? extends CompletionStage<V>> f; 1134 Object r; Throwable x; 1135 if ((a = src) == null || (r = a.result) == null 1136 || (d = dep) == null || (f = fn) == null) 1137 return null; 1138 tryComplete: if (d.result == null) { 1139 if (r instanceof AltResult) { 1140 if ((x = ((AltResult)r).ex) != null) { 1141 d.completeThrowable(x, r); 1142 break tryComplete; 1143 } 1144 r = null; 1145 } 1146 try { 1147 if (mode <= 0 && !claim()) 1148 return null; 1149 @SuppressWarnings("unchecked") T t = (T) r; 1150 CompletableFuture<V> g = f.apply(t).toCompletableFuture(); 1151 if ((r = g.result) != null) 1152 d.completeRelay(r); 1153 else { 1154 g.unipush(new UniRelay<V,V>(d, g)); 1155 if (d.result == null) 1156 return null; 1157 } 1158 } catch (Throwable ex) { 1159 d.completeThrowable(ex); 1160 } 1161 } 1162 src = null; dep = null; fn = null; 1163 return d.postFire(a, mode); 1164 } 1165 } 1166 uniComposeStage( Executor e, Function<? super T, ? extends CompletionStage<V>> f)1167 private <V> CompletableFuture<V> uniComposeStage( 1168 Executor e, Function<? super T, ? extends CompletionStage<V>> f) { 1169 if (f == null) throw new NullPointerException(); 1170 CompletableFuture<V> d = newIncompleteFuture(); 1171 Object r, s; Throwable x; 1172 if ((r = result) == null) 1173 unipush(new UniCompose<T,V>(e, d, this, f)); 1174 else { 1175 if (r instanceof AltResult) { 1176 if ((x = ((AltResult)r).ex) != null) { 1177 d.result = encodeThrowable(x, r); 1178 return d; 1179 } 1180 r = null; 1181 } 1182 try { 1183 if (e != null) 1184 e.execute(new UniCompose<T,V>(null, d, this, f)); 1185 else { 1186 @SuppressWarnings("unchecked") T t = (T) r; 1187 CompletableFuture<V> g = f.apply(t).toCompletableFuture(); 1188 if ((s = g.result) != null) 1189 d.result = encodeRelay(s); 1190 else 1191 g.unipush(new UniRelay<V,V>(d, g)); 1192 } 1193 } catch (Throwable ex) { 1194 d.result = encodeThrowable(ex); 1195 } 1196 } 1197 return d; 1198 } 1199 1200 /* ------------- Two-input Completions -------------- */ 1201 1202 /** A Completion for an action with two sources */ 1203 @SuppressWarnings("serial") 1204 abstract static class BiCompletion<T,U,V> extends UniCompletion<T,V> { 1205 CompletableFuture<U> snd; // second source for action BiCompletion(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, CompletableFuture<U> snd)1206 BiCompletion(Executor executor, CompletableFuture<V> dep, 1207 CompletableFuture<T> src, CompletableFuture<U> snd) { 1208 super(executor, dep, src); this.snd = snd; 1209 } 1210 } 1211 1212 /** A Completion delegating to a BiCompletion */ 1213 @SuppressWarnings("serial") 1214 static final class CoCompletion extends Completion { 1215 BiCompletion<?,?,?> base; CoCompletion(BiCompletion<?,?,?> base)1216 CoCompletion(BiCompletion<?,?,?> base) { this.base = base; } tryFire(int mode)1217 final CompletableFuture<?> tryFire(int mode) { 1218 BiCompletion<?,?,?> c; CompletableFuture<?> d; 1219 if ((c = base) == null || (d = c.tryFire(mode)) == null) 1220 return null; 1221 base = null; // detach 1222 return d; 1223 } isLive()1224 final boolean isLive() { 1225 BiCompletion<?,?,?> c; 1226 return (c = base) != null 1227 // && c.isLive() 1228 && c.dep != null; 1229 } 1230 } 1231 1232 /** 1233 * Pushes completion to this and b unless both done. 1234 * Caller should first check that either result or b.result is null. 1235 */ bipush(CompletableFuture<?> b, BiCompletion<?,?,?> c)1236 final void bipush(CompletableFuture<?> b, BiCompletion<?,?,?> c) { 1237 if (c != null) { 1238 while (result == null) { 1239 if (tryPushStack(c)) { 1240 if (b.result == null) 1241 b.unipush(new CoCompletion(c)); 1242 else if (result != null) 1243 c.tryFire(SYNC); 1244 return; 1245 } 1246 } 1247 b.unipush(c); 1248 } 1249 } 1250 1251 /** Post-processing after successful BiCompletion tryFire. */ postFire(CompletableFuture<?> a, CompletableFuture<?> b, int mode)1252 final CompletableFuture<T> postFire(CompletableFuture<?> a, 1253 CompletableFuture<?> b, int mode) { 1254 if (b != null && b.stack != null) { // clean second source 1255 Object r; 1256 if ((r = b.result) == null) 1257 b.cleanStack(); 1258 if (mode >= 0 && (r != null || b.result != null)) 1259 b.postComplete(); 1260 } 1261 return postFire(a, mode); 1262 } 1263 1264 @SuppressWarnings("serial") 1265 static final class BiApply<T,U,V> extends BiCompletion<T,U,V> { 1266 BiFunction<? super T,? super U,? extends V> fn; BiApply(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, CompletableFuture<U> snd, BiFunction<? super T,? super U,? extends V> fn)1267 BiApply(Executor executor, CompletableFuture<V> dep, 1268 CompletableFuture<T> src, CompletableFuture<U> snd, 1269 BiFunction<? super T,? super U,? extends V> fn) { 1270 super(executor, dep, src, snd); this.fn = fn; 1271 } tryFire(int mode)1272 final CompletableFuture<V> tryFire(int mode) { 1273 CompletableFuture<V> d; 1274 CompletableFuture<T> a; 1275 CompletableFuture<U> b; 1276 Object r, s; BiFunction<? super T,? super U,? extends V> f; 1277 if ( (a = src) == null || (r = a.result) == null 1278 || (b = snd) == null || (s = b.result) == null 1279 || (d = dep) == null || (f = fn) == null 1280 || !d.biApply(r, s, f, mode > 0 ? null : this)) 1281 return null; 1282 src = null; snd = null; dep = null; fn = null; 1283 return d.postFire(a, b, mode); 1284 } 1285 } 1286 biApply(Object r, Object s, BiFunction<? super R,? super S,? extends T> f, BiApply<R,S,T> c)1287 final <R,S> boolean biApply(Object r, Object s, 1288 BiFunction<? super R,? super S,? extends T> f, 1289 BiApply<R,S,T> c) { 1290 Throwable x; 1291 tryComplete: if (result == null) { 1292 if (r instanceof AltResult) { 1293 if ((x = ((AltResult)r).ex) != null) { 1294 completeThrowable(x, r); 1295 break tryComplete; 1296 } 1297 r = null; 1298 } 1299 if (s instanceof AltResult) { 1300 if ((x = ((AltResult)s).ex) != null) { 1301 completeThrowable(x, s); 1302 break tryComplete; 1303 } 1304 s = null; 1305 } 1306 try { 1307 if (c != null && !c.claim()) 1308 return false; 1309 @SuppressWarnings("unchecked") R rr = (R) r; 1310 @SuppressWarnings("unchecked") S ss = (S) s; 1311 completeValue(f.apply(rr, ss)); 1312 } catch (Throwable ex) { 1313 completeThrowable(ex); 1314 } 1315 } 1316 return true; 1317 } 1318 biApplyStage( Executor e, CompletionStage<U> o, BiFunction<? super T,? super U,? extends V> f)1319 private <U,V> CompletableFuture<V> biApplyStage( 1320 Executor e, CompletionStage<U> o, 1321 BiFunction<? super T,? super U,? extends V> f) { 1322 CompletableFuture<U> b; Object r, s; 1323 if (f == null || (b = o.toCompletableFuture()) == null) 1324 throw new NullPointerException(); 1325 CompletableFuture<V> d = newIncompleteFuture(); 1326 if ((r = result) == null || (s = b.result) == null) 1327 bipush(b, new BiApply<T,U,V>(e, d, this, b, f)); 1328 else if (e == null) 1329 d.biApply(r, s, f, null); 1330 else 1331 try { 1332 e.execute(new BiApply<T,U,V>(null, d, this, b, f)); 1333 } catch (Throwable ex) { 1334 d.result = encodeThrowable(ex); 1335 } 1336 return d; 1337 } 1338 1339 @SuppressWarnings("serial") 1340 static final class BiAccept<T,U> extends BiCompletion<T,U,Void> { 1341 BiConsumer<? super T,? super U> fn; BiAccept(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd, BiConsumer<? super T,? super U> fn)1342 BiAccept(Executor executor, CompletableFuture<Void> dep, 1343 CompletableFuture<T> src, CompletableFuture<U> snd, 1344 BiConsumer<? super T,? super U> fn) { 1345 super(executor, dep, src, snd); this.fn = fn; 1346 } tryFire(int mode)1347 final CompletableFuture<Void> tryFire(int mode) { 1348 CompletableFuture<Void> d; 1349 CompletableFuture<T> a; 1350 CompletableFuture<U> b; 1351 Object r, s; BiConsumer<? super T,? super U> f; 1352 if ( (a = src) == null || (r = a.result) == null 1353 || (b = snd) == null || (s = b.result) == null 1354 || (d = dep) == null || (f = fn) == null 1355 || !d.biAccept(r, s, f, mode > 0 ? null : this)) 1356 return null; 1357 src = null; snd = null; dep = null; fn = null; 1358 return d.postFire(a, b, mode); 1359 } 1360 } 1361 biAccept(Object r, Object s, BiConsumer<? super R,? super S> f, BiAccept<R,S> c)1362 final <R,S> boolean biAccept(Object r, Object s, 1363 BiConsumer<? super R,? super S> f, 1364 BiAccept<R,S> c) { 1365 Throwable x; 1366 tryComplete: if (result == null) { 1367 if (r instanceof AltResult) { 1368 if ((x = ((AltResult)r).ex) != null) { 1369 completeThrowable(x, r); 1370 break tryComplete; 1371 } 1372 r = null; 1373 } 1374 if (s instanceof AltResult) { 1375 if ((x = ((AltResult)s).ex) != null) { 1376 completeThrowable(x, s); 1377 break tryComplete; 1378 } 1379 s = null; 1380 } 1381 try { 1382 if (c != null && !c.claim()) 1383 return false; 1384 @SuppressWarnings("unchecked") R rr = (R) r; 1385 @SuppressWarnings("unchecked") S ss = (S) s; 1386 f.accept(rr, ss); 1387 completeNull(); 1388 } catch (Throwable ex) { 1389 completeThrowable(ex); 1390 } 1391 } 1392 return true; 1393 } 1394 biAcceptStage( Executor e, CompletionStage<U> o, BiConsumer<? super T,? super U> f)1395 private <U> CompletableFuture<Void> biAcceptStage( 1396 Executor e, CompletionStage<U> o, 1397 BiConsumer<? super T,? super U> f) { 1398 CompletableFuture<U> b; Object r, s; 1399 if (f == null || (b = o.toCompletableFuture()) == null) 1400 throw new NullPointerException(); 1401 CompletableFuture<Void> d = newIncompleteFuture(); 1402 if ((r = result) == null || (s = b.result) == null) 1403 bipush(b, new BiAccept<T,U>(e, d, this, b, f)); 1404 else if (e == null) 1405 d.biAccept(r, s, f, null); 1406 else 1407 try { 1408 e.execute(new BiAccept<T,U>(null, d, this, b, f)); 1409 } catch (Throwable ex) { 1410 d.result = encodeThrowable(ex); 1411 } 1412 return d; 1413 } 1414 1415 @SuppressWarnings("serial") 1416 static final class BiRun<T,U> extends BiCompletion<T,U,Void> { 1417 Runnable fn; BiRun(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd, Runnable fn)1418 BiRun(Executor executor, CompletableFuture<Void> dep, 1419 CompletableFuture<T> src, CompletableFuture<U> snd, 1420 Runnable fn) { 1421 super(executor, dep, src, snd); this.fn = fn; 1422 } tryFire(int mode)1423 final CompletableFuture<Void> tryFire(int mode) { 1424 CompletableFuture<Void> d; 1425 CompletableFuture<T> a; 1426 CompletableFuture<U> b; 1427 Object r, s; Runnable f; 1428 if ( (a = src) == null || (r = a.result) == null 1429 || (b = snd) == null || (s = b.result) == null 1430 || (d = dep) == null || (f = fn) == null 1431 || !d.biRun(r, s, f, mode > 0 ? null : this)) 1432 return null; 1433 src = null; snd = null; dep = null; fn = null; 1434 return d.postFire(a, b, mode); 1435 } 1436 } 1437 biRun(Object r, Object s, Runnable f, BiRun<?,?> c)1438 final boolean biRun(Object r, Object s, Runnable f, BiRun<?,?> c) { 1439 Throwable x; Object z; 1440 if (result == null) { 1441 if ((r instanceof AltResult 1442 && (x = ((AltResult)(z = r)).ex) != null) || 1443 (s instanceof AltResult 1444 && (x = ((AltResult)(z = s)).ex) != null)) 1445 completeThrowable(x, z); 1446 else 1447 try { 1448 if (c != null && !c.claim()) 1449 return false; 1450 f.run(); 1451 completeNull(); 1452 } catch (Throwable ex) { 1453 completeThrowable(ex); 1454 } 1455 } 1456 return true; 1457 } 1458 biRunStage(Executor e, CompletionStage<?> o, Runnable f)1459 private CompletableFuture<Void> biRunStage(Executor e, CompletionStage<?> o, 1460 Runnable f) { 1461 CompletableFuture<?> b; Object r, s; 1462 if (f == null || (b = o.toCompletableFuture()) == null) 1463 throw new NullPointerException(); 1464 CompletableFuture<Void> d = newIncompleteFuture(); 1465 if ((r = result) == null || (s = b.result) == null) 1466 bipush(b, new BiRun<>(e, d, this, b, f)); 1467 else if (e == null) 1468 d.biRun(r, s, f, null); 1469 else 1470 try { 1471 e.execute(new BiRun<>(null, d, this, b, f)); 1472 } catch (Throwable ex) { 1473 d.result = encodeThrowable(ex); 1474 } 1475 return d; 1476 } 1477 1478 @SuppressWarnings("serial") 1479 static final class BiRelay<T,U> extends BiCompletion<T,U,Void> { // for And BiRelay(CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd)1480 BiRelay(CompletableFuture<Void> dep, 1481 CompletableFuture<T> src, CompletableFuture<U> snd) { 1482 super(null, dep, src, snd); 1483 } tryFire(int mode)1484 final CompletableFuture<Void> tryFire(int mode) { 1485 CompletableFuture<Void> d; 1486 CompletableFuture<T> a; 1487 CompletableFuture<U> b; 1488 Object r, s, z; Throwable x; 1489 if ( (a = src) == null || (r = a.result) == null 1490 || (b = snd) == null || (s = b.result) == null 1491 || (d = dep) == null) 1492 return null; 1493 if (d.result == null) { 1494 if ((r instanceof AltResult 1495 && (x = ((AltResult)(z = r)).ex) != null) || 1496 (s instanceof AltResult 1497 && (x = ((AltResult)(z = s)).ex) != null)) 1498 d.completeThrowable(x, z); 1499 else 1500 d.completeNull(); 1501 } 1502 src = null; snd = null; dep = null; 1503 return d.postFire(a, b, mode); 1504 } 1505 } 1506 1507 /** Recursively constructs a tree of completions. */ andTree(CompletableFuture<?>[] cfs, int lo, int hi)1508 static CompletableFuture<Void> andTree(CompletableFuture<?>[] cfs, 1509 int lo, int hi) { 1510 CompletableFuture<Void> d = new CompletableFuture<Void>(); 1511 if (lo > hi) // empty 1512 d.result = NIL; 1513 else { 1514 CompletableFuture<?> a, b; Object r, s, z; Throwable x; 1515 int mid = (lo + hi) >>> 1; 1516 if ((a = (lo == mid ? cfs[lo] : 1517 andTree(cfs, lo, mid))) == null || 1518 (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] : 1519 andTree(cfs, mid+1, hi))) == null) 1520 throw new NullPointerException(); 1521 if ((r = a.result) == null || (s = b.result) == null) 1522 a.bipush(b, new BiRelay<>(d, a, b)); 1523 else if ((r instanceof AltResult 1524 && (x = ((AltResult)(z = r)).ex) != null) || 1525 (s instanceof AltResult 1526 && (x = ((AltResult)(z = s)).ex) != null)) 1527 d.result = encodeThrowable(x, z); 1528 else 1529 d.result = NIL; 1530 } 1531 return d; 1532 } 1533 1534 /* ------------- Projected (Ored) BiCompletions -------------- */ 1535 1536 /** 1537 * Pushes completion to this and b unless either done. 1538 * Caller should first check that result and b.result are both null. 1539 */ orpush(CompletableFuture<?> b, BiCompletion<?,?,?> c)1540 final void orpush(CompletableFuture<?> b, BiCompletion<?,?,?> c) { 1541 if (c != null) { 1542 while (!tryPushStack(c)) { 1543 if (result != null) { 1544 NEXT.set(c, null); 1545 break; 1546 } 1547 } 1548 if (result != null) 1549 c.tryFire(SYNC); 1550 else 1551 b.unipush(new CoCompletion(c)); 1552 } 1553 } 1554 1555 @SuppressWarnings("serial") 1556 static final class OrApply<T,U extends T,V> extends BiCompletion<T,U,V> { 1557 Function<? super T,? extends V> fn; OrApply(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, CompletableFuture<U> snd, Function<? super T,? extends V> fn)1558 OrApply(Executor executor, CompletableFuture<V> dep, 1559 CompletableFuture<T> src, CompletableFuture<U> snd, 1560 Function<? super T,? extends V> fn) { 1561 super(executor, dep, src, snd); this.fn = fn; 1562 } tryFire(int mode)1563 final CompletableFuture<V> tryFire(int mode) { 1564 CompletableFuture<V> d; CompletableFuture<? extends T> a, b; 1565 Object r; Throwable x; Function<? super T,? extends V> f; 1566 if ((a = src) == null || (b = snd) == null 1567 || ((r = a.result) == null && (r = b.result) == null) 1568 || (d = dep) == null || (f = fn) == null) 1569 return null; 1570 tryComplete: if (d.result == null) { 1571 try { 1572 if (mode <= 0 && !claim()) 1573 return null; 1574 if (r instanceof AltResult) { 1575 if ((x = ((AltResult)r).ex) != null) { 1576 d.completeThrowable(x, r); 1577 break tryComplete; 1578 } 1579 r = null; 1580 } 1581 @SuppressWarnings("unchecked") T t = (T) r; 1582 d.completeValue(f.apply(t)); 1583 } catch (Throwable ex) { 1584 d.completeThrowable(ex); 1585 } 1586 } 1587 src = null; snd = null; dep = null; fn = null; 1588 return d.postFire(a, b, mode); 1589 } 1590 } 1591 orApplyStage( Executor e, CompletionStage<U> o, Function<? super T, ? extends V> f)1592 private <U extends T,V> CompletableFuture<V> orApplyStage( 1593 Executor e, CompletionStage<U> o, Function<? super T, ? extends V> f) { 1594 CompletableFuture<U> b; 1595 if (f == null || (b = o.toCompletableFuture()) == null) 1596 throw new NullPointerException(); 1597 1598 Object r; CompletableFuture<? extends T> z; 1599 if ((r = (z = this).result) != null || 1600 (r = (z = b).result) != null) 1601 return z.uniApplyNow(r, e, f); 1602 1603 CompletableFuture<V> d = newIncompleteFuture(); 1604 orpush(b, new OrApply<T,U,V>(e, d, this, b, f)); 1605 return d; 1606 } 1607 1608 @SuppressWarnings("serial") 1609 static final class OrAccept<T,U extends T> extends BiCompletion<T,U,Void> { 1610 Consumer<? super T> fn; OrAccept(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd, Consumer<? super T> fn)1611 OrAccept(Executor executor, CompletableFuture<Void> dep, 1612 CompletableFuture<T> src, CompletableFuture<U> snd, 1613 Consumer<? super T> fn) { 1614 super(executor, dep, src, snd); this.fn = fn; 1615 } tryFire(int mode)1616 final CompletableFuture<Void> tryFire(int mode) { 1617 CompletableFuture<Void> d; CompletableFuture<? extends T> a, b; 1618 Object r; Throwable x; Consumer<? super T> f; 1619 if ((a = src) == null || (b = snd) == null 1620 || ((r = a.result) == null && (r = b.result) == null) 1621 || (d = dep) == null || (f = fn) == null) 1622 return null; 1623 tryComplete: if (d.result == null) { 1624 try { 1625 if (mode <= 0 && !claim()) 1626 return null; 1627 if (r instanceof AltResult) { 1628 if ((x = ((AltResult)r).ex) != null) { 1629 d.completeThrowable(x, r); 1630 break tryComplete; 1631 } 1632 r = null; 1633 } 1634 @SuppressWarnings("unchecked") T t = (T) r; 1635 f.accept(t); 1636 d.completeNull(); 1637 } catch (Throwable ex) { 1638 d.completeThrowable(ex); 1639 } 1640 } 1641 src = null; snd = null; dep = null; fn = null; 1642 return d.postFire(a, b, mode); 1643 } 1644 } 1645 orAcceptStage( Executor e, CompletionStage<U> o, Consumer<? super T> f)1646 private <U extends T> CompletableFuture<Void> orAcceptStage( 1647 Executor e, CompletionStage<U> o, Consumer<? super T> f) { 1648 CompletableFuture<U> b; 1649 if (f == null || (b = o.toCompletableFuture()) == null) 1650 throw new NullPointerException(); 1651 1652 Object r; CompletableFuture<? extends T> z; 1653 if ((r = (z = this).result) != null || 1654 (r = (z = b).result) != null) 1655 return z.uniAcceptNow(r, e, f); 1656 1657 CompletableFuture<Void> d = newIncompleteFuture(); 1658 orpush(b, new OrAccept<T,U>(e, d, this, b, f)); 1659 return d; 1660 } 1661 1662 @SuppressWarnings("serial") 1663 static final class OrRun<T,U> extends BiCompletion<T,U,Void> { 1664 Runnable fn; OrRun(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd, Runnable fn)1665 OrRun(Executor executor, CompletableFuture<Void> dep, 1666 CompletableFuture<T> src, CompletableFuture<U> snd, 1667 Runnable fn) { 1668 super(executor, dep, src, snd); this.fn = fn; 1669 } tryFire(int mode)1670 final CompletableFuture<Void> tryFire(int mode) { 1671 CompletableFuture<Void> d; CompletableFuture<?> a, b; 1672 Object r; Throwable x; Runnable f; 1673 if ((a = src) == null || (b = snd) == null 1674 || ((r = a.result) == null && (r = b.result) == null) 1675 || (d = dep) == null || (f = fn) == null) 1676 return null; 1677 if (d.result == null) { 1678 try { 1679 if (mode <= 0 && !claim()) 1680 return null; 1681 else if (r instanceof AltResult 1682 && (x = ((AltResult)r).ex) != null) 1683 d.completeThrowable(x, r); 1684 else { 1685 f.run(); 1686 d.completeNull(); 1687 } 1688 } catch (Throwable ex) { 1689 d.completeThrowable(ex); 1690 } 1691 } 1692 src = null; snd = null; dep = null; fn = null; 1693 return d.postFire(a, b, mode); 1694 } 1695 } 1696 orRunStage(Executor e, CompletionStage<?> o, Runnable f)1697 private CompletableFuture<Void> orRunStage(Executor e, CompletionStage<?> o, 1698 Runnable f) { 1699 CompletableFuture<?> b; 1700 if (f == null || (b = o.toCompletableFuture()) == null) 1701 throw new NullPointerException(); 1702 1703 Object r; CompletableFuture<?> z; 1704 if ((r = (z = this).result) != null || 1705 (r = (z = b).result) != null) 1706 return z.uniRunNow(r, e, f); 1707 1708 CompletableFuture<Void> d = newIncompleteFuture(); 1709 orpush(b, new OrRun<>(e, d, this, b, f)); 1710 return d; 1711 } 1712 1713 /** Completion for an anyOf input future. */ 1714 @SuppressWarnings("serial") 1715 static class AnyOf extends Completion { 1716 CompletableFuture<Object> dep; CompletableFuture<?> src; 1717 CompletableFuture<?>[] srcs; AnyOf(CompletableFuture<Object> dep, CompletableFuture<?> src, CompletableFuture<?>[] srcs)1718 AnyOf(CompletableFuture<Object> dep, CompletableFuture<?> src, 1719 CompletableFuture<?>[] srcs) { 1720 this.dep = dep; this.src = src; this.srcs = srcs; 1721 } tryFire(int mode)1722 final CompletableFuture<Object> tryFire(int mode) { 1723 // assert mode != ASYNC; 1724 CompletableFuture<Object> d; CompletableFuture<?> a; 1725 CompletableFuture<?>[] as; 1726 Object r; 1727 if ((a = src) == null || (r = a.result) == null 1728 || (d = dep) == null || (as = srcs) == null) 1729 return null; 1730 src = null; dep = null; srcs = null; 1731 if (d.completeRelay(r)) { 1732 for (CompletableFuture<?> b : as) 1733 if (b != a) 1734 b.cleanStack(); 1735 if (mode < 0) 1736 return d; 1737 else 1738 d.postComplete(); 1739 } 1740 return null; 1741 } isLive()1742 final boolean isLive() { 1743 CompletableFuture<Object> d; 1744 return (d = dep) != null && d.result == null; 1745 } 1746 } 1747 1748 /* ------------- Zero-input Async forms -------------- */ 1749 1750 @SuppressWarnings("serial") 1751 static final class AsyncSupply<T> extends ForkJoinTask<Void> 1752 implements Runnable, AsynchronousCompletionTask { 1753 CompletableFuture<T> dep; Supplier<? extends T> fn; AsyncSupply(CompletableFuture<T> dep, Supplier<? extends T> fn)1754 AsyncSupply(CompletableFuture<T> dep, Supplier<? extends T> fn) { 1755 this.dep = dep; this.fn = fn; 1756 } 1757 getRawResult()1758 public final Void getRawResult() { return null; } setRawResult(Void v)1759 public final void setRawResult(Void v) {} exec()1760 public final boolean exec() { run(); return false; } 1761 run()1762 public void run() { 1763 CompletableFuture<T> d; Supplier<? extends T> f; 1764 if ((d = dep) != null && (f = fn) != null) { 1765 dep = null; fn = null; 1766 if (d.result == null) { 1767 try { 1768 d.completeValue(f.get()); 1769 } catch (Throwable ex) { 1770 d.completeThrowable(ex); 1771 } 1772 } 1773 d.postComplete(); 1774 } 1775 } 1776 } 1777 asyncSupplyStage(Executor e, Supplier<U> f)1778 static <U> CompletableFuture<U> asyncSupplyStage(Executor e, 1779 Supplier<U> f) { 1780 if (f == null) throw new NullPointerException(); 1781 CompletableFuture<U> d = new CompletableFuture<U>(); 1782 e.execute(new AsyncSupply<U>(d, f)); 1783 return d; 1784 } 1785 1786 @SuppressWarnings("serial") 1787 static final class AsyncRun extends ForkJoinTask<Void> 1788 implements Runnable, AsynchronousCompletionTask { 1789 CompletableFuture<Void> dep; Runnable fn; AsyncRun(CompletableFuture<Void> dep, Runnable fn)1790 AsyncRun(CompletableFuture<Void> dep, Runnable fn) { 1791 this.dep = dep; this.fn = fn; 1792 } 1793 getRawResult()1794 public final Void getRawResult() { return null; } setRawResult(Void v)1795 public final void setRawResult(Void v) {} exec()1796 public final boolean exec() { run(); return false; } 1797 run()1798 public void run() { 1799 CompletableFuture<Void> d; Runnable f; 1800 if ((d = dep) != null && (f = fn) != null) { 1801 dep = null; fn = null; 1802 if (d.result == null) { 1803 try { 1804 f.run(); 1805 d.completeNull(); 1806 } catch (Throwable ex) { 1807 d.completeThrowable(ex); 1808 } 1809 } 1810 d.postComplete(); 1811 } 1812 } 1813 } 1814 asyncRunStage(Executor e, Runnable f)1815 static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) { 1816 if (f == null) throw new NullPointerException(); 1817 CompletableFuture<Void> d = new CompletableFuture<Void>(); 1818 e.execute(new AsyncRun(d, f)); 1819 return d; 1820 } 1821 1822 /* ------------- Signallers -------------- */ 1823 1824 /** 1825 * Completion for recording and releasing a waiting thread. This 1826 * class implements ManagedBlocker to avoid starvation when 1827 * blocking actions pile up in ForkJoinPools. 1828 */ 1829 @SuppressWarnings("serial") 1830 static final class Signaller extends Completion 1831 implements ForkJoinPool.ManagedBlocker { 1832 long nanos; // remaining wait time if timed 1833 final long deadline; // non-zero if timed 1834 final boolean interruptible; 1835 boolean interrupted; 1836 volatile Thread thread; 1837 Signaller(boolean interruptible, long nanos, long deadline)1838 Signaller(boolean interruptible, long nanos, long deadline) { 1839 this.thread = Thread.currentThread(); 1840 this.interruptible = interruptible; 1841 this.nanos = nanos; 1842 this.deadline = deadline; 1843 } tryFire(int ignore)1844 final CompletableFuture<?> tryFire(int ignore) { 1845 Thread w; // no need to atomically claim 1846 if ((w = thread) != null) { 1847 thread = null; 1848 LockSupport.unpark(w); 1849 } 1850 return null; 1851 } isReleasable()1852 public boolean isReleasable() { 1853 if (Thread.interrupted()) 1854 interrupted = true; 1855 return ((interrupted && interruptible) || 1856 (deadline != 0L && 1857 (nanos <= 0L || 1858 (nanos = deadline - System.nanoTime()) <= 0L)) || 1859 thread == null); 1860 } block()1861 public boolean block() { 1862 while (!isReleasable()) { 1863 if (deadline == 0L) 1864 LockSupport.park(this); 1865 else 1866 LockSupport.parkNanos(this, nanos); 1867 } 1868 return true; 1869 } isLive()1870 final boolean isLive() { return thread != null; } 1871 } 1872 1873 /** 1874 * Returns raw result after waiting, or null if interruptible and 1875 * interrupted. 1876 */ waitingGet(boolean interruptible)1877 private Object waitingGet(boolean interruptible) { 1878 if (interruptible && Thread.interrupted()) 1879 return null; 1880 Signaller q = null; 1881 boolean queued = false; 1882 Object r; 1883 while ((r = result) == null) { 1884 if (q == null) { 1885 q = new Signaller(interruptible, 0L, 0L); 1886 if (Thread.currentThread() instanceof ForkJoinWorkerThread) 1887 ForkJoinPool.helpAsyncBlocker(defaultExecutor(), q); 1888 } 1889 else if (!queued) 1890 queued = tryPushStack(q); 1891 else if (interruptible && q.interrupted) { 1892 q.thread = null; 1893 cleanStack(); 1894 return null; 1895 } 1896 else { 1897 try { 1898 ForkJoinPool.managedBlock(q); 1899 } catch (InterruptedException ie) { // currently cannot happen 1900 q.interrupted = true; 1901 } 1902 } 1903 } 1904 if (q != null) { 1905 q.thread = null; 1906 if (q.interrupted) 1907 Thread.currentThread().interrupt(); 1908 } 1909 postComplete(); 1910 return r; 1911 } 1912 1913 /** 1914 * Returns raw result after waiting, or null if interrupted, or 1915 * throws TimeoutException on timeout. 1916 */ timedGet(long nanos)1917 private Object timedGet(long nanos) throws TimeoutException { 1918 long d = System.nanoTime() + nanos; 1919 long deadline = (d == 0L) ? 1L : d; // avoid 0 1920 boolean interrupted = false, queued = false; 1921 Signaller q = null; 1922 Object r = null; 1923 for (;;) { // order of checking interrupt, result, timeout matters 1924 if (interrupted || (interrupted = Thread.interrupted())) 1925 break; 1926 else if ((r = result) != null) 1927 break; 1928 else if (nanos <= 0L) 1929 break; 1930 else if (q == null) { 1931 q = new Signaller(true, nanos, deadline); 1932 if (Thread.currentThread() instanceof ForkJoinWorkerThread) 1933 ForkJoinPool.helpAsyncBlocker(defaultExecutor(), q); 1934 } 1935 else if (!queued) 1936 queued = tryPushStack(q); 1937 else { 1938 try { 1939 ForkJoinPool.managedBlock(q); 1940 interrupted = q.interrupted; 1941 nanos = q.nanos; 1942 } catch (InterruptedException ie) { 1943 interrupted = true; 1944 } 1945 } 1946 } 1947 if (q != null) { 1948 q.thread = null; 1949 if (r == null) 1950 cleanStack(); 1951 } 1952 if (r != null) { 1953 if (interrupted) 1954 Thread.currentThread().interrupt(); 1955 postComplete(); 1956 return r; 1957 } else if (interrupted) 1958 return null; 1959 else 1960 throw new TimeoutException(); 1961 } 1962 1963 /* ------------- public methods -------------- */ 1964 1965 /** 1966 * Creates a new incomplete CompletableFuture. 1967 */ CompletableFuture()1968 public CompletableFuture() { 1969 } 1970 1971 /** 1972 * Creates a new complete CompletableFuture with given encoded result. 1973 */ CompletableFuture(Object r)1974 CompletableFuture(Object r) { 1975 RESULT.setRelease(this, r); 1976 } 1977 1978 /** 1979 * Returns a new CompletableFuture that is asynchronously completed 1980 * by a task running in the {@link ForkJoinPool#commonPool()} with 1981 * the value obtained by calling the given Supplier. 1982 * 1983 * @param supplier a function returning the value to be used 1984 * to complete the returned CompletableFuture 1985 * @param <U> the function's return type 1986 * @return the new CompletableFuture 1987 */ supplyAsync(Supplier<U> supplier)1988 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { 1989 return asyncSupplyStage(ASYNC_POOL, supplier); 1990 } 1991 1992 /** 1993 * Returns a new CompletableFuture that is asynchronously completed 1994 * by a task running in the given executor with the value obtained 1995 * by calling the given Supplier. 1996 * 1997 * @param supplier a function returning the value to be used 1998 * to complete the returned CompletableFuture 1999 * @param executor the executor to use for asynchronous execution 2000 * @param <U> the function's return type 2001 * @return the new CompletableFuture 2002 */ supplyAsync(Supplier<U> supplier, Executor executor)2003 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, 2004 Executor executor) { 2005 return asyncSupplyStage(screenExecutor(executor), supplier); 2006 } 2007 2008 /** 2009 * Returns a new CompletableFuture that is asynchronously completed 2010 * by a task running in the {@link ForkJoinPool#commonPool()} after 2011 * it runs the given action. 2012 * 2013 * @param runnable the action to run before completing the 2014 * returned CompletableFuture 2015 * @return the new CompletableFuture 2016 */ runAsync(Runnable runnable)2017 public static CompletableFuture<Void> runAsync(Runnable runnable) { 2018 return asyncRunStage(ASYNC_POOL, runnable); 2019 } 2020 2021 /** 2022 * Returns a new CompletableFuture that is asynchronously completed 2023 * by a task running in the given executor after it runs the given 2024 * action. 2025 * 2026 * @param runnable the action to run before completing the 2027 * returned CompletableFuture 2028 * @param executor the executor to use for asynchronous execution 2029 * @return the new CompletableFuture 2030 */ runAsync(Runnable runnable, Executor executor)2031 public static CompletableFuture<Void> runAsync(Runnable runnable, 2032 Executor executor) { 2033 return asyncRunStage(screenExecutor(executor), runnable); 2034 } 2035 2036 /** 2037 * Returns a new CompletableFuture that is already completed with 2038 * the given value. 2039 * 2040 * @param value the value 2041 * @param <U> the type of the value 2042 * @return the completed CompletableFuture 2043 */ completedFuture(U value)2044 public static <U> CompletableFuture<U> completedFuture(U value) { 2045 return new CompletableFuture<U>((value == null) ? NIL : value); 2046 } 2047 2048 /** 2049 * Returns {@code true} if completed in any fashion: normally, 2050 * exceptionally, or via cancellation. 2051 * 2052 * @return {@code true} if completed 2053 */ isDone()2054 public boolean isDone() { 2055 return result != null; 2056 } 2057 2058 /** 2059 * Waits if necessary for this future to complete, and then 2060 * returns its result. 2061 * 2062 * @return the result value 2063 * @throws CancellationException if this future was cancelled 2064 * @throws ExecutionException if this future completed exceptionally 2065 * @throws InterruptedException if the current thread was interrupted 2066 * while waiting 2067 */ 2068 @SuppressWarnings("unchecked") get()2069 public T get() throws InterruptedException, ExecutionException { 2070 Object r; 2071 if ((r = result) == null) 2072 r = waitingGet(true); 2073 return (T) reportGet(r); 2074 } 2075 2076 /** 2077 * Waits if necessary for at most the given time for this future 2078 * to complete, and then returns its result, if available. 2079 * 2080 * @param timeout the maximum time to wait 2081 * @param unit the time unit of the timeout argument 2082 * @return the result value 2083 * @throws CancellationException if this future was cancelled 2084 * @throws ExecutionException if this future completed exceptionally 2085 * @throws InterruptedException if the current thread was interrupted 2086 * while waiting 2087 * @throws TimeoutException if the wait timed out 2088 */ 2089 @SuppressWarnings("unchecked") get(long timeout, TimeUnit unit)2090 public T get(long timeout, TimeUnit unit) 2091 throws InterruptedException, ExecutionException, TimeoutException { 2092 long nanos = unit.toNanos(timeout); 2093 Object r; 2094 if ((r = result) == null) 2095 r = timedGet(nanos); 2096 return (T) reportGet(r); 2097 } 2098 2099 /** 2100 * Returns the result value when complete, or throws an 2101 * (unchecked) exception if completed exceptionally. To better 2102 * conform with the use of common functional forms, if a 2103 * computation involved in the completion of this 2104 * CompletableFuture threw an exception, this method throws an 2105 * (unchecked) {@link CompletionException} with the underlying 2106 * exception as its cause. 2107 * 2108 * @return the result value 2109 * @throws CancellationException if the computation was cancelled 2110 * @throws CompletionException if this future completed 2111 * exceptionally or a completion computation threw an exception 2112 */ 2113 @SuppressWarnings("unchecked") join()2114 public T join() { 2115 Object r; 2116 if ((r = result) == null) 2117 r = waitingGet(false); 2118 return (T) reportJoin(r); 2119 } 2120 2121 /** 2122 * Returns the result value (or throws any encountered exception) 2123 * if completed, else returns the given valueIfAbsent. 2124 * 2125 * @param valueIfAbsent the value to return if not completed 2126 * @return the result value, if completed, else the given valueIfAbsent 2127 * @throws CancellationException if the computation was cancelled 2128 * @throws CompletionException if this future completed 2129 * exceptionally or a completion computation threw an exception 2130 */ 2131 @SuppressWarnings("unchecked") getNow(T valueIfAbsent)2132 public T getNow(T valueIfAbsent) { 2133 Object r; 2134 return ((r = result) == null) ? valueIfAbsent : (T) reportJoin(r); 2135 } 2136 2137 /** 2138 * If not already completed, sets the value returned by {@link 2139 * #get()} and related methods to the given value. 2140 * 2141 * @param value the result value 2142 * @return {@code true} if this invocation caused this CompletableFuture 2143 * to transition to a completed state, else {@code false} 2144 */ complete(T value)2145 public boolean complete(T value) { 2146 boolean triggered = completeValue(value); 2147 postComplete(); 2148 return triggered; 2149 } 2150 2151 /** 2152 * If not already completed, causes invocations of {@link #get()} 2153 * and related methods to throw the given exception. 2154 * 2155 * @param ex the exception 2156 * @return {@code true} if this invocation caused this CompletableFuture 2157 * to transition to a completed state, else {@code false} 2158 */ completeExceptionally(Throwable ex)2159 public boolean completeExceptionally(Throwable ex) { 2160 if (ex == null) throw new NullPointerException(); 2161 boolean triggered = internalComplete(new AltResult(ex)); 2162 postComplete(); 2163 return triggered; 2164 } 2165 thenApply( Function<? super T,? extends U> fn)2166 public <U> CompletableFuture<U> thenApply( 2167 Function<? super T,? extends U> fn) { 2168 return uniApplyStage(null, fn); 2169 } 2170 thenApplyAsync( Function<? super T,? extends U> fn)2171 public <U> CompletableFuture<U> thenApplyAsync( 2172 Function<? super T,? extends U> fn) { 2173 return uniApplyStage(defaultExecutor(), fn); 2174 } 2175 thenApplyAsync( Function<? super T,? extends U> fn, Executor executor)2176 public <U> CompletableFuture<U> thenApplyAsync( 2177 Function<? super T,? extends U> fn, Executor executor) { 2178 return uniApplyStage(screenExecutor(executor), fn); 2179 } 2180 thenAccept(Consumer<? super T> action)2181 public CompletableFuture<Void> thenAccept(Consumer<? super T> action) { 2182 return uniAcceptStage(null, action); 2183 } 2184 thenAcceptAsync(Consumer<? super T> action)2185 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) { 2186 return uniAcceptStage(defaultExecutor(), action); 2187 } 2188 thenAcceptAsync(Consumer<? super T> action, Executor executor)2189 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, 2190 Executor executor) { 2191 return uniAcceptStage(screenExecutor(executor), action); 2192 } 2193 thenRun(Runnable action)2194 public CompletableFuture<Void> thenRun(Runnable action) { 2195 return uniRunStage(null, action); 2196 } 2197 thenRunAsync(Runnable action)2198 public CompletableFuture<Void> thenRunAsync(Runnable action) { 2199 return uniRunStage(defaultExecutor(), action); 2200 } 2201 thenRunAsync(Runnable action, Executor executor)2202 public CompletableFuture<Void> thenRunAsync(Runnable action, 2203 Executor executor) { 2204 return uniRunStage(screenExecutor(executor), action); 2205 } 2206 thenCombine( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)2207 public <U,V> CompletableFuture<V> thenCombine( 2208 CompletionStage<? extends U> other, 2209 BiFunction<? super T,? super U,? extends V> fn) { 2210 return biApplyStage(null, other, fn); 2211 } 2212 thenCombineAsync( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)2213 public <U,V> CompletableFuture<V> thenCombineAsync( 2214 CompletionStage<? extends U> other, 2215 BiFunction<? super T,? super U,? extends V> fn) { 2216 return biApplyStage(defaultExecutor(), other, fn); 2217 } 2218 thenCombineAsync( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)2219 public <U,V> CompletableFuture<V> thenCombineAsync( 2220 CompletionStage<? extends U> other, 2221 BiFunction<? super T,? super U,? extends V> fn, Executor executor) { 2222 return biApplyStage(screenExecutor(executor), other, fn); 2223 } 2224 thenAcceptBoth( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)2225 public <U> CompletableFuture<Void> thenAcceptBoth( 2226 CompletionStage<? extends U> other, 2227 BiConsumer<? super T, ? super U> action) { 2228 return biAcceptStage(null, other, action); 2229 } 2230 thenAcceptBothAsync( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)2231 public <U> CompletableFuture<Void> thenAcceptBothAsync( 2232 CompletionStage<? extends U> other, 2233 BiConsumer<? super T, ? super U> action) { 2234 return biAcceptStage(defaultExecutor(), other, action); 2235 } 2236 thenAcceptBothAsync( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)2237 public <U> CompletableFuture<Void> thenAcceptBothAsync( 2238 CompletionStage<? extends U> other, 2239 BiConsumer<? super T, ? super U> action, Executor executor) { 2240 return biAcceptStage(screenExecutor(executor), other, action); 2241 } 2242 runAfterBoth(CompletionStage<?> other, Runnable action)2243 public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, 2244 Runnable action) { 2245 return biRunStage(null, other, action); 2246 } 2247 runAfterBothAsync(CompletionStage<?> other, Runnable action)2248 public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, 2249 Runnable action) { 2250 return biRunStage(defaultExecutor(), other, action); 2251 } 2252 runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor)2253 public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, 2254 Runnable action, 2255 Executor executor) { 2256 return biRunStage(screenExecutor(executor), other, action); 2257 } 2258 applyToEither( CompletionStage<? extends T> other, Function<? super T, U> fn)2259 public <U> CompletableFuture<U> applyToEither( 2260 CompletionStage<? extends T> other, Function<? super T, U> fn) { 2261 return orApplyStage(null, other, fn); 2262 } 2263 applyToEitherAsync( CompletionStage<? extends T> other, Function<? super T, U> fn)2264 public <U> CompletableFuture<U> applyToEitherAsync( 2265 CompletionStage<? extends T> other, Function<? super T, U> fn) { 2266 return orApplyStage(defaultExecutor(), other, fn); 2267 } 2268 applyToEitherAsync( CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor)2269 public <U> CompletableFuture<U> applyToEitherAsync( 2270 CompletionStage<? extends T> other, Function<? super T, U> fn, 2271 Executor executor) { 2272 return orApplyStage(screenExecutor(executor), other, fn); 2273 } 2274 acceptEither( CompletionStage<? extends T> other, Consumer<? super T> action)2275 public CompletableFuture<Void> acceptEither( 2276 CompletionStage<? extends T> other, Consumer<? super T> action) { 2277 return orAcceptStage(null, other, action); 2278 } 2279 acceptEitherAsync( CompletionStage<? extends T> other, Consumer<? super T> action)2280 public CompletableFuture<Void> acceptEitherAsync( 2281 CompletionStage<? extends T> other, Consumer<? super T> action) { 2282 return orAcceptStage(defaultExecutor(), other, action); 2283 } 2284 acceptEitherAsync( CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)2285 public CompletableFuture<Void> acceptEitherAsync( 2286 CompletionStage<? extends T> other, Consumer<? super T> action, 2287 Executor executor) { 2288 return orAcceptStage(screenExecutor(executor), other, action); 2289 } 2290 runAfterEither(CompletionStage<?> other, Runnable action)2291 public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, 2292 Runnable action) { 2293 return orRunStage(null, other, action); 2294 } 2295 runAfterEitherAsync(CompletionStage<?> other, Runnable action)2296 public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, 2297 Runnable action) { 2298 return orRunStage(defaultExecutor(), other, action); 2299 } 2300 runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor)2301 public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, 2302 Runnable action, 2303 Executor executor) { 2304 return orRunStage(screenExecutor(executor), other, action); 2305 } 2306 thenCompose( Function<? super T, ? extends CompletionStage<U>> fn)2307 public <U> CompletableFuture<U> thenCompose( 2308 Function<? super T, ? extends CompletionStage<U>> fn) { 2309 return uniComposeStage(null, fn); 2310 } 2311 thenComposeAsync( Function<? super T, ? extends CompletionStage<U>> fn)2312 public <U> CompletableFuture<U> thenComposeAsync( 2313 Function<? super T, ? extends CompletionStage<U>> fn) { 2314 return uniComposeStage(defaultExecutor(), fn); 2315 } 2316 thenComposeAsync( Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)2317 public <U> CompletableFuture<U> thenComposeAsync( 2318 Function<? super T, ? extends CompletionStage<U>> fn, 2319 Executor executor) { 2320 return uniComposeStage(screenExecutor(executor), fn); 2321 } 2322 whenComplete( BiConsumer<? super T, ? super Throwable> action)2323 public CompletableFuture<T> whenComplete( 2324 BiConsumer<? super T, ? super Throwable> action) { 2325 return uniWhenCompleteStage(null, action); 2326 } 2327 whenCompleteAsync( BiConsumer<? super T, ? super Throwable> action)2328 public CompletableFuture<T> whenCompleteAsync( 2329 BiConsumer<? super T, ? super Throwable> action) { 2330 return uniWhenCompleteStage(defaultExecutor(), action); 2331 } 2332 whenCompleteAsync( BiConsumer<? super T, ? super Throwable> action, Executor executor)2333 public CompletableFuture<T> whenCompleteAsync( 2334 BiConsumer<? super T, ? super Throwable> action, Executor executor) { 2335 return uniWhenCompleteStage(screenExecutor(executor), action); 2336 } 2337 handle( BiFunction<? super T, Throwable, ? extends U> fn)2338 public <U> CompletableFuture<U> handle( 2339 BiFunction<? super T, Throwable, ? extends U> fn) { 2340 return uniHandleStage(null, fn); 2341 } 2342 handleAsync( BiFunction<? super T, Throwable, ? extends U> fn)2343 public <U> CompletableFuture<U> handleAsync( 2344 BiFunction<? super T, Throwable, ? extends U> fn) { 2345 return uniHandleStage(defaultExecutor(), fn); 2346 } 2347 handleAsync( BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)2348 public <U> CompletableFuture<U> handleAsync( 2349 BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) { 2350 return uniHandleStage(screenExecutor(executor), fn); 2351 } 2352 2353 /** 2354 * Returns this CompletableFuture. 2355 * 2356 * @return this CompletableFuture 2357 */ toCompletableFuture()2358 public CompletableFuture<T> toCompletableFuture() { 2359 return this; 2360 } 2361 exceptionally( Function<Throwable, ? extends T> fn)2362 public CompletableFuture<T> exceptionally( 2363 Function<Throwable, ? extends T> fn) { 2364 return uniExceptionallyStage(null, fn); 2365 } 2366 exceptionallyAsync( Function<Throwable, ? extends T> fn)2367 public CompletableFuture<T> exceptionallyAsync( 2368 Function<Throwable, ? extends T> fn) { 2369 return uniExceptionallyStage(defaultExecutor(), fn); 2370 } 2371 exceptionallyAsync( Function<Throwable, ? extends T> fn, Executor executor)2372 public CompletableFuture<T> exceptionallyAsync( 2373 Function<Throwable, ? extends T> fn, Executor executor) { 2374 return uniExceptionallyStage(screenExecutor(executor), fn); 2375 } 2376 exceptionallyCompose( Function<Throwable, ? extends CompletionStage<T>> fn)2377 public CompletableFuture<T> exceptionallyCompose( 2378 Function<Throwable, ? extends CompletionStage<T>> fn) { 2379 return uniComposeExceptionallyStage(null, fn); 2380 } 2381 exceptionallyComposeAsync( Function<Throwable, ? extends CompletionStage<T>> fn)2382 public CompletableFuture<T> exceptionallyComposeAsync( 2383 Function<Throwable, ? extends CompletionStage<T>> fn) { 2384 return uniComposeExceptionallyStage(defaultExecutor(), fn); 2385 } 2386 exceptionallyComposeAsync( Function<Throwable, ? extends CompletionStage<T>> fn, Executor executor)2387 public CompletableFuture<T> exceptionallyComposeAsync( 2388 Function<Throwable, ? extends CompletionStage<T>> fn, 2389 Executor executor) { 2390 return uniComposeExceptionallyStage(screenExecutor(executor), fn); 2391 } 2392 2393 /* ------------- Arbitrary-arity constructions -------------- */ 2394 2395 /** 2396 * Returns a new CompletableFuture that is completed when all of 2397 * the given CompletableFutures complete. If any of the given 2398 * CompletableFutures complete exceptionally, then the returned 2399 * CompletableFuture also does so, with a CompletionException 2400 * holding this exception as its cause. Otherwise, the results, 2401 * if any, of the given CompletableFutures are not reflected in 2402 * the returned CompletableFuture, but may be obtained by 2403 * inspecting them individually. If no CompletableFutures are 2404 * provided, returns a CompletableFuture completed with the value 2405 * {@code null}. 2406 * 2407 * <p>Among the applications of this method is to await completion 2408 * of a set of independent CompletableFutures before continuing a 2409 * program, as in: {@code CompletableFuture.allOf(c1, c2, 2410 * c3).join();}. 2411 * 2412 * @param cfs the CompletableFutures 2413 * @return a new CompletableFuture that is completed when all of the 2414 * given CompletableFutures complete 2415 * @throws NullPointerException if the array or any of its elements are 2416 * {@code null} 2417 */ allOf(CompletableFuture<?>.... cfs)2418 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) { 2419 return andTree(cfs, 0, cfs.length - 1); 2420 } 2421 2422 /** 2423 * Returns a new CompletableFuture that is completed when any of 2424 * the given CompletableFutures complete, with the same result. 2425 * Otherwise, if it completed exceptionally, the returned 2426 * CompletableFuture also does so, with a CompletionException 2427 * holding this exception as its cause. If no CompletableFutures 2428 * are provided, returns an incomplete CompletableFuture. 2429 * 2430 * @param cfs the CompletableFutures 2431 * @return a new CompletableFuture that is completed with the 2432 * result or exception of any of the given CompletableFutures when 2433 * one completes 2434 * @throws NullPointerException if the array or any of its elements are 2435 * {@code null} 2436 */ anyOf(CompletableFuture<?>.... cfs)2437 public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) { 2438 int n; Object r; 2439 if ((n = cfs.length) <= 1) 2440 return (n == 0) 2441 ? new CompletableFuture<Object>() 2442 : uniCopyStage(cfs[0]); 2443 for (CompletableFuture<?> cf : cfs) 2444 if ((r = cf.result) != null) 2445 return new CompletableFuture<Object>(encodeRelay(r)); 2446 cfs = cfs.clone(); 2447 CompletableFuture<Object> d = new CompletableFuture<>(); 2448 for (CompletableFuture<?> cf : cfs) 2449 cf.unipush(new AnyOf(d, cf, cfs)); 2450 // If d was completed while we were adding completions, we should 2451 // clean the stack of any sources that may have had completions 2452 // pushed on their stack after d was completed. 2453 if (d.result != null) 2454 for (int i = 0, len = cfs.length; i < len; i++) 2455 if (cfs[i].result != null) 2456 for (i++; i < len; i++) 2457 if (cfs[i].result == null) 2458 cfs[i].cleanStack(); 2459 return d; 2460 } 2461 2462 /* ------------- Control and status methods -------------- */ 2463 2464 /** 2465 * If not already completed, completes this CompletableFuture with 2466 * a {@link CancellationException}. Dependent CompletableFutures 2467 * that have not already completed will also complete 2468 * exceptionally, with a {@link CompletionException} caused by 2469 * this {@code CancellationException}. 2470 * 2471 * @param mayInterruptIfRunning this value has no effect in this 2472 * implementation because interrupts are not used to control 2473 * processing. 2474 * 2475 * @return {@code true} if this task is now cancelled 2476 */ cancel(boolean mayInterruptIfRunning)2477 public boolean cancel(boolean mayInterruptIfRunning) { 2478 boolean cancelled = (result == null) && 2479 internalComplete(new AltResult(new CancellationException())); 2480 postComplete(); 2481 return cancelled || isCancelled(); 2482 } 2483 2484 /** 2485 * Returns {@code true} if this CompletableFuture was cancelled 2486 * before it completed normally. 2487 * 2488 * @return {@code true} if this CompletableFuture was cancelled 2489 * before it completed normally 2490 */ isCancelled()2491 public boolean isCancelled() { 2492 Object r; 2493 return ((r = result) instanceof AltResult) && 2494 (((AltResult)r).ex instanceof CancellationException); 2495 } 2496 2497 /** 2498 * Returns {@code true} if this CompletableFuture completed 2499 * exceptionally, in any way. Possible causes include 2500 * cancellation, explicit invocation of {@code 2501 * completeExceptionally}, and abrupt termination of a 2502 * CompletionStage action. 2503 * 2504 * @return {@code true} if this CompletableFuture completed 2505 * exceptionally 2506 */ isCompletedExceptionally()2507 public boolean isCompletedExceptionally() { 2508 Object r; 2509 return ((r = result) instanceof AltResult) && r != NIL; 2510 } 2511 2512 /** 2513 * Forcibly sets or resets the value subsequently returned by 2514 * method {@link #get()} and related methods, whether or not 2515 * already completed. This method is designed for use only in 2516 * error recovery actions, and even in such situations may result 2517 * in ongoing dependent completions using established versus 2518 * overwritten outcomes. 2519 * 2520 * @param value the completion value 2521 */ obtrudeValue(T value)2522 public void obtrudeValue(T value) { 2523 result = (value == null) ? NIL : value; 2524 postComplete(); 2525 } 2526 2527 /** 2528 * Forcibly causes subsequent invocations of method {@link #get()} 2529 * and related methods to throw the given exception, whether or 2530 * not already completed. This method is designed for use only in 2531 * error recovery actions, and even in such situations may result 2532 * in ongoing dependent completions using established versus 2533 * overwritten outcomes. 2534 * 2535 * @param ex the exception 2536 * @throws NullPointerException if the exception is null 2537 */ obtrudeException(Throwable ex)2538 public void obtrudeException(Throwable ex) { 2539 if (ex == null) throw new NullPointerException(); 2540 result = new AltResult(ex); 2541 postComplete(); 2542 } 2543 2544 /** 2545 * Returns the estimated number of CompletableFutures whose 2546 * completions are awaiting completion of this CompletableFuture. 2547 * This method is designed for use in monitoring system state, not 2548 * for synchronization control. 2549 * 2550 * @return the number of dependent CompletableFutures 2551 */ getNumberOfDependents()2552 public int getNumberOfDependents() { 2553 int count = 0; 2554 for (Completion p = stack; p != null; p = p.next) 2555 ++count; 2556 return count; 2557 } 2558 2559 /** 2560 * Returns a string identifying this CompletableFuture, as well as 2561 * its completion state. The state, in brackets, contains the 2562 * String {@code "Completed Normally"} or the String {@code 2563 * "Completed Exceptionally"}, or the String {@code "Not 2564 * completed"} followed by the number of CompletableFutures 2565 * dependent upon its completion, if any. 2566 * 2567 * @return a string identifying this CompletableFuture, as well as its state 2568 */ toString()2569 public String toString() { 2570 Object r = result; 2571 int count = 0; // avoid call to getNumberOfDependents in case disabled 2572 for (Completion p = stack; p != null; p = p.next) 2573 ++count; 2574 return super.toString() + 2575 ((r == null) 2576 ? ((count == 0) 2577 ? "[Not completed]" 2578 : "[Not completed, " + count + " dependents]") 2579 : (((r instanceof AltResult) && ((AltResult)r).ex != null) 2580 ? "[Completed exceptionally: " + ((AltResult)r).ex + "]" 2581 : "[Completed normally]")); 2582 } 2583 2584 // jdk9 additions 2585 2586 /** 2587 * Returns a new incomplete CompletableFuture of the type to be 2588 * returned by a CompletionStage method. Subclasses should 2589 * normally override this method to return an instance of the same 2590 * class as this CompletableFuture. The default implementation 2591 * returns an instance of class CompletableFuture. 2592 * 2593 * @param <U> the type of the value 2594 * @return a new CompletableFuture 2595 * @since 9 2596 */ newIncompleteFuture()2597 public <U> CompletableFuture<U> newIncompleteFuture() { 2598 return new CompletableFuture<U>(); 2599 } 2600 2601 /** 2602 * Returns the default Executor used for async methods that do not 2603 * specify an Executor. This class uses the {@link 2604 * ForkJoinPool#commonPool()} if it supports more than one 2605 * parallel thread, or else an Executor using one thread per async 2606 * task. This method may be overridden in subclasses to return 2607 * an Executor that provides at least one independent thread. 2608 * 2609 * @return the executor 2610 * @since 9 2611 */ defaultExecutor()2612 public Executor defaultExecutor() { 2613 return ASYNC_POOL; 2614 } 2615 2616 /** 2617 * Returns a new CompletableFuture that is completed normally with 2618 * the same value as this CompletableFuture when it completes 2619 * normally. If this CompletableFuture completes exceptionally, 2620 * then the returned CompletableFuture completes exceptionally 2621 * with a CompletionException with this exception as cause. The 2622 * behavior is equivalent to {@code thenApply(x -> x)}. This 2623 * method may be useful as a form of "defensive copying", to 2624 * prevent clients from completing, while still being able to 2625 * arrange dependent actions. 2626 * 2627 * @return the new CompletableFuture 2628 * @since 9 2629 */ copy()2630 public CompletableFuture<T> copy() { 2631 return uniCopyStage(this); 2632 } 2633 2634 /** 2635 * Returns a new CompletionStage that is completed normally with 2636 * the same value as this CompletableFuture when it completes 2637 * normally, and cannot be independently completed or otherwise 2638 * used in ways not defined by the methods of interface {@link 2639 * CompletionStage}. If this CompletableFuture completes 2640 * exceptionally, then the returned CompletionStage completes 2641 * exceptionally with a CompletionException with this exception as 2642 * cause. 2643 * 2644 * <p>Unless overridden by a subclass, a new non-minimal 2645 * CompletableFuture with all methods available can be obtained from 2646 * a minimal CompletionStage via {@link #toCompletableFuture()}. 2647 * For example, completion of a minimal stage can be awaited by 2648 * 2649 * <pre> {@code minimalStage.toCompletableFuture().join(); }</pre> 2650 * 2651 * @return the new CompletionStage 2652 * @since 9 2653 */ minimalCompletionStage()2654 public CompletionStage<T> minimalCompletionStage() { 2655 return uniAsMinimalStage(); 2656 } 2657 2658 /** 2659 * Completes this CompletableFuture with the result of 2660 * the given Supplier function invoked from an asynchronous 2661 * task using the given executor. 2662 * 2663 * @param supplier a function returning the value to be used 2664 * to complete this CompletableFuture 2665 * @param executor the executor to use for asynchronous execution 2666 * @return this CompletableFuture 2667 * @since 9 2668 */ completeAsync(Supplier<? extends T> supplier, Executor executor)2669 public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier, 2670 Executor executor) { 2671 if (supplier == null || executor == null) 2672 throw new NullPointerException(); 2673 executor.execute(new AsyncSupply<T>(this, supplier)); 2674 return this; 2675 } 2676 2677 /** 2678 * Completes this CompletableFuture with the result of the given 2679 * Supplier function invoked from an asynchronous task using the 2680 * default executor. 2681 * 2682 * @param supplier a function returning the value to be used 2683 * to complete this CompletableFuture 2684 * @return this CompletableFuture 2685 * @since 9 2686 */ completeAsync(Supplier<? extends T> supplier)2687 public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier) { 2688 return completeAsync(supplier, defaultExecutor()); 2689 } 2690 2691 /** 2692 * Exceptionally completes this CompletableFuture with 2693 * a {@link TimeoutException} if not otherwise completed 2694 * before the given timeout. 2695 * 2696 * @param timeout how long to wait before completing exceptionally 2697 * with a TimeoutException, in units of {@code unit} 2698 * @param unit a {@code TimeUnit} determining how to interpret the 2699 * {@code timeout} parameter 2700 * @return this CompletableFuture 2701 * @since 9 2702 */ orTimeout(long timeout, TimeUnit unit)2703 public CompletableFuture<T> orTimeout(long timeout, TimeUnit unit) { 2704 if (unit == null) 2705 throw new NullPointerException(); 2706 if (result == null) 2707 whenComplete(new Canceller(Delayer.delay(new Timeout(this), 2708 timeout, unit))); 2709 return this; 2710 } 2711 2712 /** 2713 * Completes this CompletableFuture with the given value if not 2714 * otherwise completed before the given timeout. 2715 * 2716 * @param value the value to use upon timeout 2717 * @param timeout how long to wait before completing normally 2718 * with the given value, in units of {@code unit} 2719 * @param unit a {@code TimeUnit} determining how to interpret the 2720 * {@code timeout} parameter 2721 * @return this CompletableFuture 2722 * @since 9 2723 */ completeOnTimeout(T value, long timeout, TimeUnit unit)2724 public CompletableFuture<T> completeOnTimeout(T value, long timeout, 2725 TimeUnit unit) { 2726 if (unit == null) 2727 throw new NullPointerException(); 2728 if (result == null) 2729 whenComplete(new Canceller(Delayer.delay( 2730 new DelayedCompleter<T>(this, value), 2731 timeout, unit))); 2732 return this; 2733 } 2734 2735 /** 2736 * Returns a new Executor that submits a task to the given base 2737 * executor after the given delay (or no delay if non-positive). 2738 * Each delay commences upon invocation of the returned executor's 2739 * {@code execute} method. 2740 * 2741 * @param delay how long to delay, in units of {@code unit} 2742 * @param unit a {@code TimeUnit} determining how to interpret the 2743 * {@code delay} parameter 2744 * @param executor the base executor 2745 * @return the new delayed executor 2746 * @since 9 2747 */ delayedExecutor(long delay, TimeUnit unit, Executor executor)2748 public static Executor delayedExecutor(long delay, TimeUnit unit, 2749 Executor executor) { 2750 if (unit == null || executor == null) 2751 throw new NullPointerException(); 2752 return new DelayedExecutor(delay, unit, executor); 2753 } 2754 2755 /** 2756 * Returns a new Executor that submits a task to the default 2757 * executor after the given delay (or no delay if non-positive). 2758 * Each delay commences upon invocation of the returned executor's 2759 * {@code execute} method. 2760 * 2761 * @param delay how long to delay, in units of {@code unit} 2762 * @param unit a {@code TimeUnit} determining how to interpret the 2763 * {@code delay} parameter 2764 * @return the new delayed executor 2765 * @since 9 2766 */ delayedExecutor(long delay, TimeUnit unit)2767 public static Executor delayedExecutor(long delay, TimeUnit unit) { 2768 if (unit == null) 2769 throw new NullPointerException(); 2770 return new DelayedExecutor(delay, unit, ASYNC_POOL); 2771 } 2772 2773 /** 2774 * Returns a new CompletionStage that is already completed with 2775 * the given value and supports only those methods in 2776 * interface {@link CompletionStage}. 2777 * 2778 * @param value the value 2779 * @param <U> the type of the value 2780 * @return the completed CompletionStage 2781 * @since 9 2782 */ completedStage(U value)2783 public static <U> CompletionStage<U> completedStage(U value) { 2784 return new MinimalStage<U>((value == null) ? NIL : value); 2785 } 2786 2787 /** 2788 * Returns a new CompletableFuture that is already completed 2789 * exceptionally with the given exception. 2790 * 2791 * @param ex the exception 2792 * @param <U> the type of the value 2793 * @return the exceptionally completed CompletableFuture 2794 * @since 9 2795 */ failedFuture(Throwable ex)2796 public static <U> CompletableFuture<U> failedFuture(Throwable ex) { 2797 if (ex == null) throw new NullPointerException(); 2798 return new CompletableFuture<U>(new AltResult(ex)); 2799 } 2800 2801 /** 2802 * Returns a new CompletionStage that is already completed 2803 * exceptionally with the given exception and supports only those 2804 * methods in interface {@link CompletionStage}. 2805 * 2806 * @param ex the exception 2807 * @param <U> the type of the value 2808 * @return the exceptionally completed CompletionStage 2809 * @since 9 2810 */ failedStage(Throwable ex)2811 public static <U> CompletionStage<U> failedStage(Throwable ex) { 2812 if (ex == null) throw new NullPointerException(); 2813 return new MinimalStage<U>(new AltResult(ex)); 2814 } 2815 2816 /** 2817 * Singleton delay scheduler, used only for starting and 2818 * cancelling tasks. 2819 */ 2820 static final class Delayer { delay(Runnable command, long delay, TimeUnit unit)2821 static ScheduledFuture<?> delay(Runnable command, long delay, 2822 TimeUnit unit) { 2823 return delayer.schedule(command, delay, unit); 2824 } 2825 2826 static final class DaemonThreadFactory implements ThreadFactory { newThread(Runnable r)2827 public Thread newThread(Runnable r) { 2828 Thread t = new Thread(r); 2829 t.setDaemon(true); 2830 t.setName("CompletableFutureDelayScheduler"); 2831 return t; 2832 } 2833 } 2834 2835 static final ScheduledThreadPoolExecutor delayer; 2836 static { 2837 (delayer = new ScheduledThreadPoolExecutor( 2838 1, new DaemonThreadFactory())). 2839 setRemoveOnCancelPolicy(true); 2840 } 2841 } 2842 2843 // Little class-ified lambdas to better support monitoring 2844 2845 static final class DelayedExecutor implements Executor { 2846 final long delay; 2847 final TimeUnit unit; 2848 final Executor executor; DelayedExecutor(long delay, TimeUnit unit, Executor executor)2849 DelayedExecutor(long delay, TimeUnit unit, Executor executor) { 2850 this.delay = delay; this.unit = unit; this.executor = executor; 2851 } execute(Runnable r)2852 public void execute(Runnable r) { 2853 Delayer.delay(new TaskSubmitter(executor, r), delay, unit); 2854 } 2855 } 2856 2857 /** Action to submit user task */ 2858 static final class TaskSubmitter implements Runnable { 2859 final Executor executor; 2860 final Runnable action; TaskSubmitter(Executor executor, Runnable action)2861 TaskSubmitter(Executor executor, Runnable action) { 2862 this.executor = executor; 2863 this.action = action; 2864 } run()2865 public void run() { executor.execute(action); } 2866 } 2867 2868 /** Action to completeExceptionally on timeout */ 2869 static final class Timeout implements Runnable { 2870 final CompletableFuture<?> f; Timeout(CompletableFuture<?> f)2871 Timeout(CompletableFuture<?> f) { this.f = f; } run()2872 public void run() { 2873 if (f != null && !f.isDone()) 2874 f.completeExceptionally(new TimeoutException()); 2875 } 2876 } 2877 2878 /** Action to complete on timeout */ 2879 static final class DelayedCompleter<U> implements Runnable { 2880 final CompletableFuture<U> f; 2881 final U u; DelayedCompleter(CompletableFuture<U> f, U u)2882 DelayedCompleter(CompletableFuture<U> f, U u) { this.f = f; this.u = u; } run()2883 public void run() { 2884 if (f != null) 2885 f.complete(u); 2886 } 2887 } 2888 2889 /** Action to cancel unneeded timeouts */ 2890 static final class Canceller implements BiConsumer<Object, Throwable> { 2891 final Future<?> f; Canceller(Future<?> f)2892 Canceller(Future<?> f) { this.f = f; } accept(Object ignore, Throwable ex)2893 public void accept(Object ignore, Throwable ex) { 2894 if (ex == null && f != null && !f.isDone()) 2895 f.cancel(false); 2896 } 2897 } 2898 2899 /** 2900 * A subclass that just throws UOE for most non-CompletionStage methods. 2901 */ 2902 static final class MinimalStage<T> extends CompletableFuture<T> { MinimalStage()2903 MinimalStage() { } MinimalStage(Object r)2904 MinimalStage(Object r) { super(r); } newIncompleteFuture()2905 @Override public <U> CompletableFuture<U> newIncompleteFuture() { 2906 return new MinimalStage<U>(); } get()2907 @Override public T get() { 2908 throw new UnsupportedOperationException(); } get(long timeout, TimeUnit unit)2909 @Override public T get(long timeout, TimeUnit unit) { 2910 throw new UnsupportedOperationException(); } getNow(T valueIfAbsent)2911 @Override public T getNow(T valueIfAbsent) { 2912 throw new UnsupportedOperationException(); } join()2913 @Override public T join() { 2914 throw new UnsupportedOperationException(); } complete(T value)2915 @Override public boolean complete(T value) { 2916 throw new UnsupportedOperationException(); } completeExceptionally(Throwable ex)2917 @Override public boolean completeExceptionally(Throwable ex) { 2918 throw new UnsupportedOperationException(); } cancel(boolean mayInterruptIfRunning)2919 @Override public boolean cancel(boolean mayInterruptIfRunning) { 2920 throw new UnsupportedOperationException(); } obtrudeValue(T value)2921 @Override public void obtrudeValue(T value) { 2922 throw new UnsupportedOperationException(); } obtrudeException(Throwable ex)2923 @Override public void obtrudeException(Throwable ex) { 2924 throw new UnsupportedOperationException(); } isDone()2925 @Override public boolean isDone() { 2926 throw new UnsupportedOperationException(); } isCancelled()2927 @Override public boolean isCancelled() { 2928 throw new UnsupportedOperationException(); } isCompletedExceptionally()2929 @Override public boolean isCompletedExceptionally() { 2930 throw new UnsupportedOperationException(); } getNumberOfDependents()2931 @Override public int getNumberOfDependents() { 2932 throw new UnsupportedOperationException(); } completeAsync(Supplier<? extends T> supplier, Executor executor)2933 @Override public CompletableFuture<T> completeAsync 2934 (Supplier<? extends T> supplier, Executor executor) { 2935 throw new UnsupportedOperationException(); } completeAsync(Supplier<? extends T> supplier)2936 @Override public CompletableFuture<T> completeAsync 2937 (Supplier<? extends T> supplier) { 2938 throw new UnsupportedOperationException(); } orTimeout(long timeout, TimeUnit unit)2939 @Override public CompletableFuture<T> orTimeout 2940 (long timeout, TimeUnit unit) { 2941 throw new UnsupportedOperationException(); } completeOnTimeout(T value, long timeout, TimeUnit unit)2942 @Override public CompletableFuture<T> completeOnTimeout 2943 (T value, long timeout, TimeUnit unit) { 2944 throw new UnsupportedOperationException(); } toCompletableFuture()2945 @Override public CompletableFuture<T> toCompletableFuture() { 2946 Object r; 2947 if ((r = result) != null) 2948 return new CompletableFuture<T>(encodeRelay(r)); 2949 else { 2950 CompletableFuture<T> d = new CompletableFuture<>(); 2951 unipush(new UniRelay<T,T>(d, this)); 2952 return d; 2953 } 2954 } 2955 } 2956 2957 // VarHandle mechanics 2958 private static final VarHandle RESULT; 2959 private static final VarHandle STACK; 2960 private static final VarHandle NEXT; 2961 static { 2962 try { 2963 MethodHandles.Lookup l = MethodHandles.lookup(); 2964 RESULT = l.findVarHandle(CompletableFuture.class, "result", Object.class); 2965 STACK = l.findVarHandle(CompletableFuture.class, "stack", Completion.class); 2966 NEXT = l.findVarHandle(Completion.class, "next", Completion.class); 2967 } catch (ReflectiveOperationException e) { 2968 throw new ExceptionInInitializerError(e); 2969 } 2970 2971 // Reduce the risk of rare disastrous classloading in first call to 2972 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773 2973 Class<?> ensureLoaded = LockSupport.class; 2974 } 2975 } 2976