1 /*
2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3  *
4  * This code is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 only, as
6  * published by the Free Software Foundation.  Oracle designates this
7  * particular file as subject to the "Classpath" exception as provided
8  * by Oracle in the LICENSE file that accompanied this code.
9  *
10  * This code is distributed in the hope that it will be useful, but WITHOUT
11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * version 2 for more details (a copy is included in the LICENSE file that
14  * accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License version
17  * 2 along with this work; if not, write to the Free Software Foundation,
18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19  *
20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21  * or visit www.oracle.com if you need additional information or have any
22  * questions.
23  */
24 
25 /*
26  * This file is available under and governed by the GNU General Public
27  * License version 2 only, as published by the Free Software Foundation.
28  * However, the following notice accompanied the original version of this
29  * file:
30  *
31  * Written by Doug Lea with assistance from members of JCP JSR-166
32  * Expert Group and released to the public domain, as explained at
33  * http://creativecommons.org/publicdomain/zero/1.0/
34  */
35 
36 package java.util.concurrent;
37 
38 import java.lang.invoke.MethodHandles;
39 import java.lang.invoke.VarHandle;
40 import java.util.concurrent.locks.LockSupport;
41 import java.util.function.BiConsumer;
42 import java.util.function.BiFunction;
43 import java.util.function.Consumer;
44 import java.util.function.Function;
45 import java.util.function.Supplier;
46 
47 /**
48  * A {@link Future} that may be explicitly completed (setting its
49  * value and status), and may be used as a {@link CompletionStage},
50  * supporting dependent functions and actions that trigger upon its
51  * completion.
52  *
53  * <p>When two or more threads attempt to
54  * {@link #complete complete},
55  * {@link #completeExceptionally completeExceptionally}, or
56  * {@link #cancel cancel}
57  * a CompletableFuture, only one of them succeeds.
58  *
59  * <p>In addition to these and related methods for directly
60  * manipulating status and results, CompletableFuture implements
61  * interface {@link CompletionStage} with the following policies: <ul>
62  *
63  * <li>Actions supplied for dependent completions of
64  * <em>non-async</em> methods may be performed by the thread that
65  * completes the current CompletableFuture, or by any other caller of
66  * a completion method.
67  *
68  * <li>All <em>async</em> methods without an explicit Executor
69  * argument are performed using the {@link ForkJoinPool#commonPool()}
70  * (unless it does not support a parallelism level of at least two, in
71  * which case, a new Thread is created to run each task).  This may be
72  * overridden for non-static methods in subclasses by defining method
73  * {@link #defaultExecutor()}. To simplify monitoring, debugging,
74  * and tracking, all generated asynchronous tasks are instances of the
75  * marker interface {@link AsynchronousCompletionTask}.  Operations
76  * with time-delays can use adapter methods defined in this class, for
77  * example: {@code supplyAsync(supplier, delayedExecutor(timeout,
78  * timeUnit))}.  To support methods with delays and timeouts, this
79  * class maintains at most one daemon thread for triggering and
80  * cancelling actions, not for running them.
81  *
82  * <li>All CompletionStage methods are implemented independently of
83  * other public methods, so the behavior of one method is not impacted
84  * by overrides of others in subclasses.
85  *
86  * <li>All CompletionStage methods return CompletableFutures.  To
87  * restrict usages to only those methods defined in interface
88  * CompletionStage, use method {@link #minimalCompletionStage}. Or to
89  * ensure only that clients do not themselves modify a future, use
90  * method {@link #copy}.
91  * </ul>
92  *
93  * <p>CompletableFuture also implements {@link Future} with the following
94  * policies: <ul>
95  *
96  * <li>Since (unlike {@link FutureTask}) this class has no direct
97  * control over the computation that causes it to be completed,
98  * cancellation is treated as just another form of exceptional
99  * completion.  Method {@link #cancel cancel} has the same effect as
100  * {@code completeExceptionally(new CancellationException())}. Method
101  * {@link #isCompletedExceptionally} can be used to determine if a
102  * CompletableFuture completed in any exceptional fashion.
103  *
104  * <li>In case of exceptional completion with a CompletionException,
105  * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
106  * {@link ExecutionException} with the same cause as held in the
107  * corresponding CompletionException.  To simplify usage in most
108  * contexts, this class also defines methods {@link #join()} and
109  * {@link #getNow} that instead throw the CompletionException directly
110  * in these cases.
111  * </ul>
112  *
113  * <p>Arguments used to pass a completion result (that is, for
114  * parameters of type {@code T}) for methods accepting them may be
115  * null, but passing a null value for any other parameter will result
116  * in a {@link NullPointerException} being thrown.
117  *
118  * <p>Subclasses of this class should normally override the "virtual
119  * constructor" method {@link #newIncompleteFuture}, which establishes
120  * the concrete type returned by CompletionStage methods. For example,
121  * here is a class that substitutes a different default Executor and
122  * disables the {@code obtrude} methods:
123  *
124  * <pre> {@code
125  * class MyCompletableFuture<T> extends CompletableFuture<T> {
126  *   static final Executor myExecutor = ...;
127  *   public MyCompletableFuture() { }
128  *   public <U> CompletableFuture<U> newIncompleteFuture() {
129  *     return new MyCompletableFuture<U>(); }
130  *   public Executor defaultExecutor() {
131  *     return myExecutor; }
132  *   public void obtrudeValue(T value) {
133  *     throw new UnsupportedOperationException(); }
134  *   public void obtrudeException(Throwable ex) {
135  *     throw new UnsupportedOperationException(); }
136  * }}</pre>
137  *
138  * @author Doug Lea
139  * @param <T> The result type returned by this future's {@code join}
140  * and {@code get} methods
141  * @since 1.8
142  */
143 public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
144 
145     /*
146      * Overview:
147      *
148      * A CompletableFuture may have dependent completion actions,
149      * collected in a linked stack. It atomically completes by CASing
150      * a result field, and then pops off and runs those actions. This
151      * applies across normal vs exceptional outcomes, sync vs async
152      * actions, binary triggers, and various forms of completions.
153      *
154      * Non-nullness of volatile field "result" indicates done.  It may
155      * be set directly if known to be thread-confined, else via CAS.
156      * An AltResult is used to box null as a result, as well as to
157      * hold exceptions.  Using a single field makes completion simple
158      * to detect and trigger.  Result encoding and decoding is
159      * straightforward but tedious and adds to the sprawl of trapping
160      * and associating exceptions with targets.  Minor simplifications
161      * rely on (static) NIL (to box null results) being the only
162      * AltResult with a null exception field, so we don't usually need
163      * explicit comparisons.  Even though some of the generics casts
164      * are unchecked (see SuppressWarnings annotations), they are
165      * placed to be appropriate even if checked.
166      *
167      * Dependent actions are represented by Completion objects linked
168      * as Treiber stacks headed by field "stack". There are Completion
169      * classes for each kind of action, grouped into:
170      * - single-input (UniCompletion),
171      * - two-input (BiCompletion),
172      * - projected (BiCompletions using exactly one of two inputs),
173      * - shared (CoCompletion, used by the second of two sources),
174      * - zero-input source actions,
175      * - Signallers that unblock waiters.
176      * Class Completion extends ForkJoinTask to enable async execution
177      * (adding no space overhead because we exploit its "tag" methods
178      * to maintain claims). It is also declared as Runnable to allow
179      * usage with arbitrary executors.
180      *
181      * Support for each kind of CompletionStage relies on a separate
182      * class, along with two CompletableFuture methods:
183      *
184      * * A Completion class with name X corresponding to function,
185      *   prefaced with "Uni", "Bi", or "Or". Each class contains
186      *   fields for source(s), actions, and dependent. They are
187      *   boringly similar, differing from others only with respect to
188      *   underlying functional forms. We do this so that users don't
189      *   encounter layers of adapters in common usages.
190      *
191      * * Boolean CompletableFuture method x(...) (for example
192      *   biApply) takes all of the arguments needed to check that an
193      *   action is triggerable, and then either runs the action or
194      *   arranges its async execution by executing its Completion
195      *   argument, if present. The method returns true if known to be
196      *   complete.
197      *
198      * * Completion method tryFire(int mode) invokes the associated x
199      *   method with its held arguments, and on success cleans up.
200      *   The mode argument allows tryFire to be called twice (SYNC,
201      *   then ASYNC); the first to screen and trap exceptions while
202      *   arranging to execute, and the second when called from a task.
203      *   (A few classes are not used async so take slightly different
204      *   forms.)  The claim() callback suppresses function invocation
205      *   if already claimed by another thread.
206      *
207      * * Some classes (for example UniApply) have separate handling
208      *   code for when known to be thread-confined ("now" methods) and
209      *   for when shared (in tryFire), for efficiency.
210      *
211      * * CompletableFuture method xStage(...) is called from a public
212      *   stage method of CompletableFuture f. It screens user
213      *   arguments and invokes and/or creates the stage object.  If
214      *   not async and already triggerable, the action is run
215      *   immediately.  Otherwise a Completion c is created, and
216      *   submitted to the executor if triggerable, or pushed onto f's
217      *   stack if not.  Completion actions are started via c.tryFire.
218      *   We recheck after pushing to a source future's stack to cover
219      *   possible races if the source completes while pushing.
220      *   Classes with two inputs (for example BiApply) deal with races
221      *   across both while pushing actions.  The second completion is
222      *   a CoCompletion pointing to the first, shared so that at most
223      *   one performs the action.  The multiple-arity methods allOf
224      *   does this pairwise to form trees of completions.  Method
225      *   anyOf is handled differently from allOf because completion of
226      *   any source should trigger a cleanStack of other sources.
227      *   Each AnyOf completion can reach others via a shared array.
228      *
229      * Note that the generic type parameters of methods vary according
230      * to whether "this" is a source, dependent, or completion.
231      *
232      * Method postComplete is called upon completion unless the target
233      * is guaranteed not to be observable (i.e., not yet returned or
234      * linked). Multiple threads can call postComplete, which
235      * atomically pops each dependent action, and tries to trigger it
236      * via method tryFire, in NESTED mode.  Triggering can propagate
237      * recursively, so NESTED mode returns its completed dependent (if
238      * one exists) for further processing by its caller (see method
239      * postFire).
240      *
241      * Blocking methods get() and join() rely on Signaller Completions
242      * that wake up waiting threads.  The mechanics are similar to
243      * Treiber stack wait-nodes used in FutureTask, Phaser, and
244      * SynchronousQueue. See their internal documentation for
245      * algorithmic details.
246      *
247      * Without precautions, CompletableFutures would be prone to
248      * garbage accumulation as chains of Completions build up, each
249      * pointing back to its sources. So we null out fields as soon as
250      * possible.  The screening checks needed anyway harmlessly ignore
251      * null arguments that may have been obtained during races with
252      * threads nulling out fields.  We also try to unlink non-isLive
253      * (fired or cancelled) Completions from stacks that might
254      * otherwise never be popped: Method cleanStack always unlinks non
255      * isLive completions from the head of stack; others may
256      * occasionally remain if racing with other cancellations or
257      * removals.
258      *
259      * Completion fields need not be declared as final or volatile
260      * because they are only visible to other threads upon safe
261      * publication.
262      */
263 
264     volatile Object result;       // Either the result or boxed AltResult
265     volatile Completion stack;    // Top of Treiber stack of dependent actions
266 
internalComplete(Object r)267     final boolean internalComplete(Object r) { // CAS from null to r
268         return RESULT.compareAndSet(this, null, r);
269     }
270 
271     /** Returns true if successfully pushed c onto stack. */
tryPushStack(Completion c)272     final boolean tryPushStack(Completion c) {
273         Completion h = stack;
274         NEXT.set(c, h);         // CAS piggyback
275         return STACK.compareAndSet(this, h, c);
276     }
277 
278     /** Unconditionally pushes c onto stack, retrying if necessary. */
pushStack(Completion c)279     final void pushStack(Completion c) {
280         do {} while (!tryPushStack(c));
281     }
282 
283     /* ------------- Encoding and decoding outcomes -------------- */
284 
285     static final class AltResult { // See above
286         final Throwable ex;        // null only for NIL
AltResult(Throwable x)287         AltResult(Throwable x) { this.ex = x; }
288     }
289 
290     /** The encoding of the null value. */
291     static final AltResult NIL = new AltResult(null);
292 
293     /** Completes with the null value, unless already completed. */
completeNull()294     final boolean completeNull() {
295         return RESULT.compareAndSet(this, null, NIL);
296     }
297 
298     /** Returns the encoding of the given non-exceptional value. */
encodeValue(T t)299     final Object encodeValue(T t) {
300         return (t == null) ? NIL : t;
301     }
302 
303     /** Completes with a non-exceptional result, unless already completed. */
completeValue(T t)304     final boolean completeValue(T t) {
305         return RESULT.compareAndSet(this, null, (t == null) ? NIL : t);
306     }
307 
308     /**
309      * Returns the encoding of the given (non-null) exception as a
310      * wrapped CompletionException unless it is one already.
311      */
encodeThrowable(Throwable x)312     static AltResult encodeThrowable(Throwable x) {
313         return new AltResult((x instanceof CompletionException) ? x :
314                              new CompletionException(x));
315     }
316 
317     /** Completes with an exceptional result, unless already completed. */
completeThrowable(Throwable x)318     final boolean completeThrowable(Throwable x) {
319         return RESULT.compareAndSet(this, null, encodeThrowable(x));
320     }
321 
322     /**
323      * Returns the encoding of the given (non-null) exception as a
324      * wrapped CompletionException unless it is one already.  May
325      * return the given Object r (which must have been the result of a
326      * source future) if it is equivalent, i.e. if this is a simple
327      * relay of an existing CompletionException.
328      */
encodeThrowable(Throwable x, Object r)329     static Object encodeThrowable(Throwable x, Object r) {
330         if (!(x instanceof CompletionException))
331             x = new CompletionException(x);
332         else if (r instanceof AltResult && x == ((AltResult)r).ex)
333             return r;
334         return new AltResult(x);
335     }
336 
337     /**
338      * Completes with the given (non-null) exceptional result as a
339      * wrapped CompletionException unless it is one already, unless
340      * already completed.  May complete with the given Object r
341      * (which must have been the result of a source future) if it is
342      * equivalent, i.e. if this is a simple propagation of an
343      * existing CompletionException.
344      */
completeThrowable(Throwable x, Object r)345     final boolean completeThrowable(Throwable x, Object r) {
346         return RESULT.compareAndSet(this, null, encodeThrowable(x, r));
347     }
348 
349     /**
350      * Returns the encoding of the given arguments: if the exception
351      * is non-null, encodes as AltResult.  Otherwise uses the given
352      * value, boxed as NIL if null.
353      */
encodeOutcome(T t, Throwable x)354     Object encodeOutcome(T t, Throwable x) {
355         return (x == null) ? (t == null) ? NIL : t : encodeThrowable(x);
356     }
357 
358     /**
359      * Returns the encoding of a copied outcome; if exceptional,
360      * rewraps as a CompletionException, else returns argument.
361      */
encodeRelay(Object r)362     static Object encodeRelay(Object r) {
363         Throwable x;
364         if (r instanceof AltResult
365             && (x = ((AltResult)r).ex) != null
366             && !(x instanceof CompletionException))
367             r = new AltResult(new CompletionException(x));
368         return r;
369     }
370 
371     /**
372      * Completes with r or a copy of r, unless already completed.
373      * If exceptional, r is first coerced to a CompletionException.
374      */
completeRelay(Object r)375     final boolean completeRelay(Object r) {
376         return RESULT.compareAndSet(this, null, encodeRelay(r));
377     }
378 
379     /**
380      * Reports result using Future.get conventions.
381      */
reportGet(Object r)382     private static Object reportGet(Object r)
383         throws InterruptedException, ExecutionException {
384         if (r == null) // by convention below, null means interrupted
385             throw new InterruptedException();
386         if (r instanceof AltResult) {
387             Throwable x, cause;
388             if ((x = ((AltResult)r).ex) == null)
389                 return null;
390             if (x instanceof CancellationException)
391                 throw (CancellationException)x;
392             if ((x instanceof CompletionException) &&
393                 (cause = x.getCause()) != null)
394                 x = cause;
395             throw new ExecutionException(x);
396         }
397         return r;
398     }
399 
400     /**
401      * Decodes outcome to return result or throw unchecked exception.
402      */
reportJoin(Object r)403     private static Object reportJoin(Object r) {
404         if (r instanceof AltResult) {
405             Throwable x;
406             if ((x = ((AltResult)r).ex) == null)
407                 return null;
408             if (x instanceof CancellationException)
409                 throw (CancellationException)x;
410             if (x instanceof CompletionException)
411                 throw (CompletionException)x;
412             throw new CompletionException(x);
413         }
414         return r;
415     }
416 
417     /* ------------- Async task preliminaries -------------- */
418 
419     /**
420      * A marker interface identifying asynchronous tasks produced by
421      * {@code async} methods. This may be useful for monitoring,
422      * debugging, and tracking asynchronous activities.
423      *
424      * @since 1.8
425      */
426     public static interface AsynchronousCompletionTask {
427     }
428 
429     private static final boolean USE_COMMON_POOL =
430         (ForkJoinPool.getCommonPoolParallelism() > 1);
431 
432     /**
433      * Default executor -- ForkJoinPool.commonPool() unless it cannot
434      * support parallelism.
435      */
436     private static final Executor ASYNC_POOL = USE_COMMON_POOL ?
437         ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
438 
439     /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
440     static final class ThreadPerTaskExecutor implements Executor {
execute(Runnable r)441         public void execute(Runnable r) { new Thread(r).start(); }
442     }
443 
444     /**
445      * Null-checks user executor argument, and translates uses of
446      * commonPool to ASYNC_POOL in case parallelism disabled.
447      */
screenExecutor(Executor e)448     static Executor screenExecutor(Executor e) {
449         if (!USE_COMMON_POOL && e == ForkJoinPool.commonPool())
450             return ASYNC_POOL;
451         if (e == null) throw new NullPointerException();
452         return e;
453     }
454 
455     // Modes for Completion.tryFire. Signedness matters.
456     static final int SYNC   =  0;
457     static final int ASYNC  =  1;
458     static final int NESTED = -1;
459 
460     /* ------------- Base Completion classes and operations -------------- */
461 
462     @SuppressWarnings("serial")
463     abstract static class Completion extends ForkJoinTask<Void>
464         implements Runnable, AsynchronousCompletionTask {
465         volatile Completion next;      // Treiber stack link
466 
467         /**
468          * Performs completion action if triggered, returning a
469          * dependent that may need propagation, if one exists.
470          *
471          * @param mode SYNC, ASYNC, or NESTED
472          */
tryFire(int mode)473         abstract CompletableFuture<?> tryFire(int mode);
474 
475         /** Returns true if possibly still triggerable. Used by cleanStack. */
isLive()476         abstract boolean isLive();
477 
run()478         public final void run()                { tryFire(ASYNC); }
exec()479         public final boolean exec()            { tryFire(ASYNC); return false; }
getRawResult()480         public final Void getRawResult()       { return null; }
setRawResult(Void v)481         public final void setRawResult(Void v) {}
482     }
483 
484     /**
485      * Pops and tries to trigger all reachable dependents.  Call only
486      * when known to be done.
487      */
postComplete()488     final void postComplete() {
489         /*
490          * On each step, variable f holds current dependents to pop
491          * and run.  It is extended along only one path at a time,
492          * pushing others to avoid unbounded recursion.
493          */
494         CompletableFuture<?> f = this; Completion h;
495         while ((h = f.stack) != null ||
496                (f != this && (h = (f = this).stack) != null)) {
497             CompletableFuture<?> d; Completion t;
498             if (STACK.compareAndSet(f, h, t = h.next)) {
499                 if (t != null) {
500                     if (f != this) {
501                         pushStack(h);
502                         continue;
503                     }
504                     NEXT.compareAndSet(h, t, null); // try to detach
505                 }
506                 f = (d = h.tryFire(NESTED)) == null ? this : d;
507             }
508         }
509     }
510 
511     /** Traverses stack and unlinks one or more dead Completions, if found. */
cleanStack()512     final void cleanStack() {
513         Completion p = stack;
514         // ensure head of stack live
515         for (boolean unlinked = false;;) {
516             if (p == null)
517                 return;
518             else if (p.isLive()) {
519                 if (unlinked)
520                     return;
521                 else
522                     break;
523             }
524             else if (STACK.weakCompareAndSet(this, p, (p = p.next)))
525                 unlinked = true;
526             else
527                 p = stack;
528         }
529         // try to unlink first non-live
530         for (Completion q = p.next; q != null;) {
531             Completion s = q.next;
532             if (q.isLive()) {
533                 p = q;
534                 q = s;
535             } else if (NEXT.weakCompareAndSet(p, q, s))
536                 break;
537             else
538                 q = p.next;
539         }
540     }
541 
542     /* ------------- One-input Completions -------------- */
543 
544     /** A Completion with a source, dependent, and executor. */
545     @SuppressWarnings("serial")
546     abstract static class UniCompletion<T,V> extends Completion {
547         Executor executor;                 // executor to use (null if none)
548         CompletableFuture<V> dep;          // the dependent to complete
549         CompletableFuture<T> src;          // source for action
550 
UniCompletion(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src)551         UniCompletion(Executor executor, CompletableFuture<V> dep,
552                       CompletableFuture<T> src) {
553             this.executor = executor; this.dep = dep; this.src = src;
554         }
555 
556         /**
557          * Returns true if action can be run. Call only when known to
558          * be triggerable. Uses FJ tag bit to ensure that only one
559          * thread claims ownership.  If async, starts as task -- a
560          * later call to tryFire will run action.
561          */
claim()562         final boolean claim() {
563             Executor e = executor;
564             if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
565                 if (e == null)
566                     return true;
567                 executor = null; // disable
568                 e.execute(this);
569             }
570             return false;
571         }
572 
isLive()573         final boolean isLive() { return dep != null; }
574     }
575 
576     /**
577      * Pushes the given completion unless it completes while trying.
578      * Caller should first check that result is null.
579      */
unipush(Completion c)580     final void unipush(Completion c) {
581         if (c != null) {
582             while (!tryPushStack(c)) {
583                 if (result != null) {
584                     NEXT.set(c, null);
585                     break;
586                 }
587             }
588             if (result != null)
589                 c.tryFire(SYNC);
590         }
591     }
592 
593     /**
594      * Post-processing by dependent after successful UniCompletion tryFire.
595      * Tries to clean stack of source a, and then either runs postComplete
596      * or returns this to caller, depending on mode.
597      */
postFire(CompletableFuture<?> a, int mode)598     final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {
599         if (a != null && a.stack != null) {
600             Object r;
601             if ((r = a.result) == null)
602                 a.cleanStack();
603             if (mode >= 0 && (r != null || a.result != null))
604                 a.postComplete();
605         }
606         if (result != null && stack != null) {
607             if (mode < 0)
608                 return this;
609             else
610                 postComplete();
611         }
612         return null;
613     }
614 
615     @SuppressWarnings("serial")
616     static final class UniApply<T,V> extends UniCompletion<T,V> {
617         Function<? super T,? extends V> fn;
UniApply(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, Function<? super T,? extends V> fn)618         UniApply(Executor executor, CompletableFuture<V> dep,
619                  CompletableFuture<T> src,
620                  Function<? super T,? extends V> fn) {
621             super(executor, dep, src); this.fn = fn;
622         }
tryFire(int mode)623         final CompletableFuture<V> tryFire(int mode) {
624             CompletableFuture<V> d; CompletableFuture<T> a;
625             Object r; Throwable x; Function<? super T,? extends V> f;
626             if ((a = src) == null || (r = a.result) == null
627                 || (d = dep) == null || (f = fn) == null)
628                 return null;
629             tryComplete: if (d.result == null) {
630                 if (r instanceof AltResult) {
631                     if ((x = ((AltResult)r).ex) != null) {
632                         d.completeThrowable(x, r);
633                         break tryComplete;
634                     }
635                     r = null;
636                 }
637                 try {
638                     if (mode <= 0 && !claim())
639                         return null;
640                     else {
641                         @SuppressWarnings("unchecked") T t = (T) r;
642                         d.completeValue(f.apply(t));
643                     }
644                 } catch (Throwable ex) {
645                     d.completeThrowable(ex);
646                 }
647             }
648             src = null; dep = null; fn = null;
649             return d.postFire(a, mode);
650         }
651     }
652 
uniApplyStage( Executor e, Function<? super T,? extends V> f)653     private <V> CompletableFuture<V> uniApplyStage(
654         Executor e, Function<? super T,? extends V> f) {
655         if (f == null) throw new NullPointerException();
656         Object r;
657         if ((r = result) != null)
658             return uniApplyNow(r, e, f);
659         CompletableFuture<V> d = newIncompleteFuture();
660         unipush(new UniApply<T,V>(e, d, this, f));
661         return d;
662     }
663 
uniApplyNow( Object r, Executor e, Function<? super T,? extends V> f)664     private <V> CompletableFuture<V> uniApplyNow(
665         Object r, Executor e, Function<? super T,? extends V> f) {
666         Throwable x;
667         CompletableFuture<V> d = newIncompleteFuture();
668         if (r instanceof AltResult) {
669             if ((x = ((AltResult)r).ex) != null) {
670                 d.result = encodeThrowable(x, r);
671                 return d;
672             }
673             r = null;
674         }
675         try {
676             if (e != null) {
677                 e.execute(new UniApply<T,V>(null, d, this, f));
678             } else {
679                 @SuppressWarnings("unchecked") T t = (T) r;
680                 d.result = d.encodeValue(f.apply(t));
681             }
682         } catch (Throwable ex) {
683             d.result = encodeThrowable(ex);
684         }
685         return d;
686     }
687 
688     @SuppressWarnings("serial")
689     static final class UniAccept<T> extends UniCompletion<T,Void> {
690         Consumer<? super T> fn;
UniAccept(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, Consumer<? super T> fn)691         UniAccept(Executor executor, CompletableFuture<Void> dep,
692                   CompletableFuture<T> src, Consumer<? super T> fn) {
693             super(executor, dep, src); this.fn = fn;
694         }
tryFire(int mode)695         final CompletableFuture<Void> tryFire(int mode) {
696             CompletableFuture<Void> d; CompletableFuture<T> a;
697             Object r; Throwable x; Consumer<? super T> f;
698             if ((a = src) == null || (r = a.result) == null
699                 || (d = dep) == null || (f = fn) == null)
700                 return null;
701             tryComplete: if (d.result == null) {
702                 if (r instanceof AltResult) {
703                     if ((x = ((AltResult)r).ex) != null) {
704                         d.completeThrowable(x, r);
705                         break tryComplete;
706                     }
707                     r = null;
708                 }
709                 try {
710                     if (mode <= 0 && !claim())
711                         return null;
712                     else {
713                         @SuppressWarnings("unchecked") T t = (T) r;
714                         f.accept(t);
715                         d.completeNull();
716                     }
717                 } catch (Throwable ex) {
718                     d.completeThrowable(ex);
719                 }
720             }
721             src = null; dep = null; fn = null;
722             return d.postFire(a, mode);
723         }
724     }
725 
uniAcceptStage(Executor e, Consumer<? super T> f)726     private CompletableFuture<Void> uniAcceptStage(Executor e,
727                                                    Consumer<? super T> f) {
728         if (f == null) throw new NullPointerException();
729         Object r;
730         if ((r = result) != null)
731             return uniAcceptNow(r, e, f);
732         CompletableFuture<Void> d = newIncompleteFuture();
733         unipush(new UniAccept<T>(e, d, this, f));
734         return d;
735     }
736 
uniAcceptNow( Object r, Executor e, Consumer<? super T> f)737     private CompletableFuture<Void> uniAcceptNow(
738         Object r, Executor e, Consumer<? super T> f) {
739         Throwable x;
740         CompletableFuture<Void> d = newIncompleteFuture();
741         if (r instanceof AltResult) {
742             if ((x = ((AltResult)r).ex) != null) {
743                 d.result = encodeThrowable(x, r);
744                 return d;
745             }
746             r = null;
747         }
748         try {
749             if (e != null) {
750                 e.execute(new UniAccept<T>(null, d, this, f));
751             } else {
752                 @SuppressWarnings("unchecked") T t = (T) r;
753                 f.accept(t);
754                 d.result = NIL;
755             }
756         } catch (Throwable ex) {
757             d.result = encodeThrowable(ex);
758         }
759         return d;
760     }
761 
762     @SuppressWarnings("serial")
763     static final class UniRun<T> extends UniCompletion<T,Void> {
764         Runnable fn;
UniRun(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, Runnable fn)765         UniRun(Executor executor, CompletableFuture<Void> dep,
766                CompletableFuture<T> src, Runnable fn) {
767             super(executor, dep, src); this.fn = fn;
768         }
tryFire(int mode)769         final CompletableFuture<Void> tryFire(int mode) {
770             CompletableFuture<Void> d; CompletableFuture<T> a;
771             Object r; Throwable x; Runnable f;
772             if ((a = src) == null || (r = a.result) == null
773                 || (d = dep) == null || (f = fn) == null)
774                 return null;
775             if (d.result == null) {
776                 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null)
777                     d.completeThrowable(x, r);
778                 else
779                     try {
780                         if (mode <= 0 && !claim())
781                             return null;
782                         else {
783                             f.run();
784                             d.completeNull();
785                         }
786                     } catch (Throwable ex) {
787                         d.completeThrowable(ex);
788                     }
789             }
790             src = null; dep = null; fn = null;
791             return d.postFire(a, mode);
792         }
793     }
794 
uniRunStage(Executor e, Runnable f)795     private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) {
796         if (f == null) throw new NullPointerException();
797         Object r;
798         if ((r = result) != null)
799             return uniRunNow(r, e, f);
800         CompletableFuture<Void> d = newIncompleteFuture();
801         unipush(new UniRun<T>(e, d, this, f));
802         return d;
803     }
804 
uniRunNow(Object r, Executor e, Runnable f)805     private CompletableFuture<Void> uniRunNow(Object r, Executor e, Runnable f) {
806         Throwable x;
807         CompletableFuture<Void> d = newIncompleteFuture();
808         if (r instanceof AltResult && (x = ((AltResult)r).ex) != null)
809             d.result = encodeThrowable(x, r);
810         else
811             try {
812                 if (e != null) {
813                     e.execute(new UniRun<T>(null, d, this, f));
814                 } else {
815                     f.run();
816                     d.result = NIL;
817                 }
818             } catch (Throwable ex) {
819                 d.result = encodeThrowable(ex);
820             }
821         return d;
822     }
823 
824     @SuppressWarnings("serial")
825     static final class UniWhenComplete<T> extends UniCompletion<T,T> {
826         BiConsumer<? super T, ? super Throwable> fn;
UniWhenComplete(Executor executor, CompletableFuture<T> dep, CompletableFuture<T> src, BiConsumer<? super T, ? super Throwable> fn)827         UniWhenComplete(Executor executor, CompletableFuture<T> dep,
828                         CompletableFuture<T> src,
829                         BiConsumer<? super T, ? super Throwable> fn) {
830             super(executor, dep, src); this.fn = fn;
831         }
tryFire(int mode)832         final CompletableFuture<T> tryFire(int mode) {
833             CompletableFuture<T> d; CompletableFuture<T> a;
834             Object r; BiConsumer<? super T, ? super Throwable> f;
835             if ((a = src) == null || (r = a.result) == null
836                 || (d = dep) == null || (f = fn) == null
837                 || !d.uniWhenComplete(r, f, mode > 0 ? null : this))
838                 return null;
839             src = null; dep = null; fn = null;
840             return d.postFire(a, mode);
841         }
842     }
843 
uniWhenComplete(Object r, BiConsumer<? super T,? super Throwable> f, UniWhenComplete<T> c)844     final boolean uniWhenComplete(Object r,
845                                   BiConsumer<? super T,? super Throwable> f,
846                                   UniWhenComplete<T> c) {
847         T t; Throwable x = null;
848         if (result == null) {
849             try {
850                 if (c != null && !c.claim())
851                     return false;
852                 if (r instanceof AltResult) {
853                     x = ((AltResult)r).ex;
854                     t = null;
855                 } else {
856                     @SuppressWarnings("unchecked") T tr = (T) r;
857                     t = tr;
858                 }
859                 f.accept(t, x);
860                 if (x == null) {
861                     internalComplete(r);
862                     return true;
863                 }
864             } catch (Throwable ex) {
865                 if (x == null)
866                     x = ex;
867                 else if (x != ex)
868                     x.addSuppressed(ex);
869             }
870             completeThrowable(x, r);
871         }
872         return true;
873     }
874 
uniWhenCompleteStage( Executor e, BiConsumer<? super T, ? super Throwable> f)875     private CompletableFuture<T> uniWhenCompleteStage(
876         Executor e, BiConsumer<? super T, ? super Throwable> f) {
877         if (f == null) throw new NullPointerException();
878         CompletableFuture<T> d = newIncompleteFuture();
879         Object r;
880         if ((r = result) == null)
881             unipush(new UniWhenComplete<T>(e, d, this, f));
882         else if (e == null)
883             d.uniWhenComplete(r, f, null);
884         else {
885             try {
886                 e.execute(new UniWhenComplete<T>(null, d, this, f));
887             } catch (Throwable ex) {
888                 d.result = encodeThrowable(ex);
889             }
890         }
891         return d;
892     }
893 
894     @SuppressWarnings("serial")
895     static final class UniHandle<T,V> extends UniCompletion<T,V> {
896         BiFunction<? super T, Throwable, ? extends V> fn;
UniHandle(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, BiFunction<? super T, Throwable, ? extends V> fn)897         UniHandle(Executor executor, CompletableFuture<V> dep,
898                   CompletableFuture<T> src,
899                   BiFunction<? super T, Throwable, ? extends V> fn) {
900             super(executor, dep, src); this.fn = fn;
901         }
tryFire(int mode)902         final CompletableFuture<V> tryFire(int mode) {
903             CompletableFuture<V> d; CompletableFuture<T> a;
904             Object r; BiFunction<? super T, Throwable, ? extends V> f;
905             if ((a = src) == null || (r = a.result) == null
906                 || (d = dep) == null || (f = fn) == null
907                 || !d.uniHandle(r, f, mode > 0 ? null : this))
908                 return null;
909             src = null; dep = null; fn = null;
910             return d.postFire(a, mode);
911         }
912     }
913 
uniHandle(Object r, BiFunction<? super S, Throwable, ? extends T> f, UniHandle<S,T> c)914     final <S> boolean uniHandle(Object r,
915                                 BiFunction<? super S, Throwable, ? extends T> f,
916                                 UniHandle<S,T> c) {
917         S s; Throwable x;
918         if (result == null) {
919             try {
920                 if (c != null && !c.claim())
921                     return false;
922                 if (r instanceof AltResult) {
923                     x = ((AltResult)r).ex;
924                     s = null;
925                 } else {
926                     x = null;
927                     @SuppressWarnings("unchecked") S ss = (S) r;
928                     s = ss;
929                 }
930                 completeValue(f.apply(s, x));
931             } catch (Throwable ex) {
932                 completeThrowable(ex);
933             }
934         }
935         return true;
936     }
937 
uniHandleStage( Executor e, BiFunction<? super T, Throwable, ? extends V> f)938     private <V> CompletableFuture<V> uniHandleStage(
939         Executor e, BiFunction<? super T, Throwable, ? extends V> f) {
940         if (f == null) throw new NullPointerException();
941         CompletableFuture<V> d = newIncompleteFuture();
942         Object r;
943         if ((r = result) == null)
944             unipush(new UniHandle<T,V>(e, d, this, f));
945         else if (e == null)
946             d.uniHandle(r, f, null);
947         else {
948             try {
949                 e.execute(new UniHandle<T,V>(null, d, this, f));
950             } catch (Throwable ex) {
951                 d.result = encodeThrowable(ex);
952             }
953         }
954         return d;
955     }
956 
957     @SuppressWarnings("serial")
958     static final class UniExceptionally<T> extends UniCompletion<T,T> {
959         Function<? super Throwable, ? extends T> fn;
UniExceptionally(Executor executor, CompletableFuture<T> dep, CompletableFuture<T> src, Function<? super Throwable, ? extends T> fn)960         UniExceptionally(Executor executor,
961                          CompletableFuture<T> dep, CompletableFuture<T> src,
962                          Function<? super Throwable, ? extends T> fn) {
963             super(executor, dep, src); this.fn = fn;
964         }
tryFire(int mode)965         final CompletableFuture<T> tryFire(int mode) {
966             CompletableFuture<T> d; CompletableFuture<T> a;
967             Object r; Function<? super Throwable, ? extends T> f;
968             if ((a = src) == null || (r = a.result) == null
969                 || (d = dep) == null || (f = fn) == null
970                 || !d.uniExceptionally(r, f, mode > 0 ? null : this))
971                 return null;
972             src = null; dep = null; fn = null;
973             return d.postFire(a, mode);
974         }
975     }
976 
uniExceptionally(Object r, Function<? super Throwable, ? extends T> f, UniExceptionally<T> c)977     final boolean uniExceptionally(Object r,
978                                    Function<? super Throwable, ? extends T> f,
979                                    UniExceptionally<T> c) {
980         Throwable x;
981         if (result == null) {
982             try {
983                 if (c != null && !c.claim())
984                     return false;
985                 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null)
986                     completeValue(f.apply(x));
987                 else
988                     internalComplete(r);
989             } catch (Throwable ex) {
990                 completeThrowable(ex);
991             }
992         }
993         return true;
994     }
995 
uniExceptionallyStage( Executor e, Function<Throwable, ? extends T> f)996     private CompletableFuture<T> uniExceptionallyStage(
997         Executor e, Function<Throwable, ? extends T> f) {
998         if (f == null) throw new NullPointerException();
999         CompletableFuture<T> d = newIncompleteFuture();
1000         Object r;
1001         if ((r = result) == null)
1002             unipush(new UniExceptionally<T>(e, d, this, f));
1003         else if (e == null)
1004             d.uniExceptionally(r, f, null);
1005         else {
1006             try {
1007                 e.execute(new UniExceptionally<T>(null, d, this, f));
1008             } catch (Throwable ex) {
1009                 d.result = encodeThrowable(ex);
1010             }
1011         }
1012         return d;
1013     }
1014 
1015     @SuppressWarnings("serial")
1016     static final class UniComposeExceptionally<T> extends UniCompletion<T,T> {
1017         Function<Throwable, ? extends CompletionStage<T>> fn;
UniComposeExceptionally(Executor executor, CompletableFuture<T> dep, CompletableFuture<T> src, Function<Throwable, ? extends CompletionStage<T>> fn)1018         UniComposeExceptionally(Executor executor, CompletableFuture<T> dep,
1019                                 CompletableFuture<T> src,
1020                                 Function<Throwable, ? extends CompletionStage<T>> fn) {
1021             super(executor, dep, src); this.fn = fn;
1022         }
tryFire(int mode)1023         final CompletableFuture<T> tryFire(int mode) {
1024             CompletableFuture<T> d; CompletableFuture<T> a;
1025             Function<Throwable, ? extends CompletionStage<T>> f;
1026             Object r; Throwable x;
1027             if ((a = src) == null || (r = a.result) == null
1028                 || (d = dep) == null || (f = fn) == null)
1029                 return null;
1030             if (d.result == null) {
1031                 if ((r instanceof AltResult) &&
1032                     (x = ((AltResult)r).ex) != null) {
1033                     try {
1034                         if (mode <= 0 && !claim())
1035                             return null;
1036                         CompletableFuture<T> g = f.apply(x).toCompletableFuture();
1037                         if ((r = g.result) != null)
1038                             d.completeRelay(r);
1039                         else {
1040                             g.unipush(new UniRelay<T,T>(d, g));
1041                             if (d.result == null)
1042                                 return null;
1043                         }
1044                     } catch (Throwable ex) {
1045                         d.completeThrowable(ex);
1046                     }
1047                 }
1048                 else
1049                     d.internalComplete(r);
1050             }
1051             src = null; dep = null; fn = null;
1052             return d.postFire(a, mode);
1053         }
1054     }
1055 
uniComposeExceptionallyStage( Executor e, Function<Throwable, ? extends CompletionStage<T>> f)1056     private CompletableFuture<T> uniComposeExceptionallyStage(
1057         Executor e, Function<Throwable, ? extends CompletionStage<T>> f) {
1058         if (f == null) throw new NullPointerException();
1059         CompletableFuture<T> d = newIncompleteFuture();
1060         Object r, s; Throwable x;
1061         if ((r = result) == null)
1062             unipush(new UniComposeExceptionally<T>(e, d, this, f));
1063         else if (!(r instanceof AltResult) || (x = ((AltResult)r).ex) == null)
1064             d.internalComplete(r);
1065         else
1066             try {
1067                 if (e != null)
1068                     e.execute(new UniComposeExceptionally<T>(null, d, this, f));
1069                 else {
1070                     CompletableFuture<T> g = f.apply(x).toCompletableFuture();
1071                     if ((s = g.result) != null)
1072                         d.result = encodeRelay(s);
1073                     else
1074                         g.unipush(new UniRelay<T,T>(d, g));
1075                 }
1076             } catch (Throwable ex) {
1077                 d.result = encodeThrowable(ex);
1078             }
1079         return d;
1080     }
1081 
1082     @SuppressWarnings("serial")
1083     static final class UniRelay<U, T extends U> extends UniCompletion<T,U> {
UniRelay(CompletableFuture<U> dep, CompletableFuture<T> src)1084         UniRelay(CompletableFuture<U> dep, CompletableFuture<T> src) {
1085             super(null, dep, src);
1086         }
tryFire(int mode)1087         final CompletableFuture<U> tryFire(int mode) {
1088             CompletableFuture<U> d; CompletableFuture<T> a; Object r;
1089             if ((a = src) == null || (r = a.result) == null
1090                 || (d = dep) == null)
1091                 return null;
1092             if (d.result == null)
1093                 d.completeRelay(r);
1094             src = null; dep = null;
1095             return d.postFire(a, mode);
1096         }
1097     }
1098 
uniCopyStage( CompletableFuture<T> src)1099     private static <U, T extends U> CompletableFuture<U> uniCopyStage(
1100         CompletableFuture<T> src) {
1101         Object r;
1102         CompletableFuture<U> d = src.newIncompleteFuture();
1103         if ((r = src.result) != null)
1104             d.result = encodeRelay(r);
1105         else
1106             src.unipush(new UniRelay<U,T>(d, src));
1107         return d;
1108     }
1109 
uniAsMinimalStage()1110     private MinimalStage<T> uniAsMinimalStage() {
1111         Object r;
1112         if ((r = result) != null)
1113             return new MinimalStage<T>(encodeRelay(r));
1114         MinimalStage<T> d = new MinimalStage<T>();
1115         unipush(new UniRelay<T,T>(d, this));
1116         return d;
1117     }
1118 
1119     @SuppressWarnings("serial")
1120     static final class UniCompose<T,V> extends UniCompletion<T,V> {
1121         Function<? super T, ? extends CompletionStage<V>> fn;
UniCompose(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, Function<? super T, ? extends CompletionStage<V>> fn)1122         UniCompose(Executor executor, CompletableFuture<V> dep,
1123                    CompletableFuture<T> src,
1124                    Function<? super T, ? extends CompletionStage<V>> fn) {
1125             super(executor, dep, src); this.fn = fn;
1126         }
tryFire(int mode)1127         final CompletableFuture<V> tryFire(int mode) {
1128             CompletableFuture<V> d; CompletableFuture<T> a;
1129             Function<? super T, ? extends CompletionStage<V>> f;
1130             Object r; Throwable x;
1131             if ((a = src) == null || (r = a.result) == null
1132                 || (d = dep) == null || (f = fn) == null)
1133                 return null;
1134             tryComplete: if (d.result == null) {
1135                 if (r instanceof AltResult) {
1136                     if ((x = ((AltResult)r).ex) != null) {
1137                         d.completeThrowable(x, r);
1138                         break tryComplete;
1139                     }
1140                     r = null;
1141                 }
1142                 try {
1143                     if (mode <= 0 && !claim())
1144                         return null;
1145                     @SuppressWarnings("unchecked") T t = (T) r;
1146                     CompletableFuture<V> g = f.apply(t).toCompletableFuture();
1147                     if ((r = g.result) != null)
1148                         d.completeRelay(r);
1149                     else {
1150                         g.unipush(new UniRelay<V,V>(d, g));
1151                         if (d.result == null)
1152                             return null;
1153                     }
1154                 } catch (Throwable ex) {
1155                     d.completeThrowable(ex);
1156                 }
1157             }
1158             src = null; dep = null; fn = null;
1159             return d.postFire(a, mode);
1160         }
1161     }
1162 
uniComposeStage( Executor e, Function<? super T, ? extends CompletionStage<V>> f)1163     private <V> CompletableFuture<V> uniComposeStage(
1164         Executor e, Function<? super T, ? extends CompletionStage<V>> f) {
1165         if (f == null) throw new NullPointerException();
1166         CompletableFuture<V> d = newIncompleteFuture();
1167         Object r, s; Throwable x;
1168         if ((r = result) == null)
1169             unipush(new UniCompose<T,V>(e, d, this, f));
1170         else {
1171             if (r instanceof AltResult) {
1172                 if ((x = ((AltResult)r).ex) != null) {
1173                     d.result = encodeThrowable(x, r);
1174                     return d;
1175                 }
1176                 r = null;
1177             }
1178             try {
1179                 if (e != null)
1180                     e.execute(new UniCompose<T,V>(null, d, this, f));
1181                 else {
1182                     @SuppressWarnings("unchecked") T t = (T) r;
1183                     CompletableFuture<V> g = f.apply(t).toCompletableFuture();
1184                     if ((s = g.result) != null)
1185                         d.result = encodeRelay(s);
1186                     else
1187                         g.unipush(new UniRelay<V,V>(d, g));
1188                 }
1189             } catch (Throwable ex) {
1190                 d.result = encodeThrowable(ex);
1191             }
1192         }
1193         return d;
1194     }
1195 
1196     /* ------------- Two-input Completions -------------- */
1197 
1198     /** A Completion for an action with two sources */
1199     @SuppressWarnings("serial")
1200     abstract static class BiCompletion<T,U,V> extends UniCompletion<T,V> {
1201         CompletableFuture<U> snd; // second source for action
BiCompletion(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, CompletableFuture<U> snd)1202         BiCompletion(Executor executor, CompletableFuture<V> dep,
1203                      CompletableFuture<T> src, CompletableFuture<U> snd) {
1204             super(executor, dep, src); this.snd = snd;
1205         }
1206     }
1207 
1208     /** A Completion delegating to a BiCompletion */
1209     @SuppressWarnings("serial")
1210     static final class CoCompletion extends Completion {
1211         BiCompletion<?,?,?> base;
CoCompletion(BiCompletion<?,?,?> base)1212         CoCompletion(BiCompletion<?,?,?> base) { this.base = base; }
tryFire(int mode)1213         final CompletableFuture<?> tryFire(int mode) {
1214             BiCompletion<?,?,?> c; CompletableFuture<?> d;
1215             if ((c = base) == null || (d = c.tryFire(mode)) == null)
1216                 return null;
1217             base = null; // detach
1218             return d;
1219         }
isLive()1220         final boolean isLive() {
1221             BiCompletion<?,?,?> c;
1222             return (c = base) != null
1223                 // && c.isLive()
1224                 && c.dep != null;
1225         }
1226     }
1227 
1228     /**
1229      * Pushes completion to this and b unless both done.
1230      * Caller should first check that either result or b.result is null.
1231      */
bipush(CompletableFuture<?> b, BiCompletion<?,?,?> c)1232     final void bipush(CompletableFuture<?> b, BiCompletion<?,?,?> c) {
1233         if (c != null) {
1234             while (result == null) {
1235                 if (tryPushStack(c)) {
1236                     if (b.result == null)
1237                         b.unipush(new CoCompletion(c));
1238                     else if (result != null)
1239                         c.tryFire(SYNC);
1240                     return;
1241                 }
1242             }
1243             b.unipush(c);
1244         }
1245     }
1246 
1247     /** Post-processing after successful BiCompletion tryFire. */
postFire(CompletableFuture<?> a, CompletableFuture<?> b, int mode)1248     final CompletableFuture<T> postFire(CompletableFuture<?> a,
1249                                         CompletableFuture<?> b, int mode) {
1250         if (b != null && b.stack != null) { // clean second source
1251             Object r;
1252             if ((r = b.result) == null)
1253                 b.cleanStack();
1254             if (mode >= 0 && (r != null || b.result != null))
1255                 b.postComplete();
1256         }
1257         return postFire(a, mode);
1258     }
1259 
1260     @SuppressWarnings("serial")
1261     static final class BiApply<T,U,V> extends BiCompletion<T,U,V> {
1262         BiFunction<? super T,? super U,? extends V> fn;
BiApply(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, CompletableFuture<U> snd, BiFunction<? super T,? super U,? extends V> fn)1263         BiApply(Executor executor, CompletableFuture<V> dep,
1264                 CompletableFuture<T> src, CompletableFuture<U> snd,
1265                 BiFunction<? super T,? super U,? extends V> fn) {
1266             super(executor, dep, src, snd); this.fn = fn;
1267         }
tryFire(int mode)1268         final CompletableFuture<V> tryFire(int mode) {
1269             CompletableFuture<V> d;
1270             CompletableFuture<T> a;
1271             CompletableFuture<U> b;
1272             Object r, s; BiFunction<? super T,? super U,? extends V> f;
1273             if (   (a = src) == null || (r = a.result) == null
1274                 || (b = snd) == null || (s = b.result) == null
1275                 || (d = dep) == null || (f = fn) == null
1276                 || !d.biApply(r, s, f, mode > 0 ? null : this))
1277                 return null;
1278             src = null; snd = null; dep = null; fn = null;
1279             return d.postFire(a, b, mode);
1280         }
1281     }
1282 
biApply(Object r, Object s, BiFunction<? super R,? super S,? extends T> f, BiApply<R,S,T> c)1283     final <R,S> boolean biApply(Object r, Object s,
1284                                 BiFunction<? super R,? super S,? extends T> f,
1285                                 BiApply<R,S,T> c) {
1286         Throwable x;
1287         tryComplete: if (result == null) {
1288             if (r instanceof AltResult) {
1289                 if ((x = ((AltResult)r).ex) != null) {
1290                     completeThrowable(x, r);
1291                     break tryComplete;
1292                 }
1293                 r = null;
1294             }
1295             if (s instanceof AltResult) {
1296                 if ((x = ((AltResult)s).ex) != null) {
1297                     completeThrowable(x, s);
1298                     break tryComplete;
1299                 }
1300                 s = null;
1301             }
1302             try {
1303                 if (c != null && !c.claim())
1304                     return false;
1305                 @SuppressWarnings("unchecked") R rr = (R) r;
1306                 @SuppressWarnings("unchecked") S ss = (S) s;
1307                 completeValue(f.apply(rr, ss));
1308             } catch (Throwable ex) {
1309                 completeThrowable(ex);
1310             }
1311         }
1312         return true;
1313     }
1314 
biApplyStage( Executor e, CompletionStage<U> o, BiFunction<? super T,? super U,? extends V> f)1315     private <U,V> CompletableFuture<V> biApplyStage(
1316         Executor e, CompletionStage<U> o,
1317         BiFunction<? super T,? super U,? extends V> f) {
1318         CompletableFuture<U> b; Object r, s;
1319         if (f == null || (b = o.toCompletableFuture()) == null)
1320             throw new NullPointerException();
1321         CompletableFuture<V> d = newIncompleteFuture();
1322         if ((r = result) == null || (s = b.result) == null)
1323             bipush(b, new BiApply<T,U,V>(e, d, this, b, f));
1324         else if (e == null)
1325             d.biApply(r, s, f, null);
1326         else
1327             try {
1328                 e.execute(new BiApply<T,U,V>(null, d, this, b, f));
1329             } catch (Throwable ex) {
1330                 d.result = encodeThrowable(ex);
1331             }
1332         return d;
1333     }
1334 
1335     @SuppressWarnings("serial")
1336     static final class BiAccept<T,U> extends BiCompletion<T,U,Void> {
1337         BiConsumer<? super T,? super U> fn;
BiAccept(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd, BiConsumer<? super T,? super U> fn)1338         BiAccept(Executor executor, CompletableFuture<Void> dep,
1339                  CompletableFuture<T> src, CompletableFuture<U> snd,
1340                  BiConsumer<? super T,? super U> fn) {
1341             super(executor, dep, src, snd); this.fn = fn;
1342         }
tryFire(int mode)1343         final CompletableFuture<Void> tryFire(int mode) {
1344             CompletableFuture<Void> d;
1345             CompletableFuture<T> a;
1346             CompletableFuture<U> b;
1347             Object r, s; BiConsumer<? super T,? super U> f;
1348             if (   (a = src) == null || (r = a.result) == null
1349                 || (b = snd) == null || (s = b.result) == null
1350                 || (d = dep) == null || (f = fn) == null
1351                 || !d.biAccept(r, s, f, mode > 0 ? null : this))
1352                 return null;
1353             src = null; snd = null; dep = null; fn = null;
1354             return d.postFire(a, b, mode);
1355         }
1356     }
1357 
biAccept(Object r, Object s, BiConsumer<? super R,? super S> f, BiAccept<R,S> c)1358     final <R,S> boolean biAccept(Object r, Object s,
1359                                  BiConsumer<? super R,? super S> f,
1360                                  BiAccept<R,S> c) {
1361         Throwable x;
1362         tryComplete: if (result == null) {
1363             if (r instanceof AltResult) {
1364                 if ((x = ((AltResult)r).ex) != null) {
1365                     completeThrowable(x, r);
1366                     break tryComplete;
1367                 }
1368                 r = null;
1369             }
1370             if (s instanceof AltResult) {
1371                 if ((x = ((AltResult)s).ex) != null) {
1372                     completeThrowable(x, s);
1373                     break tryComplete;
1374                 }
1375                 s = null;
1376             }
1377             try {
1378                 if (c != null && !c.claim())
1379                     return false;
1380                 @SuppressWarnings("unchecked") R rr = (R) r;
1381                 @SuppressWarnings("unchecked") S ss = (S) s;
1382                 f.accept(rr, ss);
1383                 completeNull();
1384             } catch (Throwable ex) {
1385                 completeThrowable(ex);
1386             }
1387         }
1388         return true;
1389     }
1390 
biAcceptStage( Executor e, CompletionStage<U> o, BiConsumer<? super T,? super U> f)1391     private <U> CompletableFuture<Void> biAcceptStage(
1392         Executor e, CompletionStage<U> o,
1393         BiConsumer<? super T,? super U> f) {
1394         CompletableFuture<U> b; Object r, s;
1395         if (f == null || (b = o.toCompletableFuture()) == null)
1396             throw new NullPointerException();
1397         CompletableFuture<Void> d = newIncompleteFuture();
1398         if ((r = result) == null || (s = b.result) == null)
1399             bipush(b, new BiAccept<T,U>(e, d, this, b, f));
1400         else if (e == null)
1401             d.biAccept(r, s, f, null);
1402         else
1403             try {
1404                 e.execute(new BiAccept<T,U>(null, d, this, b, f));
1405             } catch (Throwable ex) {
1406                 d.result = encodeThrowable(ex);
1407             }
1408         return d;
1409     }
1410 
1411     @SuppressWarnings("serial")
1412     static final class BiRun<T,U> extends BiCompletion<T,U,Void> {
1413         Runnable fn;
BiRun(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd, Runnable fn)1414         BiRun(Executor executor, CompletableFuture<Void> dep,
1415               CompletableFuture<T> src, CompletableFuture<U> snd,
1416               Runnable fn) {
1417             super(executor, dep, src, snd); this.fn = fn;
1418         }
tryFire(int mode)1419         final CompletableFuture<Void> tryFire(int mode) {
1420             CompletableFuture<Void> d;
1421             CompletableFuture<T> a;
1422             CompletableFuture<U> b;
1423             Object r, s; Runnable f;
1424             if (   (a = src) == null || (r = a.result) == null
1425                 || (b = snd) == null || (s = b.result) == null
1426                 || (d = dep) == null || (f = fn) == null
1427                 || !d.biRun(r, s, f, mode > 0 ? null : this))
1428                 return null;
1429             src = null; snd = null; dep = null; fn = null;
1430             return d.postFire(a, b, mode);
1431         }
1432     }
1433 
biRun(Object r, Object s, Runnable f, BiRun<?,?> c)1434     final boolean biRun(Object r, Object s, Runnable f, BiRun<?,?> c) {
1435         Throwable x; Object z;
1436         if (result == null) {
1437             if ((r instanceof AltResult
1438                  && (x = ((AltResult)(z = r)).ex) != null) ||
1439                 (s instanceof AltResult
1440                  && (x = ((AltResult)(z = s)).ex) != null))
1441                 completeThrowable(x, z);
1442             else
1443                 try {
1444                     if (c != null && !c.claim())
1445                         return false;
1446                     f.run();
1447                     completeNull();
1448                 } catch (Throwable ex) {
1449                     completeThrowable(ex);
1450                 }
1451         }
1452         return true;
1453     }
1454 
biRunStage(Executor e, CompletionStage<?> o, Runnable f)1455     private CompletableFuture<Void> biRunStage(Executor e, CompletionStage<?> o,
1456                                                Runnable f) {
1457         CompletableFuture<?> b; Object r, s;
1458         if (f == null || (b = o.toCompletableFuture()) == null)
1459             throw new NullPointerException();
1460         CompletableFuture<Void> d = newIncompleteFuture();
1461         if ((r = result) == null || (s = b.result) == null)
1462             bipush(b, new BiRun<>(e, d, this, b, f));
1463         else if (e == null)
1464             d.biRun(r, s, f, null);
1465         else
1466             try {
1467                 e.execute(new BiRun<>(null, d, this, b, f));
1468             } catch (Throwable ex) {
1469                 d.result = encodeThrowable(ex);
1470             }
1471         return d;
1472     }
1473 
1474     @SuppressWarnings("serial")
1475     static final class BiRelay<T,U> extends BiCompletion<T,U,Void> { // for And
BiRelay(CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd)1476         BiRelay(CompletableFuture<Void> dep,
1477                 CompletableFuture<T> src, CompletableFuture<U> snd) {
1478             super(null, dep, src, snd);
1479         }
tryFire(int mode)1480         final CompletableFuture<Void> tryFire(int mode) {
1481             CompletableFuture<Void> d;
1482             CompletableFuture<T> a;
1483             CompletableFuture<U> b;
1484             Object r, s, z; Throwable x;
1485             if (   (a = src) == null || (r = a.result) == null
1486                 || (b = snd) == null || (s = b.result) == null
1487                 || (d = dep) == null)
1488                 return null;
1489             if (d.result == null) {
1490                 if ((r instanceof AltResult
1491                      && (x = ((AltResult)(z = r)).ex) != null) ||
1492                     (s instanceof AltResult
1493                      && (x = ((AltResult)(z = s)).ex) != null))
1494                     d.completeThrowable(x, z);
1495                 else
1496                     d.completeNull();
1497             }
1498             src = null; snd = null; dep = null;
1499             return d.postFire(a, b, mode);
1500         }
1501     }
1502 
1503     /** Recursively constructs a tree of completions. */
andTree(CompletableFuture<?>[] cfs, int lo, int hi)1504     static CompletableFuture<Void> andTree(CompletableFuture<?>[] cfs,
1505                                            int lo, int hi) {
1506         CompletableFuture<Void> d = new CompletableFuture<Void>();
1507         if (lo > hi) // empty
1508             d.result = NIL;
1509         else {
1510             CompletableFuture<?> a, b; Object r, s, z; Throwable x;
1511             int mid = (lo + hi) >>> 1;
1512             if ((a = (lo == mid ? cfs[lo] :
1513                       andTree(cfs, lo, mid))) == null ||
1514                 (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :
1515                       andTree(cfs, mid+1, hi))) == null)
1516                 throw new NullPointerException();
1517             if ((r = a.result) == null || (s = b.result) == null)
1518                 a.bipush(b, new BiRelay<>(d, a, b));
1519             else if ((r instanceof AltResult
1520                       && (x = ((AltResult)(z = r)).ex) != null) ||
1521                      (s instanceof AltResult
1522                       && (x = ((AltResult)(z = s)).ex) != null))
1523                 d.result = encodeThrowable(x, z);
1524             else
1525                 d.result = NIL;
1526         }
1527         return d;
1528     }
1529 
1530     /* ------------- Projected (Ored) BiCompletions -------------- */
1531 
1532     /**
1533      * Pushes completion to this and b unless either done.
1534      * Caller should first check that result and b.result are both null.
1535      */
orpush(CompletableFuture<?> b, BiCompletion<?,?,?> c)1536     final void orpush(CompletableFuture<?> b, BiCompletion<?,?,?> c) {
1537         if (c != null) {
1538             while (!tryPushStack(c)) {
1539                 if (result != null) {
1540                     NEXT.set(c, null);
1541                     break;
1542                 }
1543             }
1544             if (result != null)
1545                 c.tryFire(SYNC);
1546             else
1547                 b.unipush(new CoCompletion(c));
1548         }
1549     }
1550 
1551     @SuppressWarnings("serial")
1552     static final class OrApply<T,U extends T,V> extends BiCompletion<T,U,V> {
1553         Function<? super T,? extends V> fn;
OrApply(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, CompletableFuture<U> snd, Function<? super T,? extends V> fn)1554         OrApply(Executor executor, CompletableFuture<V> dep,
1555                 CompletableFuture<T> src, CompletableFuture<U> snd,
1556                 Function<? super T,? extends V> fn) {
1557             super(executor, dep, src, snd); this.fn = fn;
1558         }
tryFire(int mode)1559         final CompletableFuture<V> tryFire(int mode) {
1560             CompletableFuture<V> d; CompletableFuture<? extends T> a, b;
1561             Object r; Throwable x; Function<? super T,? extends V> f;
1562             if ((a = src) == null || (b = snd) == null
1563                 || ((r = a.result) == null && (r = b.result) == null)
1564                 || (d = dep) == null || (f = fn) == null)
1565                 return null;
1566             tryComplete: if (d.result == null) {
1567                 try {
1568                     if (mode <= 0 && !claim())
1569                         return null;
1570                     if (r instanceof AltResult) {
1571                         if ((x = ((AltResult)r).ex) != null) {
1572                             d.completeThrowable(x, r);
1573                             break tryComplete;
1574                         }
1575                         r = null;
1576                     }
1577                     @SuppressWarnings("unchecked") T t = (T) r;
1578                     d.completeValue(f.apply(t));
1579                 } catch (Throwable ex) {
1580                     d.completeThrowable(ex);
1581                 }
1582             }
1583             src = null; snd = null; dep = null; fn = null;
1584             return d.postFire(a, b, mode);
1585         }
1586     }
1587 
orApplyStage( Executor e, CompletionStage<U> o, Function<? super T, ? extends V> f)1588     private <U extends T,V> CompletableFuture<V> orApplyStage(
1589         Executor e, CompletionStage<U> o, Function<? super T, ? extends V> f) {
1590         CompletableFuture<U> b;
1591         if (f == null || (b = o.toCompletableFuture()) == null)
1592             throw new NullPointerException();
1593 
1594         Object r; CompletableFuture<? extends T> z;
1595         if ((r = (z = this).result) != null ||
1596             (r = (z = b).result) != null)
1597             return z.uniApplyNow(r, e, f);
1598 
1599         CompletableFuture<V> d = newIncompleteFuture();
1600         orpush(b, new OrApply<T,U,V>(e, d, this, b, f));
1601         return d;
1602     }
1603 
1604     @SuppressWarnings("serial")
1605     static final class OrAccept<T,U extends T> extends BiCompletion<T,U,Void> {
1606         Consumer<? super T> fn;
OrAccept(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd, Consumer<? super T> fn)1607         OrAccept(Executor executor, CompletableFuture<Void> dep,
1608                  CompletableFuture<T> src, CompletableFuture<U> snd,
1609                  Consumer<? super T> fn) {
1610             super(executor, dep, src, snd); this.fn = fn;
1611         }
tryFire(int mode)1612         final CompletableFuture<Void> tryFire(int mode) {
1613             CompletableFuture<Void> d; CompletableFuture<? extends T> a, b;
1614             Object r; Throwable x; Consumer<? super T> f;
1615             if ((a = src) == null || (b = snd) == null
1616                 || ((r = a.result) == null && (r = b.result) == null)
1617                 || (d = dep) == null || (f = fn) == null)
1618                 return null;
1619             tryComplete: if (d.result == null) {
1620                 try {
1621                     if (mode <= 0 && !claim())
1622                         return null;
1623                     if (r instanceof AltResult) {
1624                         if ((x = ((AltResult)r).ex) != null) {
1625                             d.completeThrowable(x, r);
1626                             break tryComplete;
1627                         }
1628                         r = null;
1629                     }
1630                     @SuppressWarnings("unchecked") T t = (T) r;
1631                     f.accept(t);
1632                     d.completeNull();
1633                 } catch (Throwable ex) {
1634                     d.completeThrowable(ex);
1635                 }
1636             }
1637             src = null; snd = null; dep = null; fn = null;
1638             return d.postFire(a, b, mode);
1639         }
1640     }
1641 
orAcceptStage( Executor e, CompletionStage<U> o, Consumer<? super T> f)1642     private <U extends T> CompletableFuture<Void> orAcceptStage(
1643         Executor e, CompletionStage<U> o, Consumer<? super T> f) {
1644         CompletableFuture<U> b;
1645         if (f == null || (b = o.toCompletableFuture()) == null)
1646             throw new NullPointerException();
1647 
1648         Object r; CompletableFuture<? extends T> z;
1649         if ((r = (z = this).result) != null ||
1650             (r = (z = b).result) != null)
1651             return z.uniAcceptNow(r, e, f);
1652 
1653         CompletableFuture<Void> d = newIncompleteFuture();
1654         orpush(b, new OrAccept<T,U>(e, d, this, b, f));
1655         return d;
1656     }
1657 
1658     @SuppressWarnings("serial")
1659     static final class OrRun<T,U> extends BiCompletion<T,U,Void> {
1660         Runnable fn;
OrRun(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd, Runnable fn)1661         OrRun(Executor executor, CompletableFuture<Void> dep,
1662               CompletableFuture<T> src, CompletableFuture<U> snd,
1663               Runnable fn) {
1664             super(executor, dep, src, snd); this.fn = fn;
1665         }
tryFire(int mode)1666         final CompletableFuture<Void> tryFire(int mode) {
1667             CompletableFuture<Void> d; CompletableFuture<?> a, b;
1668             Object r; Throwable x; Runnable f;
1669             if ((a = src) == null || (b = snd) == null
1670                 || ((r = a.result) == null && (r = b.result) == null)
1671                 || (d = dep) == null || (f = fn) == null)
1672                 return null;
1673             if (d.result == null) {
1674                 try {
1675                     if (mode <= 0 && !claim())
1676                         return null;
1677                     else if (r instanceof AltResult
1678                         && (x = ((AltResult)r).ex) != null)
1679                         d.completeThrowable(x, r);
1680                     else {
1681                         f.run();
1682                         d.completeNull();
1683                     }
1684                 } catch (Throwable ex) {
1685                     d.completeThrowable(ex);
1686                 }
1687             }
1688             src = null; snd = null; dep = null; fn = null;
1689             return d.postFire(a, b, mode);
1690         }
1691     }
1692 
orRunStage(Executor e, CompletionStage<?> o, Runnable f)1693     private CompletableFuture<Void> orRunStage(Executor e, CompletionStage<?> o,
1694                                                Runnable f) {
1695         CompletableFuture<?> b;
1696         if (f == null || (b = o.toCompletableFuture()) == null)
1697             throw new NullPointerException();
1698 
1699         Object r; CompletableFuture<?> z;
1700         if ((r = (z = this).result) != null ||
1701             (r = (z = b).result) != null)
1702             return z.uniRunNow(r, e, f);
1703 
1704         CompletableFuture<Void> d = newIncompleteFuture();
1705         orpush(b, new OrRun<>(e, d, this, b, f));
1706         return d;
1707     }
1708 
1709     /** Completion for an anyOf input future. */
1710     @SuppressWarnings("serial")
1711     static class AnyOf extends Completion {
1712         CompletableFuture<Object> dep; CompletableFuture<?> src;
1713         CompletableFuture<?>[] srcs;
AnyOf(CompletableFuture<Object> dep, CompletableFuture<?> src, CompletableFuture<?>[] srcs)1714         AnyOf(CompletableFuture<Object> dep, CompletableFuture<?> src,
1715               CompletableFuture<?>[] srcs) {
1716             this.dep = dep; this.src = src; this.srcs = srcs;
1717         }
tryFire(int mode)1718         final CompletableFuture<Object> tryFire(int mode) {
1719             // assert mode != ASYNC;
1720             CompletableFuture<Object> d; CompletableFuture<?> a;
1721             CompletableFuture<?>[] as;
1722             Object r;
1723             if ((a = src) == null || (r = a.result) == null
1724                 || (d = dep) == null || (as = srcs) == null)
1725                 return null;
1726             src = null; dep = null; srcs = null;
1727             if (d.completeRelay(r)) {
1728                 for (CompletableFuture<?> b : as)
1729                     if (b != a)
1730                         b.cleanStack();
1731                 if (mode < 0)
1732                     return d;
1733                 else
1734                     d.postComplete();
1735             }
1736             return null;
1737         }
isLive()1738         final boolean isLive() {
1739             CompletableFuture<Object> d;
1740             return (d = dep) != null && d.result == null;
1741         }
1742     }
1743 
1744     /* ------------- Zero-input Async forms -------------- */
1745 
1746     @SuppressWarnings("serial")
1747     static final class AsyncSupply<T> extends ForkJoinTask<Void>
1748         implements Runnable, AsynchronousCompletionTask {
1749         CompletableFuture<T> dep; Supplier<? extends T> fn;
AsyncSupply(CompletableFuture<T> dep, Supplier<? extends T> fn)1750         AsyncSupply(CompletableFuture<T> dep, Supplier<? extends T> fn) {
1751             this.dep = dep; this.fn = fn;
1752         }
1753 
getRawResult()1754         public final Void getRawResult() { return null; }
setRawResult(Void v)1755         public final void setRawResult(Void v) {}
exec()1756         public final boolean exec() { run(); return false; }
1757 
run()1758         public void run() {
1759             CompletableFuture<T> d; Supplier<? extends T> f;
1760             if ((d = dep) != null && (f = fn) != null) {
1761                 dep = null; fn = null;
1762                 if (d.result == null) {
1763                     try {
1764                         d.completeValue(f.get());
1765                     } catch (Throwable ex) {
1766                         d.completeThrowable(ex);
1767                     }
1768                 }
1769                 d.postComplete();
1770             }
1771         }
1772     }
1773 
asyncSupplyStage(Executor e, Supplier<U> f)1774     static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
1775                                                      Supplier<U> f) {
1776         if (f == null) throw new NullPointerException();
1777         CompletableFuture<U> d = new CompletableFuture<U>();
1778         e.execute(new AsyncSupply<U>(d, f));
1779         return d;
1780     }
1781 
1782     @SuppressWarnings("serial")
1783     static final class AsyncRun extends ForkJoinTask<Void>
1784         implements Runnable, AsynchronousCompletionTask {
1785         CompletableFuture<Void> dep; Runnable fn;
AsyncRun(CompletableFuture<Void> dep, Runnable fn)1786         AsyncRun(CompletableFuture<Void> dep, Runnable fn) {
1787             this.dep = dep; this.fn = fn;
1788         }
1789 
getRawResult()1790         public final Void getRawResult() { return null; }
setRawResult(Void v)1791         public final void setRawResult(Void v) {}
exec()1792         public final boolean exec() { run(); return false; }
1793 
run()1794         public void run() {
1795             CompletableFuture<Void> d; Runnable f;
1796             if ((d = dep) != null && (f = fn) != null) {
1797                 dep = null; fn = null;
1798                 if (d.result == null) {
1799                     try {
1800                         f.run();
1801                         d.completeNull();
1802                     } catch (Throwable ex) {
1803                         d.completeThrowable(ex);
1804                     }
1805                 }
1806                 d.postComplete();
1807             }
1808         }
1809     }
1810 
asyncRunStage(Executor e, Runnable f)1811     static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {
1812         if (f == null) throw new NullPointerException();
1813         CompletableFuture<Void> d = new CompletableFuture<Void>();
1814         e.execute(new AsyncRun(d, f));
1815         return d;
1816     }
1817 
1818     /* ------------- Signallers -------------- */
1819 
1820     /**
1821      * Completion for recording and releasing a waiting thread.  This
1822      * class implements ManagedBlocker to avoid starvation when
1823      * blocking actions pile up in ForkJoinPools.
1824      */
1825     @SuppressWarnings("serial")
1826     static final class Signaller extends Completion
1827         implements ForkJoinPool.ManagedBlocker {
1828         long nanos;                    // remaining wait time if timed
1829         final long deadline;           // non-zero if timed
1830         final boolean interruptible;
1831         boolean interrupted;
1832         volatile Thread thread;
1833 
Signaller(boolean interruptible, long nanos, long deadline)1834         Signaller(boolean interruptible, long nanos, long deadline) {
1835             this.thread = Thread.currentThread();
1836             this.interruptible = interruptible;
1837             this.nanos = nanos;
1838             this.deadline = deadline;
1839         }
tryFire(int ignore)1840         final CompletableFuture<?> tryFire(int ignore) {
1841             Thread w; // no need to atomically claim
1842             if ((w = thread) != null) {
1843                 thread = null;
1844                 LockSupport.unpark(w);
1845             }
1846             return null;
1847         }
isReleasable()1848         public boolean isReleasable() {
1849             if (Thread.interrupted())
1850                 interrupted = true;
1851             return ((interrupted && interruptible) ||
1852                     (deadline != 0L &&
1853                      (nanos <= 0L ||
1854                       (nanos = deadline - System.nanoTime()) <= 0L)) ||
1855                     thread == null);
1856         }
block()1857         public boolean block() {
1858             while (!isReleasable()) {
1859                 if (deadline == 0L)
1860                     LockSupport.park(this);
1861                 else
1862                     LockSupport.parkNanos(this, nanos);
1863             }
1864             return true;
1865         }
isLive()1866         final boolean isLive() { return thread != null; }
1867     }
1868 
1869     /**
1870      * Returns raw result after waiting, or null if interruptible and
1871      * interrupted.
1872      */
waitingGet(boolean interruptible)1873     private Object waitingGet(boolean interruptible) {
1874         Signaller q = null;
1875         boolean queued = false;
1876         Object r;
1877         while ((r = result) == null) {
1878             if (q == null) {
1879                 q = new Signaller(interruptible, 0L, 0L);
1880                 if (Thread.currentThread() instanceof ForkJoinWorkerThread)
1881                     ForkJoinPool.helpAsyncBlocker(defaultExecutor(), q);
1882             }
1883             else if (!queued)
1884                 queued = tryPushStack(q);
1885             else {
1886                 try {
1887                     ForkJoinPool.managedBlock(q);
1888                 } catch (InterruptedException ie) { // currently cannot happen
1889                     q.interrupted = true;
1890                 }
1891                 if (q.interrupted && interruptible)
1892                     break;
1893             }
1894         }
1895         if (q != null && queued) {
1896             q.thread = null;
1897             if (!interruptible && q.interrupted)
1898                 Thread.currentThread().interrupt();
1899             if (r == null)
1900                 cleanStack();
1901         }
1902         if (r != null || (r = result) != null)
1903             postComplete();
1904         return r;
1905     }
1906 
1907     /**
1908      * Returns raw result after waiting, or null if interrupted, or
1909      * throws TimeoutException on timeout.
1910      */
timedGet(long nanos)1911     private Object timedGet(long nanos) throws TimeoutException {
1912         if (Thread.interrupted())
1913             return null;
1914         if (nanos > 0L) {
1915             long d = System.nanoTime() + nanos;
1916             long deadline = (d == 0L) ? 1L : d; // avoid 0
1917             Signaller q = null;
1918             boolean queued = false;
1919             Object r;
1920             while ((r = result) == null) { // similar to untimed
1921                 if (q == null) {
1922                     q = new Signaller(true, nanos, deadline);
1923                     if (Thread.currentThread() instanceof ForkJoinWorkerThread)
1924                         ForkJoinPool.helpAsyncBlocker(defaultExecutor(), q);
1925                 }
1926                 else if (!queued)
1927                     queued = tryPushStack(q);
1928                 else if (q.nanos <= 0L)
1929                     break;
1930                 else {
1931                     try {
1932                         ForkJoinPool.managedBlock(q);
1933                     } catch (InterruptedException ie) {
1934                         q.interrupted = true;
1935                     }
1936                     if (q.interrupted)
1937                         break;
1938                 }
1939             }
1940             if (q != null && queued) {
1941                 q.thread = null;
1942                 if (r == null)
1943                     cleanStack();
1944             }
1945             if (r != null || (r = result) != null)
1946                 postComplete();
1947             if (r != null || (q != null && q.interrupted))
1948                 return r;
1949         }
1950         throw new TimeoutException();
1951     }
1952 
1953     /* ------------- public methods -------------- */
1954 
1955     /**
1956      * Creates a new incomplete CompletableFuture.
1957      */
CompletableFuture()1958     public CompletableFuture() {
1959     }
1960 
1961     /**
1962      * Creates a new complete CompletableFuture with given encoded result.
1963      */
CompletableFuture(Object r)1964     CompletableFuture(Object r) {
1965         RESULT.setRelease(this, r);
1966     }
1967 
1968     /**
1969      * Returns a new CompletableFuture that is asynchronously completed
1970      * by a task running in the {@link ForkJoinPool#commonPool()} with
1971      * the value obtained by calling the given Supplier.
1972      *
1973      * @param supplier a function returning the value to be used
1974      * to complete the returned CompletableFuture
1975      * @param <U> the function's return type
1976      * @return the new CompletableFuture
1977      */
supplyAsync(Supplier<U> supplier)1978     public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
1979         return asyncSupplyStage(ASYNC_POOL, supplier);
1980     }
1981 
1982     /**
1983      * Returns a new CompletableFuture that is asynchronously completed
1984      * by a task running in the given executor with the value obtained
1985      * by calling the given Supplier.
1986      *
1987      * @param supplier a function returning the value to be used
1988      * to complete the returned CompletableFuture
1989      * @param executor the executor to use for asynchronous execution
1990      * @param <U> the function's return type
1991      * @return the new CompletableFuture
1992      */
supplyAsync(Supplier<U> supplier, Executor executor)1993     public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
1994                                                        Executor executor) {
1995         return asyncSupplyStage(screenExecutor(executor), supplier);
1996     }
1997 
1998     /**
1999      * Returns a new CompletableFuture that is asynchronously completed
2000      * by a task running in the {@link ForkJoinPool#commonPool()} after
2001      * it runs the given action.
2002      *
2003      * @param runnable the action to run before completing the
2004      * returned CompletableFuture
2005      * @return the new CompletableFuture
2006      */
runAsync(Runnable runnable)2007     public static CompletableFuture<Void> runAsync(Runnable runnable) {
2008         return asyncRunStage(ASYNC_POOL, runnable);
2009     }
2010 
2011     /**
2012      * Returns a new CompletableFuture that is asynchronously completed
2013      * by a task running in the given executor after it runs the given
2014      * action.
2015      *
2016      * @param runnable the action to run before completing the
2017      * returned CompletableFuture
2018      * @param executor the executor to use for asynchronous execution
2019      * @return the new CompletableFuture
2020      */
runAsync(Runnable runnable, Executor executor)2021     public static CompletableFuture<Void> runAsync(Runnable runnable,
2022                                                    Executor executor) {
2023         return asyncRunStage(screenExecutor(executor), runnable);
2024     }
2025 
2026     /**
2027      * Returns a new CompletableFuture that is already completed with
2028      * the given value.
2029      *
2030      * @param value the value
2031      * @param <U> the type of the value
2032      * @return the completed CompletableFuture
2033      */
completedFuture(U value)2034     public static <U> CompletableFuture<U> completedFuture(U value) {
2035         return new CompletableFuture<U>((value == null) ? NIL : value);
2036     }
2037 
2038     /**
2039      * Returns {@code true} if completed in any fashion: normally,
2040      * exceptionally, or via cancellation.
2041      *
2042      * @return {@code true} if completed
2043      */
isDone()2044     public boolean isDone() {
2045         return result != null;
2046     }
2047 
2048     /**
2049      * Waits if necessary for this future to complete, and then
2050      * returns its result.
2051      *
2052      * @return the result value
2053      * @throws CancellationException if this future was cancelled
2054      * @throws ExecutionException if this future completed exceptionally
2055      * @throws InterruptedException if the current thread was interrupted
2056      * while waiting
2057      */
2058     @SuppressWarnings("unchecked")
get()2059     public T get() throws InterruptedException, ExecutionException {
2060         Object r;
2061         if ((r = result) == null)
2062             r = waitingGet(true);
2063         return (T) reportGet(r);
2064     }
2065 
2066     /**
2067      * Waits if necessary for at most the given time for this future
2068      * to complete, and then returns its result, if available.
2069      *
2070      * @param timeout the maximum time to wait
2071      * @param unit the time unit of the timeout argument
2072      * @return the result value
2073      * @throws CancellationException if this future was cancelled
2074      * @throws ExecutionException if this future completed exceptionally
2075      * @throws InterruptedException if the current thread was interrupted
2076      * while waiting
2077      * @throws TimeoutException if the wait timed out
2078      */
2079     @SuppressWarnings("unchecked")
get(long timeout, TimeUnit unit)2080     public T get(long timeout, TimeUnit unit)
2081         throws InterruptedException, ExecutionException, TimeoutException {
2082         long nanos = unit.toNanos(timeout);
2083         Object r;
2084         if ((r = result) == null)
2085             r = timedGet(nanos);
2086         return (T) reportGet(r);
2087     }
2088 
2089     /**
2090      * Returns the result value when complete, or throws an
2091      * (unchecked) exception if completed exceptionally. To better
2092      * conform with the use of common functional forms, if a
2093      * computation involved in the completion of this
2094      * CompletableFuture threw an exception, this method throws an
2095      * (unchecked) {@link CompletionException} with the underlying
2096      * exception as its cause.
2097      *
2098      * @return the result value
2099      * @throws CancellationException if the computation was cancelled
2100      * @throws CompletionException if this future completed
2101      * exceptionally or a completion computation threw an exception
2102      */
2103     @SuppressWarnings("unchecked")
join()2104     public T join() {
2105         Object r;
2106         if ((r = result) == null)
2107             r = waitingGet(false);
2108         return (T) reportJoin(r);
2109     }
2110 
2111     /**
2112      * Returns the result value (or throws any encountered exception)
2113      * if completed, else returns the given valueIfAbsent.
2114      *
2115      * @param valueIfAbsent the value to return if not completed
2116      * @return the result value, if completed, else the given valueIfAbsent
2117      * @throws CancellationException if the computation was cancelled
2118      * @throws CompletionException if this future completed
2119      * exceptionally or a completion computation threw an exception
2120      */
2121     @SuppressWarnings("unchecked")
getNow(T valueIfAbsent)2122     public T getNow(T valueIfAbsent) {
2123         Object r;
2124         return ((r = result) == null) ? valueIfAbsent : (T) reportJoin(r);
2125     }
2126 
2127     /**
2128      * If not already completed, sets the value returned by {@link
2129      * #get()} and related methods to the given value.
2130      *
2131      * @param value the result value
2132      * @return {@code true} if this invocation caused this CompletableFuture
2133      * to transition to a completed state, else {@code false}
2134      */
complete(T value)2135     public boolean complete(T value) {
2136         boolean triggered = completeValue(value);
2137         postComplete();
2138         return triggered;
2139     }
2140 
2141     /**
2142      * If not already completed, causes invocations of {@link #get()}
2143      * and related methods to throw the given exception.
2144      *
2145      * @param ex the exception
2146      * @return {@code true} if this invocation caused this CompletableFuture
2147      * to transition to a completed state, else {@code false}
2148      */
completeExceptionally(Throwable ex)2149     public boolean completeExceptionally(Throwable ex) {
2150         if (ex == null) throw new NullPointerException();
2151         boolean triggered = internalComplete(new AltResult(ex));
2152         postComplete();
2153         return triggered;
2154     }
2155 
thenApply( Function<? super T,? extends U> fn)2156     public <U> CompletableFuture<U> thenApply(
2157         Function<? super T,? extends U> fn) {
2158         return uniApplyStage(null, fn);
2159     }
2160 
thenApplyAsync( Function<? super T,? extends U> fn)2161     public <U> CompletableFuture<U> thenApplyAsync(
2162         Function<? super T,? extends U> fn) {
2163         return uniApplyStage(defaultExecutor(), fn);
2164     }
2165 
thenApplyAsync( Function<? super T,? extends U> fn, Executor executor)2166     public <U> CompletableFuture<U> thenApplyAsync(
2167         Function<? super T,? extends U> fn, Executor executor) {
2168         return uniApplyStage(screenExecutor(executor), fn);
2169     }
2170 
thenAccept(Consumer<? super T> action)2171     public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
2172         return uniAcceptStage(null, action);
2173     }
2174 
thenAcceptAsync(Consumer<? super T> action)2175     public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
2176         return uniAcceptStage(defaultExecutor(), action);
2177     }
2178 
thenAcceptAsync(Consumer<? super T> action, Executor executor)2179     public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
2180                                                    Executor executor) {
2181         return uniAcceptStage(screenExecutor(executor), action);
2182     }
2183 
thenRun(Runnable action)2184     public CompletableFuture<Void> thenRun(Runnable action) {
2185         return uniRunStage(null, action);
2186     }
2187 
thenRunAsync(Runnable action)2188     public CompletableFuture<Void> thenRunAsync(Runnable action) {
2189         return uniRunStage(defaultExecutor(), action);
2190     }
2191 
thenRunAsync(Runnable action, Executor executor)2192     public CompletableFuture<Void> thenRunAsync(Runnable action,
2193                                                 Executor executor) {
2194         return uniRunStage(screenExecutor(executor), action);
2195     }
2196 
thenCombine( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)2197     public <U,V> CompletableFuture<V> thenCombine(
2198         CompletionStage<? extends U> other,
2199         BiFunction<? super T,? super U,? extends V> fn) {
2200         return biApplyStage(null, other, fn);
2201     }
2202 
thenCombineAsync( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)2203     public <U,V> CompletableFuture<V> thenCombineAsync(
2204         CompletionStage<? extends U> other,
2205         BiFunction<? super T,? super U,? extends V> fn) {
2206         return biApplyStage(defaultExecutor(), other, fn);
2207     }
2208 
thenCombineAsync( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)2209     public <U,V> CompletableFuture<V> thenCombineAsync(
2210         CompletionStage<? extends U> other,
2211         BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
2212         return biApplyStage(screenExecutor(executor), other, fn);
2213     }
2214 
thenAcceptBoth( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)2215     public <U> CompletableFuture<Void> thenAcceptBoth(
2216         CompletionStage<? extends U> other,
2217         BiConsumer<? super T, ? super U> action) {
2218         return biAcceptStage(null, other, action);
2219     }
2220 
thenAcceptBothAsync( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)2221     public <U> CompletableFuture<Void> thenAcceptBothAsync(
2222         CompletionStage<? extends U> other,
2223         BiConsumer<? super T, ? super U> action) {
2224         return biAcceptStage(defaultExecutor(), other, action);
2225     }
2226 
thenAcceptBothAsync( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)2227     public <U> CompletableFuture<Void> thenAcceptBothAsync(
2228         CompletionStage<? extends U> other,
2229         BiConsumer<? super T, ? super U> action, Executor executor) {
2230         return biAcceptStage(screenExecutor(executor), other, action);
2231     }
2232 
runAfterBoth(CompletionStage<?> other, Runnable action)2233     public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
2234                                                 Runnable action) {
2235         return biRunStage(null, other, action);
2236     }
2237 
runAfterBothAsync(CompletionStage<?> other, Runnable action)2238     public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
2239                                                      Runnable action) {
2240         return biRunStage(defaultExecutor(), other, action);
2241     }
2242 
runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor)2243     public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
2244                                                      Runnable action,
2245                                                      Executor executor) {
2246         return biRunStage(screenExecutor(executor), other, action);
2247     }
2248 
applyToEither( CompletionStage<? extends T> other, Function<? super T, U> fn)2249     public <U> CompletableFuture<U> applyToEither(
2250         CompletionStage<? extends T> other, Function<? super T, U> fn) {
2251         return orApplyStage(null, other, fn);
2252     }
2253 
applyToEitherAsync( CompletionStage<? extends T> other, Function<? super T, U> fn)2254     public <U> CompletableFuture<U> applyToEitherAsync(
2255         CompletionStage<? extends T> other, Function<? super T, U> fn) {
2256         return orApplyStage(defaultExecutor(), other, fn);
2257     }
2258 
applyToEitherAsync( CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor)2259     public <U> CompletableFuture<U> applyToEitherAsync(
2260         CompletionStage<? extends T> other, Function<? super T, U> fn,
2261         Executor executor) {
2262         return orApplyStage(screenExecutor(executor), other, fn);
2263     }
2264 
acceptEither( CompletionStage<? extends T> other, Consumer<? super T> action)2265     public CompletableFuture<Void> acceptEither(
2266         CompletionStage<? extends T> other, Consumer<? super T> action) {
2267         return orAcceptStage(null, other, action);
2268     }
2269 
acceptEitherAsync( CompletionStage<? extends T> other, Consumer<? super T> action)2270     public CompletableFuture<Void> acceptEitherAsync(
2271         CompletionStage<? extends T> other, Consumer<? super T> action) {
2272         return orAcceptStage(defaultExecutor(), other, action);
2273     }
2274 
acceptEitherAsync( CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)2275     public CompletableFuture<Void> acceptEitherAsync(
2276         CompletionStage<? extends T> other, Consumer<? super T> action,
2277         Executor executor) {
2278         return orAcceptStage(screenExecutor(executor), other, action);
2279     }
2280 
runAfterEither(CompletionStage<?> other, Runnable action)2281     public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
2282                                                   Runnable action) {
2283         return orRunStage(null, other, action);
2284     }
2285 
runAfterEitherAsync(CompletionStage<?> other, Runnable action)2286     public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
2287                                                        Runnable action) {
2288         return orRunStage(defaultExecutor(), other, action);
2289     }
2290 
runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor)2291     public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
2292                                                        Runnable action,
2293                                                        Executor executor) {
2294         return orRunStage(screenExecutor(executor), other, action);
2295     }
2296 
thenCompose( Function<? super T, ? extends CompletionStage<U>> fn)2297     public <U> CompletableFuture<U> thenCompose(
2298         Function<? super T, ? extends CompletionStage<U>> fn) {
2299         return uniComposeStage(null, fn);
2300     }
2301 
thenComposeAsync( Function<? super T, ? extends CompletionStage<U>> fn)2302     public <U> CompletableFuture<U> thenComposeAsync(
2303         Function<? super T, ? extends CompletionStage<U>> fn) {
2304         return uniComposeStage(defaultExecutor(), fn);
2305     }
2306 
thenComposeAsync( Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)2307     public <U> CompletableFuture<U> thenComposeAsync(
2308         Function<? super T, ? extends CompletionStage<U>> fn,
2309         Executor executor) {
2310         return uniComposeStage(screenExecutor(executor), fn);
2311     }
2312 
whenComplete( BiConsumer<? super T, ? super Throwable> action)2313     public CompletableFuture<T> whenComplete(
2314         BiConsumer<? super T, ? super Throwable> action) {
2315         return uniWhenCompleteStage(null, action);
2316     }
2317 
whenCompleteAsync( BiConsumer<? super T, ? super Throwable> action)2318     public CompletableFuture<T> whenCompleteAsync(
2319         BiConsumer<? super T, ? super Throwable> action) {
2320         return uniWhenCompleteStage(defaultExecutor(), action);
2321     }
2322 
whenCompleteAsync( BiConsumer<? super T, ? super Throwable> action, Executor executor)2323     public CompletableFuture<T> whenCompleteAsync(
2324         BiConsumer<? super T, ? super Throwable> action, Executor executor) {
2325         return uniWhenCompleteStage(screenExecutor(executor), action);
2326     }
2327 
handle( BiFunction<? super T, Throwable, ? extends U> fn)2328     public <U> CompletableFuture<U> handle(
2329         BiFunction<? super T, Throwable, ? extends U> fn) {
2330         return uniHandleStage(null, fn);
2331     }
2332 
handleAsync( BiFunction<? super T, Throwable, ? extends U> fn)2333     public <U> CompletableFuture<U> handleAsync(
2334         BiFunction<? super T, Throwable, ? extends U> fn) {
2335         return uniHandleStage(defaultExecutor(), fn);
2336     }
2337 
handleAsync( BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)2338     public <U> CompletableFuture<U> handleAsync(
2339         BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
2340         return uniHandleStage(screenExecutor(executor), fn);
2341     }
2342 
2343     /**
2344      * Returns this CompletableFuture.
2345      *
2346      * @return this CompletableFuture
2347      */
toCompletableFuture()2348     public CompletableFuture<T> toCompletableFuture() {
2349         return this;
2350     }
2351 
exceptionally( Function<Throwable, ? extends T> fn)2352     public CompletableFuture<T> exceptionally(
2353         Function<Throwable, ? extends T> fn) {
2354         return uniExceptionallyStage(null, fn);
2355     }
2356 
exceptionallyAsync( Function<Throwable, ? extends T> fn)2357     public CompletableFuture<T> exceptionallyAsync(
2358         Function<Throwable, ? extends T> fn) {
2359         return uniExceptionallyStage(defaultExecutor(), fn);
2360     }
2361 
exceptionallyAsync( Function<Throwable, ? extends T> fn, Executor executor)2362     public CompletableFuture<T> exceptionallyAsync(
2363         Function<Throwable, ? extends T> fn, Executor executor) {
2364         return uniExceptionallyStage(screenExecutor(executor), fn);
2365     }
2366 
exceptionallyCompose( Function<Throwable, ? extends CompletionStage<T>> fn)2367     public CompletableFuture<T> exceptionallyCompose(
2368         Function<Throwable, ? extends CompletionStage<T>> fn) {
2369         return uniComposeExceptionallyStage(null, fn);
2370     }
2371 
exceptionallyComposeAsync( Function<Throwable, ? extends CompletionStage<T>> fn)2372     public CompletableFuture<T> exceptionallyComposeAsync(
2373         Function<Throwable, ? extends CompletionStage<T>> fn) {
2374         return uniComposeExceptionallyStage(defaultExecutor(), fn);
2375     }
2376 
exceptionallyComposeAsync( Function<Throwable, ? extends CompletionStage<T>> fn, Executor executor)2377     public CompletableFuture<T> exceptionallyComposeAsync(
2378         Function<Throwable, ? extends CompletionStage<T>> fn,
2379         Executor executor) {
2380         return uniComposeExceptionallyStage(screenExecutor(executor), fn);
2381     }
2382 
2383     /* ------------- Arbitrary-arity constructions -------------- */
2384 
2385     /**
2386      * Returns a new CompletableFuture that is completed when all of
2387      * the given CompletableFutures complete.  If any of the given
2388      * CompletableFutures complete exceptionally, then the returned
2389      * CompletableFuture also does so, with a CompletionException
2390      * holding this exception as its cause.  Otherwise, the results,
2391      * if any, of the given CompletableFutures are not reflected in
2392      * the returned CompletableFuture, but may be obtained by
2393      * inspecting them individually. If no CompletableFutures are
2394      * provided, returns a CompletableFuture completed with the value
2395      * {@code null}.
2396      *
2397      * <p>Among the applications of this method is to await completion
2398      * of a set of independent CompletableFutures before continuing a
2399      * program, as in: {@code CompletableFuture.allOf(c1, c2,
2400      * c3).join();}.
2401      *
2402      * @param cfs the CompletableFutures
2403      * @return a new CompletableFuture that is completed when all of the
2404      * given CompletableFutures complete
2405      * @throws NullPointerException if the array or any of its elements are
2406      * {@code null}
2407      */
allOf(CompletableFuture<?>.... cfs)2408     public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2409         return andTree(cfs, 0, cfs.length - 1);
2410     }
2411 
2412     /**
2413      * Returns a new CompletableFuture that is completed when any of
2414      * the given CompletableFutures complete, with the same result.
2415      * Otherwise, if it completed exceptionally, the returned
2416      * CompletableFuture also does so, with a CompletionException
2417      * holding this exception as its cause.  If no CompletableFutures
2418      * are provided, returns an incomplete CompletableFuture.
2419      *
2420      * @param cfs the CompletableFutures
2421      * @return a new CompletableFuture that is completed with the
2422      * result or exception of any of the given CompletableFutures when
2423      * one completes
2424      * @throws NullPointerException if the array or any of its elements are
2425      * {@code null}
2426      */
anyOf(CompletableFuture<?>.... cfs)2427     public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
2428         int n; Object r;
2429         if ((n = cfs.length) <= 1)
2430             return (n == 0)
2431                 ? new CompletableFuture<Object>()
2432                 : uniCopyStage(cfs[0]);
2433         for (CompletableFuture<?> cf : cfs)
2434             if ((r = cf.result) != null)
2435                 return new CompletableFuture<Object>(encodeRelay(r));
2436         cfs = cfs.clone();
2437         CompletableFuture<Object> d = new CompletableFuture<>();
2438         for (CompletableFuture<?> cf : cfs)
2439             cf.unipush(new AnyOf(d, cf, cfs));
2440         // If d was completed while we were adding completions, we should
2441         // clean the stack of any sources that may have had completions
2442         // pushed on their stack after d was completed.
2443         if (d.result != null)
2444             for (int i = 0, len = cfs.length; i < len; i++)
2445                 if (cfs[i].result != null)
2446                     for (i++; i < len; i++)
2447                         if (cfs[i].result == null)
2448                             cfs[i].cleanStack();
2449         return d;
2450     }
2451 
2452     /* ------------- Control and status methods -------------- */
2453 
2454     /**
2455      * If not already completed, completes this CompletableFuture with
2456      * a {@link CancellationException}. Dependent CompletableFutures
2457      * that have not already completed will also complete
2458      * exceptionally, with a {@link CompletionException} caused by
2459      * this {@code CancellationException}.
2460      *
2461      * @param mayInterruptIfRunning this value has no effect in this
2462      * implementation because interrupts are not used to control
2463      * processing.
2464      *
2465      * @return {@code true} if this task is now cancelled
2466      */
cancel(boolean mayInterruptIfRunning)2467     public boolean cancel(boolean mayInterruptIfRunning) {
2468         boolean cancelled = (result == null) &&
2469             internalComplete(new AltResult(new CancellationException()));
2470         postComplete();
2471         return cancelled || isCancelled();
2472     }
2473 
2474     /**
2475      * Returns {@code true} if this CompletableFuture was cancelled
2476      * before it completed normally.
2477      *
2478      * @return {@code true} if this CompletableFuture was cancelled
2479      * before it completed normally
2480      */
isCancelled()2481     public boolean isCancelled() {
2482         Object r;
2483         return ((r = result) instanceof AltResult) &&
2484             (((AltResult)r).ex instanceof CancellationException);
2485     }
2486 
2487     /**
2488      * Returns {@code true} if this CompletableFuture completed
2489      * exceptionally, in any way. Possible causes include
2490      * cancellation, explicit invocation of {@code
2491      * completeExceptionally}, and abrupt termination of a
2492      * CompletionStage action.
2493      *
2494      * @return {@code true} if this CompletableFuture completed
2495      * exceptionally
2496      */
isCompletedExceptionally()2497     public boolean isCompletedExceptionally() {
2498         Object r;
2499         return ((r = result) instanceof AltResult) && r != NIL;
2500     }
2501 
2502     /**
2503      * Forcibly sets or resets the value subsequently returned by
2504      * method {@link #get()} and related methods, whether or not
2505      * already completed. This method is designed for use only in
2506      * error recovery actions, and even in such situations may result
2507      * in ongoing dependent completions using established versus
2508      * overwritten outcomes.
2509      *
2510      * @param value the completion value
2511      */
obtrudeValue(T value)2512     public void obtrudeValue(T value) {
2513         result = (value == null) ? NIL : value;
2514         postComplete();
2515     }
2516 
2517     /**
2518      * Forcibly causes subsequent invocations of method {@link #get()}
2519      * and related methods to throw the given exception, whether or
2520      * not already completed. This method is designed for use only in
2521      * error recovery actions, and even in such situations may result
2522      * in ongoing dependent completions using established versus
2523      * overwritten outcomes.
2524      *
2525      * @param ex the exception
2526      * @throws NullPointerException if the exception is null
2527      */
obtrudeException(Throwable ex)2528     public void obtrudeException(Throwable ex) {
2529         if (ex == null) throw new NullPointerException();
2530         result = new AltResult(ex);
2531         postComplete();
2532     }
2533 
2534     /**
2535      * Returns the estimated number of CompletableFutures whose
2536      * completions are awaiting completion of this CompletableFuture.
2537      * This method is designed for use in monitoring system state, not
2538      * for synchronization control.
2539      *
2540      * @return the number of dependent CompletableFutures
2541      */
getNumberOfDependents()2542     public int getNumberOfDependents() {
2543         int count = 0;
2544         for (Completion p = stack; p != null; p = p.next)
2545             ++count;
2546         return count;
2547     }
2548 
2549     /**
2550      * Returns a string identifying this CompletableFuture, as well as
2551      * its completion state.  The state, in brackets, contains the
2552      * String {@code "Completed Normally"} or the String {@code
2553      * "Completed Exceptionally"}, or the String {@code "Not
2554      * completed"} followed by the number of CompletableFutures
2555      * dependent upon its completion, if any.
2556      *
2557      * @return a string identifying this CompletableFuture, as well as its state
2558      */
toString()2559     public String toString() {
2560         Object r = result;
2561         int count = 0; // avoid call to getNumberOfDependents in case disabled
2562         for (Completion p = stack; p != null; p = p.next)
2563             ++count;
2564         return super.toString() +
2565             ((r == null)
2566              ? ((count == 0)
2567                 ? "[Not completed]"
2568                 : "[Not completed, " + count + " dependents]")
2569              : (((r instanceof AltResult) && ((AltResult)r).ex != null)
2570                 ? "[Completed exceptionally: " + ((AltResult)r).ex + "]"
2571                 : "[Completed normally]"));
2572     }
2573 
2574     // jdk9 additions
2575 
2576     /**
2577      * Returns a new incomplete CompletableFuture of the type to be
2578      * returned by a CompletionStage method. Subclasses should
2579      * normally override this method to return an instance of the same
2580      * class as this CompletableFuture. The default implementation
2581      * returns an instance of class CompletableFuture.
2582      *
2583      * @param <U> the type of the value
2584      * @return a new CompletableFuture
2585      * @since 9
2586      */
newIncompleteFuture()2587     public <U> CompletableFuture<U> newIncompleteFuture() {
2588         return new CompletableFuture<U>();
2589     }
2590 
2591     /**
2592      * Returns the default Executor used for async methods that do not
2593      * specify an Executor. This class uses the {@link
2594      * ForkJoinPool#commonPool()} if it supports more than one
2595      * parallel thread, or else an Executor using one thread per async
2596      * task.  This method may be overridden in subclasses to return
2597      * an Executor that provides at least one independent thread.
2598      *
2599      * @return the executor
2600      * @since 9
2601      */
defaultExecutor()2602     public Executor defaultExecutor() {
2603         return ASYNC_POOL;
2604     }
2605 
2606     /**
2607      * Returns a new CompletableFuture that is completed normally with
2608      * the same value as this CompletableFuture when it completes
2609      * normally. If this CompletableFuture completes exceptionally,
2610      * then the returned CompletableFuture completes exceptionally
2611      * with a CompletionException with this exception as cause. The
2612      * behavior is equivalent to {@code thenApply(x -> x)}. This
2613      * method may be useful as a form of "defensive copying", to
2614      * prevent clients from completing, while still being able to
2615      * arrange dependent actions.
2616      *
2617      * @return the new CompletableFuture
2618      * @since 9
2619      */
copy()2620     public CompletableFuture<T> copy() {
2621         return uniCopyStage(this);
2622     }
2623 
2624     /**
2625      * Returns a new CompletionStage that is completed normally with
2626      * the same value as this CompletableFuture when it completes
2627      * normally, and cannot be independently completed or otherwise
2628      * used in ways not defined by the methods of interface {@link
2629      * CompletionStage}.  If this CompletableFuture completes
2630      * exceptionally, then the returned CompletionStage completes
2631      * exceptionally with a CompletionException with this exception as
2632      * cause.
2633      *
2634      * <p>Unless overridden by a subclass, a new non-minimal
2635      * CompletableFuture with all methods available can be obtained from
2636      * a minimal CompletionStage via {@link #toCompletableFuture()}.
2637      * For example, completion of a minimal stage can be awaited by
2638      *
2639      * <pre> {@code minimalStage.toCompletableFuture().join(); }</pre>
2640      *
2641      * @return the new CompletionStage
2642      * @since 9
2643      */
minimalCompletionStage()2644     public CompletionStage<T> minimalCompletionStage() {
2645         return uniAsMinimalStage();
2646     }
2647 
2648     /**
2649      * Completes this CompletableFuture with the result of
2650      * the given Supplier function invoked from an asynchronous
2651      * task using the given executor.
2652      *
2653      * @param supplier a function returning the value to be used
2654      * to complete this CompletableFuture
2655      * @param executor the executor to use for asynchronous execution
2656      * @return this CompletableFuture
2657      * @since 9
2658      */
completeAsync(Supplier<? extends T> supplier, Executor executor)2659     public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier,
2660                                               Executor executor) {
2661         if (supplier == null || executor == null)
2662             throw new NullPointerException();
2663         executor.execute(new AsyncSupply<T>(this, supplier));
2664         return this;
2665     }
2666 
2667     /**
2668      * Completes this CompletableFuture with the result of the given
2669      * Supplier function invoked from an asynchronous task using the
2670      * default executor.
2671      *
2672      * @param supplier a function returning the value to be used
2673      * to complete this CompletableFuture
2674      * @return this CompletableFuture
2675      * @since 9
2676      */
completeAsync(Supplier<? extends T> supplier)2677     public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier) {
2678         return completeAsync(supplier, defaultExecutor());
2679     }
2680 
2681     /**
2682      * Exceptionally completes this CompletableFuture with
2683      * a {@link TimeoutException} if not otherwise completed
2684      * before the given timeout.
2685      *
2686      * @param timeout how long to wait before completing exceptionally
2687      *        with a TimeoutException, in units of {@code unit}
2688      * @param unit a {@code TimeUnit} determining how to interpret the
2689      *        {@code timeout} parameter
2690      * @return this CompletableFuture
2691      * @since 9
2692      */
orTimeout(long timeout, TimeUnit unit)2693     public CompletableFuture<T> orTimeout(long timeout, TimeUnit unit) {
2694         if (unit == null)
2695             throw new NullPointerException();
2696         if (result == null)
2697             whenComplete(new Canceller(Delayer.delay(new Timeout(this),
2698                                                      timeout, unit)));
2699         return this;
2700     }
2701 
2702     /**
2703      * Completes this CompletableFuture with the given value if not
2704      * otherwise completed before the given timeout.
2705      *
2706      * @param value the value to use upon timeout
2707      * @param timeout how long to wait before completing normally
2708      *        with the given value, in units of {@code unit}
2709      * @param unit a {@code TimeUnit} determining how to interpret the
2710      *        {@code timeout} parameter
2711      * @return this CompletableFuture
2712      * @since 9
2713      */
completeOnTimeout(T value, long timeout, TimeUnit unit)2714     public CompletableFuture<T> completeOnTimeout(T value, long timeout,
2715                                                   TimeUnit unit) {
2716         if (unit == null)
2717             throw new NullPointerException();
2718         if (result == null)
2719             whenComplete(new Canceller(Delayer.delay(
2720                                            new DelayedCompleter<T>(this, value),
2721                                            timeout, unit)));
2722         return this;
2723     }
2724 
2725     /**
2726      * Returns a new Executor that submits a task to the given base
2727      * executor after the given delay (or no delay if non-positive).
2728      * Each delay commences upon invocation of the returned executor's
2729      * {@code execute} method.
2730      *
2731      * @param delay how long to delay, in units of {@code unit}
2732      * @param unit a {@code TimeUnit} determining how to interpret the
2733      *        {@code delay} parameter
2734      * @param executor the base executor
2735      * @return the new delayed executor
2736      * @since 9
2737      */
delayedExecutor(long delay, TimeUnit unit, Executor executor)2738     public static Executor delayedExecutor(long delay, TimeUnit unit,
2739                                            Executor executor) {
2740         if (unit == null || executor == null)
2741             throw new NullPointerException();
2742         return new DelayedExecutor(delay, unit, executor);
2743     }
2744 
2745     /**
2746      * Returns a new Executor that submits a task to the default
2747      * executor after the given delay (or no delay if non-positive).
2748      * Each delay commences upon invocation of the returned executor's
2749      * {@code execute} method.
2750      *
2751      * @param delay how long to delay, in units of {@code unit}
2752      * @param unit a {@code TimeUnit} determining how to interpret the
2753      *        {@code delay} parameter
2754      * @return the new delayed executor
2755      * @since 9
2756      */
delayedExecutor(long delay, TimeUnit unit)2757     public static Executor delayedExecutor(long delay, TimeUnit unit) {
2758         if (unit == null)
2759             throw new NullPointerException();
2760         return new DelayedExecutor(delay, unit, ASYNC_POOL);
2761     }
2762 
2763     /**
2764      * Returns a new CompletionStage that is already completed with
2765      * the given value and supports only those methods in
2766      * interface {@link CompletionStage}.
2767      *
2768      * @param value the value
2769      * @param <U> the type of the value
2770      * @return the completed CompletionStage
2771      * @since 9
2772      */
completedStage(U value)2773     public static <U> CompletionStage<U> completedStage(U value) {
2774         return new MinimalStage<U>((value == null) ? NIL : value);
2775     }
2776 
2777     /**
2778      * Returns a new CompletableFuture that is already completed
2779      * exceptionally with the given exception.
2780      *
2781      * @param ex the exception
2782      * @param <U> the type of the value
2783      * @return the exceptionally completed CompletableFuture
2784      * @since 9
2785      */
failedFuture(Throwable ex)2786     public static <U> CompletableFuture<U> failedFuture(Throwable ex) {
2787         if (ex == null) throw new NullPointerException();
2788         return new CompletableFuture<U>(new AltResult(ex));
2789     }
2790 
2791     /**
2792      * Returns a new CompletionStage that is already completed
2793      * exceptionally with the given exception and supports only those
2794      * methods in interface {@link CompletionStage}.
2795      *
2796      * @param ex the exception
2797      * @param <U> the type of the value
2798      * @return the exceptionally completed CompletionStage
2799      * @since 9
2800      */
failedStage(Throwable ex)2801     public static <U> CompletionStage<U> failedStage(Throwable ex) {
2802         if (ex == null) throw new NullPointerException();
2803         return new MinimalStage<U>(new AltResult(ex));
2804     }
2805 
2806     /**
2807      * Singleton delay scheduler, used only for starting and
2808      * cancelling tasks.
2809      */
2810     static final class Delayer {
delay(Runnable command, long delay, TimeUnit unit)2811         static ScheduledFuture<?> delay(Runnable command, long delay,
2812                                         TimeUnit unit) {
2813             return delayer.schedule(command, delay, unit);
2814         }
2815 
2816         static final class DaemonThreadFactory implements ThreadFactory {
newThread(Runnable r)2817             public Thread newThread(Runnable r) {
2818                 Thread t = new Thread(r);
2819                 t.setDaemon(true);
2820                 t.setName("CompletableFutureDelayScheduler");
2821                 return t;
2822             }
2823         }
2824 
2825         static final ScheduledThreadPoolExecutor delayer;
2826         static {
2827             (delayer = new ScheduledThreadPoolExecutor(
2828                 1, new DaemonThreadFactory())).
2829                 setRemoveOnCancelPolicy(true);
2830         }
2831     }
2832 
2833     // Little class-ified lambdas to better support monitoring
2834 
2835     static final class DelayedExecutor implements Executor {
2836         final long delay;
2837         final TimeUnit unit;
2838         final Executor executor;
DelayedExecutor(long delay, TimeUnit unit, Executor executor)2839         DelayedExecutor(long delay, TimeUnit unit, Executor executor) {
2840             this.delay = delay; this.unit = unit; this.executor = executor;
2841         }
execute(Runnable r)2842         public void execute(Runnable r) {
2843             Delayer.delay(new TaskSubmitter(executor, r), delay, unit);
2844         }
2845     }
2846 
2847     /** Action to submit user task */
2848     static final class TaskSubmitter implements Runnable {
2849         final Executor executor;
2850         final Runnable action;
TaskSubmitter(Executor executor, Runnable action)2851         TaskSubmitter(Executor executor, Runnable action) {
2852             this.executor = executor;
2853             this.action = action;
2854         }
run()2855         public void run() { executor.execute(action); }
2856     }
2857 
2858     /** Action to completeExceptionally on timeout */
2859     static final class Timeout implements Runnable {
2860         final CompletableFuture<?> f;
Timeout(CompletableFuture<?> f)2861         Timeout(CompletableFuture<?> f) { this.f = f; }
run()2862         public void run() {
2863             if (f != null && !f.isDone())
2864                 f.completeExceptionally(new TimeoutException());
2865         }
2866     }
2867 
2868     /** Action to complete on timeout */
2869     static final class DelayedCompleter<U> implements Runnable {
2870         final CompletableFuture<U> f;
2871         final U u;
DelayedCompleter(CompletableFuture<U> f, U u)2872         DelayedCompleter(CompletableFuture<U> f, U u) { this.f = f; this.u = u; }
run()2873         public void run() {
2874             if (f != null)
2875                 f.complete(u);
2876         }
2877     }
2878 
2879     /** Action to cancel unneeded timeouts */
2880     static final class Canceller implements BiConsumer<Object, Throwable> {
2881         final Future<?> f;
Canceller(Future<?> f)2882         Canceller(Future<?> f) { this.f = f; }
accept(Object ignore, Throwable ex)2883         public void accept(Object ignore, Throwable ex) {
2884             if (ex == null && f != null && !f.isDone())
2885                 f.cancel(false);
2886         }
2887     }
2888 
2889     /**
2890      * A subclass that just throws UOE for most non-CompletionStage methods.
2891      */
2892     static final class MinimalStage<T> extends CompletableFuture<T> {
MinimalStage()2893         MinimalStage() { }
MinimalStage(Object r)2894         MinimalStage(Object r) { super(r); }
newIncompleteFuture()2895         @Override public <U> CompletableFuture<U> newIncompleteFuture() {
2896             return new MinimalStage<U>(); }
get()2897         @Override public T get() {
2898             throw new UnsupportedOperationException(); }
get(long timeout, TimeUnit unit)2899         @Override public T get(long timeout, TimeUnit unit) {
2900             throw new UnsupportedOperationException(); }
getNow(T valueIfAbsent)2901         @Override public T getNow(T valueIfAbsent) {
2902             throw new UnsupportedOperationException(); }
join()2903         @Override public T join() {
2904             throw new UnsupportedOperationException(); }
complete(T value)2905         @Override public boolean complete(T value) {
2906             throw new UnsupportedOperationException(); }
completeExceptionally(Throwable ex)2907         @Override public boolean completeExceptionally(Throwable ex) {
2908             throw new UnsupportedOperationException(); }
cancel(boolean mayInterruptIfRunning)2909         @Override public boolean cancel(boolean mayInterruptIfRunning) {
2910             throw new UnsupportedOperationException(); }
obtrudeValue(T value)2911         @Override public void obtrudeValue(T value) {
2912             throw new UnsupportedOperationException(); }
obtrudeException(Throwable ex)2913         @Override public void obtrudeException(Throwable ex) {
2914             throw new UnsupportedOperationException(); }
isDone()2915         @Override public boolean isDone() {
2916             throw new UnsupportedOperationException(); }
isCancelled()2917         @Override public boolean isCancelled() {
2918             throw new UnsupportedOperationException(); }
isCompletedExceptionally()2919         @Override public boolean isCompletedExceptionally() {
2920             throw new UnsupportedOperationException(); }
getNumberOfDependents()2921         @Override public int getNumberOfDependents() {
2922             throw new UnsupportedOperationException(); }
completeAsync(Supplier<? extends T> supplier, Executor executor)2923         @Override public CompletableFuture<T> completeAsync
2924             (Supplier<? extends T> supplier, Executor executor) {
2925             throw new UnsupportedOperationException(); }
completeAsync(Supplier<? extends T> supplier)2926         @Override public CompletableFuture<T> completeAsync
2927             (Supplier<? extends T> supplier) {
2928             throw new UnsupportedOperationException(); }
orTimeout(long timeout, TimeUnit unit)2929         @Override public CompletableFuture<T> orTimeout
2930             (long timeout, TimeUnit unit) {
2931             throw new UnsupportedOperationException(); }
completeOnTimeout(T value, long timeout, TimeUnit unit)2932         @Override public CompletableFuture<T> completeOnTimeout
2933             (T value, long timeout, TimeUnit unit) {
2934             throw new UnsupportedOperationException(); }
toCompletableFuture()2935         @Override public CompletableFuture<T> toCompletableFuture() {
2936             Object r;
2937             if ((r = result) != null)
2938                 return new CompletableFuture<T>(encodeRelay(r));
2939             else {
2940                 CompletableFuture<T> d = new CompletableFuture<>();
2941                 unipush(new UniRelay<T,T>(d, this));
2942                 return d;
2943             }
2944         }
2945     }
2946 
2947     // VarHandle mechanics
2948     private static final VarHandle RESULT;
2949     private static final VarHandle STACK;
2950     private static final VarHandle NEXT;
2951     static {
2952         try {
2953             MethodHandles.Lookup l = MethodHandles.lookup();
2954             RESULT = l.findVarHandle(CompletableFuture.class, "result", Object.class);
2955             STACK = l.findVarHandle(CompletableFuture.class, "stack", Completion.class);
2956             NEXT = l.findVarHandle(Completion.class, "next", Completion.class);
2957         } catch (ReflectiveOperationException e) {
2958             throw new ExceptionInInitializerError(e);
2959         }
2960 
2961         // Reduce the risk of rare disastrous classloading in first call to
2962         // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
2963         Class<?> ensureLoaded = LockSupport.class;
2964     }
2965 }
2966