1 /*
2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3  *
4  * This code is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 only, as
6  * published by the Free Software Foundation.  Oracle designates this
7  * particular file as subject to the "Classpath" exception as provided
8  * by Oracle in the LICENSE file that accompanied this code.
9  *
10  * This code is distributed in the hope that it will be useful, but WITHOUT
11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * version 2 for more details (a copy is included in the LICENSE file that
14  * accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License version
17  * 2 along with this work; if not, write to the Free Software Foundation,
18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19  *
20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21  * or visit www.oracle.com if you need additional information or have any
22  * questions.
23  */
24 
25 /*
26  * This file is available under and governed by the GNU General Public
27  * License version 2 only, as published by the Free Software Foundation.
28  * However, the following notice accompanied the original version of this
29  * file:
30  *
31  * Written by Doug Lea with assistance from members of JCP JSR-166
32  * Expert Group and released to the public domain, as explained at
33  * http://creativecommons.org/publicdomain/zero/1.0/
34  */
35 
36 package java.util.concurrent;
37 
38 import java.lang.invoke.MethodHandles;
39 import java.lang.invoke.VarHandle;
40 import java.util.concurrent.locks.LockSupport;
41 import java.util.function.BiConsumer;
42 import java.util.function.BiFunction;
43 import java.util.function.Consumer;
44 import java.util.function.Function;
45 import java.util.function.Supplier;
46 
47 /**
48  * A {@link Future} that may be explicitly completed (setting its
49  * value and status), and may be used as a {@link CompletionStage},
50  * supporting dependent functions and actions that trigger upon its
51  * completion.
52  *
53  * <p>When two or more threads attempt to
54  * {@link #complete complete},
55  * {@link #completeExceptionally completeExceptionally}, or
56  * {@link #cancel cancel}
57  * a CompletableFuture, only one of them succeeds.
58  *
59  * <p>In addition to these and related methods for directly
60  * manipulating status and results, CompletableFuture implements
61  * interface {@link CompletionStage} with the following policies: <ul>
62  *
63  * <li>Actions supplied for dependent completions of
64  * <em>non-async</em> methods may be performed by the thread that
65  * completes the current CompletableFuture, or by any other caller of
66  * a completion method.
67  *
68  * <li>All <em>async</em> methods without an explicit Executor
69  * argument are performed using the {@link ForkJoinPool#commonPool()}
70  * (unless it does not support a parallelism level of at least two, in
71  * which case, a new Thread is created to run each task).  This may be
72  * overridden for non-static methods in subclasses by defining method
73  * {@link #defaultExecutor()}. To simplify monitoring, debugging,
74  * and tracking, all generated asynchronous tasks are instances of the
75  * marker interface {@link AsynchronousCompletionTask}.  Operations
76  * with time-delays can use adapter methods defined in this class, for
77  * example: {@code supplyAsync(supplier, delayedExecutor(timeout,
78  * timeUnit))}.  To support methods with delays and timeouts, this
79  * class maintains at most one daemon thread for triggering and
80  * cancelling actions, not for running them.
81  *
82  * <li>All CompletionStage methods are implemented independently of
83  * other public methods, so the behavior of one method is not impacted
84  * by overrides of others in subclasses.
85  *
86  * <li>All CompletionStage methods return CompletableFutures.  To
87  * restrict usages to only those methods defined in interface
88  * CompletionStage, use method {@link #minimalCompletionStage}. Or to
89  * ensure only that clients do not themselves modify a future, use
90  * method {@link #copy}.
91  * </ul>
92  *
93  * <p>CompletableFuture also implements {@link Future} with the following
94  * policies: <ul>
95  *
96  * <li>Since (unlike {@link FutureTask}) this class has no direct
97  * control over the computation that causes it to be completed,
98  * cancellation is treated as just another form of exceptional
99  * completion.  Method {@link #cancel cancel} has the same effect as
100  * {@code completeExceptionally(new CancellationException())}. Method
101  * {@link #isCompletedExceptionally} can be used to determine if a
102  * CompletableFuture completed in any exceptional fashion.
103  *
104  * <li>In case of exceptional completion with a CompletionException,
105  * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
106  * {@link ExecutionException} with the same cause as held in the
107  * corresponding CompletionException.  To simplify usage in most
108  * contexts, this class also defines methods {@link #join()} and
109  * {@link #getNow} that instead throw the CompletionException directly
110  * in these cases.
111  * </ul>
112  *
113  * <p>Arguments used to pass a completion result (that is, for
114  * parameters of type {@code T}) for methods accepting them may be
115  * null, but passing a null value for any other parameter will result
116  * in a {@link NullPointerException} being thrown.
117  *
118  * <p>Subclasses of this class should normally override the "virtual
119  * constructor" method {@link #newIncompleteFuture}, which establishes
120  * the concrete type returned by CompletionStage methods. For example,
121  * here is a class that substitutes a different default Executor and
122  * disables the {@code obtrude} methods:
123  *
124  * <pre> {@code
125  * class MyCompletableFuture<T> extends CompletableFuture<T> {
126  *   static final Executor myExecutor = ...;
127  *   public MyCompletableFuture() { }
128  *   public <U> CompletableFuture<U> newIncompleteFuture() {
129  *     return new MyCompletableFuture<U>(); }
130  *   public Executor defaultExecutor() {
131  *     return myExecutor; }
132  *   public void obtrudeValue(T value) {
133  *     throw new UnsupportedOperationException(); }
134  *   public void obtrudeException(Throwable ex) {
135  *     throw new UnsupportedOperationException(); }
136  * }}</pre>
137  *
138  * @author Doug Lea
139  * @param <T> The result type returned by this future's {@code join}
140  * and {@code get} methods
141  * @since 1.8
142  */
143 public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
144 
145     /*
146      * Overview:
147      *
148      * A CompletableFuture may have dependent completion actions,
149      * collected in a linked stack. It atomically completes by CASing
150      * a result field, and then pops off and runs those actions. This
151      * applies across normal vs exceptional outcomes, sync vs async
152      * actions, binary triggers, and various forms of completions.
153      *
154      * Non-nullness of volatile field "result" indicates done.  It may
155      * be set directly if known to be thread-confined, else via CAS.
156      * An AltResult is used to box null as a result, as well as to
157      * hold exceptions.  Using a single field makes completion simple
158      * to detect and trigger.  Result encoding and decoding is
159      * straightforward but tedious and adds to the sprawl of trapping
160      * and associating exceptions with targets.  Minor simplifications
161      * rely on (static) NIL (to box null results) being the only
162      * AltResult with a null exception field, so we don't usually need
163      * explicit comparisons.  Even though some of the generics casts
164      * are unchecked (see SuppressWarnings annotations), they are
165      * placed to be appropriate even if checked.
166      *
167      * Dependent actions are represented by Completion objects linked
168      * as Treiber stacks headed by field "stack". There are Completion
169      * classes for each kind of action, grouped into:
170      * - single-input (UniCompletion),
171      * - two-input (BiCompletion),
172      * - projected (BiCompletions using exactly one of two inputs),
173      * - shared (CoCompletion, used by the second of two sources),
174      * - zero-input source actions,
175      * - Signallers that unblock waiters.
176      * Class Completion extends ForkJoinTask to enable async execution
177      * (adding no space overhead because we exploit its "tag" methods
178      * to maintain claims). It is also declared as Runnable to allow
179      * usage with arbitrary executors.
180      *
181      * Support for each kind of CompletionStage relies on a separate
182      * class, along with two CompletableFuture methods:
183      *
184      * * A Completion class with name X corresponding to function,
185      *   prefaced with "Uni", "Bi", or "Or". Each class contains
186      *   fields for source(s), actions, and dependent. They are
187      *   boringly similar, differing from others only with respect to
188      *   underlying functional forms. We do this so that users don't
189      *   encounter layers of adapters in common usages.
190      *
191      * * Boolean CompletableFuture method x(...) (for example
192      *   biApply) takes all of the arguments needed to check that an
193      *   action is triggerable, and then either runs the action or
194      *   arranges its async execution by executing its Completion
195      *   argument, if present. The method returns true if known to be
196      *   complete.
197      *
198      * * Completion method tryFire(int mode) invokes the associated x
199      *   method with its held arguments, and on success cleans up.
200      *   The mode argument allows tryFire to be called twice (SYNC,
201      *   then ASYNC); the first to screen and trap exceptions while
202      *   arranging to execute, and the second when called from a task.
203      *   (A few classes are not used async so take slightly different
204      *   forms.)  The claim() callback suppresses function invocation
205      *   if already claimed by another thread.
206      *
207      * * Some classes (for example UniApply) have separate handling
208      *   code for when known to be thread-confined ("now" methods) and
209      *   for when shared (in tryFire), for efficiency.
210      *
211      * * CompletableFuture method xStage(...) is called from a public
212      *   stage method of CompletableFuture f. It screens user
213      *   arguments and invokes and/or creates the stage object.  If
214      *   not async and already triggerable, the action is run
215      *   immediately.  Otherwise a Completion c is created, and
216      *   submitted to the executor if triggerable, or pushed onto f's
217      *   stack if not.  Completion actions are started via c.tryFire.
218      *   We recheck after pushing to a source future's stack to cover
219      *   possible races if the source completes while pushing.
220      *   Classes with two inputs (for example BiApply) deal with races
221      *   across both while pushing actions.  The second completion is
222      *   a CoCompletion pointing to the first, shared so that at most
223      *   one performs the action.  The multiple-arity methods allOf
224      *   does this pairwise to form trees of completions.  Method
225      *   anyOf is handled differently from allOf because completion of
226      *   any source should trigger a cleanStack of other sources.
227      *   Each AnyOf completion can reach others via a shared array.
228      *
229      * Note that the generic type parameters of methods vary according
230      * to whether "this" is a source, dependent, or completion.
231      *
232      * Method postComplete is called upon completion unless the target
233      * is guaranteed not to be observable (i.e., not yet returned or
234      * linked). Multiple threads can call postComplete, which
235      * atomically pops each dependent action, and tries to trigger it
236      * via method tryFire, in NESTED mode.  Triggering can propagate
237      * recursively, so NESTED mode returns its completed dependent (if
238      * one exists) for further processing by its caller (see method
239      * postFire).
240      *
241      * Blocking methods get() and join() rely on Signaller Completions
242      * that wake up waiting threads.  The mechanics are similar to
243      * Treiber stack wait-nodes used in FutureTask, Phaser, and
244      * SynchronousQueue. See their internal documentation for
245      * algorithmic details.
246      *
247      * Without precautions, CompletableFutures would be prone to
248      * garbage accumulation as chains of Completions build up, each
249      * pointing back to its sources. So we null out fields as soon as
250      * possible.  The screening checks needed anyway harmlessly ignore
251      * null arguments that may have been obtained during races with
252      * threads nulling out fields.  We also try to unlink non-isLive
253      * (fired or cancelled) Completions from stacks that might
254      * otherwise never be popped: Method cleanStack always unlinks non
255      * isLive completions from the head of stack; others may
256      * occasionally remain if racing with other cancellations or
257      * removals.
258      *
259      * Completion fields need not be declared as final or volatile
260      * because they are only visible to other threads upon safe
261      * publication.
262      */
263 
264     volatile Object result;       // Either the result or boxed AltResult
265     volatile Completion stack;    // Top of Treiber stack of dependent actions
266 
internalComplete(Object r)267     final boolean internalComplete(Object r) { // CAS from null to r
268         return RESULT.compareAndSet(this, null, r);
269     }
270 
271     /** Returns true if successfully pushed c onto stack. */
tryPushStack(Completion c)272     final boolean tryPushStack(Completion c) {
273         Completion h = stack;
274         NEXT.set(c, h);         // CAS piggyback
275         return STACK.compareAndSet(this, h, c);
276     }
277 
278     /** Unconditionally pushes c onto stack, retrying if necessary. */
pushStack(Completion c)279     final void pushStack(Completion c) {
280         do {} while (!tryPushStack(c));
281     }
282 
283     /* ------------- Encoding and decoding outcomes -------------- */
284 
285     static final class AltResult { // See above
286         final Throwable ex;        // null only for NIL
AltResult(Throwable x)287         AltResult(Throwable x) { this.ex = x; }
288     }
289 
290     /** The encoding of the null value. */
291     static final AltResult NIL = new AltResult(null);
292 
293     /** Completes with the null value, unless already completed. */
completeNull()294     final boolean completeNull() {
295         return RESULT.compareAndSet(this, null, NIL);
296     }
297 
298     /** Returns the encoding of the given non-exceptional value. */
encodeValue(T t)299     final Object encodeValue(T t) {
300         return (t == null) ? NIL : t;
301     }
302 
303     /** Completes with a non-exceptional result, unless already completed. */
completeValue(T t)304     final boolean completeValue(T t) {
305         return RESULT.compareAndSet(this, null, (t == null) ? NIL : t);
306     }
307 
308     /**
309      * Returns the encoding of the given (non-null) exception as a
310      * wrapped CompletionException unless it is one already.
311      */
encodeThrowable(Throwable x)312     static AltResult encodeThrowable(Throwable x) {
313         return new AltResult((x instanceof CompletionException) ? x :
314                              new CompletionException(x));
315     }
316 
317     /** Completes with an exceptional result, unless already completed. */
completeThrowable(Throwable x)318     final boolean completeThrowable(Throwable x) {
319         return RESULT.compareAndSet(this, null, encodeThrowable(x));
320     }
321 
322     /**
323      * Returns the encoding of the given (non-null) exception as a
324      * wrapped CompletionException unless it is one already.  May
325      * return the given Object r (which must have been the result of a
326      * source future) if it is equivalent, i.e. if this is a simple
327      * relay of an existing CompletionException.
328      */
encodeThrowable(Throwable x, Object r)329     static Object encodeThrowable(Throwable x, Object r) {
330         if (!(x instanceof CompletionException))
331             x = new CompletionException(x);
332         else if (r instanceof AltResult && x == ((AltResult)r).ex)
333             return r;
334         return new AltResult(x);
335     }
336 
337     /**
338      * Completes with the given (non-null) exceptional result as a
339      * wrapped CompletionException unless it is one already, unless
340      * already completed.  May complete with the given Object r
341      * (which must have been the result of a source future) if it is
342      * equivalent, i.e. if this is a simple propagation of an
343      * existing CompletionException.
344      */
completeThrowable(Throwable x, Object r)345     final boolean completeThrowable(Throwable x, Object r) {
346         return RESULT.compareAndSet(this, null, encodeThrowable(x, r));
347     }
348 
349     /**
350      * Returns the encoding of the given arguments: if the exception
351      * is non-null, encodes as AltResult.  Otherwise uses the given
352      * value, boxed as NIL if null.
353      */
encodeOutcome(T t, Throwable x)354     Object encodeOutcome(T t, Throwable x) {
355         return (x == null) ? (t == null) ? NIL : t : encodeThrowable(x);
356     }
357 
358     /**
359      * Returns the encoding of a copied outcome; if exceptional,
360      * rewraps as a CompletionException, else returns argument.
361      */
encodeRelay(Object r)362     static Object encodeRelay(Object r) {
363         Throwable x;
364         if (r instanceof AltResult
365             && (x = ((AltResult)r).ex) != null
366             && !(x instanceof CompletionException))
367             r = new AltResult(new CompletionException(x));
368         return r;
369     }
370 
371     /**
372      * Completes with r or a copy of r, unless already completed.
373      * If exceptional, r is first coerced to a CompletionException.
374      */
completeRelay(Object r)375     final boolean completeRelay(Object r) {
376         return RESULT.compareAndSet(this, null, encodeRelay(r));
377     }
378 
379     /**
380      * Reports result using Future.get conventions.
381      */
reportGet(Object r)382     private static Object reportGet(Object r)
383         throws InterruptedException, ExecutionException {
384         if (r == null) // by convention below, null means interrupted
385             throw new InterruptedException();
386         if (r instanceof AltResult) {
387             Throwable x, cause;
388             if ((x = ((AltResult)r).ex) == null)
389                 return null;
390             if (x instanceof CancellationException)
391                 throw (CancellationException)x;
392             if ((x instanceof CompletionException) &&
393                 (cause = x.getCause()) != null)
394                 x = cause;
395             throw new ExecutionException(x);
396         }
397         return r;
398     }
399 
400     /**
401      * Decodes outcome to return result or throw unchecked exception.
402      */
reportJoin(Object r)403     private static Object reportJoin(Object r) {
404         if (r instanceof AltResult) {
405             Throwable x;
406             if ((x = ((AltResult)r).ex) == null)
407                 return null;
408             if (x instanceof CancellationException)
409                 throw (CancellationException)x;
410             if (x instanceof CompletionException)
411                 throw (CompletionException)x;
412             throw new CompletionException(x);
413         }
414         return r;
415     }
416 
417     /* ------------- Async task preliminaries -------------- */
418 
419     /**
420      * A marker interface identifying asynchronous tasks produced by
421      * {@code async} methods. This may be useful for monitoring,
422      * debugging, and tracking asynchronous activities.
423      *
424      * @since 1.8
425      */
426     public static interface AsynchronousCompletionTask {
427     }
428 
429     private static final boolean USE_COMMON_POOL =
430         (ForkJoinPool.getCommonPoolParallelism() > 1);
431 
432     /**
433      * Default executor -- ForkJoinPool.commonPool() unless it cannot
434      * support parallelism.
435      */
436     private static final Executor ASYNC_POOL = USE_COMMON_POOL ?
437         ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
438 
439     /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
440     static final class ThreadPerTaskExecutor implements Executor {
execute(Runnable r)441         public void execute(Runnable r) { new Thread(r).start(); }
442     }
443 
444     /**
445      * Null-checks user executor argument, and translates uses of
446      * commonPool to ASYNC_POOL in case parallelism disabled.
447      */
screenExecutor(Executor e)448     static Executor screenExecutor(Executor e) {
449         if (!USE_COMMON_POOL && e == ForkJoinPool.commonPool())
450             return ASYNC_POOL;
451         if (e == null) throw new NullPointerException();
452         return e;
453     }
454 
455     // Modes for Completion.tryFire. Signedness matters.
456     static final int SYNC   =  0;
457     static final int ASYNC  =  1;
458     static final int NESTED = -1;
459 
460     /* ------------- Base Completion classes and operations -------------- */
461 
462     @SuppressWarnings("serial")
463     abstract static class Completion extends ForkJoinTask<Void>
464         implements Runnable, AsynchronousCompletionTask {
465         volatile Completion next;      // Treiber stack link
466 
467         /**
468          * Performs completion action if triggered, returning a
469          * dependent that may need propagation, if one exists.
470          *
471          * @param mode SYNC, ASYNC, or NESTED
472          */
tryFire(int mode)473         abstract CompletableFuture<?> tryFire(int mode);
474 
475         /** Returns true if possibly still triggerable. Used by cleanStack. */
isLive()476         abstract boolean isLive();
477 
run()478         public final void run()                { tryFire(ASYNC); }
exec()479         public final boolean exec()            { tryFire(ASYNC); return false; }
getRawResult()480         public final Void getRawResult()       { return null; }
setRawResult(Void v)481         public final void setRawResult(Void v) {}
482     }
483 
484     /**
485      * Pops and tries to trigger all reachable dependents.  Call only
486      * when known to be done.
487      */
postComplete()488     final void postComplete() {
489         /*
490          * On each step, variable f holds current dependents to pop
491          * and run.  It is extended along only one path at a time,
492          * pushing others to avoid unbounded recursion.
493          */
494         CompletableFuture<?> f = this; Completion h;
495         while ((h = f.stack) != null ||
496                (f != this && (h = (f = this).stack) != null)) {
497             CompletableFuture<?> d; Completion t;
498             if (STACK.compareAndSet(f, h, t = h.next)) {
499                 if (t != null) {
500                     if (f != this) {
501                         pushStack(h);
502                         continue;
503                     }
504                     NEXT.compareAndSet(h, t, null); // try to detach
505                 }
506                 f = (d = h.tryFire(NESTED)) == null ? this : d;
507             }
508         }
509     }
510 
511     /** Traverses stack and unlinks one or more dead Completions, if found. */
cleanStack()512     final void cleanStack() {
513         Completion p = stack;
514         // ensure head of stack live
515         for (boolean unlinked = false;;) {
516             if (p == null)
517                 return;
518             else if (p.isLive()) {
519                 if (unlinked)
520                     return;
521                 else
522                     break;
523             }
524             else if (STACK.weakCompareAndSet(this, p, (p = p.next)))
525                 unlinked = true;
526             else
527                 p = stack;
528         }
529         // try to unlink first non-live
530         for (Completion q = p.next; q != null;) {
531             Completion s = q.next;
532             if (q.isLive()) {
533                 p = q;
534                 q = s;
535             } else if (NEXT.weakCompareAndSet(p, q, s))
536                 break;
537             else
538                 q = p.next;
539         }
540     }
541 
542     /* ------------- One-input Completions -------------- */
543 
544     /** A Completion with a source, dependent, and executor. */
545     @SuppressWarnings("serial")
546     abstract static class UniCompletion<T,V> extends Completion {
547         Executor executor;                 // executor to use (null if none)
548         CompletableFuture<V> dep;          // the dependent to complete
549         CompletableFuture<T> src;          // source for action
550 
UniCompletion(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src)551         UniCompletion(Executor executor, CompletableFuture<V> dep,
552                       CompletableFuture<T> src) {
553             this.executor = executor; this.dep = dep; this.src = src;
554         }
555 
556         /**
557          * Returns true if action can be run. Call only when known to
558          * be triggerable. Uses FJ tag bit to ensure that only one
559          * thread claims ownership.  If async, starts as task -- a
560          * later call to tryFire will run action.
561          */
claim()562         final boolean claim() {
563             Executor e = executor;
564             if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
565                 if (e == null)
566                     return true;
567                 executor = null; // disable
568                 e.execute(this);
569             }
570             return false;
571         }
572 
isLive()573         final boolean isLive() { return dep != null; }
574     }
575 
576     /**
577      * Pushes the given completion unless it completes while trying.
578      * Caller should first check that result is null.
579      */
unipush(Completion c)580     final void unipush(Completion c) {
581         if (c != null) {
582             while (!tryPushStack(c)) {
583                 if (result != null) {
584                     NEXT.set(c, null);
585                     break;
586                 }
587             }
588             if (result != null)
589                 c.tryFire(SYNC);
590         }
591     }
592 
593     /**
594      * Post-processing by dependent after successful UniCompletion tryFire.
595      * Tries to clean stack of source a, and then either runs postComplete
596      * or returns this to caller, depending on mode.
597      */
postFire(CompletableFuture<?> a, int mode)598     final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {
599         if (a != null && a.stack != null) {
600             Object r;
601             if ((r = a.result) == null)
602                 a.cleanStack();
603             if (mode >= 0 && (r != null || a.result != null))
604                 a.postComplete();
605         }
606         if (result != null && stack != null) {
607             if (mode < 0)
608                 return this;
609             else
610                 postComplete();
611         }
612         return null;
613     }
614 
615     @SuppressWarnings("serial")
616     static final class UniApply<T,V> extends UniCompletion<T,V> {
617         Function<? super T,? extends V> fn;
UniApply(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, Function<? super T,? extends V> fn)618         UniApply(Executor executor, CompletableFuture<V> dep,
619                  CompletableFuture<T> src,
620                  Function<? super T,? extends V> fn) {
621             super(executor, dep, src); this.fn = fn;
622         }
tryFire(int mode)623         final CompletableFuture<V> tryFire(int mode) {
624             CompletableFuture<V> d; CompletableFuture<T> a;
625             Object r; Throwable x; Function<? super T,? extends V> f;
626             if ((d = dep) == null || (f = fn) == null
627                 || (a = src) == null || (r = a.result) == null)
628                 return null;
629             tryComplete: if (d.result == null) {
630                 if (r instanceof AltResult) {
631                     if ((x = ((AltResult)r).ex) != null) {
632                         d.completeThrowable(x, r);
633                         break tryComplete;
634                     }
635                     r = null;
636                 }
637                 try {
638                     if (mode <= 0 && !claim())
639                         return null;
640                     else {
641                         @SuppressWarnings("unchecked") T t = (T) r;
642                         d.completeValue(f.apply(t));
643                     }
644                 } catch (Throwable ex) {
645                     d.completeThrowable(ex);
646                 }
647             }
648             dep = null; src = null; fn = null;
649             return d.postFire(a, mode);
650         }
651     }
652 
uniApplyStage( Executor e, Function<? super T,? extends V> f)653     private <V> CompletableFuture<V> uniApplyStage(
654         Executor e, Function<? super T,? extends V> f) {
655         if (f == null) throw new NullPointerException();
656         Object r;
657         if ((r = result) != null)
658             return uniApplyNow(r, e, f);
659         CompletableFuture<V> d = newIncompleteFuture();
660         unipush(new UniApply<T,V>(e, d, this, f));
661         return d;
662     }
663 
uniApplyNow( Object r, Executor e, Function<? super T,? extends V> f)664     private <V> CompletableFuture<V> uniApplyNow(
665         Object r, Executor e, Function<? super T,? extends V> f) {
666         Throwable x;
667         CompletableFuture<V> d = newIncompleteFuture();
668         if (r instanceof AltResult) {
669             if ((x = ((AltResult)r).ex) != null) {
670                 d.result = encodeThrowable(x, r);
671                 return d;
672             }
673             r = null;
674         }
675         try {
676             if (e != null) {
677                 e.execute(new UniApply<T,V>(null, d, this, f));
678             } else {
679                 @SuppressWarnings("unchecked") T t = (T) r;
680                 d.result = d.encodeValue(f.apply(t));
681             }
682         } catch (Throwable ex) {
683             d.result = encodeThrowable(ex);
684         }
685         return d;
686     }
687 
688     @SuppressWarnings("serial")
689     static final class UniAccept<T> extends UniCompletion<T,Void> {
690         Consumer<? super T> fn;
UniAccept(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, Consumer<? super T> fn)691         UniAccept(Executor executor, CompletableFuture<Void> dep,
692                   CompletableFuture<T> src, Consumer<? super T> fn) {
693             super(executor, dep, src); this.fn = fn;
694         }
tryFire(int mode)695         final CompletableFuture<Void> tryFire(int mode) {
696             CompletableFuture<Void> d; CompletableFuture<T> a;
697             Object r; Throwable x; Consumer<? super T> f;
698             if ((d = dep) == null || (f = fn) == null
699                 || (a = src) == null || (r = a.result) == null)
700                 return null;
701             tryComplete: if (d.result == null) {
702                 if (r instanceof AltResult) {
703                     if ((x = ((AltResult)r).ex) != null) {
704                         d.completeThrowable(x, r);
705                         break tryComplete;
706                     }
707                     r = null;
708                 }
709                 try {
710                     if (mode <= 0 && !claim())
711                         return null;
712                     else {
713                         @SuppressWarnings("unchecked") T t = (T) r;
714                         f.accept(t);
715                         d.completeNull();
716                     }
717                 } catch (Throwable ex) {
718                     d.completeThrowable(ex);
719                 }
720             }
721             dep = null; src = null; fn = null;
722             return d.postFire(a, mode);
723         }
724     }
725 
uniAcceptStage(Executor e, Consumer<? super T> f)726     private CompletableFuture<Void> uniAcceptStage(Executor e,
727                                                    Consumer<? super T> f) {
728         if (f == null) throw new NullPointerException();
729         Object r;
730         if ((r = result) != null)
731             return uniAcceptNow(r, e, f);
732         CompletableFuture<Void> d = newIncompleteFuture();
733         unipush(new UniAccept<T>(e, d, this, f));
734         return d;
735     }
736 
uniAcceptNow( Object r, Executor e, Consumer<? super T> f)737     private CompletableFuture<Void> uniAcceptNow(
738         Object r, Executor e, Consumer<? super T> f) {
739         Throwable x;
740         CompletableFuture<Void> d = newIncompleteFuture();
741         if (r instanceof AltResult) {
742             if ((x = ((AltResult)r).ex) != null) {
743                 d.result = encodeThrowable(x, r);
744                 return d;
745             }
746             r = null;
747         }
748         try {
749             if (e != null) {
750                 e.execute(new UniAccept<T>(null, d, this, f));
751             } else {
752                 @SuppressWarnings("unchecked") T t = (T) r;
753                 f.accept(t);
754                 d.result = NIL;
755             }
756         } catch (Throwable ex) {
757             d.result = encodeThrowable(ex);
758         }
759         return d;
760     }
761 
762     @SuppressWarnings("serial")
763     static final class UniRun<T> extends UniCompletion<T,Void> {
764         Runnable fn;
UniRun(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, Runnable fn)765         UniRun(Executor executor, CompletableFuture<Void> dep,
766                CompletableFuture<T> src, Runnable fn) {
767             super(executor, dep, src); this.fn = fn;
768         }
tryFire(int mode)769         final CompletableFuture<Void> tryFire(int mode) {
770             CompletableFuture<Void> d; CompletableFuture<T> a;
771             Object r; Throwable x; Runnable f;
772             if ((d = dep) == null || (f = fn) == null
773                 || (a = src) == null || (r = a.result) == null)
774                 return null;
775             if (d.result == null) {
776                 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null)
777                     d.completeThrowable(x, r);
778                 else
779                     try {
780                         if (mode <= 0 && !claim())
781                             return null;
782                         else {
783                             f.run();
784                             d.completeNull();
785                         }
786                     } catch (Throwable ex) {
787                         d.completeThrowable(ex);
788                     }
789             }
790             dep = null; src = null; fn = null;
791             return d.postFire(a, mode);
792         }
793     }
794 
uniRunStage(Executor e, Runnable f)795     private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) {
796         if (f == null) throw new NullPointerException();
797         Object r;
798         if ((r = result) != null)
799             return uniRunNow(r, e, f);
800         CompletableFuture<Void> d = newIncompleteFuture();
801         unipush(new UniRun<T>(e, d, this, f));
802         return d;
803     }
804 
uniRunNow(Object r, Executor e, Runnable f)805     private CompletableFuture<Void> uniRunNow(Object r, Executor e, Runnable f) {
806         Throwable x;
807         CompletableFuture<Void> d = newIncompleteFuture();
808         if (r instanceof AltResult && (x = ((AltResult)r).ex) != null)
809             d.result = encodeThrowable(x, r);
810         else
811             try {
812                 if (e != null) {
813                     e.execute(new UniRun<T>(null, d, this, f));
814                 } else {
815                     f.run();
816                     d.result = NIL;
817                 }
818             } catch (Throwable ex) {
819                 d.result = encodeThrowable(ex);
820             }
821         return d;
822     }
823 
824     @SuppressWarnings("serial")
825     static final class UniWhenComplete<T> extends UniCompletion<T,T> {
826         BiConsumer<? super T, ? super Throwable> fn;
UniWhenComplete(Executor executor, CompletableFuture<T> dep, CompletableFuture<T> src, BiConsumer<? super T, ? super Throwable> fn)827         UniWhenComplete(Executor executor, CompletableFuture<T> dep,
828                         CompletableFuture<T> src,
829                         BiConsumer<? super T, ? super Throwable> fn) {
830             super(executor, dep, src); this.fn = fn;
831         }
tryFire(int mode)832         final CompletableFuture<T> tryFire(int mode) {
833             CompletableFuture<T> d; CompletableFuture<T> a;
834             Object r; BiConsumer<? super T, ? super Throwable> f;
835             if ((d = dep) == null || (f = fn) == null
836                 || (a = src) == null || (r = a.result) == null
837                 || !d.uniWhenComplete(r, f, mode > 0 ? null : this))
838                 return null;
839             dep = null; src = null; fn = null;
840             return d.postFire(a, mode);
841         }
842     }
843 
uniWhenComplete(Object r, BiConsumer<? super T,? super Throwable> f, UniWhenComplete<T> c)844     final boolean uniWhenComplete(Object r,
845                                   BiConsumer<? super T,? super Throwable> f,
846                                   UniWhenComplete<T> c) {
847         T t; Throwable x = null;
848         if (result == null) {
849             try {
850                 if (c != null && !c.claim())
851                     return false;
852                 if (r instanceof AltResult) {
853                     x = ((AltResult)r).ex;
854                     t = null;
855                 } else {
856                     @SuppressWarnings("unchecked") T tr = (T) r;
857                     t = tr;
858                 }
859                 f.accept(t, x);
860                 if (x == null) {
861                     internalComplete(r);
862                     return true;
863                 }
864             } catch (Throwable ex) {
865                 if (x == null)
866                     x = ex;
867                 else if (x != ex)
868                     x.addSuppressed(ex);
869             }
870             completeThrowable(x, r);
871         }
872         return true;
873     }
874 
uniWhenCompleteStage( Executor e, BiConsumer<? super T, ? super Throwable> f)875     private CompletableFuture<T> uniWhenCompleteStage(
876         Executor e, BiConsumer<? super T, ? super Throwable> f) {
877         if (f == null) throw new NullPointerException();
878         CompletableFuture<T> d = newIncompleteFuture();
879         Object r;
880         if ((r = result) == null)
881             unipush(new UniWhenComplete<T>(e, d, this, f));
882         else if (e == null)
883             d.uniWhenComplete(r, f, null);
884         else {
885             try {
886                 e.execute(new UniWhenComplete<T>(null, d, this, f));
887             } catch (Throwable ex) {
888                 d.result = encodeThrowable(ex);
889             }
890         }
891         return d;
892     }
893 
894     @SuppressWarnings("serial")
895     static final class UniHandle<T,V> extends UniCompletion<T,V> {
896         BiFunction<? super T, Throwable, ? extends V> fn;
UniHandle(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, BiFunction<? super T, Throwable, ? extends V> fn)897         UniHandle(Executor executor, CompletableFuture<V> dep,
898                   CompletableFuture<T> src,
899                   BiFunction<? super T, Throwable, ? extends V> fn) {
900             super(executor, dep, src); this.fn = fn;
901         }
tryFire(int mode)902         final CompletableFuture<V> tryFire(int mode) {
903             CompletableFuture<V> d; CompletableFuture<T> a;
904             Object r; BiFunction<? super T, Throwable, ? extends V> f;
905             if ((d = dep) == null || (f = fn) == null
906                 || (a = src) == null || (r = a.result) == null
907                 || !d.uniHandle(r, f, mode > 0 ? null : this))
908                 return null;
909             dep = null; src = null; fn = null;
910             return d.postFire(a, mode);
911         }
912     }
913 
uniHandle(Object r, BiFunction<? super S, Throwable, ? extends T> f, UniHandle<S,T> c)914     final <S> boolean uniHandle(Object r,
915                                 BiFunction<? super S, Throwable, ? extends T> f,
916                                 UniHandle<S,T> c) {
917         S s; Throwable x;
918         if (result == null) {
919             try {
920                 if (c != null && !c.claim())
921                     return false;
922                 if (r instanceof AltResult) {
923                     x = ((AltResult)r).ex;
924                     s = null;
925                 } else {
926                     x = null;
927                     @SuppressWarnings("unchecked") S ss = (S) r;
928                     s = ss;
929                 }
930                 completeValue(f.apply(s, x));
931             } catch (Throwable ex) {
932                 completeThrowable(ex);
933             }
934         }
935         return true;
936     }
937 
uniHandleStage( Executor e, BiFunction<? super T, Throwable, ? extends V> f)938     private <V> CompletableFuture<V> uniHandleStage(
939         Executor e, BiFunction<? super T, Throwable, ? extends V> f) {
940         if (f == null) throw new NullPointerException();
941         CompletableFuture<V> d = newIncompleteFuture();
942         Object r;
943         if ((r = result) == null)
944             unipush(new UniHandle<T,V>(e, d, this, f));
945         else if (e == null)
946             d.uniHandle(r, f, null);
947         else {
948             try {
949                 e.execute(new UniHandle<T,V>(null, d, this, f));
950             } catch (Throwable ex) {
951                 d.result = encodeThrowable(ex);
952             }
953         }
954         return d;
955     }
956 
957     @SuppressWarnings("serial")
958     static final class UniExceptionally<T> extends UniCompletion<T,T> {
959         Function<? super Throwable, ? extends T> fn;
UniExceptionally(CompletableFuture<T> dep, CompletableFuture<T> src, Function<? super Throwable, ? extends T> fn)960         UniExceptionally(CompletableFuture<T> dep, CompletableFuture<T> src,
961                          Function<? super Throwable, ? extends T> fn) {
962             super(null, dep, src); this.fn = fn;
963         }
tryFire(int mode)964         final CompletableFuture<T> tryFire(int mode) { // never ASYNC
965             // assert mode != ASYNC;
966             CompletableFuture<T> d; CompletableFuture<T> a;
967             Object r; Function<? super Throwable, ? extends T> f;
968             if ((d = dep) == null || (f = fn) == null
969                 || (a = src) == null || (r = a.result) == null
970                 || !d.uniExceptionally(r, f, this))
971                 return null;
972             dep = null; src = null; fn = null;
973             return d.postFire(a, mode);
974         }
975     }
976 
uniExceptionally(Object r, Function<? super Throwable, ? extends T> f, UniExceptionally<T> c)977     final boolean uniExceptionally(Object r,
978                                    Function<? super Throwable, ? extends T> f,
979                                    UniExceptionally<T> c) {
980         Throwable x;
981         if (result == null) {
982             try {
983                 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) {
984                     if (c != null && !c.claim())
985                         return false;
986                     completeValue(f.apply(x));
987                 } else
988                     internalComplete(r);
989             } catch (Throwable ex) {
990                 completeThrowable(ex);
991             }
992         }
993         return true;
994     }
995 
uniExceptionallyStage( Function<Throwable, ? extends T> f)996     private CompletableFuture<T> uniExceptionallyStage(
997         Function<Throwable, ? extends T> f) {
998         if (f == null) throw new NullPointerException();
999         CompletableFuture<T> d = newIncompleteFuture();
1000         Object r;
1001         if ((r = result) == null)
1002             unipush(new UniExceptionally<T>(d, this, f));
1003         else
1004             d.uniExceptionally(r, f, null);
1005         return d;
1006     }
1007 
1008     @SuppressWarnings("serial")
1009     static final class UniRelay<U, T extends U> extends UniCompletion<T,U> {
UniRelay(CompletableFuture<U> dep, CompletableFuture<T> src)1010         UniRelay(CompletableFuture<U> dep, CompletableFuture<T> src) {
1011             super(null, dep, src);
1012         }
tryFire(int mode)1013         final CompletableFuture<U> tryFire(int mode) {
1014             CompletableFuture<U> d; CompletableFuture<T> a; Object r;
1015             if ((d = dep) == null
1016                 || (a = src) == null || (r = a.result) == null)
1017                 return null;
1018             if (d.result == null)
1019                 d.completeRelay(r);
1020             src = null; dep = null;
1021             return d.postFire(a, mode);
1022         }
1023     }
1024 
uniCopyStage( CompletableFuture<T> src)1025     private static <U, T extends U> CompletableFuture<U> uniCopyStage(
1026         CompletableFuture<T> src) {
1027         Object r;
1028         CompletableFuture<U> d = src.newIncompleteFuture();
1029         if ((r = src.result) != null)
1030             d.result = encodeRelay(r);
1031         else
1032             src.unipush(new UniRelay<U,T>(d, src));
1033         return d;
1034     }
1035 
uniAsMinimalStage()1036     private MinimalStage<T> uniAsMinimalStage() {
1037         Object r;
1038         if ((r = result) != null)
1039             return new MinimalStage<T>(encodeRelay(r));
1040         MinimalStage<T> d = new MinimalStage<T>();
1041         unipush(new UniRelay<T,T>(d, this));
1042         return d;
1043     }
1044 
1045     @SuppressWarnings("serial")
1046     static final class UniCompose<T,V> extends UniCompletion<T,V> {
1047         Function<? super T, ? extends CompletionStage<V>> fn;
UniCompose(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, Function<? super T, ? extends CompletionStage<V>> fn)1048         UniCompose(Executor executor, CompletableFuture<V> dep,
1049                    CompletableFuture<T> src,
1050                    Function<? super T, ? extends CompletionStage<V>> fn) {
1051             super(executor, dep, src); this.fn = fn;
1052         }
tryFire(int mode)1053         final CompletableFuture<V> tryFire(int mode) {
1054             CompletableFuture<V> d; CompletableFuture<T> a;
1055             Function<? super T, ? extends CompletionStage<V>> f;
1056             Object r; Throwable x;
1057             if ((d = dep) == null || (f = fn) == null
1058                 || (a = src) == null || (r = a.result) == null)
1059                 return null;
1060             tryComplete: if (d.result == null) {
1061                 if (r instanceof AltResult) {
1062                     if ((x = ((AltResult)r).ex) != null) {
1063                         d.completeThrowable(x, r);
1064                         break tryComplete;
1065                     }
1066                     r = null;
1067                 }
1068                 try {
1069                     if (mode <= 0 && !claim())
1070                         return null;
1071                     @SuppressWarnings("unchecked") T t = (T) r;
1072                     CompletableFuture<V> g = f.apply(t).toCompletableFuture();
1073                     if ((r = g.result) != null)
1074                         d.completeRelay(r);
1075                     else {
1076                         g.unipush(new UniRelay<V,V>(d, g));
1077                         if (d.result == null)
1078                             return null;
1079                     }
1080                 } catch (Throwable ex) {
1081                     d.completeThrowable(ex);
1082                 }
1083             }
1084             dep = null; src = null; fn = null;
1085             return d.postFire(a, mode);
1086         }
1087     }
1088 
uniComposeStage( Executor e, Function<? super T, ? extends CompletionStage<V>> f)1089     private <V> CompletableFuture<V> uniComposeStage(
1090         Executor e, Function<? super T, ? extends CompletionStage<V>> f) {
1091         if (f == null) throw new NullPointerException();
1092         CompletableFuture<V> d = newIncompleteFuture();
1093         Object r, s; Throwable x;
1094         if ((r = result) == null)
1095             unipush(new UniCompose<T,V>(e, d, this, f));
1096         else if (e == null) {
1097             if (r instanceof AltResult) {
1098                 if ((x = ((AltResult)r).ex) != null) {
1099                     d.result = encodeThrowable(x, r);
1100                     return d;
1101                 }
1102                 r = null;
1103             }
1104             try {
1105                 @SuppressWarnings("unchecked") T t = (T) r;
1106                 CompletableFuture<V> g = f.apply(t).toCompletableFuture();
1107                 if ((s = g.result) != null)
1108                     d.result = encodeRelay(s);
1109                 else {
1110                     g.unipush(new UniRelay<V,V>(d, g));
1111                 }
1112             } catch (Throwable ex) {
1113                 d.result = encodeThrowable(ex);
1114             }
1115         }
1116         else
1117             try {
1118                 e.execute(new UniCompose<T,V>(null, d, this, f));
1119             } catch (Throwable ex) {
1120                 d.result = encodeThrowable(ex);
1121             }
1122         return d;
1123     }
1124 
1125     /* ------------- Two-input Completions -------------- */
1126 
1127     /** A Completion for an action with two sources */
1128     @SuppressWarnings("serial")
1129     abstract static class BiCompletion<T,U,V> extends UniCompletion<T,V> {
1130         CompletableFuture<U> snd; // second source for action
BiCompletion(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, CompletableFuture<U> snd)1131         BiCompletion(Executor executor, CompletableFuture<V> dep,
1132                      CompletableFuture<T> src, CompletableFuture<U> snd) {
1133             super(executor, dep, src); this.snd = snd;
1134         }
1135     }
1136 
1137     /** A Completion delegating to a BiCompletion */
1138     @SuppressWarnings("serial")
1139     static final class CoCompletion extends Completion {
1140         BiCompletion<?,?,?> base;
CoCompletion(BiCompletion<?,?,?> base)1141         CoCompletion(BiCompletion<?,?,?> base) { this.base = base; }
tryFire(int mode)1142         final CompletableFuture<?> tryFire(int mode) {
1143             BiCompletion<?,?,?> c; CompletableFuture<?> d;
1144             if ((c = base) == null || (d = c.tryFire(mode)) == null)
1145                 return null;
1146             base = null; // detach
1147             return d;
1148         }
isLive()1149         final boolean isLive() {
1150             BiCompletion<?,?,?> c;
1151             return (c = base) != null
1152                 // && c.isLive()
1153                 && c.dep != null;
1154         }
1155     }
1156 
1157     /**
1158      * Pushes completion to this and b unless both done.
1159      * Caller should first check that either result or b.result is null.
1160      */
bipush(CompletableFuture<?> b, BiCompletion<?,?,?> c)1161     final void bipush(CompletableFuture<?> b, BiCompletion<?,?,?> c) {
1162         if (c != null) {
1163             while (result == null) {
1164                 if (tryPushStack(c)) {
1165                     if (b.result == null)
1166                         b.unipush(new CoCompletion(c));
1167                     else if (result != null)
1168                         c.tryFire(SYNC);
1169                     return;
1170                 }
1171             }
1172             b.unipush(c);
1173         }
1174     }
1175 
1176     /** Post-processing after successful BiCompletion tryFire. */
postFire(CompletableFuture<?> a, CompletableFuture<?> b, int mode)1177     final CompletableFuture<T> postFire(CompletableFuture<?> a,
1178                                         CompletableFuture<?> b, int mode) {
1179         if (b != null && b.stack != null) { // clean second source
1180             Object r;
1181             if ((r = b.result) == null)
1182                 b.cleanStack();
1183             if (mode >= 0 && (r != null || b.result != null))
1184                 b.postComplete();
1185         }
1186         return postFire(a, mode);
1187     }
1188 
1189     @SuppressWarnings("serial")
1190     static final class BiApply<T,U,V> extends BiCompletion<T,U,V> {
1191         BiFunction<? super T,? super U,? extends V> fn;
BiApply(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, CompletableFuture<U> snd, BiFunction<? super T,? super U,? extends V> fn)1192         BiApply(Executor executor, CompletableFuture<V> dep,
1193                 CompletableFuture<T> src, CompletableFuture<U> snd,
1194                 BiFunction<? super T,? super U,? extends V> fn) {
1195             super(executor, dep, src, snd); this.fn = fn;
1196         }
tryFire(int mode)1197         final CompletableFuture<V> tryFire(int mode) {
1198             CompletableFuture<V> d;
1199             CompletableFuture<T> a;
1200             CompletableFuture<U> b;
1201             Object r, s; BiFunction<? super T,? super U,? extends V> f;
1202             if ((d = dep) == null || (f = fn) == null
1203                 || (a = src) == null || (r = a.result) == null
1204                 || (b = snd) == null || (s = b.result) == null
1205                 || !d.biApply(r, s, f, mode > 0 ? null : this))
1206                 return null;
1207             dep = null; src = null; snd = null; fn = null;
1208             return d.postFire(a, b, mode);
1209         }
1210     }
1211 
biApply(Object r, Object s, BiFunction<? super R,? super S,? extends T> f, BiApply<R,S,T> c)1212     final <R,S> boolean biApply(Object r, Object s,
1213                                 BiFunction<? super R,? super S,? extends T> f,
1214                                 BiApply<R,S,T> c) {
1215         Throwable x;
1216         tryComplete: if (result == null) {
1217             if (r instanceof AltResult) {
1218                 if ((x = ((AltResult)r).ex) != null) {
1219                     completeThrowable(x, r);
1220                     break tryComplete;
1221                 }
1222                 r = null;
1223             }
1224             if (s instanceof AltResult) {
1225                 if ((x = ((AltResult)s).ex) != null) {
1226                     completeThrowable(x, s);
1227                     break tryComplete;
1228                 }
1229                 s = null;
1230             }
1231             try {
1232                 if (c != null && !c.claim())
1233                     return false;
1234                 @SuppressWarnings("unchecked") R rr = (R) r;
1235                 @SuppressWarnings("unchecked") S ss = (S) s;
1236                 completeValue(f.apply(rr, ss));
1237             } catch (Throwable ex) {
1238                 completeThrowable(ex);
1239             }
1240         }
1241         return true;
1242     }
1243 
biApplyStage( Executor e, CompletionStage<U> o, BiFunction<? super T,? super U,? extends V> f)1244     private <U,V> CompletableFuture<V> biApplyStage(
1245         Executor e, CompletionStage<U> o,
1246         BiFunction<? super T,? super U,? extends V> f) {
1247         CompletableFuture<U> b; Object r, s;
1248         if (f == null || (b = o.toCompletableFuture()) == null)
1249             throw new NullPointerException();
1250         CompletableFuture<V> d = newIncompleteFuture();
1251         if ((r = result) == null || (s = b.result) == null)
1252             bipush(b, new BiApply<T,U,V>(e, d, this, b, f));
1253         else if (e == null)
1254             d.biApply(r, s, f, null);
1255         else
1256             try {
1257                 e.execute(new BiApply<T,U,V>(null, d, this, b, f));
1258             } catch (Throwable ex) {
1259                 d.result = encodeThrowable(ex);
1260             }
1261         return d;
1262     }
1263 
1264     @SuppressWarnings("serial")
1265     static final class BiAccept<T,U> extends BiCompletion<T,U,Void> {
1266         BiConsumer<? super T,? super U> fn;
BiAccept(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd, BiConsumer<? super T,? super U> fn)1267         BiAccept(Executor executor, CompletableFuture<Void> dep,
1268                  CompletableFuture<T> src, CompletableFuture<U> snd,
1269                  BiConsumer<? super T,? super U> fn) {
1270             super(executor, dep, src, snd); this.fn = fn;
1271         }
tryFire(int mode)1272         final CompletableFuture<Void> tryFire(int mode) {
1273             CompletableFuture<Void> d;
1274             CompletableFuture<T> a;
1275             CompletableFuture<U> b;
1276             Object r, s; BiConsumer<? super T,? super U> f;
1277             if ((d = dep) == null || (f = fn) == null
1278                 || (a = src) == null || (r = a.result) == null
1279                 || (b = snd) == null || (s = b.result) == null
1280                 || !d.biAccept(r, s, f, mode > 0 ? null : this))
1281                 return null;
1282             dep = null; src = null; snd = null; fn = null;
1283             return d.postFire(a, b, mode);
1284         }
1285     }
1286 
biAccept(Object r, Object s, BiConsumer<? super R,? super S> f, BiAccept<R,S> c)1287     final <R,S> boolean biAccept(Object r, Object s,
1288                                  BiConsumer<? super R,? super S> f,
1289                                  BiAccept<R,S> c) {
1290         Throwable x;
1291         tryComplete: if (result == null) {
1292             if (r instanceof AltResult) {
1293                 if ((x = ((AltResult)r).ex) != null) {
1294                     completeThrowable(x, r);
1295                     break tryComplete;
1296                 }
1297                 r = null;
1298             }
1299             if (s instanceof AltResult) {
1300                 if ((x = ((AltResult)s).ex) != null) {
1301                     completeThrowable(x, s);
1302                     break tryComplete;
1303                 }
1304                 s = null;
1305             }
1306             try {
1307                 if (c != null && !c.claim())
1308                     return false;
1309                 @SuppressWarnings("unchecked") R rr = (R) r;
1310                 @SuppressWarnings("unchecked") S ss = (S) s;
1311                 f.accept(rr, ss);
1312                 completeNull();
1313             } catch (Throwable ex) {
1314                 completeThrowable(ex);
1315             }
1316         }
1317         return true;
1318     }
1319 
biAcceptStage( Executor e, CompletionStage<U> o, BiConsumer<? super T,? super U> f)1320     private <U> CompletableFuture<Void> biAcceptStage(
1321         Executor e, CompletionStage<U> o,
1322         BiConsumer<? super T,? super U> f) {
1323         CompletableFuture<U> b; Object r, s;
1324         if (f == null || (b = o.toCompletableFuture()) == null)
1325             throw new NullPointerException();
1326         CompletableFuture<Void> d = newIncompleteFuture();
1327         if ((r = result) == null || (s = b.result) == null)
1328             bipush(b, new BiAccept<T,U>(e, d, this, b, f));
1329         else if (e == null)
1330             d.biAccept(r, s, f, null);
1331         else
1332             try {
1333                 e.execute(new BiAccept<T,U>(null, d, this, b, f));
1334             } catch (Throwable ex) {
1335                 d.result = encodeThrowable(ex);
1336             }
1337         return d;
1338     }
1339 
1340     @SuppressWarnings("serial")
1341     static final class BiRun<T,U> extends BiCompletion<T,U,Void> {
1342         Runnable fn;
BiRun(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd, Runnable fn)1343         BiRun(Executor executor, CompletableFuture<Void> dep,
1344               CompletableFuture<T> src, CompletableFuture<U> snd,
1345               Runnable fn) {
1346             super(executor, dep, src, snd); this.fn = fn;
1347         }
tryFire(int mode)1348         final CompletableFuture<Void> tryFire(int mode) {
1349             CompletableFuture<Void> d;
1350             CompletableFuture<T> a;
1351             CompletableFuture<U> b;
1352             Object r, s; Runnable f;
1353             if ((d = dep) == null || (f = fn) == null
1354                 || (a = src) == null || (r = a.result) == null
1355                 || (b = snd) == null || (s = b.result) == null
1356                 || !d.biRun(r, s, f, mode > 0 ? null : this))
1357                 return null;
1358             dep = null; src = null; snd = null; fn = null;
1359             return d.postFire(a, b, mode);
1360         }
1361     }
1362 
biRun(Object r, Object s, Runnable f, BiRun<?,?> c)1363     final boolean biRun(Object r, Object s, Runnable f, BiRun<?,?> c) {
1364         Throwable x; Object z;
1365         if (result == null) {
1366             if ((r instanceof AltResult
1367                  && (x = ((AltResult)(z = r)).ex) != null) ||
1368                 (s instanceof AltResult
1369                  && (x = ((AltResult)(z = s)).ex) != null))
1370                 completeThrowable(x, z);
1371             else
1372                 try {
1373                     if (c != null && !c.claim())
1374                         return false;
1375                     f.run();
1376                     completeNull();
1377                 } catch (Throwable ex) {
1378                     completeThrowable(ex);
1379                 }
1380         }
1381         return true;
1382     }
1383 
biRunStage(Executor e, CompletionStage<?> o, Runnable f)1384     private CompletableFuture<Void> biRunStage(Executor e, CompletionStage<?> o,
1385                                                Runnable f) {
1386         CompletableFuture<?> b; Object r, s;
1387         if (f == null || (b = o.toCompletableFuture()) == null)
1388             throw new NullPointerException();
1389         CompletableFuture<Void> d = newIncompleteFuture();
1390         if ((r = result) == null || (s = b.result) == null)
1391             bipush(b, new BiRun<>(e, d, this, b, f));
1392         else if (e == null)
1393             d.biRun(r, s, f, null);
1394         else
1395             try {
1396                 e.execute(new BiRun<>(null, d, this, b, f));
1397             } catch (Throwable ex) {
1398                 d.result = encodeThrowable(ex);
1399             }
1400         return d;
1401     }
1402 
1403     @SuppressWarnings("serial")
1404     static final class BiRelay<T,U> extends BiCompletion<T,U,Void> { // for And
BiRelay(CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd)1405         BiRelay(CompletableFuture<Void> dep,
1406                 CompletableFuture<T> src, CompletableFuture<U> snd) {
1407             super(null, dep, src, snd);
1408         }
tryFire(int mode)1409         final CompletableFuture<Void> tryFire(int mode) {
1410             CompletableFuture<Void> d;
1411             CompletableFuture<T> a;
1412             CompletableFuture<U> b;
1413             Object r, s, z; Throwable x;
1414             if ((d = dep) == null
1415                 || (a = src) == null || (r = a.result) == null
1416                 || (b = snd) == null || (s = b.result) == null)
1417                 return null;
1418             if (d.result == null) {
1419                 if ((r instanceof AltResult
1420                      && (x = ((AltResult)(z = r)).ex) != null) ||
1421                     (s instanceof AltResult
1422                      && (x = ((AltResult)(z = s)).ex) != null))
1423                     d.completeThrowable(x, z);
1424                 else
1425                     d.completeNull();
1426             }
1427             src = null; snd = null; dep = null;
1428             return d.postFire(a, b, mode);
1429         }
1430     }
1431 
1432     /** Recursively constructs a tree of completions. */
andTree(CompletableFuture<?>[] cfs, int lo, int hi)1433     static CompletableFuture<Void> andTree(CompletableFuture<?>[] cfs,
1434                                            int lo, int hi) {
1435         CompletableFuture<Void> d = new CompletableFuture<Void>();
1436         if (lo > hi) // empty
1437             d.result = NIL;
1438         else {
1439             CompletableFuture<?> a, b; Object r, s, z; Throwable x;
1440             int mid = (lo + hi) >>> 1;
1441             if ((a = (lo == mid ? cfs[lo] :
1442                       andTree(cfs, lo, mid))) == null ||
1443                 (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :
1444                       andTree(cfs, mid+1, hi))) == null)
1445                 throw new NullPointerException();
1446             if ((r = a.result) == null || (s = b.result) == null)
1447                 a.bipush(b, new BiRelay<>(d, a, b));
1448             else if ((r instanceof AltResult
1449                       && (x = ((AltResult)(z = r)).ex) != null) ||
1450                      (s instanceof AltResult
1451                       && (x = ((AltResult)(z = s)).ex) != null))
1452                 d.result = encodeThrowable(x, z);
1453             else
1454                 d.result = NIL;
1455         }
1456         return d;
1457     }
1458 
1459     /* ------------- Projected (Ored) BiCompletions -------------- */
1460 
1461     /**
1462      * Pushes completion to this and b unless either done.
1463      * Caller should first check that result and b.result are both null.
1464      */
orpush(CompletableFuture<?> b, BiCompletion<?,?,?> c)1465     final void orpush(CompletableFuture<?> b, BiCompletion<?,?,?> c) {
1466         if (c != null) {
1467             while (!tryPushStack(c)) {
1468                 if (result != null) {
1469                     NEXT.set(c, null);
1470                     break;
1471                 }
1472             }
1473             if (result != null)
1474                 c.tryFire(SYNC);
1475             else
1476                 b.unipush(new CoCompletion(c));
1477         }
1478     }
1479 
1480     @SuppressWarnings("serial")
1481     static final class OrApply<T,U extends T,V> extends BiCompletion<T,U,V> {
1482         Function<? super T,? extends V> fn;
OrApply(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, CompletableFuture<U> snd, Function<? super T,? extends V> fn)1483         OrApply(Executor executor, CompletableFuture<V> dep,
1484                 CompletableFuture<T> src, CompletableFuture<U> snd,
1485                 Function<? super T,? extends V> fn) {
1486             super(executor, dep, src, snd); this.fn = fn;
1487         }
tryFire(int mode)1488         final CompletableFuture<V> tryFire(int mode) {
1489             CompletableFuture<V> d;
1490             CompletableFuture<T> a;
1491             CompletableFuture<U> b;
1492             Object r; Throwable x; Function<? super T,? extends V> f;
1493             if ((d = dep) == null || (f = fn) == null
1494                 || (a = src) == null || (b = snd) == null
1495                 || ((r = a.result) == null && (r = b.result) == null))
1496                 return null;
1497             tryComplete: if (d.result == null) {
1498                 try {
1499                     if (mode <= 0 && !claim())
1500                         return null;
1501                     if (r instanceof AltResult) {
1502                         if ((x = ((AltResult)r).ex) != null) {
1503                             d.completeThrowable(x, r);
1504                             break tryComplete;
1505                         }
1506                         r = null;
1507                     }
1508                     @SuppressWarnings("unchecked") T t = (T) r;
1509                     d.completeValue(f.apply(t));
1510                 } catch (Throwable ex) {
1511                     d.completeThrowable(ex);
1512                 }
1513             }
1514             dep = null; src = null; snd = null; fn = null;
1515             return d.postFire(a, b, mode);
1516         }
1517     }
1518 
orApplyStage( Executor e, CompletionStage<U> o, Function<? super T, ? extends V> f)1519     private <U extends T,V> CompletableFuture<V> orApplyStage(
1520         Executor e, CompletionStage<U> o, Function<? super T, ? extends V> f) {
1521         CompletableFuture<U> b;
1522         if (f == null || (b = o.toCompletableFuture()) == null)
1523             throw new NullPointerException();
1524 
1525         Object r; CompletableFuture<? extends T> z;
1526         if ((r = (z = this).result) != null ||
1527             (r = (z = b).result) != null)
1528             return z.uniApplyNow(r, e, f);
1529 
1530         CompletableFuture<V> d = newIncompleteFuture();
1531         orpush(b, new OrApply<T,U,V>(e, d, this, b, f));
1532         return d;
1533     }
1534 
1535     @SuppressWarnings("serial")
1536     static final class OrAccept<T,U extends T> extends BiCompletion<T,U,Void> {
1537         Consumer<? super T> fn;
OrAccept(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd, Consumer<? super T> fn)1538         OrAccept(Executor executor, CompletableFuture<Void> dep,
1539                  CompletableFuture<T> src, CompletableFuture<U> snd,
1540                  Consumer<? super T> fn) {
1541             super(executor, dep, src, snd); this.fn = fn;
1542         }
tryFire(int mode)1543         final CompletableFuture<Void> tryFire(int mode) {
1544             CompletableFuture<Void> d;
1545             CompletableFuture<T> a;
1546             CompletableFuture<U> b;
1547             Object r; Throwable x; Consumer<? super T> f;
1548             if ((d = dep) == null || (f = fn) == null
1549                 || (a = src) == null || (b = snd) == null
1550                 || ((r = a.result) == null && (r = b.result) == null))
1551                 return null;
1552             tryComplete: if (d.result == null) {
1553                 try {
1554                     if (mode <= 0 && !claim())
1555                         return null;
1556                     if (r instanceof AltResult) {
1557                         if ((x = ((AltResult)r).ex) != null) {
1558                             d.completeThrowable(x, r);
1559                             break tryComplete;
1560                         }
1561                         r = null;
1562                     }
1563                     @SuppressWarnings("unchecked") T t = (T) r;
1564                     f.accept(t);
1565                     d.completeNull();
1566                 } catch (Throwable ex) {
1567                     d.completeThrowable(ex);
1568                 }
1569             }
1570             dep = null; src = null; snd = null; fn = null;
1571             return d.postFire(a, b, mode);
1572         }
1573     }
1574 
orAcceptStage( Executor e, CompletionStage<U> o, Consumer<? super T> f)1575     private <U extends T> CompletableFuture<Void> orAcceptStage(
1576         Executor e, CompletionStage<U> o, Consumer<? super T> f) {
1577         CompletableFuture<U> b;
1578         if (f == null || (b = o.toCompletableFuture()) == null)
1579             throw new NullPointerException();
1580 
1581         Object r; CompletableFuture<? extends T> z;
1582         if ((r = (z = this).result) != null ||
1583             (r = (z = b).result) != null)
1584             return z.uniAcceptNow(r, e, f);
1585 
1586         CompletableFuture<Void> d = newIncompleteFuture();
1587         orpush(b, new OrAccept<T,U>(e, d, this, b, f));
1588         return d;
1589     }
1590 
1591     @SuppressWarnings("serial")
1592     static final class OrRun<T,U> extends BiCompletion<T,U,Void> {
1593         Runnable fn;
OrRun(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd, Runnable fn)1594         OrRun(Executor executor, CompletableFuture<Void> dep,
1595               CompletableFuture<T> src, CompletableFuture<U> snd,
1596               Runnable fn) {
1597             super(executor, dep, src, snd); this.fn = fn;
1598         }
tryFire(int mode)1599         final CompletableFuture<Void> tryFire(int mode) {
1600             CompletableFuture<Void> d;
1601             CompletableFuture<T> a;
1602             CompletableFuture<U> b;
1603             Object r; Throwable x; Runnable f;
1604             if ((d = dep) == null || (f = fn) == null
1605                 || (a = src) == null || (b = snd) == null
1606                 || ((r = a.result) == null && (r = b.result) == null))
1607                 return null;
1608             if (d.result == null) {
1609                 try {
1610                     if (mode <= 0 && !claim())
1611                         return null;
1612                     else if (r instanceof AltResult
1613                         && (x = ((AltResult)r).ex) != null)
1614                         d.completeThrowable(x, r);
1615                     else {
1616                         f.run();
1617                         d.completeNull();
1618                     }
1619                 } catch (Throwable ex) {
1620                     d.completeThrowable(ex);
1621                 }
1622             }
1623             dep = null; src = null; snd = null; fn = null;
1624             return d.postFire(a, b, mode);
1625         }
1626     }
1627 
orRunStage(Executor e, CompletionStage<?> o, Runnable f)1628     private CompletableFuture<Void> orRunStage(Executor e, CompletionStage<?> o,
1629                                                Runnable f) {
1630         CompletableFuture<?> b;
1631         if (f == null || (b = o.toCompletableFuture()) == null)
1632             throw new NullPointerException();
1633 
1634         Object r; CompletableFuture<?> z;
1635         if ((r = (z = this).result) != null ||
1636             (r = (z = b).result) != null)
1637             return z.uniRunNow(r, e, f);
1638 
1639         CompletableFuture<Void> d = newIncompleteFuture();
1640         orpush(b, new OrRun<>(e, d, this, b, f));
1641         return d;
1642     }
1643 
1644     /** Completion for an anyOf input future. */
1645     @SuppressWarnings("serial")
1646     static class AnyOf extends Completion {
1647         CompletableFuture<Object> dep; CompletableFuture<?> src;
1648         CompletableFuture<?>[] srcs;
AnyOf(CompletableFuture<Object> dep, CompletableFuture<?> src, CompletableFuture<?>[] srcs)1649         AnyOf(CompletableFuture<Object> dep, CompletableFuture<?> src,
1650               CompletableFuture<?>[] srcs) {
1651             this.dep = dep; this.src = src; this.srcs = srcs;
1652         }
tryFire(int mode)1653         final CompletableFuture<Object> tryFire(int mode) {
1654             // assert mode != ASYNC;
1655             CompletableFuture<Object> d; CompletableFuture<?> a;
1656             CompletableFuture<?>[] as;
1657             Object r;
1658             if ((d = dep) == null
1659                 || (a = src) == null || (r = a.result) == null
1660                 || (as = srcs) == null)
1661                 return null;
1662             dep = null; src = null; srcs = null;
1663             if (d.completeRelay(r)) {
1664                 for (CompletableFuture<?> b : as)
1665                     if (b != a)
1666                         b.cleanStack();
1667                 if (mode < 0)
1668                     return d;
1669                 else
1670                     d.postComplete();
1671             }
1672             return null;
1673         }
isLive()1674         final boolean isLive() {
1675             CompletableFuture<Object> d;
1676             return (d = dep) != null && d.result == null;
1677         }
1678     }
1679 
1680     /* ------------- Zero-input Async forms -------------- */
1681 
1682     @SuppressWarnings("serial")
1683     static final class AsyncSupply<T> extends ForkJoinTask<Void>
1684         implements Runnable, AsynchronousCompletionTask {
1685         CompletableFuture<T> dep; Supplier<? extends T> fn;
AsyncSupply(CompletableFuture<T> dep, Supplier<? extends T> fn)1686         AsyncSupply(CompletableFuture<T> dep, Supplier<? extends T> fn) {
1687             this.dep = dep; this.fn = fn;
1688         }
1689 
getRawResult()1690         public final Void getRawResult() { return null; }
setRawResult(Void v)1691         public final void setRawResult(Void v) {}
exec()1692         public final boolean exec() { run(); return false; }
1693 
run()1694         public void run() {
1695             CompletableFuture<T> d; Supplier<? extends T> f;
1696             if ((d = dep) != null && (f = fn) != null) {
1697                 dep = null; fn = null;
1698                 if (d.result == null) {
1699                     try {
1700                         d.completeValue(f.get());
1701                     } catch (Throwable ex) {
1702                         d.completeThrowable(ex);
1703                     }
1704                 }
1705                 d.postComplete();
1706             }
1707         }
1708     }
1709 
asyncSupplyStage(Executor e, Supplier<U> f)1710     static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
1711                                                      Supplier<U> f) {
1712         if (f == null) throw new NullPointerException();
1713         CompletableFuture<U> d = new CompletableFuture<U>();
1714         e.execute(new AsyncSupply<U>(d, f));
1715         return d;
1716     }
1717 
1718     @SuppressWarnings("serial")
1719     static final class AsyncRun extends ForkJoinTask<Void>
1720         implements Runnable, AsynchronousCompletionTask {
1721         CompletableFuture<Void> dep; Runnable fn;
AsyncRun(CompletableFuture<Void> dep, Runnable fn)1722         AsyncRun(CompletableFuture<Void> dep, Runnable fn) {
1723             this.dep = dep; this.fn = fn;
1724         }
1725 
getRawResult()1726         public final Void getRawResult() { return null; }
setRawResult(Void v)1727         public final void setRawResult(Void v) {}
exec()1728         public final boolean exec() { run(); return false; }
1729 
run()1730         public void run() {
1731             CompletableFuture<Void> d; Runnable f;
1732             if ((d = dep) != null && (f = fn) != null) {
1733                 dep = null; fn = null;
1734                 if (d.result == null) {
1735                     try {
1736                         f.run();
1737                         d.completeNull();
1738                     } catch (Throwable ex) {
1739                         d.completeThrowable(ex);
1740                     }
1741                 }
1742                 d.postComplete();
1743             }
1744         }
1745     }
1746 
asyncRunStage(Executor e, Runnable f)1747     static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {
1748         if (f == null) throw new NullPointerException();
1749         CompletableFuture<Void> d = new CompletableFuture<Void>();
1750         e.execute(new AsyncRun(d, f));
1751         return d;
1752     }
1753 
1754     /* ------------- Signallers -------------- */
1755 
1756     /**
1757      * Completion for recording and releasing a waiting thread.  This
1758      * class implements ManagedBlocker to avoid starvation when
1759      * blocking actions pile up in ForkJoinPools.
1760      */
1761     @SuppressWarnings("serial")
1762     static final class Signaller extends Completion
1763         implements ForkJoinPool.ManagedBlocker {
1764         long nanos;                    // remaining wait time if timed
1765         final long deadline;           // non-zero if timed
1766         final boolean interruptible;
1767         boolean interrupted;
1768         volatile Thread thread;
1769 
Signaller(boolean interruptible, long nanos, long deadline)1770         Signaller(boolean interruptible, long nanos, long deadline) {
1771             this.thread = Thread.currentThread();
1772             this.interruptible = interruptible;
1773             this.nanos = nanos;
1774             this.deadline = deadline;
1775         }
tryFire(int ignore)1776         final CompletableFuture<?> tryFire(int ignore) {
1777             Thread w; // no need to atomically claim
1778             if ((w = thread) != null) {
1779                 thread = null;
1780                 LockSupport.unpark(w);
1781             }
1782             return null;
1783         }
isReleasable()1784         public boolean isReleasable() {
1785             if (Thread.interrupted())
1786                 interrupted = true;
1787             return ((interrupted && interruptible) ||
1788                     (deadline != 0L &&
1789                      (nanos <= 0L ||
1790                       (nanos = deadline - System.nanoTime()) <= 0L)) ||
1791                     thread == null);
1792         }
block()1793         public boolean block() {
1794             while (!isReleasable()) {
1795                 if (deadline == 0L)
1796                     LockSupport.park(this);
1797                 else
1798                     LockSupport.parkNanos(this, nanos);
1799             }
1800             return true;
1801         }
isLive()1802         final boolean isLive() { return thread != null; }
1803     }
1804 
1805     /**
1806      * Returns raw result after waiting, or null if interruptible and
1807      * interrupted.
1808      */
waitingGet(boolean interruptible)1809     private Object waitingGet(boolean interruptible) {
1810         Signaller q = null;
1811         boolean queued = false;
1812         Object r;
1813         while ((r = result) == null) {
1814             if (q == null) {
1815                 q = new Signaller(interruptible, 0L, 0L);
1816                 if (Thread.currentThread() instanceof ForkJoinWorkerThread)
1817                     ForkJoinPool.helpAsyncBlocker(defaultExecutor(), q);
1818             }
1819             else if (!queued)
1820                 queued = tryPushStack(q);
1821             else {
1822                 try {
1823                     ForkJoinPool.managedBlock(q);
1824                 } catch (InterruptedException ie) { // currently cannot happen
1825                     q.interrupted = true;
1826                 }
1827                 if (q.interrupted && interruptible)
1828                     break;
1829             }
1830         }
1831         if (q != null && queued) {
1832             q.thread = null;
1833             if (!interruptible && q.interrupted)
1834                 Thread.currentThread().interrupt();
1835             if (r == null)
1836                 cleanStack();
1837         }
1838         if (r != null || (r = result) != null)
1839             postComplete();
1840         return r;
1841     }
1842 
1843     /**
1844      * Returns raw result after waiting, or null if interrupted, or
1845      * throws TimeoutException on timeout.
1846      */
timedGet(long nanos)1847     private Object timedGet(long nanos) throws TimeoutException {
1848         if (Thread.interrupted())
1849             return null;
1850         if (nanos > 0L) {
1851             long d = System.nanoTime() + nanos;
1852             long deadline = (d == 0L) ? 1L : d; // avoid 0
1853             Signaller q = null;
1854             boolean queued = false;
1855             Object r;
1856             while ((r = result) == null) { // similar to untimed
1857                 if (q == null) {
1858                     q = new Signaller(true, nanos, deadline);
1859                     if (Thread.currentThread() instanceof ForkJoinWorkerThread)
1860                         ForkJoinPool.helpAsyncBlocker(defaultExecutor(), q);
1861                 }
1862                 else if (!queued)
1863                     queued = tryPushStack(q);
1864                 else if (q.nanos <= 0L)
1865                     break;
1866                 else {
1867                     try {
1868                         ForkJoinPool.managedBlock(q);
1869                     } catch (InterruptedException ie) {
1870                         q.interrupted = true;
1871                     }
1872                     if (q.interrupted)
1873                         break;
1874                 }
1875             }
1876             if (q != null && queued) {
1877                 q.thread = null;
1878                 if (r == null)
1879                     cleanStack();
1880             }
1881             if (r != null || (r = result) != null)
1882                 postComplete();
1883             if (r != null || (q != null && q.interrupted))
1884                 return r;
1885         }
1886         throw new TimeoutException();
1887     }
1888 
1889     /* ------------- public methods -------------- */
1890 
1891     /**
1892      * Creates a new incomplete CompletableFuture.
1893      */
CompletableFuture()1894     public CompletableFuture() {
1895     }
1896 
1897     /**
1898      * Creates a new complete CompletableFuture with given encoded result.
1899      */
CompletableFuture(Object r)1900     CompletableFuture(Object r) {
1901         this.result = r;
1902     }
1903 
1904     /**
1905      * Returns a new CompletableFuture that is asynchronously completed
1906      * by a task running in the {@link ForkJoinPool#commonPool()} with
1907      * the value obtained by calling the given Supplier.
1908      *
1909      * @param supplier a function returning the value to be used
1910      * to complete the returned CompletableFuture
1911      * @param <U> the function's return type
1912      * @return the new CompletableFuture
1913      */
supplyAsync(Supplier<U> supplier)1914     public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
1915         return asyncSupplyStage(ASYNC_POOL, supplier);
1916     }
1917 
1918     /**
1919      * Returns a new CompletableFuture that is asynchronously completed
1920      * by a task running in the given executor with the value obtained
1921      * by calling the given Supplier.
1922      *
1923      * @param supplier a function returning the value to be used
1924      * to complete the returned CompletableFuture
1925      * @param executor the executor to use for asynchronous execution
1926      * @param <U> the function's return type
1927      * @return the new CompletableFuture
1928      */
supplyAsync(Supplier<U> supplier, Executor executor)1929     public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
1930                                                        Executor executor) {
1931         return asyncSupplyStage(screenExecutor(executor), supplier);
1932     }
1933 
1934     /**
1935      * Returns a new CompletableFuture that is asynchronously completed
1936      * by a task running in the {@link ForkJoinPool#commonPool()} after
1937      * it runs the given action.
1938      *
1939      * @param runnable the action to run before completing the
1940      * returned CompletableFuture
1941      * @return the new CompletableFuture
1942      */
runAsync(Runnable runnable)1943     public static CompletableFuture<Void> runAsync(Runnable runnable) {
1944         return asyncRunStage(ASYNC_POOL, runnable);
1945     }
1946 
1947     /**
1948      * Returns a new CompletableFuture that is asynchronously completed
1949      * by a task running in the given executor after it runs the given
1950      * action.
1951      *
1952      * @param runnable the action to run before completing the
1953      * returned CompletableFuture
1954      * @param executor the executor to use for asynchronous execution
1955      * @return the new CompletableFuture
1956      */
runAsync(Runnable runnable, Executor executor)1957     public static CompletableFuture<Void> runAsync(Runnable runnable,
1958                                                    Executor executor) {
1959         return asyncRunStage(screenExecutor(executor), runnable);
1960     }
1961 
1962     /**
1963      * Returns a new CompletableFuture that is already completed with
1964      * the given value.
1965      *
1966      * @param value the value
1967      * @param <U> the type of the value
1968      * @return the completed CompletableFuture
1969      */
completedFuture(U value)1970     public static <U> CompletableFuture<U> completedFuture(U value) {
1971         return new CompletableFuture<U>((value == null) ? NIL : value);
1972     }
1973 
1974     /**
1975      * Returns {@code true} if completed in any fashion: normally,
1976      * exceptionally, or via cancellation.
1977      *
1978      * @return {@code true} if completed
1979      */
isDone()1980     public boolean isDone() {
1981         return result != null;
1982     }
1983 
1984     /**
1985      * Waits if necessary for this future to complete, and then
1986      * returns its result.
1987      *
1988      * @return the result value
1989      * @throws CancellationException if this future was cancelled
1990      * @throws ExecutionException if this future completed exceptionally
1991      * @throws InterruptedException if the current thread was interrupted
1992      * while waiting
1993      */
1994     @SuppressWarnings("unchecked")
get()1995     public T get() throws InterruptedException, ExecutionException {
1996         Object r;
1997         if ((r = result) == null)
1998             r = waitingGet(true);
1999         return (T) reportGet(r);
2000     }
2001 
2002     /**
2003      * Waits if necessary for at most the given time for this future
2004      * to complete, and then returns its result, if available.
2005      *
2006      * @param timeout the maximum time to wait
2007      * @param unit the time unit of the timeout argument
2008      * @return the result value
2009      * @throws CancellationException if this future was cancelled
2010      * @throws ExecutionException if this future completed exceptionally
2011      * @throws InterruptedException if the current thread was interrupted
2012      * while waiting
2013      * @throws TimeoutException if the wait timed out
2014      */
2015     @SuppressWarnings("unchecked")
get(long timeout, TimeUnit unit)2016     public T get(long timeout, TimeUnit unit)
2017         throws InterruptedException, ExecutionException, TimeoutException {
2018         long nanos = unit.toNanos(timeout);
2019         Object r;
2020         if ((r = result) == null)
2021             r = timedGet(nanos);
2022         return (T) reportGet(r);
2023     }
2024 
2025     /**
2026      * Returns the result value when complete, or throws an
2027      * (unchecked) exception if completed exceptionally. To better
2028      * conform with the use of common functional forms, if a
2029      * computation involved in the completion of this
2030      * CompletableFuture threw an exception, this method throws an
2031      * (unchecked) {@link CompletionException} with the underlying
2032      * exception as its cause.
2033      *
2034      * @return the result value
2035      * @throws CancellationException if the computation was cancelled
2036      * @throws CompletionException if this future completed
2037      * exceptionally or a completion computation threw an exception
2038      */
2039     @SuppressWarnings("unchecked")
join()2040     public T join() {
2041         Object r;
2042         if ((r = result) == null)
2043             r = waitingGet(false);
2044         return (T) reportJoin(r);
2045     }
2046 
2047     /**
2048      * Returns the result value (or throws any encountered exception)
2049      * if completed, else returns the given valueIfAbsent.
2050      *
2051      * @param valueIfAbsent the value to return if not completed
2052      * @return the result value, if completed, else the given valueIfAbsent
2053      * @throws CancellationException if the computation was cancelled
2054      * @throws CompletionException if this future completed
2055      * exceptionally or a completion computation threw an exception
2056      */
2057     @SuppressWarnings("unchecked")
getNow(T valueIfAbsent)2058     public T getNow(T valueIfAbsent) {
2059         Object r;
2060         return ((r = result) == null) ? valueIfAbsent : (T) reportJoin(r);
2061     }
2062 
2063     /**
2064      * If not already completed, sets the value returned by {@link
2065      * #get()} and related methods to the given value.
2066      *
2067      * @param value the result value
2068      * @return {@code true} if this invocation caused this CompletableFuture
2069      * to transition to a completed state, else {@code false}
2070      */
complete(T value)2071     public boolean complete(T value) {
2072         boolean triggered = completeValue(value);
2073         postComplete();
2074         return triggered;
2075     }
2076 
2077     /**
2078      * If not already completed, causes invocations of {@link #get()}
2079      * and related methods to throw the given exception.
2080      *
2081      * @param ex the exception
2082      * @return {@code true} if this invocation caused this CompletableFuture
2083      * to transition to a completed state, else {@code false}
2084      */
completeExceptionally(Throwable ex)2085     public boolean completeExceptionally(Throwable ex) {
2086         if (ex == null) throw new NullPointerException();
2087         boolean triggered = internalComplete(new AltResult(ex));
2088         postComplete();
2089         return triggered;
2090     }
2091 
thenApply( Function<? super T,? extends U> fn)2092     public <U> CompletableFuture<U> thenApply(
2093         Function<? super T,? extends U> fn) {
2094         return uniApplyStage(null, fn);
2095     }
2096 
thenApplyAsync( Function<? super T,? extends U> fn)2097     public <U> CompletableFuture<U> thenApplyAsync(
2098         Function<? super T,? extends U> fn) {
2099         return uniApplyStage(defaultExecutor(), fn);
2100     }
2101 
thenApplyAsync( Function<? super T,? extends U> fn, Executor executor)2102     public <U> CompletableFuture<U> thenApplyAsync(
2103         Function<? super T,? extends U> fn, Executor executor) {
2104         return uniApplyStage(screenExecutor(executor), fn);
2105     }
2106 
thenAccept(Consumer<? super T> action)2107     public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
2108         return uniAcceptStage(null, action);
2109     }
2110 
thenAcceptAsync(Consumer<? super T> action)2111     public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
2112         return uniAcceptStage(defaultExecutor(), action);
2113     }
2114 
thenAcceptAsync(Consumer<? super T> action, Executor executor)2115     public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
2116                                                    Executor executor) {
2117         return uniAcceptStage(screenExecutor(executor), action);
2118     }
2119 
thenRun(Runnable action)2120     public CompletableFuture<Void> thenRun(Runnable action) {
2121         return uniRunStage(null, action);
2122     }
2123 
thenRunAsync(Runnable action)2124     public CompletableFuture<Void> thenRunAsync(Runnable action) {
2125         return uniRunStage(defaultExecutor(), action);
2126     }
2127 
thenRunAsync(Runnable action, Executor executor)2128     public CompletableFuture<Void> thenRunAsync(Runnable action,
2129                                                 Executor executor) {
2130         return uniRunStage(screenExecutor(executor), action);
2131     }
2132 
thenCombine( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)2133     public <U,V> CompletableFuture<V> thenCombine(
2134         CompletionStage<? extends U> other,
2135         BiFunction<? super T,? super U,? extends V> fn) {
2136         return biApplyStage(null, other, fn);
2137     }
2138 
thenCombineAsync( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)2139     public <U,V> CompletableFuture<V> thenCombineAsync(
2140         CompletionStage<? extends U> other,
2141         BiFunction<? super T,? super U,? extends V> fn) {
2142         return biApplyStage(defaultExecutor(), other, fn);
2143     }
2144 
thenCombineAsync( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)2145     public <U,V> CompletableFuture<V> thenCombineAsync(
2146         CompletionStage<? extends U> other,
2147         BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
2148         return biApplyStage(screenExecutor(executor), other, fn);
2149     }
2150 
thenAcceptBoth( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)2151     public <U> CompletableFuture<Void> thenAcceptBoth(
2152         CompletionStage<? extends U> other,
2153         BiConsumer<? super T, ? super U> action) {
2154         return biAcceptStage(null, other, action);
2155     }
2156 
thenAcceptBothAsync( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)2157     public <U> CompletableFuture<Void> thenAcceptBothAsync(
2158         CompletionStage<? extends U> other,
2159         BiConsumer<? super T, ? super U> action) {
2160         return biAcceptStage(defaultExecutor(), other, action);
2161     }
2162 
thenAcceptBothAsync( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)2163     public <U> CompletableFuture<Void> thenAcceptBothAsync(
2164         CompletionStage<? extends U> other,
2165         BiConsumer<? super T, ? super U> action, Executor executor) {
2166         return biAcceptStage(screenExecutor(executor), other, action);
2167     }
2168 
runAfterBoth(CompletionStage<?> other, Runnable action)2169     public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
2170                                                 Runnable action) {
2171         return biRunStage(null, other, action);
2172     }
2173 
runAfterBothAsync(CompletionStage<?> other, Runnable action)2174     public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
2175                                                      Runnable action) {
2176         return biRunStage(defaultExecutor(), other, action);
2177     }
2178 
runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor)2179     public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
2180                                                      Runnable action,
2181                                                      Executor executor) {
2182         return biRunStage(screenExecutor(executor), other, action);
2183     }
2184 
applyToEither( CompletionStage<? extends T> other, Function<? super T, U> fn)2185     public <U> CompletableFuture<U> applyToEither(
2186         CompletionStage<? extends T> other, Function<? super T, U> fn) {
2187         return orApplyStage(null, other, fn);
2188     }
2189 
applyToEitherAsync( CompletionStage<? extends T> other, Function<? super T, U> fn)2190     public <U> CompletableFuture<U> applyToEitherAsync(
2191         CompletionStage<? extends T> other, Function<? super T, U> fn) {
2192         return orApplyStage(defaultExecutor(), other, fn);
2193     }
2194 
applyToEitherAsync( CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor)2195     public <U> CompletableFuture<U> applyToEitherAsync(
2196         CompletionStage<? extends T> other, Function<? super T, U> fn,
2197         Executor executor) {
2198         return orApplyStage(screenExecutor(executor), other, fn);
2199     }
2200 
acceptEither( CompletionStage<? extends T> other, Consumer<? super T> action)2201     public CompletableFuture<Void> acceptEither(
2202         CompletionStage<? extends T> other, Consumer<? super T> action) {
2203         return orAcceptStage(null, other, action);
2204     }
2205 
acceptEitherAsync( CompletionStage<? extends T> other, Consumer<? super T> action)2206     public CompletableFuture<Void> acceptEitherAsync(
2207         CompletionStage<? extends T> other, Consumer<? super T> action) {
2208         return orAcceptStage(defaultExecutor(), other, action);
2209     }
2210 
acceptEitherAsync( CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)2211     public CompletableFuture<Void> acceptEitherAsync(
2212         CompletionStage<? extends T> other, Consumer<? super T> action,
2213         Executor executor) {
2214         return orAcceptStage(screenExecutor(executor), other, action);
2215     }
2216 
runAfterEither(CompletionStage<?> other, Runnable action)2217     public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
2218                                                   Runnable action) {
2219         return orRunStage(null, other, action);
2220     }
2221 
runAfterEitherAsync(CompletionStage<?> other, Runnable action)2222     public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
2223                                                        Runnable action) {
2224         return orRunStage(defaultExecutor(), other, action);
2225     }
2226 
runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor)2227     public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
2228                                                        Runnable action,
2229                                                        Executor executor) {
2230         return orRunStage(screenExecutor(executor), other, action);
2231     }
2232 
thenCompose( Function<? super T, ? extends CompletionStage<U>> fn)2233     public <U> CompletableFuture<U> thenCompose(
2234         Function<? super T, ? extends CompletionStage<U>> fn) {
2235         return uniComposeStage(null, fn);
2236     }
2237 
thenComposeAsync( Function<? super T, ? extends CompletionStage<U>> fn)2238     public <U> CompletableFuture<U> thenComposeAsync(
2239         Function<? super T, ? extends CompletionStage<U>> fn) {
2240         return uniComposeStage(defaultExecutor(), fn);
2241     }
2242 
thenComposeAsync( Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)2243     public <U> CompletableFuture<U> thenComposeAsync(
2244         Function<? super T, ? extends CompletionStage<U>> fn,
2245         Executor executor) {
2246         return uniComposeStage(screenExecutor(executor), fn);
2247     }
2248 
whenComplete( BiConsumer<? super T, ? super Throwable> action)2249     public CompletableFuture<T> whenComplete(
2250         BiConsumer<? super T, ? super Throwable> action) {
2251         return uniWhenCompleteStage(null, action);
2252     }
2253 
whenCompleteAsync( BiConsumer<? super T, ? super Throwable> action)2254     public CompletableFuture<T> whenCompleteAsync(
2255         BiConsumer<? super T, ? super Throwable> action) {
2256         return uniWhenCompleteStage(defaultExecutor(), action);
2257     }
2258 
whenCompleteAsync( BiConsumer<? super T, ? super Throwable> action, Executor executor)2259     public CompletableFuture<T> whenCompleteAsync(
2260         BiConsumer<? super T, ? super Throwable> action, Executor executor) {
2261         return uniWhenCompleteStage(screenExecutor(executor), action);
2262     }
2263 
handle( BiFunction<? super T, Throwable, ? extends U> fn)2264     public <U> CompletableFuture<U> handle(
2265         BiFunction<? super T, Throwable, ? extends U> fn) {
2266         return uniHandleStage(null, fn);
2267     }
2268 
handleAsync( BiFunction<? super T, Throwable, ? extends U> fn)2269     public <U> CompletableFuture<U> handleAsync(
2270         BiFunction<? super T, Throwable, ? extends U> fn) {
2271         return uniHandleStage(defaultExecutor(), fn);
2272     }
2273 
handleAsync( BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)2274     public <U> CompletableFuture<U> handleAsync(
2275         BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
2276         return uniHandleStage(screenExecutor(executor), fn);
2277     }
2278 
2279     /**
2280      * Returns this CompletableFuture.
2281      *
2282      * @return this CompletableFuture
2283      */
toCompletableFuture()2284     public CompletableFuture<T> toCompletableFuture() {
2285         return this;
2286     }
2287 
2288     // not in interface CompletionStage
2289 
2290     /**
2291      * Returns a new CompletableFuture that is completed when this
2292      * CompletableFuture completes, with the result of the given
2293      * function of the exception triggering this CompletableFuture's
2294      * completion when it completes exceptionally; otherwise, if this
2295      * CompletableFuture completes normally, then the returned
2296      * CompletableFuture also completes normally with the same value.
2297      * Note: More flexible versions of this functionality are
2298      * available using methods {@code whenComplete} and {@code handle}.
2299      *
2300      * @param fn the function to use to compute the value of the
2301      * returned CompletableFuture if this CompletableFuture completed
2302      * exceptionally
2303      * @return the new CompletableFuture
2304      */
exceptionally( Function<Throwable, ? extends T> fn)2305     public CompletableFuture<T> exceptionally(
2306         Function<Throwable, ? extends T> fn) {
2307         return uniExceptionallyStage(fn);
2308     }
2309 
2310 
2311     /* ------------- Arbitrary-arity constructions -------------- */
2312 
2313     /**
2314      * Returns a new CompletableFuture that is completed when all of
2315      * the given CompletableFutures complete.  If any of the given
2316      * CompletableFutures complete exceptionally, then the returned
2317      * CompletableFuture also does so, with a CompletionException
2318      * holding this exception as its cause.  Otherwise, the results,
2319      * if any, of the given CompletableFutures are not reflected in
2320      * the returned CompletableFuture, but may be obtained by
2321      * inspecting them individually. If no CompletableFutures are
2322      * provided, returns a CompletableFuture completed with the value
2323      * {@code null}.
2324      *
2325      * <p>Among the applications of this method is to await completion
2326      * of a set of independent CompletableFutures before continuing a
2327      * program, as in: {@code CompletableFuture.allOf(c1, c2,
2328      * c3).join();}.
2329      *
2330      * @param cfs the CompletableFutures
2331      * @return a new CompletableFuture that is completed when all of the
2332      * given CompletableFutures complete
2333      * @throws NullPointerException if the array or any of its elements are
2334      * {@code null}
2335      */
allOf(CompletableFuture<?>.... cfs)2336     public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2337         return andTree(cfs, 0, cfs.length - 1);
2338     }
2339 
2340     /**
2341      * Returns a new CompletableFuture that is completed when any of
2342      * the given CompletableFutures complete, with the same result.
2343      * Otherwise, if it completed exceptionally, the returned
2344      * CompletableFuture also does so, with a CompletionException
2345      * holding this exception as its cause.  If no CompletableFutures
2346      * are provided, returns an incomplete CompletableFuture.
2347      *
2348      * @param cfs the CompletableFutures
2349      * @return a new CompletableFuture that is completed with the
2350      * result or exception of any of the given CompletableFutures when
2351      * one completes
2352      * @throws NullPointerException if the array or any of its elements are
2353      * {@code null}
2354      */
anyOf(CompletableFuture<?>.... cfs)2355     public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
2356         int n; Object r;
2357         if ((n = cfs.length) <= 1)
2358             return (n == 0)
2359                 ? new CompletableFuture<Object>()
2360                 : uniCopyStage(cfs[0]);
2361         for (CompletableFuture<?> cf : cfs)
2362             if ((r = cf.result) != null)
2363                 return new CompletableFuture<Object>(encodeRelay(r));
2364         cfs = cfs.clone();
2365         CompletableFuture<Object> d = new CompletableFuture<>();
2366         for (CompletableFuture<?> cf : cfs)
2367             cf.unipush(new AnyOf(d, cf, cfs));
2368         // If d was completed while we were adding completions, we should
2369         // clean the stack of any sources that may have had completions
2370         // pushed on their stack after d was completed.
2371         if (d.result != null)
2372             for (int i = 0, len = cfs.length; i < len; i++)
2373                 if (cfs[i].result != null)
2374                     for (i++; i < len; i++)
2375                         if (cfs[i].result == null)
2376                             cfs[i].cleanStack();
2377         return d;
2378     }
2379 
2380     /* ------------- Control and status methods -------------- */
2381 
2382     /**
2383      * If not already completed, completes this CompletableFuture with
2384      * a {@link CancellationException}. Dependent CompletableFutures
2385      * that have not already completed will also complete
2386      * exceptionally, with a {@link CompletionException} caused by
2387      * this {@code CancellationException}.
2388      *
2389      * @param mayInterruptIfRunning this value has no effect in this
2390      * implementation because interrupts are not used to control
2391      * processing.
2392      *
2393      * @return {@code true} if this task is now cancelled
2394      */
cancel(boolean mayInterruptIfRunning)2395     public boolean cancel(boolean mayInterruptIfRunning) {
2396         boolean cancelled = (result == null) &&
2397             internalComplete(new AltResult(new CancellationException()));
2398         postComplete();
2399         return cancelled || isCancelled();
2400     }
2401 
2402     /**
2403      * Returns {@code true} if this CompletableFuture was cancelled
2404      * before it completed normally.
2405      *
2406      * @return {@code true} if this CompletableFuture was cancelled
2407      * before it completed normally
2408      */
isCancelled()2409     public boolean isCancelled() {
2410         Object r;
2411         return ((r = result) instanceof AltResult) &&
2412             (((AltResult)r).ex instanceof CancellationException);
2413     }
2414 
2415     /**
2416      * Returns {@code true} if this CompletableFuture completed
2417      * exceptionally, in any way. Possible causes include
2418      * cancellation, explicit invocation of {@code
2419      * completeExceptionally}, and abrupt termination of a
2420      * CompletionStage action.
2421      *
2422      * @return {@code true} if this CompletableFuture completed
2423      * exceptionally
2424      */
isCompletedExceptionally()2425     public boolean isCompletedExceptionally() {
2426         Object r;
2427         return ((r = result) instanceof AltResult) && r != NIL;
2428     }
2429 
2430     /**
2431      * Forcibly sets or resets the value subsequently returned by
2432      * method {@link #get()} and related methods, whether or not
2433      * already completed. This method is designed for use only in
2434      * error recovery actions, and even in such situations may result
2435      * in ongoing dependent completions using established versus
2436      * overwritten outcomes.
2437      *
2438      * @param value the completion value
2439      */
obtrudeValue(T value)2440     public void obtrudeValue(T value) {
2441         result = (value == null) ? NIL : value;
2442         postComplete();
2443     }
2444 
2445     /**
2446      * Forcibly causes subsequent invocations of method {@link #get()}
2447      * and related methods to throw the given exception, whether or
2448      * not already completed. This method is designed for use only in
2449      * error recovery actions, and even in such situations may result
2450      * in ongoing dependent completions using established versus
2451      * overwritten outcomes.
2452      *
2453      * @param ex the exception
2454      * @throws NullPointerException if the exception is null
2455      */
obtrudeException(Throwable ex)2456     public void obtrudeException(Throwable ex) {
2457         if (ex == null) throw new NullPointerException();
2458         result = new AltResult(ex);
2459         postComplete();
2460     }
2461 
2462     /**
2463      * Returns the estimated number of CompletableFutures whose
2464      * completions are awaiting completion of this CompletableFuture.
2465      * This method is designed for use in monitoring system state, not
2466      * for synchronization control.
2467      *
2468      * @return the number of dependent CompletableFutures
2469      */
getNumberOfDependents()2470     public int getNumberOfDependents() {
2471         int count = 0;
2472         for (Completion p = stack; p != null; p = p.next)
2473             ++count;
2474         return count;
2475     }
2476 
2477     /**
2478      * Returns a string identifying this CompletableFuture, as well as
2479      * its completion state.  The state, in brackets, contains the
2480      * String {@code "Completed Normally"} or the String {@code
2481      * "Completed Exceptionally"}, or the String {@code "Not
2482      * completed"} followed by the number of CompletableFutures
2483      * dependent upon its completion, if any.
2484      *
2485      * @return a string identifying this CompletableFuture, as well as its state
2486      */
toString()2487     public String toString() {
2488         Object r = result;
2489         int count = 0; // avoid call to getNumberOfDependents in case disabled
2490         for (Completion p = stack; p != null; p = p.next)
2491             ++count;
2492         return super.toString() +
2493             ((r == null)
2494              ? ((count == 0)
2495                 ? "[Not completed]"
2496                 : "[Not completed, " + count + " dependents]")
2497              : (((r instanceof AltResult) && ((AltResult)r).ex != null)
2498                 ? "[Completed exceptionally: " + ((AltResult)r).ex + "]"
2499                 : "[Completed normally]"));
2500     }
2501 
2502     // jdk9 additions
2503 
2504     /**
2505      * Returns a new incomplete CompletableFuture of the type to be
2506      * returned by a CompletionStage method. Subclasses should
2507      * normally override this method to return an instance of the same
2508      * class as this CompletableFuture. The default implementation
2509      * returns an instance of class CompletableFuture.
2510      *
2511      * @param <U> the type of the value
2512      * @return a new CompletableFuture
2513      * @since 9
2514      */
newIncompleteFuture()2515     public <U> CompletableFuture<U> newIncompleteFuture() {
2516         return new CompletableFuture<U>();
2517     }
2518 
2519     /**
2520      * Returns the default Executor used for async methods that do not
2521      * specify an Executor. This class uses the {@link
2522      * ForkJoinPool#commonPool()} if it supports more than one
2523      * parallel thread, or else an Executor using one thread per async
2524      * task.  This method may be overridden in subclasses to return
2525      * an Executor that provides at least one independent thread.
2526      *
2527      * @return the executor
2528      * @since 9
2529      */
defaultExecutor()2530     public Executor defaultExecutor() {
2531         return ASYNC_POOL;
2532     }
2533 
2534     /**
2535      * Returns a new CompletableFuture that is completed normally with
2536      * the same value as this CompletableFuture when it completes
2537      * normally. If this CompletableFuture completes exceptionally,
2538      * then the returned CompletableFuture completes exceptionally
2539      * with a CompletionException with this exception as cause. The
2540      * behavior is equivalent to {@code thenApply(x -> x)}. This
2541      * method may be useful as a form of "defensive copying", to
2542      * prevent clients from completing, while still being able to
2543      * arrange dependent actions.
2544      *
2545      * @return the new CompletableFuture
2546      * @since 9
2547      */
copy()2548     public CompletableFuture<T> copy() {
2549         return uniCopyStage(this);
2550     }
2551 
2552     /**
2553      * Returns a new CompletionStage that is completed normally with
2554      * the same value as this CompletableFuture when it completes
2555      * normally, and cannot be independently completed or otherwise
2556      * used in ways not defined by the methods of interface {@link
2557      * CompletionStage}.  If this CompletableFuture completes
2558      * exceptionally, then the returned CompletionStage completes
2559      * exceptionally with a CompletionException with this exception as
2560      * cause.
2561      *
2562      * <p>Unless overridden by a subclass, a new non-minimal
2563      * CompletableFuture with all methods available can be obtained from
2564      * a minimal CompletionStage via {@link #toCompletableFuture()}.
2565      * For example, completion of a minimal stage can be awaited by
2566      *
2567      * <pre> {@code minimalStage.toCompletableFuture().join(); }</pre>
2568      *
2569      * @return the new CompletionStage
2570      * @since 9
2571      */
minimalCompletionStage()2572     public CompletionStage<T> minimalCompletionStage() {
2573         return uniAsMinimalStage();
2574     }
2575 
2576     /**
2577      * Completes this CompletableFuture with the result of
2578      * the given Supplier function invoked from an asynchronous
2579      * task using the given executor.
2580      *
2581      * @param supplier a function returning the value to be used
2582      * to complete this CompletableFuture
2583      * @param executor the executor to use for asynchronous execution
2584      * @return this CompletableFuture
2585      * @since 9
2586      */
completeAsync(Supplier<? extends T> supplier, Executor executor)2587     public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier,
2588                                               Executor executor) {
2589         if (supplier == null || executor == null)
2590             throw new NullPointerException();
2591         executor.execute(new AsyncSupply<T>(this, supplier));
2592         return this;
2593     }
2594 
2595     /**
2596      * Completes this CompletableFuture with the result of the given
2597      * Supplier function invoked from an asynchronous task using the
2598      * default executor.
2599      *
2600      * @param supplier a function returning the value to be used
2601      * to complete this CompletableFuture
2602      * @return this CompletableFuture
2603      * @since 9
2604      */
completeAsync(Supplier<? extends T> supplier)2605     public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier) {
2606         return completeAsync(supplier, defaultExecutor());
2607     }
2608 
2609     /**
2610      * Exceptionally completes this CompletableFuture with
2611      * a {@link TimeoutException} if not otherwise completed
2612      * before the given timeout.
2613      *
2614      * @param timeout how long to wait before completing exceptionally
2615      *        with a TimeoutException, in units of {@code unit}
2616      * @param unit a {@code TimeUnit} determining how to interpret the
2617      *        {@code timeout} parameter
2618      * @return this CompletableFuture
2619      * @since 9
2620      */
orTimeout(long timeout, TimeUnit unit)2621     public CompletableFuture<T> orTimeout(long timeout, TimeUnit unit) {
2622         if (unit == null)
2623             throw new NullPointerException();
2624         if (result == null)
2625             whenComplete(new Canceller(Delayer.delay(new Timeout(this),
2626                                                      timeout, unit)));
2627         return this;
2628     }
2629 
2630     /**
2631      * Completes this CompletableFuture with the given value if not
2632      * otherwise completed before the given timeout.
2633      *
2634      * @param value the value to use upon timeout
2635      * @param timeout how long to wait before completing normally
2636      *        with the given value, in units of {@code unit}
2637      * @param unit a {@code TimeUnit} determining how to interpret the
2638      *        {@code timeout} parameter
2639      * @return this CompletableFuture
2640      * @since 9
2641      */
completeOnTimeout(T value, long timeout, TimeUnit unit)2642     public CompletableFuture<T> completeOnTimeout(T value, long timeout,
2643                                                   TimeUnit unit) {
2644         if (unit == null)
2645             throw new NullPointerException();
2646         if (result == null)
2647             whenComplete(new Canceller(Delayer.delay(
2648                                            new DelayedCompleter<T>(this, value),
2649                                            timeout, unit)));
2650         return this;
2651     }
2652 
2653     /**
2654      * Returns a new Executor that submits a task to the given base
2655      * executor after the given delay (or no delay if non-positive).
2656      * Each delay commences upon invocation of the returned executor's
2657      * {@code execute} method.
2658      *
2659      * @param delay how long to delay, in units of {@code unit}
2660      * @param unit a {@code TimeUnit} determining how to interpret the
2661      *        {@code delay} parameter
2662      * @param executor the base executor
2663      * @return the new delayed executor
2664      * @since 9
2665      */
delayedExecutor(long delay, TimeUnit unit, Executor executor)2666     public static Executor delayedExecutor(long delay, TimeUnit unit,
2667                                            Executor executor) {
2668         if (unit == null || executor == null)
2669             throw new NullPointerException();
2670         return new DelayedExecutor(delay, unit, executor);
2671     }
2672 
2673     /**
2674      * Returns a new Executor that submits a task to the default
2675      * executor after the given delay (or no delay if non-positive).
2676      * Each delay commences upon invocation of the returned executor's
2677      * {@code execute} method.
2678      *
2679      * @param delay how long to delay, in units of {@code unit}
2680      * @param unit a {@code TimeUnit} determining how to interpret the
2681      *        {@code delay} parameter
2682      * @return the new delayed executor
2683      * @since 9
2684      */
delayedExecutor(long delay, TimeUnit unit)2685     public static Executor delayedExecutor(long delay, TimeUnit unit) {
2686         if (unit == null)
2687             throw new NullPointerException();
2688         return new DelayedExecutor(delay, unit, ASYNC_POOL);
2689     }
2690 
2691     /**
2692      * Returns a new CompletionStage that is already completed with
2693      * the given value and supports only those methods in
2694      * interface {@link CompletionStage}.
2695      *
2696      * @param value the value
2697      * @param <U> the type of the value
2698      * @return the completed CompletionStage
2699      * @since 9
2700      */
completedStage(U value)2701     public static <U> CompletionStage<U> completedStage(U value) {
2702         return new MinimalStage<U>((value == null) ? NIL : value);
2703     }
2704 
2705     /**
2706      * Returns a new CompletableFuture that is already completed
2707      * exceptionally with the given exception.
2708      *
2709      * @param ex the exception
2710      * @param <U> the type of the value
2711      * @return the exceptionally completed CompletableFuture
2712      * @since 9
2713      */
failedFuture(Throwable ex)2714     public static <U> CompletableFuture<U> failedFuture(Throwable ex) {
2715         if (ex == null) throw new NullPointerException();
2716         return new CompletableFuture<U>(new AltResult(ex));
2717     }
2718 
2719     /**
2720      * Returns a new CompletionStage that is already completed
2721      * exceptionally with the given exception and supports only those
2722      * methods in interface {@link CompletionStage}.
2723      *
2724      * @param ex the exception
2725      * @param <U> the type of the value
2726      * @return the exceptionally completed CompletionStage
2727      * @since 9
2728      */
failedStage(Throwable ex)2729     public static <U> CompletionStage<U> failedStage(Throwable ex) {
2730         if (ex == null) throw new NullPointerException();
2731         return new MinimalStage<U>(new AltResult(ex));
2732     }
2733 
2734     /**
2735      * Singleton delay scheduler, used only for starting and
2736      * cancelling tasks.
2737      */
2738     static final class Delayer {
delay(Runnable command, long delay, TimeUnit unit)2739         static ScheduledFuture<?> delay(Runnable command, long delay,
2740                                         TimeUnit unit) {
2741             return delayer.schedule(command, delay, unit);
2742         }
2743 
2744         static final class DaemonThreadFactory implements ThreadFactory {
newThread(Runnable r)2745             public Thread newThread(Runnable r) {
2746                 Thread t = new Thread(r);
2747                 t.setDaemon(true);
2748                 t.setName("CompletableFutureDelayScheduler");
2749                 return t;
2750             }
2751         }
2752 
2753         static final ScheduledThreadPoolExecutor delayer;
2754         static {
2755             (delayer = new ScheduledThreadPoolExecutor(
2756                 1, new DaemonThreadFactory())).
2757                 setRemoveOnCancelPolicy(true);
2758         }
2759     }
2760 
2761     // Little class-ified lambdas to better support monitoring
2762 
2763     static final class DelayedExecutor implements Executor {
2764         final long delay;
2765         final TimeUnit unit;
2766         final Executor executor;
DelayedExecutor(long delay, TimeUnit unit, Executor executor)2767         DelayedExecutor(long delay, TimeUnit unit, Executor executor) {
2768             this.delay = delay; this.unit = unit; this.executor = executor;
2769         }
execute(Runnable r)2770         public void execute(Runnable r) {
2771             Delayer.delay(new TaskSubmitter(executor, r), delay, unit);
2772         }
2773     }
2774 
2775     /** Action to submit user task */
2776     static final class TaskSubmitter implements Runnable {
2777         final Executor executor;
2778         final Runnable action;
TaskSubmitter(Executor executor, Runnable action)2779         TaskSubmitter(Executor executor, Runnable action) {
2780             this.executor = executor;
2781             this.action = action;
2782         }
run()2783         public void run() { executor.execute(action); }
2784     }
2785 
2786     /** Action to completeExceptionally on timeout */
2787     static final class Timeout implements Runnable {
2788         final CompletableFuture<?> f;
Timeout(CompletableFuture<?> f)2789         Timeout(CompletableFuture<?> f) { this.f = f; }
run()2790         public void run() {
2791             if (f != null && !f.isDone())
2792                 f.completeExceptionally(new TimeoutException());
2793         }
2794     }
2795 
2796     /** Action to complete on timeout */
2797     static final class DelayedCompleter<U> implements Runnable {
2798         final CompletableFuture<U> f;
2799         final U u;
DelayedCompleter(CompletableFuture<U> f, U u)2800         DelayedCompleter(CompletableFuture<U> f, U u) { this.f = f; this.u = u; }
run()2801         public void run() {
2802             if (f != null)
2803                 f.complete(u);
2804         }
2805     }
2806 
2807     /** Action to cancel unneeded timeouts */
2808     static final class Canceller implements BiConsumer<Object, Throwable> {
2809         final Future<?> f;
Canceller(Future<?> f)2810         Canceller(Future<?> f) { this.f = f; }
accept(Object ignore, Throwable ex)2811         public void accept(Object ignore, Throwable ex) {
2812             if (ex == null && f != null && !f.isDone())
2813                 f.cancel(false);
2814         }
2815     }
2816 
2817     /**
2818      * A subclass that just throws UOE for most non-CompletionStage methods.
2819      */
2820     static final class MinimalStage<T> extends CompletableFuture<T> {
MinimalStage()2821         MinimalStage() { }
MinimalStage(Object r)2822         MinimalStage(Object r) { super(r); }
newIncompleteFuture()2823         @Override public <U> CompletableFuture<U> newIncompleteFuture() {
2824             return new MinimalStage<U>(); }
get()2825         @Override public T get() {
2826             throw new UnsupportedOperationException(); }
get(long timeout, TimeUnit unit)2827         @Override public T get(long timeout, TimeUnit unit) {
2828             throw new UnsupportedOperationException(); }
getNow(T valueIfAbsent)2829         @Override public T getNow(T valueIfAbsent) {
2830             throw new UnsupportedOperationException(); }
join()2831         @Override public T join() {
2832             throw new UnsupportedOperationException(); }
complete(T value)2833         @Override public boolean complete(T value) {
2834             throw new UnsupportedOperationException(); }
completeExceptionally(Throwable ex)2835         @Override public boolean completeExceptionally(Throwable ex) {
2836             throw new UnsupportedOperationException(); }
cancel(boolean mayInterruptIfRunning)2837         @Override public boolean cancel(boolean mayInterruptIfRunning) {
2838             throw new UnsupportedOperationException(); }
obtrudeValue(T value)2839         @Override public void obtrudeValue(T value) {
2840             throw new UnsupportedOperationException(); }
obtrudeException(Throwable ex)2841         @Override public void obtrudeException(Throwable ex) {
2842             throw new UnsupportedOperationException(); }
isDone()2843         @Override public boolean isDone() {
2844             throw new UnsupportedOperationException(); }
isCancelled()2845         @Override public boolean isCancelled() {
2846             throw new UnsupportedOperationException(); }
isCompletedExceptionally()2847         @Override public boolean isCompletedExceptionally() {
2848             throw new UnsupportedOperationException(); }
getNumberOfDependents()2849         @Override public int getNumberOfDependents() {
2850             throw new UnsupportedOperationException(); }
completeAsync(Supplier<? extends T> supplier, Executor executor)2851         @Override public CompletableFuture<T> completeAsync
2852             (Supplier<? extends T> supplier, Executor executor) {
2853             throw new UnsupportedOperationException(); }
completeAsync(Supplier<? extends T> supplier)2854         @Override public CompletableFuture<T> completeAsync
2855             (Supplier<? extends T> supplier) {
2856             throw new UnsupportedOperationException(); }
orTimeout(long timeout, TimeUnit unit)2857         @Override public CompletableFuture<T> orTimeout
2858             (long timeout, TimeUnit unit) {
2859             throw new UnsupportedOperationException(); }
completeOnTimeout(T value, long timeout, TimeUnit unit)2860         @Override public CompletableFuture<T> completeOnTimeout
2861             (T value, long timeout, TimeUnit unit) {
2862             throw new UnsupportedOperationException(); }
toCompletableFuture()2863         @Override public CompletableFuture<T> toCompletableFuture() {
2864             Object r;
2865             if ((r = result) != null)
2866                 return new CompletableFuture<T>(encodeRelay(r));
2867             else {
2868                 CompletableFuture<T> d = new CompletableFuture<>();
2869                 unipush(new UniRelay<T,T>(d, this));
2870                 return d;
2871             }
2872         }
2873     }
2874 
2875     // VarHandle mechanics
2876     private static final VarHandle RESULT;
2877     private static final VarHandle STACK;
2878     private static final VarHandle NEXT;
2879     static {
2880         try {
2881             MethodHandles.Lookup l = MethodHandles.lookup();
2882             RESULT = l.findVarHandle(CompletableFuture.class, "result", Object.class);
2883             STACK = l.findVarHandle(CompletableFuture.class, "stack", Completion.class);
2884             NEXT = l.findVarHandle(Completion.class, "next", Completion.class);
2885         } catch (ReflectiveOperationException e) {
2886             throw new ExceptionInInitializerError(e);
2887         }
2888 
2889         // Reduce the risk of rare disastrous classloading in first call to
2890         // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
2891         Class<?> ensureLoaded = LockSupport.class;
2892     }
2893 }
2894