1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 import java.util.function.Supplier; 38 import java.util.function.Consumer; 39 import java.util.function.BiConsumer; 40 import java.util.function.Function; 41 import java.util.function.BiFunction; 42 import java.util.concurrent.Future; 43 import java.util.concurrent.TimeUnit; 44 import java.util.concurrent.ForkJoinPool; 45 import java.util.concurrent.ForkJoinTask; 46 import java.util.concurrent.Executor; 47 import java.util.concurrent.ThreadLocalRandom; 48 import java.util.concurrent.ExecutionException; 49 import java.util.concurrent.TimeoutException; 50 import java.util.concurrent.CancellationException; 51 import java.util.concurrent.CompletionException; 52 import java.util.concurrent.CompletionStage; 53 import java.util.concurrent.locks.LockSupport; 54 55 /** 56 * A {@link Future} that may be explicitly completed (setting its 57 * value and status), and may be used as a {@link CompletionStage}, 58 * supporting dependent functions and actions that trigger upon its 59 * completion. 60 * 61 * <p>When two or more threads attempt to 62 * {@link #complete complete}, 63 * {@link #completeExceptionally completeExceptionally}, or 64 * {@link #cancel cancel} 65 * a CompletableFuture, only one of them succeeds. 66 * 67 * <p>In addition to these and related methods for directly 68 * manipulating status and results, CompletableFuture implements 69 * interface {@link CompletionStage} with the following policies: <ul> 70 * 71 * <li>Actions supplied for dependent completions of 72 * <em>non-async</em> methods may be performed by the thread that 73 * completes the current CompletableFuture, or by any other caller of 74 * a completion method.</li> 75 * 76 * <li>All <em>async</em> methods without an explicit Executor 77 * argument are performed using the {@link ForkJoinPool#commonPool()} 78 * (unless it does not support a parallelism level of at least two, in 79 * which case, a new Thread is created to run each task). To simplify 80 * monitoring, debugging, and tracking, all generated asynchronous 81 * tasks are instances of the marker interface {@link 82 * AsynchronousCompletionTask}. </li> 83 * 84 * <li>All CompletionStage methods are implemented independently of 85 * other public methods, so the behavior of one method is not impacted 86 * by overrides of others in subclasses. </li> </ul> 87 * 88 * <p>CompletableFuture also implements {@link Future} with the following 89 * policies: <ul> 90 * 91 * <li>Since (unlike {@link FutureTask}) this class has no direct 92 * control over the computation that causes it to be completed, 93 * cancellation is treated as just another form of exceptional 94 * completion. Method {@link #cancel cancel} has the same effect as 95 * {@code completeExceptionally(new CancellationException())}. Method 96 * {@link #isCompletedExceptionally} can be used to determine if a 97 * CompletableFuture completed in any exceptional fashion.</li> 98 * 99 * <li>In case of exceptional completion with a CompletionException, 100 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an 101 * {@link ExecutionException} with the same cause as held in the 102 * corresponding CompletionException. To simplify usage in most 103 * contexts, this class also defines methods {@link #join()} and 104 * {@link #getNow} that instead throw the CompletionException directly 105 * in these cases.</li> </ul> 106 * 107 * @author Doug Lea 108 * @since 1.8 109 */ 110 public class CompletableFuture<T> implements Future<T>, CompletionStage<T> { 111 112 /* 113 * Overview: 114 * 115 * A CompletableFuture may have dependent completion actions, 116 * collected in a linked stack. It atomically completes by CASing 117 * a result field, and then pops off and runs those actions. This 118 * applies across normal vs exceptional outcomes, sync vs async 119 * actions, binary triggers, and various forms of completions. 120 * 121 * Non-nullness of field result (set via CAS) indicates done. An 122 * AltResult is used to box null as a result, as well as to hold 123 * exceptions. Using a single field makes completion simple to 124 * detect and trigger. Encoding and decoding is straightforward 125 * but adds to the sprawl of trapping and associating exceptions 126 * with targets. Minor simplifications rely on (static) NIL (to 127 * box null results) being the only AltResult with a null 128 * exception field, so we don't usually need explicit comparisons. 129 * Even though some of the generics casts are unchecked (see 130 * SuppressWarnings annotations), they are placed to be 131 * appropriate even if checked. 132 * 133 * Dependent actions are represented by Completion objects linked 134 * as Treiber stacks headed by field "stack". There are Completion 135 * classes for each kind of action, grouped into single-input 136 * (UniCompletion), two-input (BiCompletion), projected 137 * (BiCompletions using either (not both) of two inputs), shared 138 * (CoCompletion, used by the second of two sources), zero-input 139 * source actions, and Signallers that unblock waiters. Class 140 * Completion extends ForkJoinTask to enable async execution 141 * (adding no space overhead because we exploit its "tag" methods 142 * to maintain claims). It is also declared as Runnable to allow 143 * usage with arbitrary executors. 144 * 145 * Support for each kind of CompletionStage relies on a separate 146 * class, along with two CompletableFuture methods: 147 * 148 * * A Completion class with name X corresponding to function, 149 * prefaced with "Uni", "Bi", or "Or". Each class contains 150 * fields for source(s), actions, and dependent. They are 151 * boringly similar, differing from others only with respect to 152 * underlying functional forms. We do this so that users don't 153 * encounter layers of adaptors in common usages. We also 154 * include "Relay" classes/methods that don't correspond to user 155 * methods; they copy results from one stage to another. 156 * 157 * * Boolean CompletableFuture method x(...) (for example 158 * uniApply) takes all of the arguments needed to check that an 159 * action is triggerable, and then either runs the action or 160 * arranges its async execution by executing its Completion 161 * argument, if present. The method returns true if known to be 162 * complete. 163 * 164 * * Completion method tryFire(int mode) invokes the associated x 165 * method with its held arguments, and on success cleans up. 166 * The mode argument allows tryFire to be called twice (SYNC, 167 * then ASYNC); the first to screen and trap exceptions while 168 * arranging to execute, and the second when called from a 169 * task. (A few classes are not used async so take slightly 170 * different forms.) The claim() callback suppresses function 171 * invocation if already claimed by another thread. 172 * 173 * * CompletableFuture method xStage(...) is called from a public 174 * stage method of CompletableFuture x. It screens user 175 * arguments and invokes and/or creates the stage object. If 176 * not async and x is already complete, the action is run 177 * immediately. Otherwise a Completion c is created, pushed to 178 * x's stack (unless done), and started or triggered via 179 * c.tryFire. This also covers races possible if x completes 180 * while pushing. Classes with two inputs (for example BiApply) 181 * deal with races across both while pushing actions. The 182 * second completion is a CoCompletion pointing to the first, 183 * shared so that at most one performs the action. The 184 * multiple-arity methods allOf and anyOf do this pairwise to 185 * form trees of completions. 186 * 187 * Note that the generic type parameters of methods vary according 188 * to whether "this" is a source, dependent, or completion. 189 * 190 * Method postComplete is called upon completion unless the target 191 * is guaranteed not to be observable (i.e., not yet returned or 192 * linked). Multiple threads can call postComplete, which 193 * atomically pops each dependent action, and tries to trigger it 194 * via method tryFire, in NESTED mode. Triggering can propagate 195 * recursively, so NESTED mode returns its completed dependent (if 196 * one exists) for further processing by its caller (see method 197 * postFire). 198 * 199 * Blocking methods get() and join() rely on Signaller Completions 200 * that wake up waiting threads. The mechanics are similar to 201 * Treiber stack wait-nodes used in FutureTask, Phaser, and 202 * SynchronousQueue. See their internal documentation for 203 * algorithmic details. 204 * 205 * Without precautions, CompletableFutures would be prone to 206 * garbage accumulation as chains of Completions build up, each 207 * pointing back to its sources. So we null out fields as soon as 208 * possible (see especially method Completion.detach). The 209 * screening checks needed anyway harmlessly ignore null arguments 210 * that may have been obtained during races with threads nulling 211 * out fields. We also try to unlink fired Completions from 212 * stacks that might never be popped (see method postFire). 213 * Completion fields need not be declared as final or volatile 214 * because they are only visible to other threads upon safe 215 * publication. 216 */ 217 218 volatile Object result; // Either the result or boxed AltResult 219 volatile Completion stack; // Top of Treiber stack of dependent actions 220 internalComplete(Object r)221 final boolean internalComplete(Object r) { // CAS from null to r 222 return UNSAFE.compareAndSwapObject(this, RESULT, null, r); 223 } 224 casStack(Completion cmp, Completion val)225 final boolean casStack(Completion cmp, Completion val) { 226 return UNSAFE.compareAndSwapObject(this, STACK, cmp, val); 227 } 228 229 /** Returns true if successfully pushed c onto stack. */ tryPushStack(Completion c)230 final boolean tryPushStack(Completion c) { 231 Completion h = stack; 232 lazySetNext(c, h); 233 return UNSAFE.compareAndSwapObject(this, STACK, h, c); 234 } 235 236 /** Unconditionally pushes c onto stack, retrying if necessary. */ pushStack(Completion c)237 final void pushStack(Completion c) { 238 do {} while (!tryPushStack(c)); 239 } 240 241 /* ------------- Encoding and decoding outcomes -------------- */ 242 243 static final class AltResult { // See above 244 final Throwable ex; // null only for NIL AltResult(Throwable x)245 AltResult(Throwable x) { this.ex = x; } 246 } 247 248 /** The encoding of the null value. */ 249 static final AltResult NIL = new AltResult(null); 250 251 /** Completes with the null value, unless already completed. */ completeNull()252 final boolean completeNull() { 253 return UNSAFE.compareAndSwapObject(this, RESULT, null, 254 NIL); 255 } 256 257 /** Returns the encoding of the given non-exceptional value. */ encodeValue(T t)258 final Object encodeValue(T t) { 259 return (t == null) ? NIL : t; 260 } 261 262 /** Completes with a non-exceptional result, unless already completed. */ completeValue(T t)263 final boolean completeValue(T t) { 264 return UNSAFE.compareAndSwapObject(this, RESULT, null, 265 (t == null) ? NIL : t); 266 } 267 268 /** 269 * Returns the encoding of the given (non-null) exception as a 270 * wrapped CompletionException unless it is one already. 271 */ encodeThrowable(Throwable x)272 static AltResult encodeThrowable(Throwable x) { 273 return new AltResult((x instanceof CompletionException) ? x : 274 new CompletionException(x)); 275 } 276 277 /** Completes with an exceptional result, unless already completed. */ completeThrowable(Throwable x)278 final boolean completeThrowable(Throwable x) { 279 return UNSAFE.compareAndSwapObject(this, RESULT, null, 280 encodeThrowable(x)); 281 } 282 283 /** 284 * Returns the encoding of the given (non-null) exception as a 285 * wrapped CompletionException unless it is one already. May 286 * return the given Object r (which must have been the result of a 287 * source future) if it is equivalent, i.e. if this is a simple 288 * relay of an existing CompletionException. 289 */ encodeThrowable(Throwable x, Object r)290 static Object encodeThrowable(Throwable x, Object r) { 291 if (!(x instanceof CompletionException)) 292 x = new CompletionException(x); 293 else if (r instanceof AltResult && x == ((AltResult)r).ex) 294 return r; 295 return new AltResult(x); 296 } 297 298 /** 299 * Completes with the given (non-null) exceptional result as a 300 * wrapped CompletionException unless it is one already, unless 301 * already completed. May complete with the given Object r 302 * (which must have been the result of a source future) if it is 303 * equivalent, i.e. if this is a simple propagation of an 304 * existing CompletionException. 305 */ completeThrowable(Throwable x, Object r)306 final boolean completeThrowable(Throwable x, Object r) { 307 return UNSAFE.compareAndSwapObject(this, RESULT, null, 308 encodeThrowable(x, r)); 309 } 310 311 /** 312 * Returns the encoding of the given arguments: if the exception 313 * is non-null, encodes as AltResult. Otherwise uses the given 314 * value, boxed as NIL if null. 315 */ encodeOutcome(T t, Throwable x)316 Object encodeOutcome(T t, Throwable x) { 317 return (x == null) ? (t == null) ? NIL : t : encodeThrowable(x); 318 } 319 320 /** 321 * Returns the encoding of a copied outcome; if exceptional, 322 * rewraps as a CompletionException, else returns argument. 323 */ encodeRelay(Object r)324 static Object encodeRelay(Object r) { 325 Throwable x; 326 return (((r instanceof AltResult) && 327 (x = ((AltResult)r).ex) != null && 328 !(x instanceof CompletionException)) ? 329 new AltResult(new CompletionException(x)) : r); 330 } 331 332 /** 333 * Completes with r or a copy of r, unless already completed. 334 * If exceptional, r is first coerced to a CompletionException. 335 */ completeRelay(Object r)336 final boolean completeRelay(Object r) { 337 return UNSAFE.compareAndSwapObject(this, RESULT, null, 338 encodeRelay(r)); 339 } 340 341 /** 342 * Reports result using Future.get conventions. 343 */ reportGet(Object r)344 private static <T> T reportGet(Object r) 345 throws InterruptedException, ExecutionException { 346 if (r == null) // by convention below, null means interrupted 347 throw new InterruptedException(); 348 if (r instanceof AltResult) { 349 Throwable x, cause; 350 if ((x = ((AltResult)r).ex) == null) 351 return null; 352 if (x instanceof CancellationException) 353 throw (CancellationException)x; 354 if ((x instanceof CompletionException) && 355 (cause = x.getCause()) != null) 356 x = cause; 357 throw new ExecutionException(x); 358 } 359 @SuppressWarnings("unchecked") T t = (T) r; 360 return t; 361 } 362 363 /** 364 * Decodes outcome to return result or throw unchecked exception. 365 */ reportJoin(Object r)366 private static <T> T reportJoin(Object r) { 367 if (r instanceof AltResult) { 368 Throwable x; 369 if ((x = ((AltResult)r).ex) == null) 370 return null; 371 if (x instanceof CancellationException) 372 throw (CancellationException)x; 373 if (x instanceof CompletionException) 374 throw (CompletionException)x; 375 throw new CompletionException(x); 376 } 377 @SuppressWarnings("unchecked") T t = (T) r; 378 return t; 379 } 380 381 /* ------------- Async task preliminaries -------------- */ 382 383 /** 384 * A marker interface identifying asynchronous tasks produced by 385 * {@code async} methods. This may be useful for monitoring, 386 * debugging, and tracking asynchronous activities. 387 * 388 * @since 1.8 389 */ 390 public static interface AsynchronousCompletionTask { 391 } 392 393 private static final boolean useCommonPool = 394 (ForkJoinPool.getCommonPoolParallelism() > 1); 395 396 /** 397 * Default executor -- ForkJoinPool.commonPool() unless it cannot 398 * support parallelism. 399 */ 400 private static final Executor asyncPool = useCommonPool ? 401 ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); 402 403 /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */ 404 static final class ThreadPerTaskExecutor implements Executor { execute(Runnable r)405 public void execute(Runnable r) { new Thread(r).start(); } 406 } 407 408 /** 409 * Null-checks user executor argument, and translates uses of 410 * commonPool to asyncPool in case parallelism disabled. 411 */ screenExecutor(Executor e)412 static Executor screenExecutor(Executor e) { 413 if (!useCommonPool && e == ForkJoinPool.commonPool()) 414 return asyncPool; 415 if (e == null) throw new NullPointerException(); 416 return e; 417 } 418 419 // Modes for Completion.tryFire. Signedness matters. 420 static final int SYNC = 0; 421 static final int ASYNC = 1; 422 static final int NESTED = -1; 423 424 /** 425 * Spins before blocking in waitingGet. 426 * There is no need to spin on uniprocessors. 427 * 428 * Call to Runtime.availableProcessors is expensive, cache the value here. 429 * This unfortunately relies on the number of available CPUs during first 430 * initialization. This affects the case when MP system would report only 431 * one CPU available at startup, initialize SPINS to 0, and then make more 432 * CPUs online. This would incur some performance penalty due to less spins 433 * than would otherwise happen. 434 */ 435 private static final int SPINS = (Runtime.getRuntime().availableProcessors() > 1 ? 436 1 << 8 : 0); 437 438 /* ------------- Base Completion classes and operations -------------- */ 439 440 @SuppressWarnings("serial") 441 abstract static class Completion extends ForkJoinTask<Void> 442 implements Runnable, AsynchronousCompletionTask { 443 volatile Completion next; // Treiber stack link 444 445 /** 446 * Performs completion action if triggered, returning a 447 * dependent that may need propagation, if one exists. 448 * 449 * @param mode SYNC, ASYNC, or NESTED 450 */ tryFire(int mode)451 abstract CompletableFuture<?> tryFire(int mode); 452 453 /** Returns true if possibly still triggerable. Used by cleanStack. */ isLive()454 abstract boolean isLive(); 455 run()456 public final void run() { tryFire(ASYNC); } exec()457 public final boolean exec() { tryFire(ASYNC); return true; } getRawResult()458 public final Void getRawResult() { return null; } setRawResult(Void v)459 public final void setRawResult(Void v) {} 460 } 461 lazySetNext(Completion c, Completion next)462 static void lazySetNext(Completion c, Completion next) { 463 UNSAFE.putOrderedObject(c, NEXT, next); 464 } 465 466 /** 467 * Pops and tries to trigger all reachable dependents. Call only 468 * when known to be done. 469 */ postComplete()470 final void postComplete() { 471 /* 472 * On each step, variable f holds current dependents to pop 473 * and run. It is extended along only one path at a time, 474 * pushing others to avoid unbounded recursion. 475 */ 476 CompletableFuture<?> f = this; Completion h; 477 while ((h = f.stack) != null || 478 (f != this && (h = (f = this).stack) != null)) { 479 CompletableFuture<?> d; Completion t; 480 if (f.casStack(h, t = h.next)) { 481 if (t != null) { 482 if (f != this) { 483 pushStack(h); 484 continue; 485 } 486 h.next = null; // detach 487 } 488 f = (d = h.tryFire(NESTED)) == null ? this : d; 489 } 490 } 491 } 492 493 /** Traverses stack and unlinks dead Completions. */ cleanStack()494 final void cleanStack() { 495 for (Completion p = null, q = stack; q != null;) { 496 Completion s = q.next; 497 if (q.isLive()) { 498 p = q; 499 q = s; 500 } 501 else if (p == null) { 502 casStack(q, s); 503 q = stack; 504 } 505 else { 506 p.next = s; 507 if (p.isLive()) 508 q = s; 509 else { 510 p = null; // restart 511 q = stack; 512 } 513 } 514 } 515 } 516 517 /* ------------- One-input Completions -------------- */ 518 519 /** A Completion with a source, dependent, and executor. */ 520 @SuppressWarnings("serial") 521 abstract static class UniCompletion<T,V> extends Completion { 522 Executor executor; // executor to use (null if none) 523 CompletableFuture<V> dep; // the dependent to complete 524 CompletableFuture<T> src; // source for action 525 UniCompletion(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src)526 UniCompletion(Executor executor, CompletableFuture<V> dep, 527 CompletableFuture<T> src) { 528 this.executor = executor; this.dep = dep; this.src = src; 529 } 530 531 /** 532 * Returns true if action can be run. Call only when known to 533 * be triggerable. Uses FJ tag bit to ensure that only one 534 * thread claims ownership. If async, starts as task -- a 535 * later call to tryFire will run action. 536 */ claim()537 final boolean claim() { 538 Executor e = executor; 539 if (compareAndSetForkJoinTaskTag((short)0, (short)1)) { 540 if (e == null) 541 return true; 542 executor = null; // disable 543 e.execute(this); 544 } 545 return false; 546 } 547 isLive()548 final boolean isLive() { return dep != null; } 549 } 550 551 /** Pushes the given completion (if it exists) unless done. */ push(UniCompletion<?,?> c)552 final void push(UniCompletion<?,?> c) { 553 if (c != null) { 554 while (result == null && !tryPushStack(c)) 555 lazySetNext(c, null); // clear on failure 556 } 557 } 558 559 /** 560 * Post-processing by dependent after successful UniCompletion 561 * tryFire. Tries to clean stack of source a, and then either runs 562 * postComplete or returns this to caller, depending on mode. 563 */ postFire(CompletableFuture<?> a, int mode)564 final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) { 565 if (a != null && a.stack != null) { 566 if (mode < 0 || a.result == null) 567 a.cleanStack(); 568 else 569 a.postComplete(); 570 } 571 if (result != null && stack != null) { 572 if (mode < 0) 573 return this; 574 else 575 postComplete(); 576 } 577 return null; 578 } 579 580 @SuppressWarnings("serial") 581 static final class UniApply<T,V> extends UniCompletion<T,V> { 582 Function<? super T,? extends V> fn; UniApply(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, Function<? super T,? extends V> fn)583 UniApply(Executor executor, CompletableFuture<V> dep, 584 CompletableFuture<T> src, 585 Function<? super T,? extends V> fn) { 586 super(executor, dep, src); this.fn = fn; 587 } tryFire(int mode)588 final CompletableFuture<V> tryFire(int mode) { 589 CompletableFuture<V> d; CompletableFuture<T> a; 590 if ((d = dep) == null || 591 !d.uniApply(a = src, fn, mode > 0 ? null : this)) 592 return null; 593 dep = null; src = null; fn = null; 594 return d.postFire(a, mode); 595 } 596 } 597 uniApply(CompletableFuture<S> a, Function<? super S,? extends T> f, UniApply<S,T> c)598 final <S> boolean uniApply(CompletableFuture<S> a, 599 Function<? super S,? extends T> f, 600 UniApply<S,T> c) { 601 Object r; Throwable x; 602 if (a == null || (r = a.result) == null || f == null) 603 return false; 604 tryComplete: if (result == null) { 605 if (r instanceof AltResult) { 606 if ((x = ((AltResult)r).ex) != null) { 607 completeThrowable(x, r); 608 break tryComplete; 609 } 610 r = null; 611 } 612 try { 613 if (c != null && !c.claim()) 614 return false; 615 @SuppressWarnings("unchecked") S s = (S) r; 616 completeValue(f.apply(s)); 617 } catch (Throwable ex) { 618 completeThrowable(ex); 619 } 620 } 621 return true; 622 } 623 uniApplyStage( Executor e, Function<? super T,? extends V> f)624 private <V> CompletableFuture<V> uniApplyStage( 625 Executor e, Function<? super T,? extends V> f) { 626 if (f == null) throw new NullPointerException(); 627 CompletableFuture<V> d = new CompletableFuture<V>(); 628 if (e != null || !d.uniApply(this, f, null)) { 629 UniApply<T,V> c = new UniApply<T,V>(e, d, this, f); 630 push(c); 631 c.tryFire(SYNC); 632 } 633 return d; 634 } 635 636 @SuppressWarnings("serial") 637 static final class UniAccept<T> extends UniCompletion<T,Void> { 638 Consumer<? super T> fn; UniAccept(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, Consumer<? super T> fn)639 UniAccept(Executor executor, CompletableFuture<Void> dep, 640 CompletableFuture<T> src, Consumer<? super T> fn) { 641 super(executor, dep, src); this.fn = fn; 642 } tryFire(int mode)643 final CompletableFuture<Void> tryFire(int mode) { 644 CompletableFuture<Void> d; CompletableFuture<T> a; 645 if ((d = dep) == null || 646 !d.uniAccept(a = src, fn, mode > 0 ? null : this)) 647 return null; 648 dep = null; src = null; fn = null; 649 return d.postFire(a, mode); 650 } 651 } 652 uniAccept(CompletableFuture<S> a, Consumer<? super S> f, UniAccept<S> c)653 final <S> boolean uniAccept(CompletableFuture<S> a, 654 Consumer<? super S> f, UniAccept<S> c) { 655 Object r; Throwable x; 656 if (a == null || (r = a.result) == null || f == null) 657 return false; 658 tryComplete: if (result == null) { 659 if (r instanceof AltResult) { 660 if ((x = ((AltResult)r).ex) != null) { 661 completeThrowable(x, r); 662 break tryComplete; 663 } 664 r = null; 665 } 666 try { 667 if (c != null && !c.claim()) 668 return false; 669 @SuppressWarnings("unchecked") S s = (S) r; 670 f.accept(s); 671 completeNull(); 672 } catch (Throwable ex) { 673 completeThrowable(ex); 674 } 675 } 676 return true; 677 } 678 uniAcceptStage(Executor e, Consumer<? super T> f)679 private CompletableFuture<Void> uniAcceptStage(Executor e, 680 Consumer<? super T> f) { 681 if (f == null) throw new NullPointerException(); 682 CompletableFuture<Void> d = new CompletableFuture<Void>(); 683 if (e != null || !d.uniAccept(this, f, null)) { 684 UniAccept<T> c = new UniAccept<T>(e, d, this, f); 685 push(c); 686 c.tryFire(SYNC); 687 } 688 return d; 689 } 690 691 @SuppressWarnings("serial") 692 static final class UniRun<T> extends UniCompletion<T,Void> { 693 Runnable fn; UniRun(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, Runnable fn)694 UniRun(Executor executor, CompletableFuture<Void> dep, 695 CompletableFuture<T> src, Runnable fn) { 696 super(executor, dep, src); this.fn = fn; 697 } tryFire(int mode)698 final CompletableFuture<Void> tryFire(int mode) { 699 CompletableFuture<Void> d; CompletableFuture<T> a; 700 if ((d = dep) == null || 701 !d.uniRun(a = src, fn, mode > 0 ? null : this)) 702 return null; 703 dep = null; src = null; fn = null; 704 return d.postFire(a, mode); 705 } 706 } 707 uniRun(CompletableFuture<?> a, Runnable f, UniRun<?> c)708 final boolean uniRun(CompletableFuture<?> a, Runnable f, UniRun<?> c) { 709 Object r; Throwable x; 710 if (a == null || (r = a.result) == null || f == null) 711 return false; 712 if (result == null) { 713 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) 714 completeThrowable(x, r); 715 else 716 try { 717 if (c != null && !c.claim()) 718 return false; 719 f.run(); 720 completeNull(); 721 } catch (Throwable ex) { 722 completeThrowable(ex); 723 } 724 } 725 return true; 726 } 727 uniRunStage(Executor e, Runnable f)728 private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) { 729 if (f == null) throw new NullPointerException(); 730 CompletableFuture<Void> d = new CompletableFuture<Void>(); 731 if (e != null || !d.uniRun(this, f, null)) { 732 UniRun<T> c = new UniRun<T>(e, d, this, f); 733 push(c); 734 c.tryFire(SYNC); 735 } 736 return d; 737 } 738 739 @SuppressWarnings("serial") 740 static final class UniWhenComplete<T> extends UniCompletion<T,T> { 741 BiConsumer<? super T, ? super Throwable> fn; UniWhenComplete(Executor executor, CompletableFuture<T> dep, CompletableFuture<T> src, BiConsumer<? super T, ? super Throwable> fn)742 UniWhenComplete(Executor executor, CompletableFuture<T> dep, 743 CompletableFuture<T> src, 744 BiConsumer<? super T, ? super Throwable> fn) { 745 super(executor, dep, src); this.fn = fn; 746 } tryFire(int mode)747 final CompletableFuture<T> tryFire(int mode) { 748 CompletableFuture<T> d; CompletableFuture<T> a; 749 if ((d = dep) == null || 750 !d.uniWhenComplete(a = src, fn, mode > 0 ? null : this)) 751 return null; 752 dep = null; src = null; fn = null; 753 return d.postFire(a, mode); 754 } 755 } 756 uniWhenComplete(CompletableFuture<T> a, BiConsumer<? super T,? super Throwable> f, UniWhenComplete<T> c)757 final boolean uniWhenComplete(CompletableFuture<T> a, 758 BiConsumer<? super T,? super Throwable> f, 759 UniWhenComplete<T> c) { 760 Object r; T t; Throwable x = null; 761 if (a == null || (r = a.result) == null || f == null) 762 return false; 763 if (result == null) { 764 try { 765 if (c != null && !c.claim()) 766 return false; 767 if (r instanceof AltResult) { 768 x = ((AltResult)r).ex; 769 t = null; 770 } else { 771 @SuppressWarnings("unchecked") T tr = (T) r; 772 t = tr; 773 } 774 f.accept(t, x); 775 if (x == null) { 776 internalComplete(r); 777 return true; 778 } 779 } catch (Throwable ex) { 780 if (x == null) 781 x = ex; 782 } 783 completeThrowable(x, r); 784 } 785 return true; 786 } 787 uniWhenCompleteStage( Executor e, BiConsumer<? super T, ? super Throwable> f)788 private CompletableFuture<T> uniWhenCompleteStage( 789 Executor e, BiConsumer<? super T, ? super Throwable> f) { 790 if (f == null) throw new NullPointerException(); 791 CompletableFuture<T> d = new CompletableFuture<T>(); 792 if (e != null || !d.uniWhenComplete(this, f, null)) { 793 UniWhenComplete<T> c = new UniWhenComplete<T>(e, d, this, f); 794 push(c); 795 c.tryFire(SYNC); 796 } 797 return d; 798 } 799 800 @SuppressWarnings("serial") 801 static final class UniHandle<T,V> extends UniCompletion<T,V> { 802 BiFunction<? super T, Throwable, ? extends V> fn; UniHandle(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, BiFunction<? super T, Throwable, ? extends V> fn)803 UniHandle(Executor executor, CompletableFuture<V> dep, 804 CompletableFuture<T> src, 805 BiFunction<? super T, Throwable, ? extends V> fn) { 806 super(executor, dep, src); this.fn = fn; 807 } tryFire(int mode)808 final CompletableFuture<V> tryFire(int mode) { 809 CompletableFuture<V> d; CompletableFuture<T> a; 810 if ((d = dep) == null || 811 !d.uniHandle(a = src, fn, mode > 0 ? null : this)) 812 return null; 813 dep = null; src = null; fn = null; 814 return d.postFire(a, mode); 815 } 816 } 817 uniHandle(CompletableFuture<S> a, BiFunction<? super S, Throwable, ? extends T> f, UniHandle<S,T> c)818 final <S> boolean uniHandle(CompletableFuture<S> a, 819 BiFunction<? super S, Throwable, ? extends T> f, 820 UniHandle<S,T> c) { 821 Object r; S s; Throwable x; 822 if (a == null || (r = a.result) == null || f == null) 823 return false; 824 if (result == null) { 825 try { 826 if (c != null && !c.claim()) 827 return false; 828 if (r instanceof AltResult) { 829 x = ((AltResult)r).ex; 830 s = null; 831 } else { 832 x = null; 833 @SuppressWarnings("unchecked") S ss = (S) r; 834 s = ss; 835 } 836 completeValue(f.apply(s, x)); 837 } catch (Throwable ex) { 838 completeThrowable(ex); 839 } 840 } 841 return true; 842 } 843 uniHandleStage( Executor e, BiFunction<? super T, Throwable, ? extends V> f)844 private <V> CompletableFuture<V> uniHandleStage( 845 Executor e, BiFunction<? super T, Throwable, ? extends V> f) { 846 if (f == null) throw new NullPointerException(); 847 CompletableFuture<V> d = new CompletableFuture<V>(); 848 if (e != null || !d.uniHandle(this, f, null)) { 849 UniHandle<T,V> c = new UniHandle<T,V>(e, d, this, f); 850 push(c); 851 c.tryFire(SYNC); 852 } 853 return d; 854 } 855 856 @SuppressWarnings("serial") 857 static final class UniExceptionally<T> extends UniCompletion<T,T> { 858 Function<? super Throwable, ? extends T> fn; UniExceptionally(CompletableFuture<T> dep, CompletableFuture<T> src, Function<? super Throwable, ? extends T> fn)859 UniExceptionally(CompletableFuture<T> dep, CompletableFuture<T> src, 860 Function<? super Throwable, ? extends T> fn) { 861 super(null, dep, src); this.fn = fn; 862 } tryFire(int mode)863 final CompletableFuture<T> tryFire(int mode) { // never ASYNC 864 // assert mode != ASYNC; 865 CompletableFuture<T> d; CompletableFuture<T> a; 866 if ((d = dep) == null || !d.uniExceptionally(a = src, fn, this)) 867 return null; 868 dep = null; src = null; fn = null; 869 return d.postFire(a, mode); 870 } 871 } 872 uniExceptionally(CompletableFuture<T> a, Function<? super Throwable, ? extends T> f, UniExceptionally<T> c)873 final boolean uniExceptionally(CompletableFuture<T> a, 874 Function<? super Throwable, ? extends T> f, 875 UniExceptionally<T> c) { 876 Object r; Throwable x; 877 if (a == null || (r = a.result) == null || f == null) 878 return false; 879 if (result == null) { 880 try { 881 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) { 882 if (c != null && !c.claim()) 883 return false; 884 completeValue(f.apply(x)); 885 } else 886 internalComplete(r); 887 } catch (Throwable ex) { 888 completeThrowable(ex); 889 } 890 } 891 return true; 892 } 893 uniExceptionallyStage( Function<Throwable, ? extends T> f)894 private CompletableFuture<T> uniExceptionallyStage( 895 Function<Throwable, ? extends T> f) { 896 if (f == null) throw new NullPointerException(); 897 CompletableFuture<T> d = new CompletableFuture<T>(); 898 if (!d.uniExceptionally(this, f, null)) { 899 UniExceptionally<T> c = new UniExceptionally<T>(d, this, f); 900 push(c); 901 c.tryFire(SYNC); 902 } 903 return d; 904 } 905 906 @SuppressWarnings("serial") 907 static final class UniRelay<T> extends UniCompletion<T,T> { // for Compose UniRelay(CompletableFuture<T> dep, CompletableFuture<T> src)908 UniRelay(CompletableFuture<T> dep, CompletableFuture<T> src) { 909 super(null, dep, src); 910 } tryFire(int mode)911 final CompletableFuture<T> tryFire(int mode) { 912 CompletableFuture<T> d; CompletableFuture<T> a; 913 if ((d = dep) == null || !d.uniRelay(a = src)) 914 return null; 915 src = null; dep = null; 916 return d.postFire(a, mode); 917 } 918 } 919 uniRelay(CompletableFuture<T> a)920 final boolean uniRelay(CompletableFuture<T> a) { 921 Object r; 922 if (a == null || (r = a.result) == null) 923 return false; 924 if (result == null) // no need to claim 925 completeRelay(r); 926 return true; 927 } 928 929 @SuppressWarnings("serial") 930 static final class UniCompose<T,V> extends UniCompletion<T,V> { 931 Function<? super T, ? extends CompletionStage<V>> fn; UniCompose(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, Function<? super T, ? extends CompletionStage<V>> fn)932 UniCompose(Executor executor, CompletableFuture<V> dep, 933 CompletableFuture<T> src, 934 Function<? super T, ? extends CompletionStage<V>> fn) { 935 super(executor, dep, src); this.fn = fn; 936 } tryFire(int mode)937 final CompletableFuture<V> tryFire(int mode) { 938 CompletableFuture<V> d; CompletableFuture<T> a; 939 if ((d = dep) == null || 940 !d.uniCompose(a = src, fn, mode > 0 ? null : this)) 941 return null; 942 dep = null; src = null; fn = null; 943 return d.postFire(a, mode); 944 } 945 } 946 uniCompose( CompletableFuture<S> a, Function<? super S, ? extends CompletionStage<T>> f, UniCompose<S,T> c)947 final <S> boolean uniCompose( 948 CompletableFuture<S> a, 949 Function<? super S, ? extends CompletionStage<T>> f, 950 UniCompose<S,T> c) { 951 Object r; Throwable x; 952 if (a == null || (r = a.result) == null || f == null) 953 return false; 954 tryComplete: if (result == null) { 955 if (r instanceof AltResult) { 956 if ((x = ((AltResult)r).ex) != null) { 957 completeThrowable(x, r); 958 break tryComplete; 959 } 960 r = null; 961 } 962 try { 963 if (c != null && !c.claim()) 964 return false; 965 @SuppressWarnings("unchecked") S s = (S) r; 966 CompletableFuture<T> g = f.apply(s).toCompletableFuture(); 967 if (g.result == null || !uniRelay(g)) { 968 UniRelay<T> copy = new UniRelay<T>(this, g); 969 g.push(copy); 970 copy.tryFire(SYNC); 971 if (result == null) 972 return false; 973 } 974 } catch (Throwable ex) { 975 completeThrowable(ex); 976 } 977 } 978 return true; 979 } 980 uniComposeStage( Executor e, Function<? super T, ? extends CompletionStage<V>> f)981 private <V> CompletableFuture<V> uniComposeStage( 982 Executor e, Function<? super T, ? extends CompletionStage<V>> f) { 983 if (f == null) throw new NullPointerException(); 984 Object r; Throwable x; 985 if (e == null && (r = result) != null) { 986 // try to return function result directly 987 if (r instanceof AltResult) { 988 if ((x = ((AltResult)r).ex) != null) { 989 return new CompletableFuture<V>(encodeThrowable(x, r)); 990 } 991 r = null; 992 } 993 try { 994 @SuppressWarnings("unchecked") T t = (T) r; 995 CompletableFuture<V> g = f.apply(t).toCompletableFuture(); 996 Object s = g.result; 997 if (s != null) 998 return new CompletableFuture<V>(encodeRelay(s)); 999 CompletableFuture<V> d = new CompletableFuture<V>(); 1000 UniRelay<V> copy = new UniRelay<V>(d, g); 1001 g.push(copy); 1002 copy.tryFire(SYNC); 1003 return d; 1004 } catch (Throwable ex) { 1005 return new CompletableFuture<V>(encodeThrowable(ex)); 1006 } 1007 } 1008 CompletableFuture<V> d = new CompletableFuture<V>(); 1009 UniCompose<T,V> c = new UniCompose<T,V>(e, d, this, f); 1010 push(c); 1011 c.tryFire(SYNC); 1012 return d; 1013 } 1014 1015 /* ------------- Two-input Completions -------------- */ 1016 1017 /** A Completion for an action with two sources */ 1018 @SuppressWarnings("serial") 1019 abstract static class BiCompletion<T,U,V> extends UniCompletion<T,V> { 1020 CompletableFuture<U> snd; // second source for action BiCompletion(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, CompletableFuture<U> snd)1021 BiCompletion(Executor executor, CompletableFuture<V> dep, 1022 CompletableFuture<T> src, CompletableFuture<U> snd) { 1023 super(executor, dep, src); this.snd = snd; 1024 } 1025 } 1026 1027 /** A Completion delegating to a BiCompletion */ 1028 @SuppressWarnings("serial") 1029 static final class CoCompletion extends Completion { 1030 BiCompletion<?,?,?> base; CoCompletion(BiCompletion<?,?,?> base)1031 CoCompletion(BiCompletion<?,?,?> base) { this.base = base; } tryFire(int mode)1032 final CompletableFuture<?> tryFire(int mode) { 1033 BiCompletion<?,?,?> c; CompletableFuture<?> d; 1034 if ((c = base) == null || (d = c.tryFire(mode)) == null) 1035 return null; 1036 base = null; // detach 1037 return d; 1038 } isLive()1039 final boolean isLive() { 1040 BiCompletion<?,?,?> c; 1041 return (c = base) != null && c.dep != null; 1042 } 1043 } 1044 1045 /** Pushes completion to this and b unless both done. */ bipush(CompletableFuture<?> b, BiCompletion<?,?,?> c)1046 final void bipush(CompletableFuture<?> b, BiCompletion<?,?,?> c) { 1047 if (c != null) { 1048 Object r; 1049 while ((r = result) == null && !tryPushStack(c)) 1050 lazySetNext(c, null); // clear on failure 1051 if (b != null && b != this && b.result == null) { 1052 Completion q = (r != null) ? c : new CoCompletion(c); 1053 while (b.result == null && !b.tryPushStack(q)) 1054 lazySetNext(q, null); // clear on failure 1055 } 1056 } 1057 } 1058 1059 /** Post-processing after successful BiCompletion tryFire. */ postFire(CompletableFuture<?> a, CompletableFuture<?> b, int mode)1060 final CompletableFuture<T> postFire(CompletableFuture<?> a, 1061 CompletableFuture<?> b, int mode) { 1062 if (b != null && b.stack != null) { // clean second source 1063 if (mode < 0 || b.result == null) 1064 b.cleanStack(); 1065 else 1066 b.postComplete(); 1067 } 1068 return postFire(a, mode); 1069 } 1070 1071 @SuppressWarnings("serial") 1072 static final class BiApply<T,U,V> extends BiCompletion<T,U,V> { 1073 BiFunction<? super T,? super U,? extends V> fn; BiApply(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, CompletableFuture<U> snd, BiFunction<? super T,? super U,? extends V> fn)1074 BiApply(Executor executor, CompletableFuture<V> dep, 1075 CompletableFuture<T> src, CompletableFuture<U> snd, 1076 BiFunction<? super T,? super U,? extends V> fn) { 1077 super(executor, dep, src, snd); this.fn = fn; 1078 } tryFire(int mode)1079 final CompletableFuture<V> tryFire(int mode) { 1080 CompletableFuture<V> d; 1081 CompletableFuture<T> a; 1082 CompletableFuture<U> b; 1083 if ((d = dep) == null || 1084 !d.biApply(a = src, b = snd, fn, mode > 0 ? null : this)) 1085 return null; 1086 dep = null; src = null; snd = null; fn = null; 1087 return d.postFire(a, b, mode); 1088 } 1089 } 1090 biApply(CompletableFuture<R> a, CompletableFuture<S> b, BiFunction<? super R,? super S,? extends T> f, BiApply<R,S,T> c)1091 final <R,S> boolean biApply(CompletableFuture<R> a, 1092 CompletableFuture<S> b, 1093 BiFunction<? super R,? super S,? extends T> f, 1094 BiApply<R,S,T> c) { 1095 Object r, s; Throwable x; 1096 if (a == null || (r = a.result) == null || 1097 b == null || (s = b.result) == null || f == null) 1098 return false; 1099 tryComplete: if (result == null) { 1100 if (r instanceof AltResult) { 1101 if ((x = ((AltResult)r).ex) != null) { 1102 completeThrowable(x, r); 1103 break tryComplete; 1104 } 1105 r = null; 1106 } 1107 if (s instanceof AltResult) { 1108 if ((x = ((AltResult)s).ex) != null) { 1109 completeThrowable(x, s); 1110 break tryComplete; 1111 } 1112 s = null; 1113 } 1114 try { 1115 if (c != null && !c.claim()) 1116 return false; 1117 @SuppressWarnings("unchecked") R rr = (R) r; 1118 @SuppressWarnings("unchecked") S ss = (S) s; 1119 completeValue(f.apply(rr, ss)); 1120 } catch (Throwable ex) { 1121 completeThrowable(ex); 1122 } 1123 } 1124 return true; 1125 } 1126 biApplyStage( Executor e, CompletionStage<U> o, BiFunction<? super T,? super U,? extends V> f)1127 private <U,V> CompletableFuture<V> biApplyStage( 1128 Executor e, CompletionStage<U> o, 1129 BiFunction<? super T,? super U,? extends V> f) { 1130 CompletableFuture<U> b; 1131 if (f == null || (b = o.toCompletableFuture()) == null) 1132 throw new NullPointerException(); 1133 CompletableFuture<V> d = new CompletableFuture<V>(); 1134 if (e != null || !d.biApply(this, b, f, null)) { 1135 BiApply<T,U,V> c = new BiApply<T,U,V>(e, d, this, b, f); 1136 bipush(b, c); 1137 c.tryFire(SYNC); 1138 } 1139 return d; 1140 } 1141 1142 @SuppressWarnings("serial") 1143 static final class BiAccept<T,U> extends BiCompletion<T,U,Void> { 1144 BiConsumer<? super T,? super U> fn; BiAccept(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd, BiConsumer<? super T,? super U> fn)1145 BiAccept(Executor executor, CompletableFuture<Void> dep, 1146 CompletableFuture<T> src, CompletableFuture<U> snd, 1147 BiConsumer<? super T,? super U> fn) { 1148 super(executor, dep, src, snd); this.fn = fn; 1149 } tryFire(int mode)1150 final CompletableFuture<Void> tryFire(int mode) { 1151 CompletableFuture<Void> d; 1152 CompletableFuture<T> a; 1153 CompletableFuture<U> b; 1154 if ((d = dep) == null || 1155 !d.biAccept(a = src, b = snd, fn, mode > 0 ? null : this)) 1156 return null; 1157 dep = null; src = null; snd = null; fn = null; 1158 return d.postFire(a, b, mode); 1159 } 1160 } 1161 biAccept(CompletableFuture<R> a, CompletableFuture<S> b, BiConsumer<? super R,? super S> f, BiAccept<R,S> c)1162 final <R,S> boolean biAccept(CompletableFuture<R> a, 1163 CompletableFuture<S> b, 1164 BiConsumer<? super R,? super S> f, 1165 BiAccept<R,S> c) { 1166 Object r, s; Throwable x; 1167 if (a == null || (r = a.result) == null || 1168 b == null || (s = b.result) == null || f == null) 1169 return false; 1170 tryComplete: if (result == null) { 1171 if (r instanceof AltResult) { 1172 if ((x = ((AltResult)r).ex) != null) { 1173 completeThrowable(x, r); 1174 break tryComplete; 1175 } 1176 r = null; 1177 } 1178 if (s instanceof AltResult) { 1179 if ((x = ((AltResult)s).ex) != null) { 1180 completeThrowable(x, s); 1181 break tryComplete; 1182 } 1183 s = null; 1184 } 1185 try { 1186 if (c != null && !c.claim()) 1187 return false; 1188 @SuppressWarnings("unchecked") R rr = (R) r; 1189 @SuppressWarnings("unchecked") S ss = (S) s; 1190 f.accept(rr, ss); 1191 completeNull(); 1192 } catch (Throwable ex) { 1193 completeThrowable(ex); 1194 } 1195 } 1196 return true; 1197 } 1198 biAcceptStage( Executor e, CompletionStage<U> o, BiConsumer<? super T,? super U> f)1199 private <U> CompletableFuture<Void> biAcceptStage( 1200 Executor e, CompletionStage<U> o, 1201 BiConsumer<? super T,? super U> f) { 1202 CompletableFuture<U> b; 1203 if (f == null || (b = o.toCompletableFuture()) == null) 1204 throw new NullPointerException(); 1205 CompletableFuture<Void> d = new CompletableFuture<Void>(); 1206 if (e != null || !d.biAccept(this, b, f, null)) { 1207 BiAccept<T,U> c = new BiAccept<T,U>(e, d, this, b, f); 1208 bipush(b, c); 1209 c.tryFire(SYNC); 1210 } 1211 return d; 1212 } 1213 1214 @SuppressWarnings("serial") 1215 static final class BiRun<T,U> extends BiCompletion<T,U,Void> { 1216 Runnable fn; BiRun(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd, Runnable fn)1217 BiRun(Executor executor, CompletableFuture<Void> dep, 1218 CompletableFuture<T> src, 1219 CompletableFuture<U> snd, 1220 Runnable fn) { 1221 super(executor, dep, src, snd); this.fn = fn; 1222 } tryFire(int mode)1223 final CompletableFuture<Void> tryFire(int mode) { 1224 CompletableFuture<Void> d; 1225 CompletableFuture<T> a; 1226 CompletableFuture<U> b; 1227 if ((d = dep) == null || 1228 !d.biRun(a = src, b = snd, fn, mode > 0 ? null : this)) 1229 return null; 1230 dep = null; src = null; snd = null; fn = null; 1231 return d.postFire(a, b, mode); 1232 } 1233 } 1234 biRun(CompletableFuture<?> a, CompletableFuture<?> b, Runnable f, BiRun<?,?> c)1235 final boolean biRun(CompletableFuture<?> a, CompletableFuture<?> b, 1236 Runnable f, BiRun<?,?> c) { 1237 Object r, s; Throwable x; 1238 if (a == null || (r = a.result) == null || 1239 b == null || (s = b.result) == null || f == null) 1240 return false; 1241 if (result == null) { 1242 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) 1243 completeThrowable(x, r); 1244 else if (s instanceof AltResult && (x = ((AltResult)s).ex) != null) 1245 completeThrowable(x, s); 1246 else 1247 try { 1248 if (c != null && !c.claim()) 1249 return false; 1250 f.run(); 1251 completeNull(); 1252 } catch (Throwable ex) { 1253 completeThrowable(ex); 1254 } 1255 } 1256 return true; 1257 } 1258 biRunStage(Executor e, CompletionStage<?> o, Runnable f)1259 private CompletableFuture<Void> biRunStage(Executor e, CompletionStage<?> o, 1260 Runnable f) { 1261 CompletableFuture<?> b; 1262 if (f == null || (b = o.toCompletableFuture()) == null) 1263 throw new NullPointerException(); 1264 CompletableFuture<Void> d = new CompletableFuture<Void>(); 1265 if (e != null || !d.biRun(this, b, f, null)) { 1266 BiRun<T,?> c = new BiRun<>(e, d, this, b, f); 1267 bipush(b, c); 1268 c.tryFire(SYNC); 1269 } 1270 return d; 1271 } 1272 1273 @SuppressWarnings("serial") 1274 static final class BiRelay<T,U> extends BiCompletion<T,U,Void> { // for And BiRelay(CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd)1275 BiRelay(CompletableFuture<Void> dep, 1276 CompletableFuture<T> src, 1277 CompletableFuture<U> snd) { 1278 super(null, dep, src, snd); 1279 } tryFire(int mode)1280 final CompletableFuture<Void> tryFire(int mode) { 1281 CompletableFuture<Void> d; 1282 CompletableFuture<T> a; 1283 CompletableFuture<U> b; 1284 if ((d = dep) == null || !d.biRelay(a = src, b = snd)) 1285 return null; 1286 src = null; snd = null; dep = null; 1287 return d.postFire(a, b, mode); 1288 } 1289 } 1290 biRelay(CompletableFuture<?> a, CompletableFuture<?> b)1291 boolean biRelay(CompletableFuture<?> a, CompletableFuture<?> b) { 1292 Object r, s; Throwable x; 1293 if (a == null || (r = a.result) == null || 1294 b == null || (s = b.result) == null) 1295 return false; 1296 if (result == null) { 1297 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) 1298 completeThrowable(x, r); 1299 else if (s instanceof AltResult && (x = ((AltResult)s).ex) != null) 1300 completeThrowable(x, s); 1301 else 1302 completeNull(); 1303 } 1304 return true; 1305 } 1306 1307 /** Recursively constructs a tree of completions. */ andTree(CompletableFuture<?>[] cfs, int lo, int hi)1308 static CompletableFuture<Void> andTree(CompletableFuture<?>[] cfs, 1309 int lo, int hi) { 1310 CompletableFuture<Void> d = new CompletableFuture<Void>(); 1311 if (lo > hi) // empty 1312 d.result = NIL; 1313 else { 1314 CompletableFuture<?> a, b; 1315 int mid = (lo + hi) >>> 1; 1316 if ((a = (lo == mid ? cfs[lo] : 1317 andTree(cfs, lo, mid))) == null || 1318 (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] : 1319 andTree(cfs, mid+1, hi))) == null) 1320 throw new NullPointerException(); 1321 if (!d.biRelay(a, b)) { 1322 BiRelay<?,?> c = new BiRelay<>(d, a, b); 1323 a.bipush(b, c); 1324 c.tryFire(SYNC); 1325 } 1326 } 1327 return d; 1328 } 1329 1330 /* ------------- Projected (Ored) BiCompletions -------------- */ 1331 1332 /** Pushes completion to this and b unless either done. */ orpush(CompletableFuture<?> b, BiCompletion<?,?,?> c)1333 final void orpush(CompletableFuture<?> b, BiCompletion<?,?,?> c) { 1334 if (c != null) { 1335 while ((b == null || b.result == null) && result == null) { 1336 if (tryPushStack(c)) { 1337 if (b != null && b != this && b.result == null) { 1338 Completion q = new CoCompletion(c); 1339 while (result == null && b.result == null && 1340 !b.tryPushStack(q)) 1341 lazySetNext(q, null); // clear on failure 1342 } 1343 break; 1344 } 1345 lazySetNext(c, null); // clear on failure 1346 } 1347 } 1348 } 1349 1350 @SuppressWarnings("serial") 1351 static final class OrApply<T,U extends T,V> extends BiCompletion<T,U,V> { 1352 Function<? super T,? extends V> fn; OrApply(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, CompletableFuture<U> snd, Function<? super T,? extends V> fn)1353 OrApply(Executor executor, CompletableFuture<V> dep, 1354 CompletableFuture<T> src, 1355 CompletableFuture<U> snd, 1356 Function<? super T,? extends V> fn) { 1357 super(executor, dep, src, snd); this.fn = fn; 1358 } tryFire(int mode)1359 final CompletableFuture<V> tryFire(int mode) { 1360 CompletableFuture<V> d; 1361 CompletableFuture<T> a; 1362 CompletableFuture<U> b; 1363 if ((d = dep) == null || 1364 !d.orApply(a = src, b = snd, fn, mode > 0 ? null : this)) 1365 return null; 1366 dep = null; src = null; snd = null; fn = null; 1367 return d.postFire(a, b, mode); 1368 } 1369 } 1370 orApply(CompletableFuture<R> a, CompletableFuture<S> b, Function<? super R, ? extends T> f, OrApply<R,S,T> c)1371 final <R,S extends R> boolean orApply(CompletableFuture<R> a, 1372 CompletableFuture<S> b, 1373 Function<? super R, ? extends T> f, 1374 OrApply<R,S,T> c) { 1375 Object r; Throwable x; 1376 if (a == null || b == null || 1377 ((r = a.result) == null && (r = b.result) == null) || f == null) 1378 return false; 1379 tryComplete: if (result == null) { 1380 try { 1381 if (c != null && !c.claim()) 1382 return false; 1383 if (r instanceof AltResult) { 1384 if ((x = ((AltResult)r).ex) != null) { 1385 completeThrowable(x, r); 1386 break tryComplete; 1387 } 1388 r = null; 1389 } 1390 @SuppressWarnings("unchecked") R rr = (R) r; 1391 completeValue(f.apply(rr)); 1392 } catch (Throwable ex) { 1393 completeThrowable(ex); 1394 } 1395 } 1396 return true; 1397 } 1398 orApplyStage( Executor e, CompletionStage<U> o, Function<? super T, ? extends V> f)1399 private <U extends T,V> CompletableFuture<V> orApplyStage( 1400 Executor e, CompletionStage<U> o, 1401 Function<? super T, ? extends V> f) { 1402 CompletableFuture<U> b; 1403 if (f == null || (b = o.toCompletableFuture()) == null) 1404 throw new NullPointerException(); 1405 CompletableFuture<V> d = new CompletableFuture<V>(); 1406 if (e != null || !d.orApply(this, b, f, null)) { 1407 OrApply<T,U,V> c = new OrApply<T,U,V>(e, d, this, b, f); 1408 orpush(b, c); 1409 c.tryFire(SYNC); 1410 } 1411 return d; 1412 } 1413 1414 @SuppressWarnings("serial") 1415 static final class OrAccept<T,U extends T> extends BiCompletion<T,U,Void> { 1416 Consumer<? super T> fn; OrAccept(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd, Consumer<? super T> fn)1417 OrAccept(Executor executor, CompletableFuture<Void> dep, 1418 CompletableFuture<T> src, 1419 CompletableFuture<U> snd, 1420 Consumer<? super T> fn) { 1421 super(executor, dep, src, snd); this.fn = fn; 1422 } tryFire(int mode)1423 final CompletableFuture<Void> tryFire(int mode) { 1424 CompletableFuture<Void> d; 1425 CompletableFuture<T> a; 1426 CompletableFuture<U> b; 1427 if ((d = dep) == null || 1428 !d.orAccept(a = src, b = snd, fn, mode > 0 ? null : this)) 1429 return null; 1430 dep = null; src = null; snd = null; fn = null; 1431 return d.postFire(a, b, mode); 1432 } 1433 } 1434 orAccept(CompletableFuture<R> a, CompletableFuture<S> b, Consumer<? super R> f, OrAccept<R,S> c)1435 final <R,S extends R> boolean orAccept(CompletableFuture<R> a, 1436 CompletableFuture<S> b, 1437 Consumer<? super R> f, 1438 OrAccept<R,S> c) { 1439 Object r; Throwable x; 1440 if (a == null || b == null || 1441 ((r = a.result) == null && (r = b.result) == null) || f == null) 1442 return false; 1443 tryComplete: if (result == null) { 1444 try { 1445 if (c != null && !c.claim()) 1446 return false; 1447 if (r instanceof AltResult) { 1448 if ((x = ((AltResult)r).ex) != null) { 1449 completeThrowable(x, r); 1450 break tryComplete; 1451 } 1452 r = null; 1453 } 1454 @SuppressWarnings("unchecked") R rr = (R) r; 1455 f.accept(rr); 1456 completeNull(); 1457 } catch (Throwable ex) { 1458 completeThrowable(ex); 1459 } 1460 } 1461 return true; 1462 } 1463 orAcceptStage( Executor e, CompletionStage<U> o, Consumer<? super T> f)1464 private <U extends T> CompletableFuture<Void> orAcceptStage( 1465 Executor e, CompletionStage<U> o, Consumer<? super T> f) { 1466 CompletableFuture<U> b; 1467 if (f == null || (b = o.toCompletableFuture()) == null) 1468 throw new NullPointerException(); 1469 CompletableFuture<Void> d = new CompletableFuture<Void>(); 1470 if (e != null || !d.orAccept(this, b, f, null)) { 1471 OrAccept<T,U> c = new OrAccept<T,U>(e, d, this, b, f); 1472 orpush(b, c); 1473 c.tryFire(SYNC); 1474 } 1475 return d; 1476 } 1477 1478 @SuppressWarnings("serial") 1479 static final class OrRun<T,U> extends BiCompletion<T,U,Void> { 1480 Runnable fn; OrRun(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd, Runnable fn)1481 OrRun(Executor executor, CompletableFuture<Void> dep, 1482 CompletableFuture<T> src, 1483 CompletableFuture<U> snd, 1484 Runnable fn) { 1485 super(executor, dep, src, snd); this.fn = fn; 1486 } tryFire(int mode)1487 final CompletableFuture<Void> tryFire(int mode) { 1488 CompletableFuture<Void> d; 1489 CompletableFuture<T> a; 1490 CompletableFuture<U> b; 1491 if ((d = dep) == null || 1492 !d.orRun(a = src, b = snd, fn, mode > 0 ? null : this)) 1493 return null; 1494 dep = null; src = null; snd = null; fn = null; 1495 return d.postFire(a, b, mode); 1496 } 1497 } 1498 orRun(CompletableFuture<?> a, CompletableFuture<?> b, Runnable f, OrRun<?,?> c)1499 final boolean orRun(CompletableFuture<?> a, CompletableFuture<?> b, 1500 Runnable f, OrRun<?,?> c) { 1501 Object r; Throwable x; 1502 if (a == null || b == null || 1503 ((r = a.result) == null && (r = b.result) == null) || f == null) 1504 return false; 1505 if (result == null) { 1506 try { 1507 if (c != null && !c.claim()) 1508 return false; 1509 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) 1510 completeThrowable(x, r); 1511 else { 1512 f.run(); 1513 completeNull(); 1514 } 1515 } catch (Throwable ex) { 1516 completeThrowable(ex); 1517 } 1518 } 1519 return true; 1520 } 1521 orRunStage(Executor e, CompletionStage<?> o, Runnable f)1522 private CompletableFuture<Void> orRunStage(Executor e, CompletionStage<?> o, 1523 Runnable f) { 1524 CompletableFuture<?> b; 1525 if (f == null || (b = o.toCompletableFuture()) == null) 1526 throw new NullPointerException(); 1527 CompletableFuture<Void> d = new CompletableFuture<Void>(); 1528 if (e != null || !d.orRun(this, b, f, null)) { 1529 OrRun<T,?> c = new OrRun<>(e, d, this, b, f); 1530 orpush(b, c); 1531 c.tryFire(SYNC); 1532 } 1533 return d; 1534 } 1535 1536 @SuppressWarnings("serial") 1537 static final class OrRelay<T,U> extends BiCompletion<T,U,Object> { // for Or OrRelay(CompletableFuture<Object> dep, CompletableFuture<T> src, CompletableFuture<U> snd)1538 OrRelay(CompletableFuture<Object> dep, CompletableFuture<T> src, 1539 CompletableFuture<U> snd) { 1540 super(null, dep, src, snd); 1541 } tryFire(int mode)1542 final CompletableFuture<Object> tryFire(int mode) { 1543 CompletableFuture<Object> d; 1544 CompletableFuture<T> a; 1545 CompletableFuture<U> b; 1546 if ((d = dep) == null || !d.orRelay(a = src, b = snd)) 1547 return null; 1548 src = null; snd = null; dep = null; 1549 return d.postFire(a, b, mode); 1550 } 1551 } 1552 orRelay(CompletableFuture<?> a, CompletableFuture<?> b)1553 final boolean orRelay(CompletableFuture<?> a, CompletableFuture<?> b) { 1554 Object r; 1555 if (a == null || b == null || 1556 ((r = a.result) == null && (r = b.result) == null)) 1557 return false; 1558 if (result == null) 1559 completeRelay(r); 1560 return true; 1561 } 1562 1563 /** Recursively constructs a tree of completions. */ orTree(CompletableFuture<?>[] cfs, int lo, int hi)1564 static CompletableFuture<Object> orTree(CompletableFuture<?>[] cfs, 1565 int lo, int hi) { 1566 CompletableFuture<Object> d = new CompletableFuture<Object>(); 1567 if (lo <= hi) { 1568 CompletableFuture<?> a, b; 1569 int mid = (lo + hi) >>> 1; 1570 if ((a = (lo == mid ? cfs[lo] : 1571 orTree(cfs, lo, mid))) == null || 1572 (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] : 1573 orTree(cfs, mid+1, hi))) == null) 1574 throw new NullPointerException(); 1575 if (!d.orRelay(a, b)) { 1576 OrRelay<?,?> c = new OrRelay<>(d, a, b); 1577 a.orpush(b, c); 1578 c.tryFire(SYNC); 1579 } 1580 } 1581 return d; 1582 } 1583 1584 /* ------------- Zero-input Async forms -------------- */ 1585 1586 @SuppressWarnings("serial") 1587 static final class AsyncSupply<T> extends ForkJoinTask<Void> 1588 implements Runnable, AsynchronousCompletionTask { 1589 CompletableFuture<T> dep; Supplier<T> fn; AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn)1590 AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) { 1591 this.dep = dep; this.fn = fn; 1592 } 1593 getRawResult()1594 public final Void getRawResult() { return null; } setRawResult(Void v)1595 public final void setRawResult(Void v) {} exec()1596 public final boolean exec() { run(); return true; } 1597 run()1598 public void run() { 1599 CompletableFuture<T> d; Supplier<T> f; 1600 if ((d = dep) != null && (f = fn) != null) { 1601 dep = null; fn = null; 1602 if (d.result == null) { 1603 try { 1604 d.completeValue(f.get()); 1605 } catch (Throwable ex) { 1606 d.completeThrowable(ex); 1607 } 1608 } 1609 d.postComplete(); 1610 } 1611 } 1612 } 1613 asyncSupplyStage(Executor e, Supplier<U> f)1614 static <U> CompletableFuture<U> asyncSupplyStage(Executor e, 1615 Supplier<U> f) { 1616 if (f == null) throw new NullPointerException(); 1617 CompletableFuture<U> d = new CompletableFuture<U>(); 1618 e.execute(new AsyncSupply<U>(d, f)); 1619 return d; 1620 } 1621 1622 @SuppressWarnings("serial") 1623 static final class AsyncRun extends ForkJoinTask<Void> 1624 implements Runnable, AsynchronousCompletionTask { 1625 CompletableFuture<Void> dep; Runnable fn; AsyncRun(CompletableFuture<Void> dep, Runnable fn)1626 AsyncRun(CompletableFuture<Void> dep, Runnable fn) { 1627 this.dep = dep; this.fn = fn; 1628 } 1629 getRawResult()1630 public final Void getRawResult() { return null; } setRawResult(Void v)1631 public final void setRawResult(Void v) {} exec()1632 public final boolean exec() { run(); return true; } 1633 run()1634 public void run() { 1635 CompletableFuture<Void> d; Runnable f; 1636 if ((d = dep) != null && (f = fn) != null) { 1637 dep = null; fn = null; 1638 if (d.result == null) { 1639 try { 1640 f.run(); 1641 d.completeNull(); 1642 } catch (Throwable ex) { 1643 d.completeThrowable(ex); 1644 } 1645 } 1646 d.postComplete(); 1647 } 1648 } 1649 } 1650 asyncRunStage(Executor e, Runnable f)1651 static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) { 1652 if (f == null) throw new NullPointerException(); 1653 CompletableFuture<Void> d = new CompletableFuture<Void>(); 1654 e.execute(new AsyncRun(d, f)); 1655 return d; 1656 } 1657 1658 /* ------------- Signallers -------------- */ 1659 1660 /** 1661 * Completion for recording and releasing a waiting thread. This 1662 * class implements ManagedBlocker to avoid starvation when 1663 * blocking actions pile up in ForkJoinPools. 1664 */ 1665 @SuppressWarnings("serial") 1666 static final class Signaller extends Completion 1667 implements ForkJoinPool.ManagedBlocker { 1668 long nanos; // wait time if timed 1669 final long deadline; // non-zero if timed 1670 volatile int interruptControl; // > 0: interruptible, < 0: interrupted 1671 volatile Thread thread; 1672 Signaller(boolean interruptible, long nanos, long deadline)1673 Signaller(boolean interruptible, long nanos, long deadline) { 1674 this.thread = Thread.currentThread(); 1675 this.interruptControl = interruptible ? 1 : 0; 1676 this.nanos = nanos; 1677 this.deadline = deadline; 1678 } tryFire(int ignore)1679 final CompletableFuture<?> tryFire(int ignore) { 1680 Thread w; // no need to atomically claim 1681 if ((w = thread) != null) { 1682 thread = null; 1683 LockSupport.unpark(w); 1684 } 1685 return null; 1686 } isReleasable()1687 public boolean isReleasable() { 1688 if (thread == null) 1689 return true; 1690 if (Thread.interrupted()) { 1691 int i = interruptControl; 1692 interruptControl = -1; 1693 if (i > 0) 1694 return true; 1695 } 1696 if (deadline != 0L && 1697 (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) { 1698 thread = null; 1699 return true; 1700 } 1701 return false; 1702 } block()1703 public boolean block() { 1704 if (isReleasable()) 1705 return true; 1706 else if (deadline == 0L) 1707 LockSupport.park(this); 1708 else if (nanos > 0L) 1709 LockSupport.parkNanos(this, nanos); 1710 return isReleasable(); 1711 } isLive()1712 final boolean isLive() { return thread != null; } 1713 } 1714 1715 /** 1716 * Returns raw result after waiting, or null if interruptible and 1717 * interrupted. 1718 */ waitingGet(boolean interruptible)1719 private Object waitingGet(boolean interruptible) { 1720 Signaller q = null; 1721 boolean queued = false; 1722 int spins = -1; 1723 Object r; 1724 while ((r = result) == null) { 1725 if (spins < 0) 1726 spins = SPINS; 1727 else if (spins > 0) { 1728 if (ThreadLocalRandom.nextSecondarySeed() >= 0) 1729 --spins; 1730 } 1731 else if (q == null) 1732 q = new Signaller(interruptible, 0L, 0L); 1733 else if (!queued) 1734 queued = tryPushStack(q); 1735 else if (interruptible && q.interruptControl < 0) { 1736 q.thread = null; 1737 cleanStack(); 1738 return null; 1739 } 1740 else if (q.thread != null && result == null) { 1741 try { 1742 ForkJoinPool.managedBlock(q); 1743 } catch (InterruptedException ie) { 1744 q.interruptControl = -1; 1745 } 1746 } 1747 } 1748 if (q != null) { 1749 q.thread = null; 1750 if (q.interruptControl < 0) { 1751 if (interruptible) 1752 r = null; // report interruption 1753 else 1754 Thread.currentThread().interrupt(); 1755 } 1756 } 1757 postComplete(); 1758 return r; 1759 } 1760 1761 /** 1762 * Returns raw result after waiting, or null if interrupted, or 1763 * throws TimeoutException on timeout. 1764 */ timedGet(long nanos)1765 private Object timedGet(long nanos) throws TimeoutException { 1766 if (Thread.interrupted()) 1767 return null; 1768 if (nanos <= 0L) 1769 throw new TimeoutException(); 1770 long d = System.nanoTime() + nanos; 1771 Signaller q = new Signaller(true, nanos, d == 0L ? 1L : d); // avoid 0 1772 boolean queued = false; 1773 Object r; 1774 // We intentionally don't spin here (as waitingGet does) because 1775 // the call to nanoTime() above acts much like a spin. 1776 while ((r = result) == null) { 1777 if (!queued) 1778 queued = tryPushStack(q); 1779 else if (q.interruptControl < 0 || q.nanos <= 0L) { 1780 q.thread = null; 1781 cleanStack(); 1782 if (q.interruptControl < 0) 1783 return null; 1784 throw new TimeoutException(); 1785 } 1786 else if (q.thread != null && result == null) { 1787 try { 1788 ForkJoinPool.managedBlock(q); 1789 } catch (InterruptedException ie) { 1790 q.interruptControl = -1; 1791 } 1792 } 1793 } 1794 if (q.interruptControl < 0) 1795 r = null; 1796 q.thread = null; 1797 postComplete(); 1798 return r; 1799 } 1800 1801 /* ------------- public methods -------------- */ 1802 1803 /** 1804 * Creates a new incomplete CompletableFuture. 1805 */ CompletableFuture()1806 public CompletableFuture() { 1807 } 1808 1809 /** 1810 * Creates a new complete CompletableFuture with given encoded result. 1811 */ CompletableFuture(Object r)1812 private CompletableFuture(Object r) { 1813 this.result = r; 1814 } 1815 1816 /** 1817 * Returns a new CompletableFuture that is asynchronously completed 1818 * by a task running in the {@link ForkJoinPool#commonPool()} with 1819 * the value obtained by calling the given Supplier. 1820 * 1821 * @param supplier a function returning the value to be used 1822 * to complete the returned CompletableFuture 1823 * @param <U> the function's return type 1824 * @return the new CompletableFuture 1825 */ supplyAsync(Supplier<U> supplier)1826 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { 1827 return asyncSupplyStage(asyncPool, supplier); 1828 } 1829 1830 /** 1831 * Returns a new CompletableFuture that is asynchronously completed 1832 * by a task running in the given executor with the value obtained 1833 * by calling the given Supplier. 1834 * 1835 * @param supplier a function returning the value to be used 1836 * to complete the returned CompletableFuture 1837 * @param executor the executor to use for asynchronous execution 1838 * @param <U> the function's return type 1839 * @return the new CompletableFuture 1840 */ supplyAsync(Supplier<U> supplier, Executor executor)1841 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, 1842 Executor executor) { 1843 return asyncSupplyStage(screenExecutor(executor), supplier); 1844 } 1845 1846 /** 1847 * Returns a new CompletableFuture that is asynchronously completed 1848 * by a task running in the {@link ForkJoinPool#commonPool()} after 1849 * it runs the given action. 1850 * 1851 * @param runnable the action to run before completing the 1852 * returned CompletableFuture 1853 * @return the new CompletableFuture 1854 */ runAsync(Runnable runnable)1855 public static CompletableFuture<Void> runAsync(Runnable runnable) { 1856 return asyncRunStage(asyncPool, runnable); 1857 } 1858 1859 /** 1860 * Returns a new CompletableFuture that is asynchronously completed 1861 * by a task running in the given executor after it runs the given 1862 * action. 1863 * 1864 * @param runnable the action to run before completing the 1865 * returned CompletableFuture 1866 * @param executor the executor to use for asynchronous execution 1867 * @return the new CompletableFuture 1868 */ runAsync(Runnable runnable, Executor executor)1869 public static CompletableFuture<Void> runAsync(Runnable runnable, 1870 Executor executor) { 1871 return asyncRunStage(screenExecutor(executor), runnable); 1872 } 1873 1874 /** 1875 * Returns a new CompletableFuture that is already completed with 1876 * the given value. 1877 * 1878 * @param value the value 1879 * @param <U> the type of the value 1880 * @return the completed CompletableFuture 1881 */ completedFuture(U value)1882 public static <U> CompletableFuture<U> completedFuture(U value) { 1883 return new CompletableFuture<U>((value == null) ? NIL : value); 1884 } 1885 1886 /** 1887 * Returns {@code true} if completed in any fashion: normally, 1888 * exceptionally, or via cancellation. 1889 * 1890 * @return {@code true} if completed 1891 */ isDone()1892 public boolean isDone() { 1893 return result != null; 1894 } 1895 1896 /** 1897 * Waits if necessary for this future to complete, and then 1898 * returns its result. 1899 * 1900 * @return the result value 1901 * @throws CancellationException if this future was cancelled 1902 * @throws ExecutionException if this future completed exceptionally 1903 * @throws InterruptedException if the current thread was interrupted 1904 * while waiting 1905 */ get()1906 public T get() throws InterruptedException, ExecutionException { 1907 Object r; 1908 return reportGet((r = result) == null ? waitingGet(true) : r); 1909 } 1910 1911 /** 1912 * Waits if necessary for at most the given time for this future 1913 * to complete, and then returns its result, if available. 1914 * 1915 * @param timeout the maximum time to wait 1916 * @param unit the time unit of the timeout argument 1917 * @return the result value 1918 * @throws CancellationException if this future was cancelled 1919 * @throws ExecutionException if this future completed exceptionally 1920 * @throws InterruptedException if the current thread was interrupted 1921 * while waiting 1922 * @throws TimeoutException if the wait timed out 1923 */ get(long timeout, TimeUnit unit)1924 public T get(long timeout, TimeUnit unit) 1925 throws InterruptedException, ExecutionException, TimeoutException { 1926 Object r; 1927 long nanos = unit.toNanos(timeout); 1928 return reportGet((r = result) == null ? timedGet(nanos) : r); 1929 } 1930 1931 /** 1932 * Returns the result value when complete, or throws an 1933 * (unchecked) exception if completed exceptionally. To better 1934 * conform with the use of common functional forms, if a 1935 * computation involved in the completion of this 1936 * CompletableFuture threw an exception, this method throws an 1937 * (unchecked) {@link CompletionException} with the underlying 1938 * exception as its cause. 1939 * 1940 * @return the result value 1941 * @throws CancellationException if the computation was cancelled 1942 * @throws CompletionException if this future completed 1943 * exceptionally or a completion computation threw an exception 1944 */ join()1945 public T join() { 1946 Object r; 1947 return reportJoin((r = result) == null ? waitingGet(false) : r); 1948 } 1949 1950 /** 1951 * Returns the result value (or throws any encountered exception) 1952 * if completed, else returns the given valueIfAbsent. 1953 * 1954 * @param valueIfAbsent the value to return if not completed 1955 * @return the result value, if completed, else the given valueIfAbsent 1956 * @throws CancellationException if the computation was cancelled 1957 * @throws CompletionException if this future completed 1958 * exceptionally or a completion computation threw an exception 1959 */ getNow(T valueIfAbsent)1960 public T getNow(T valueIfAbsent) { 1961 Object r; 1962 return ((r = result) == null) ? valueIfAbsent : reportJoin(r); 1963 } 1964 1965 /** 1966 * If not already completed, sets the value returned by {@link 1967 * #get()} and related methods to the given value. 1968 * 1969 * @param value the result value 1970 * @return {@code true} if this invocation caused this CompletableFuture 1971 * to transition to a completed state, else {@code false} 1972 */ complete(T value)1973 public boolean complete(T value) { 1974 boolean triggered = completeValue(value); 1975 postComplete(); 1976 return triggered; 1977 } 1978 1979 /** 1980 * If not already completed, causes invocations of {@link #get()} 1981 * and related methods to throw the given exception. 1982 * 1983 * @param ex the exception 1984 * @return {@code true} if this invocation caused this CompletableFuture 1985 * to transition to a completed state, else {@code false} 1986 */ completeExceptionally(Throwable ex)1987 public boolean completeExceptionally(Throwable ex) { 1988 if (ex == null) throw new NullPointerException(); 1989 boolean triggered = internalComplete(new AltResult(ex)); 1990 postComplete(); 1991 return triggered; 1992 } 1993 thenApply( Function<? super T,? extends U> fn)1994 public <U> CompletableFuture<U> thenApply( 1995 Function<? super T,? extends U> fn) { 1996 return uniApplyStage(null, fn); 1997 } 1998 thenApplyAsync( Function<? super T,? extends U> fn)1999 public <U> CompletableFuture<U> thenApplyAsync( 2000 Function<? super T,? extends U> fn) { 2001 return uniApplyStage(asyncPool, fn); 2002 } 2003 thenApplyAsync( Function<? super T,? extends U> fn, Executor executor)2004 public <U> CompletableFuture<U> thenApplyAsync( 2005 Function<? super T,? extends U> fn, Executor executor) { 2006 return uniApplyStage(screenExecutor(executor), fn); 2007 } 2008 thenAccept(Consumer<? super T> action)2009 public CompletableFuture<Void> thenAccept(Consumer<? super T> action) { 2010 return uniAcceptStage(null, action); 2011 } 2012 thenAcceptAsync(Consumer<? super T> action)2013 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) { 2014 return uniAcceptStage(asyncPool, action); 2015 } 2016 thenAcceptAsync(Consumer<? super T> action, Executor executor)2017 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, 2018 Executor executor) { 2019 return uniAcceptStage(screenExecutor(executor), action); 2020 } 2021 thenRun(Runnable action)2022 public CompletableFuture<Void> thenRun(Runnable action) { 2023 return uniRunStage(null, action); 2024 } 2025 thenRunAsync(Runnable action)2026 public CompletableFuture<Void> thenRunAsync(Runnable action) { 2027 return uniRunStage(asyncPool, action); 2028 } 2029 thenRunAsync(Runnable action, Executor executor)2030 public CompletableFuture<Void> thenRunAsync(Runnable action, 2031 Executor executor) { 2032 return uniRunStage(screenExecutor(executor), action); 2033 } 2034 thenCombine( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)2035 public <U,V> CompletableFuture<V> thenCombine( 2036 CompletionStage<? extends U> other, 2037 BiFunction<? super T,? super U,? extends V> fn) { 2038 return biApplyStage(null, other, fn); 2039 } 2040 thenCombineAsync( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)2041 public <U,V> CompletableFuture<V> thenCombineAsync( 2042 CompletionStage<? extends U> other, 2043 BiFunction<? super T,? super U,? extends V> fn) { 2044 return biApplyStage(asyncPool, other, fn); 2045 } 2046 thenCombineAsync( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)2047 public <U,V> CompletableFuture<V> thenCombineAsync( 2048 CompletionStage<? extends U> other, 2049 BiFunction<? super T,? super U,? extends V> fn, Executor executor) { 2050 return biApplyStage(screenExecutor(executor), other, fn); 2051 } 2052 thenAcceptBoth( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)2053 public <U> CompletableFuture<Void> thenAcceptBoth( 2054 CompletionStage<? extends U> other, 2055 BiConsumer<? super T, ? super U> action) { 2056 return biAcceptStage(null, other, action); 2057 } 2058 thenAcceptBothAsync( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)2059 public <U> CompletableFuture<Void> thenAcceptBothAsync( 2060 CompletionStage<? extends U> other, 2061 BiConsumer<? super T, ? super U> action) { 2062 return biAcceptStage(asyncPool, other, action); 2063 } 2064 thenAcceptBothAsync( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)2065 public <U> CompletableFuture<Void> thenAcceptBothAsync( 2066 CompletionStage<? extends U> other, 2067 BiConsumer<? super T, ? super U> action, Executor executor) { 2068 return biAcceptStage(screenExecutor(executor), other, action); 2069 } 2070 runAfterBoth(CompletionStage<?> other, Runnable action)2071 public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, 2072 Runnable action) { 2073 return biRunStage(null, other, action); 2074 } 2075 runAfterBothAsync(CompletionStage<?> other, Runnable action)2076 public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, 2077 Runnable action) { 2078 return biRunStage(asyncPool, other, action); 2079 } 2080 runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor)2081 public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, 2082 Runnable action, 2083 Executor executor) { 2084 return biRunStage(screenExecutor(executor), other, action); 2085 } 2086 applyToEither( CompletionStage<? extends T> other, Function<? super T, U> fn)2087 public <U> CompletableFuture<U> applyToEither( 2088 CompletionStage<? extends T> other, Function<? super T, U> fn) { 2089 return orApplyStage(null, other, fn); 2090 } 2091 applyToEitherAsync( CompletionStage<? extends T> other, Function<? super T, U> fn)2092 public <U> CompletableFuture<U> applyToEitherAsync( 2093 CompletionStage<? extends T> other, Function<? super T, U> fn) { 2094 return orApplyStage(asyncPool, other, fn); 2095 } 2096 applyToEitherAsync( CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor)2097 public <U> CompletableFuture<U> applyToEitherAsync( 2098 CompletionStage<? extends T> other, Function<? super T, U> fn, 2099 Executor executor) { 2100 return orApplyStage(screenExecutor(executor), other, fn); 2101 } 2102 acceptEither( CompletionStage<? extends T> other, Consumer<? super T> action)2103 public CompletableFuture<Void> acceptEither( 2104 CompletionStage<? extends T> other, Consumer<? super T> action) { 2105 return orAcceptStage(null, other, action); 2106 } 2107 acceptEitherAsync( CompletionStage<? extends T> other, Consumer<? super T> action)2108 public CompletableFuture<Void> acceptEitherAsync( 2109 CompletionStage<? extends T> other, Consumer<? super T> action) { 2110 return orAcceptStage(asyncPool, other, action); 2111 } 2112 acceptEitherAsync( CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)2113 public CompletableFuture<Void> acceptEitherAsync( 2114 CompletionStage<? extends T> other, Consumer<? super T> action, 2115 Executor executor) { 2116 return orAcceptStage(screenExecutor(executor), other, action); 2117 } 2118 runAfterEither(CompletionStage<?> other, Runnable action)2119 public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, 2120 Runnable action) { 2121 return orRunStage(null, other, action); 2122 } 2123 runAfterEitherAsync(CompletionStage<?> other, Runnable action)2124 public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, 2125 Runnable action) { 2126 return orRunStage(asyncPool, other, action); 2127 } 2128 runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor)2129 public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, 2130 Runnable action, 2131 Executor executor) { 2132 return orRunStage(screenExecutor(executor), other, action); 2133 } 2134 thenCompose( Function<? super T, ? extends CompletionStage<U>> fn)2135 public <U> CompletableFuture<U> thenCompose( 2136 Function<? super T, ? extends CompletionStage<U>> fn) { 2137 return uniComposeStage(null, fn); 2138 } 2139 thenComposeAsync( Function<? super T, ? extends CompletionStage<U>> fn)2140 public <U> CompletableFuture<U> thenComposeAsync( 2141 Function<? super T, ? extends CompletionStage<U>> fn) { 2142 return uniComposeStage(asyncPool, fn); 2143 } 2144 thenComposeAsync( Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)2145 public <U> CompletableFuture<U> thenComposeAsync( 2146 Function<? super T, ? extends CompletionStage<U>> fn, 2147 Executor executor) { 2148 return uniComposeStage(screenExecutor(executor), fn); 2149 } 2150 whenComplete( BiConsumer<? super T, ? super Throwable> action)2151 public CompletableFuture<T> whenComplete( 2152 BiConsumer<? super T, ? super Throwable> action) { 2153 return uniWhenCompleteStage(null, action); 2154 } 2155 whenCompleteAsync( BiConsumer<? super T, ? super Throwable> action)2156 public CompletableFuture<T> whenCompleteAsync( 2157 BiConsumer<? super T, ? super Throwable> action) { 2158 return uniWhenCompleteStage(asyncPool, action); 2159 } 2160 whenCompleteAsync( BiConsumer<? super T, ? super Throwable> action, Executor executor)2161 public CompletableFuture<T> whenCompleteAsync( 2162 BiConsumer<? super T, ? super Throwable> action, Executor executor) { 2163 return uniWhenCompleteStage(screenExecutor(executor), action); 2164 } 2165 handle( BiFunction<? super T, Throwable, ? extends U> fn)2166 public <U> CompletableFuture<U> handle( 2167 BiFunction<? super T, Throwable, ? extends U> fn) { 2168 return uniHandleStage(null, fn); 2169 } 2170 handleAsync( BiFunction<? super T, Throwable, ? extends U> fn)2171 public <U> CompletableFuture<U> handleAsync( 2172 BiFunction<? super T, Throwable, ? extends U> fn) { 2173 return uniHandleStage(asyncPool, fn); 2174 } 2175 handleAsync( BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)2176 public <U> CompletableFuture<U> handleAsync( 2177 BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) { 2178 return uniHandleStage(screenExecutor(executor), fn); 2179 } 2180 2181 /** 2182 * Returns this CompletableFuture. 2183 * 2184 * @return this CompletableFuture 2185 */ toCompletableFuture()2186 public CompletableFuture<T> toCompletableFuture() { 2187 return this; 2188 } 2189 2190 // not in interface CompletionStage 2191 2192 /** 2193 * Returns a new CompletableFuture that is completed when this 2194 * CompletableFuture completes, with the result of the given 2195 * function of the exception triggering this CompletableFuture's 2196 * completion when it completes exceptionally; otherwise, if this 2197 * CompletableFuture completes normally, then the returned 2198 * CompletableFuture also completes normally with the same value. 2199 * Note: More flexible versions of this functionality are 2200 * available using methods {@code whenComplete} and {@code handle}. 2201 * 2202 * @param fn the function to use to compute the value of the 2203 * returned CompletableFuture if this CompletableFuture completed 2204 * exceptionally 2205 * @return the new CompletableFuture 2206 */ exceptionally( Function<Throwable, ? extends T> fn)2207 public CompletableFuture<T> exceptionally( 2208 Function<Throwable, ? extends T> fn) { 2209 return uniExceptionallyStage(fn); 2210 } 2211 2212 /* ------------- Arbitrary-arity constructions -------------- */ 2213 2214 /** 2215 * Returns a new CompletableFuture that is completed when all of 2216 * the given CompletableFutures complete. If any of the given 2217 * CompletableFutures complete exceptionally, then the returned 2218 * CompletableFuture also does so, with a CompletionException 2219 * holding this exception as its cause. Otherwise, the results, 2220 * if any, of the given CompletableFutures are not reflected in 2221 * the returned CompletableFuture, but may be obtained by 2222 * inspecting them individually. If no CompletableFutures are 2223 * provided, returns a CompletableFuture completed with the value 2224 * {@code null}. 2225 * 2226 * <p>Among the applications of this method is to await completion 2227 * of a set of independent CompletableFutures before continuing a 2228 * program, as in: {@code CompletableFuture.allOf(c1, c2, 2229 * c3).join();}. 2230 * 2231 * @param cfs the CompletableFutures 2232 * @return a new CompletableFuture that is completed when all of the 2233 * given CompletableFutures complete 2234 * @throws NullPointerException if the array or any of its elements are 2235 * {@code null} 2236 */ allOf(CompletableFuture<?>.... cfs)2237 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) { 2238 return andTree(cfs, 0, cfs.length - 1); 2239 } 2240 2241 /** 2242 * Returns a new CompletableFuture that is completed when any of 2243 * the given CompletableFutures complete, with the same result. 2244 * Otherwise, if it completed exceptionally, the returned 2245 * CompletableFuture also does so, with a CompletionException 2246 * holding this exception as its cause. If no CompletableFutures 2247 * are provided, returns an incomplete CompletableFuture. 2248 * 2249 * @param cfs the CompletableFutures 2250 * @return a new CompletableFuture that is completed with the 2251 * result or exception of any of the given CompletableFutures when 2252 * one completes 2253 * @throws NullPointerException if the array or any of its elements are 2254 * {@code null} 2255 */ anyOf(CompletableFuture<?>.... cfs)2256 public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) { 2257 return orTree(cfs, 0, cfs.length - 1); 2258 } 2259 2260 /* ------------- Control and status methods -------------- */ 2261 2262 /** 2263 * If not already completed, completes this CompletableFuture with 2264 * a {@link CancellationException}. Dependent CompletableFutures 2265 * that have not already completed will also complete 2266 * exceptionally, with a {@link CompletionException} caused by 2267 * this {@code CancellationException}. 2268 * 2269 * @param mayInterruptIfRunning this value has no effect in this 2270 * implementation because interrupts are not used to control 2271 * processing. 2272 * 2273 * @return {@code true} if this task is now cancelled 2274 */ cancel(boolean mayInterruptIfRunning)2275 public boolean cancel(boolean mayInterruptIfRunning) { 2276 boolean cancelled = (result == null) && 2277 internalComplete(new AltResult(new CancellationException())); 2278 postComplete(); 2279 return cancelled || isCancelled(); 2280 } 2281 2282 /** 2283 * Returns {@code true} if this CompletableFuture was cancelled 2284 * before it completed normally. 2285 * 2286 * @return {@code true} if this CompletableFuture was cancelled 2287 * before it completed normally 2288 */ isCancelled()2289 public boolean isCancelled() { 2290 Object r; 2291 return ((r = result) instanceof AltResult) && 2292 (((AltResult)r).ex instanceof CancellationException); 2293 } 2294 2295 /** 2296 * Returns {@code true} if this CompletableFuture completed 2297 * exceptionally, in any way. Possible causes include 2298 * cancellation, explicit invocation of {@code 2299 * completeExceptionally}, and abrupt termination of a 2300 * CompletionStage action. 2301 * 2302 * @return {@code true} if this CompletableFuture completed 2303 * exceptionally 2304 */ isCompletedExceptionally()2305 public boolean isCompletedExceptionally() { 2306 Object r; 2307 return ((r = result) instanceof AltResult) && r != NIL; 2308 } 2309 2310 /** 2311 * Forcibly sets or resets the value subsequently returned by 2312 * method {@link #get()} and related methods, whether or not 2313 * already completed. This method is designed for use only in 2314 * error recovery actions, and even in such situations may result 2315 * in ongoing dependent completions using established versus 2316 * overwritten outcomes. 2317 * 2318 * @param value the completion value 2319 */ obtrudeValue(T value)2320 public void obtrudeValue(T value) { 2321 result = (value == null) ? NIL : value; 2322 postComplete(); 2323 } 2324 2325 /** 2326 * Forcibly causes subsequent invocations of method {@link #get()} 2327 * and related methods to throw the given exception, whether or 2328 * not already completed. This method is designed for use only in 2329 * error recovery actions, and even in such situations may result 2330 * in ongoing dependent completions using established versus 2331 * overwritten outcomes. 2332 * 2333 * @param ex the exception 2334 * @throws NullPointerException if the exception is null 2335 */ obtrudeException(Throwable ex)2336 public void obtrudeException(Throwable ex) { 2337 if (ex == null) throw new NullPointerException(); 2338 result = new AltResult(ex); 2339 postComplete(); 2340 } 2341 2342 /** 2343 * Returns the estimated number of CompletableFutures whose 2344 * completions are awaiting completion of this CompletableFuture. 2345 * This method is designed for use in monitoring system state, not 2346 * for synchronization control. 2347 * 2348 * @return the number of dependent CompletableFutures 2349 */ getNumberOfDependents()2350 public int getNumberOfDependents() { 2351 int count = 0; 2352 for (Completion p = stack; p != null; p = p.next) 2353 ++count; 2354 return count; 2355 } 2356 2357 /** 2358 * Returns a string identifying this CompletableFuture, as well as 2359 * its completion state. The state, in brackets, contains the 2360 * String {@code "Completed Normally"} or the String {@code 2361 * "Completed Exceptionally"}, or the String {@code "Not 2362 * completed"} followed by the number of CompletableFutures 2363 * dependent upon its completion, if any. 2364 * 2365 * @return a string identifying this CompletableFuture, as well as its state 2366 */ toString()2367 public String toString() { 2368 Object r = result; 2369 int count; 2370 return super.toString() + 2371 ((r == null) ? 2372 (((count = getNumberOfDependents()) == 0) ? 2373 "[Not completed]" : 2374 "[Not completed, " + count + " dependents]") : 2375 (((r instanceof AltResult) && ((AltResult)r).ex != null) ? 2376 "[Completed exceptionally]" : 2377 "[Completed normally]")); 2378 } 2379 2380 // Unsafe mechanics 2381 private static final sun.misc.Unsafe UNSAFE; 2382 private static final long RESULT; 2383 private static final long STACK; 2384 private static final long NEXT; 2385 static { 2386 try { 2387 final sun.misc.Unsafe u; 2388 UNSAFE = u = sun.misc.Unsafe.getUnsafe(); 2389 Class<?> k = CompletableFuture.class; 2390 RESULT = u.objectFieldOffset(k.getDeclaredField("result")); 2391 STACK = u.objectFieldOffset(k.getDeclaredField("stack")); 2392 NEXT = u.objectFieldOffset 2393 (Completion.class.getDeclaredField("next")); 2394 } catch (Exception x) { 2395 throw new Error(x); 2396 } 2397 } 2398 } 2399