1 /*
2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3  *
4  * This code is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 only, as
6  * published by the Free Software Foundation.  Oracle designates this
7  * particular file as subject to the "Classpath" exception as provided
8  * by Oracle in the LICENSE file that accompanied this code.
9  *
10  * This code is distributed in the hope that it will be useful, but WITHOUT
11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * version 2 for more details (a copy is included in the LICENSE file that
14  * accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License version
17  * 2 along with this work; if not, write to the Free Software Foundation,
18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19  *
20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21  * or visit www.oracle.com if you need additional information or have any
22  * questions.
23  */
24 
25 /*
26  * This file is available under and governed by the GNU General Public
27  * License version 2 only, as published by the Free Software Foundation.
28  * However, the following notice accompanied the original version of this
29  * file:
30  *
31  * Written by Doug Lea with assistance from members of JCP JSR-166
32  * Expert Group and released to the public domain, as explained at
33  * http://creativecommons.org/publicdomain/zero/1.0/
34  */
35 
36 package java.util.concurrent.locks;
37 
38 import java.util.ArrayList;
39 import java.util.Collection;
40 import java.util.Date;
41 import java.util.concurrent.TimeUnit;
42 import java.util.concurrent.ForkJoinPool;
43 import jdk.internal.misc.Unsafe;
44 
45 /**
46  * A version of {@link AbstractQueuedSynchronizer} in
47  * which synchronization state is maintained as a {@code long}.
48  * This class has exactly the same structure, properties, and methods
49  * as {@code AbstractQueuedSynchronizer} with the exception
50  * that all state-related parameters and results are defined
51  * as {@code long} rather than {@code int}. This class
52  * may be useful when creating synchronizers such as
53  * multilevel locks and barriers that require
54  * 64 bits of state.
55  *
56  * <p>See {@link AbstractQueuedSynchronizer} for usage
57  * notes and examples.
58  *
59  * @since 1.6
60  * @author Doug Lea
61  */
62 public abstract class AbstractQueuedLongSynchronizer
63     extends AbstractOwnableSynchronizer
64     implements java.io.Serializable {
65 
66     private static final long serialVersionUID = 7373984972572414692L;
67 
68     /*
69      * To keep sources in sync, the remainder of this source file is
70      * exactly cloned from AbstractQueuedSynchronizer, replacing class
71      * name and changing ints related with sync state to longs. Please
72      * keep it that way.
73      */
74 
75     // Node status bits, also used as argument and return values
76     static final int WAITING   = 1;          // must be 1
77     static final int CANCELLED = 0x80000000; // must be negative
78     static final int COND      = 2;          // in a condition wait
79 
80     /** CLH Nodes */
81     abstract static class Node {
82         volatile Node prev;       // initially attached via casTail
83         volatile Node next;       // visibly nonnull when signallable
84         Thread waiter;            // visibly nonnull when enqueued
85         volatile int status;      // written by owner, atomic bit ops by others
86 
87         // methods for atomic operations
casPrev(Node c, Node v)88         final boolean casPrev(Node c, Node v) {  // for cleanQueue
89             return U.weakCompareAndSetReference(this, PREV, c, v);
90         }
casNext(Node c, Node v)91         final boolean casNext(Node c, Node v) {  // for cleanQueue
92             return U.weakCompareAndSetReference(this, NEXT, c, v);
93         }
getAndUnsetStatus(int v)94         final int getAndUnsetStatus(int v) {     // for signalling
95             return U.getAndBitwiseAndInt(this, STATUS, ~v);
96         }
setPrevRelaxed(Node p)97         final void setPrevRelaxed(Node p) {      // for off-queue assignment
98             U.putReference(this, PREV, p);
99         }
setStatusRelaxed(int s)100         final void setStatusRelaxed(int s) {     // for off-queue assignment
101             U.putInt(this, STATUS, s);
102         }
clearStatus()103         final void clearStatus() {               // for reducing unneeded signals
104             U.putIntOpaque(this, STATUS, 0);
105         }
106 
107         private static final long STATUS
108             = U.objectFieldOffset(Node.class, "status");
109         private static final long NEXT
110             = U.objectFieldOffset(Node.class, "next");
111         private static final long PREV
112             = U.objectFieldOffset(Node.class, "prev");
113     }
114 
115     // Concrete classes tagged by type
116     static final class ExclusiveNode extends Node { }
117     static final class SharedNode extends Node { }
118 
119     static final class ConditionNode extends Node
120         implements ForkJoinPool.ManagedBlocker {
121         ConditionNode nextWaiter;            // link to next waiting node
122 
123         /**
124          * Allows Conditions to be used in ForkJoinPools without
125          * risking fixed pool exhaustion. This is usable only for
126          * untimed Condition waits, not timed versions.
127          */
isReleasable()128         public final boolean isReleasable() {
129             return status <= 1 || Thread.currentThread().isInterrupted();
130         }
131 
block()132         public final boolean block() {
133             while (!isReleasable()) LockSupport.park();
134             return true;
135         }
136     }
137 
138     /**
139      * Head of the wait queue, lazily initialized.
140      */
141     private transient volatile Node head;
142 
143     /**
144      * Tail of the wait queue. After initialization, modified only via casTail.
145      */
146     private transient volatile Node tail;
147 
148     /**
149      * The synchronization state.
150      */
151     private volatile long state;
152 
153     /**
154      * Returns the current value of synchronization state.
155      * This operation has memory semantics of a {@code volatile} read.
156      * @return current state value
157      */
getState()158     protected final long getState() {
159         return state;
160     }
161 
162     /**
163      * Sets the value of synchronization state.
164      * This operation has memory semantics of a {@code volatile} write.
165      * @param newState the new state value
166      */
setState(long newState)167     protected final void setState(long newState) {
168         state = newState;
169     }
170 
171     /**
172      * Atomically sets synchronization state to the given updated
173      * value if the current state value equals the expected value.
174      * This operation has memory semantics of a {@code volatile} read
175      * and write.
176      *
177      * @param expect the expected value
178      * @param update the new value
179      * @return {@code true} if successful. False return indicates that the actual
180      *         value was not equal to the expected value.
181      */
compareAndSetState(long expect, long update)182     protected final boolean compareAndSetState(long expect, long update) {
183         return U.compareAndSetLong(this, STATE, expect, update);
184     }
185 
186     // Queuing utilities
187 
casTail(Node c, Node v)188     private boolean casTail(Node c, Node v) {
189         return U.compareAndSetReference(this, TAIL, c, v);
190     }
191 
192     /** tries once to CAS a new dummy node for head */
tryInitializeHead()193     private void tryInitializeHead() {
194         Node h = new ExclusiveNode();
195         if (U.compareAndSetReference(this, HEAD, null, h))
196             tail = h;
197     }
198 
199     /**
200      * Enqueues the node unless null. (Currently used only for
201      * ConditionNodes; other cases are interleaved with acquires.)
202      */
enqueue(Node node)203     final void enqueue(Node node) {
204         if (node != null) {
205             for (;;) {
206                 Node t = tail;
207                 node.setPrevRelaxed(t);        // avoid unnecessary fence
208                 if (t == null)                 // initialize
209                     tryInitializeHead();
210                 else if (casTail(t, node)) {
211                     t.next = node;
212                     if (t.status < 0)          // wake up to clean link
213                         LockSupport.unpark(node.waiter);
214                     break;
215                 }
216             }
217         }
218     }
219 
220     /** Returns true if node is found in traversal from tail */
isEnqueued(Node node)221     final boolean isEnqueued(Node node) {
222         for (Node t = tail; t != null; t = t.prev)
223             if (t == node)
224                 return true;
225         return false;
226     }
227 
228     /**
229      * Wakes up the successor of given node, if one exists, and unsets its
230      * WAITING status to avoid park race. This may fail to wake up an
231      * eligible thread when one or more have been cancelled, but
232      * cancelAcquire ensures liveness.
233      */
signalNext(Node h)234     private static void signalNext(Node h) {
235         Node s;
236         if (h != null && (s = h.next) != null && s.status != 0) {
237             s.getAndUnsetStatus(WAITING);
238             LockSupport.unpark(s.waiter);
239         }
240     }
241 
242     /** Wakes up the given node if in shared mode */
signalNextIfShared(Node h)243     private static void signalNextIfShared(Node h) {
244         Node s;
245         if (h != null && (s = h.next) != null &&
246             (s instanceof SharedNode) && s.status != 0) {
247             s.getAndUnsetStatus(WAITING);
248             LockSupport.unpark(s.waiter);
249         }
250     }
251 
252     /**
253      * Main acquire method, invoked by all exported acquire methods.
254      *
255      * @param node null unless a reacquiring Condition
256      * @param arg the acquire argument
257      * @param shared true if shared mode else exclusive
258      * @param interruptible if abort and return negative on interrupt
259      * @param timed if true use timed waits
260      * @param time if timed, the System.nanoTime value to timeout
261      * @return positive if acquired, 0 if timed out, negative if interrupted
262      */
acquire(Node node, long arg, boolean shared, boolean interruptible, boolean timed, long time)263     final int acquire(Node node, long arg, boolean shared,
264                       boolean interruptible, boolean timed, long time) {
265         Thread current = Thread.currentThread();
266         byte spins = 0, postSpins = 0;   // retries upon unpark of first thread
267         boolean interrupted = false, first = false;
268         Node pred = null;                // predecessor of node when enqueued
269 
270         /*
271          * Repeatedly:
272          *  Check if node now first
273          *    if so, ensure head stable, else ensure valid predecessor
274          *  if node is first or not yet enqueued, try acquiring
275          *  else if node not yet created, create it
276          *  else if not yet enqueued, try once to enqueue
277          *  else if woken from park, retry (up to postSpins times)
278          *  else if WAITING status not set, set and retry
279          *  else park and clear WAITING status, and check cancellation
280          */
281 
282         for (;;) {
283             if (!first && (pred = (node == null) ? null : node.prev) != null &&
284                 !(first = (head == pred))) {
285                 if (pred.status < 0) {
286                     cleanQueue();           // predecessor cancelled
287                     continue;
288                 } else if (pred.prev == null) {
289                     Thread.onSpinWait();    // ensure serialization
290                     continue;
291                 }
292             }
293             if (first || pred == null) {
294                 boolean acquired;
295                 try {
296                     if (shared)
297                         acquired = (tryAcquireShared(arg) >= 0);
298                     else
299                         acquired = tryAcquire(arg);
300                 } catch (Throwable ex) {
301                     cancelAcquire(node, interrupted, false);
302                     throw ex;
303                 }
304                 if (acquired) {
305                     if (first) {
306                         node.prev = null;
307                         head = node;
308                         pred.next = null;
309                         node.waiter = null;
310                         if (shared)
311                             signalNextIfShared(node);
312                         if (interrupted)
313                             current.interrupt();
314                     }
315                     return 1;
316                 }
317             }
318             if (node == null) {                 // allocate; retry before enqueue
319                 if (shared)
320                     node = new SharedNode();
321                 else
322                     node = new ExclusiveNode();
323             } else if (pred == null) {          // try to enqueue
324                 node.waiter = current;
325                 Node t = tail;
326                 node.setPrevRelaxed(t);         // avoid unnecessary fence
327                 if (t == null)
328                     tryInitializeHead();
329                 else if (!casTail(t, node))
330                     node.setPrevRelaxed(null);  // back out
331                 else
332                     t.next = node;
333             } else if (first && spins != 0) {
334                 --spins;                        // reduce unfairness on rewaits
335                 Thread.onSpinWait();
336             } else if (node.status == 0) {
337                 node.status = WAITING;          // enable signal and recheck
338             } else {
339                 long nanos;
340                 spins = postSpins = (byte)((postSpins << 1) | 1);
341                 if (!timed)
342                     LockSupport.park(this);
343                 else if ((nanos = time - System.nanoTime()) > 0L)
344                     LockSupport.parkNanos(this, nanos);
345                 else
346                     break;
347                 node.clearStatus();
348                 if ((interrupted |= Thread.interrupted()) && interruptible)
349                     break;
350             }
351         }
352         return cancelAcquire(node, interrupted, interruptible);
353     }
354 
355     /**
356      * Possibly repeatedly traverses from tail, unsplicing cancelled
357      * nodes until none are found.
358      */
cleanQueue()359     private void cleanQueue() {
360         for (;;) {                               // restart point
361             for (Node q = tail, s = null, p, n;;) { // (p, q, s) triples
362                 if (q == null || (p = q.prev) == null)
363                     return;                      // end of list
364                 if (s == null ? tail != q : (s.prev != q || s.status < 0))
365                     break;                       // inconsistent
366                 if (q.status < 0) {              // cancelled
367                     if ((s == null ? casTail(q, p) : s.casPrev(q, p)) &&
368                         q.prev == p) {
369                         p.casNext(q, s);         // OK if fails
370                         if (p.prev == null)
371                             signalNext(p);
372                     }
373                     break;
374                 }
375                 if ((n = p.next) != q) {         // help finish
376                     if (n != null && q.prev == p) {
377                         p.casNext(n, q);
378                         if (p.prev == null)
379                             signalNext(p);
380                     }
381                     break;
382                 }
383                 s = q;
384                 q = q.prev;
385             }
386         }
387     }
388 
389     /**
390      * Cancels an ongoing attempt to acquire.
391      *
392      * @param node the node (may be null if cancelled before enqueuing)
393      * @param interrupted true if thread interrupted
394      * @param interruptible if should report interruption vs reset
395      */
cancelAcquire(Node node, boolean interrupted, boolean interruptible)396     private int cancelAcquire(Node node, boolean interrupted,
397                               boolean interruptible) {
398         if (node != null) {
399             node.waiter = null;
400             node.status = CANCELLED;
401             if (node.prev != null)
402                 cleanQueue();
403         }
404         if (interrupted) {
405             if (interruptible)
406                 return CANCELLED;
407             else
408                 Thread.currentThread().interrupt();
409         }
410         return 0;
411     }
412 
413     // Main exported methods
414 
415     /**
416      * Attempts to acquire in exclusive mode. This method should query
417      * if the state of the object permits it to be acquired in the
418      * exclusive mode, and if so to acquire it.
419      *
420      * <p>This method is always invoked by the thread performing
421      * acquire.  If this method reports failure, the acquire method
422      * may queue the thread, if it is not already queued, until it is
423      * signalled by a release from some other thread. This can be used
424      * to implement method {@link Lock#tryLock()}.
425      *
426      * <p>The default
427      * implementation throws {@link UnsupportedOperationException}.
428      *
429      * @param arg the acquire argument. This value is always the one
430      *        passed to an acquire method, or is the value saved on entry
431      *        to a condition wait.  The value is otherwise uninterpreted
432      *        and can represent anything you like.
433      * @return {@code true} if successful. Upon success, this object has
434      *         been acquired.
435      * @throws IllegalMonitorStateException if acquiring would place this
436      *         synchronizer in an illegal state. This exception must be
437      *         thrown in a consistent fashion for synchronization to work
438      *         correctly.
439      * @throws UnsupportedOperationException if exclusive mode is not supported
440      */
tryAcquire(long arg)441     protected boolean tryAcquire(long arg) {
442         throw new UnsupportedOperationException();
443     }
444 
445     /**
446      * Attempts to set the state to reflect a release in exclusive
447      * mode.
448      *
449      * <p>This method is always invoked by the thread performing release.
450      *
451      * <p>The default implementation throws
452      * {@link UnsupportedOperationException}.
453      *
454      * @param arg the release argument. This value is always the one
455      *        passed to a release method, or the current state value upon
456      *        entry to a condition wait.  The value is otherwise
457      *        uninterpreted and can represent anything you like.
458      * @return {@code true} if this object is now in a fully released
459      *         state, so that any waiting threads may attempt to acquire;
460      *         and {@code false} otherwise.
461      * @throws IllegalMonitorStateException if releasing would place this
462      *         synchronizer in an illegal state. This exception must be
463      *         thrown in a consistent fashion for synchronization to work
464      *         correctly.
465      * @throws UnsupportedOperationException if exclusive mode is not supported
466      */
tryRelease(long arg)467     protected boolean tryRelease(long arg) {
468         throw new UnsupportedOperationException();
469     }
470 
471     /**
472      * Attempts to acquire in shared mode. This method should query if
473      * the state of the object permits it to be acquired in the shared
474      * mode, and if so to acquire it.
475      *
476      * <p>This method is always invoked by the thread performing
477      * acquire.  If this method reports failure, the acquire method
478      * may queue the thread, if it is not already queued, until it is
479      * signalled by a release from some other thread.
480      *
481      * <p>The default implementation throws {@link
482      * UnsupportedOperationException}.
483      *
484      * @param arg the acquire argument. This value is always the one
485      *        passed to an acquire method, or is the value saved on entry
486      *        to a condition wait.  The value is otherwise uninterpreted
487      *        and can represent anything you like.
488      * @return a negative value on failure; zero if acquisition in shared
489      *         mode succeeded but no subsequent shared-mode acquire can
490      *         succeed; and a positive value if acquisition in shared
491      *         mode succeeded and subsequent shared-mode acquires might
492      *         also succeed, in which case a subsequent waiting thread
493      *         must check availability. (Support for three different
494      *         return values enables this method to be used in contexts
495      *         where acquires only sometimes act exclusively.)  Upon
496      *         success, this object has been acquired.
497      * @throws IllegalMonitorStateException if acquiring would place this
498      *         synchronizer in an illegal state. This exception must be
499      *         thrown in a consistent fashion for synchronization to work
500      *         correctly.
501      * @throws UnsupportedOperationException if shared mode is not supported
502      */
tryAcquireShared(long arg)503     protected long tryAcquireShared(long arg) {
504         throw new UnsupportedOperationException();
505     }
506 
507     /**
508      * Attempts to set the state to reflect a release in shared mode.
509      *
510      * <p>This method is always invoked by the thread performing release.
511      *
512      * <p>The default implementation throws
513      * {@link UnsupportedOperationException}.
514      *
515      * @param arg the release argument. This value is always the one
516      *        passed to a release method, or the current state value upon
517      *        entry to a condition wait.  The value is otherwise
518      *        uninterpreted and can represent anything you like.
519      * @return {@code true} if this release of shared mode may permit a
520      *         waiting acquire (shared or exclusive) to succeed; and
521      *         {@code false} otherwise
522      * @throws IllegalMonitorStateException if releasing would place this
523      *         synchronizer in an illegal state. This exception must be
524      *         thrown in a consistent fashion for synchronization to work
525      *         correctly.
526      * @throws UnsupportedOperationException if shared mode is not supported
527      */
tryReleaseShared(long arg)528     protected boolean tryReleaseShared(long arg) {
529         throw new UnsupportedOperationException();
530     }
531 
532     /**
533      * Returns {@code true} if synchronization is held exclusively with
534      * respect to the current (calling) thread.  This method is invoked
535      * upon each call to a {@link ConditionObject} method.
536      *
537      * <p>The default implementation throws {@link
538      * UnsupportedOperationException}. This method is invoked
539      * internally only within {@link ConditionObject} methods, so need
540      * not be defined if conditions are not used.
541      *
542      * @return {@code true} if synchronization is held exclusively;
543      *         {@code false} otherwise
544      * @throws UnsupportedOperationException if conditions are not supported
545      */
isHeldExclusively()546     protected boolean isHeldExclusively() {
547         throw new UnsupportedOperationException();
548     }
549 
550     /**
551      * Acquires in exclusive mode, ignoring interrupts.  Implemented
552      * by invoking at least once {@link #tryAcquire},
553      * returning on success.  Otherwise the thread is queued, possibly
554      * repeatedly blocking and unblocking, invoking {@link
555      * #tryAcquire} until success.  This method can be used
556      * to implement method {@link Lock#lock}.
557      *
558      * @param arg the acquire argument.  This value is conveyed to
559      *        {@link #tryAcquire} but is otherwise uninterpreted and
560      *        can represent anything you like.
561      */
acquire(long arg)562     public final void acquire(long arg) {
563         if (!tryAcquire(arg))
564             acquire(null, arg, false, false, false, 0L);
565     }
566 
567     /**
568      * Acquires in exclusive mode, aborting if interrupted.
569      * Implemented by first checking interrupt status, then invoking
570      * at least once {@link #tryAcquire}, returning on
571      * success.  Otherwise the thread is queued, possibly repeatedly
572      * blocking and unblocking, invoking {@link #tryAcquire}
573      * until success or the thread is interrupted.  This method can be
574      * used to implement method {@link Lock#lockInterruptibly}.
575      *
576      * @param arg the acquire argument.  This value is conveyed to
577      *        {@link #tryAcquire} but is otherwise uninterpreted and
578      *        can represent anything you like.
579      * @throws InterruptedException if the current thread is interrupted
580      */
acquireInterruptibly(long arg)581     public final void acquireInterruptibly(long arg)
582         throws InterruptedException {
583         if (Thread.interrupted() ||
584             (!tryAcquire(arg) && acquire(null, arg, false, true, false, 0L) < 0))
585             throw new InterruptedException();
586     }
587 
588     /**
589      * Attempts to acquire in exclusive mode, aborting if interrupted,
590      * and failing if the given timeout elapses.  Implemented by first
591      * checking interrupt status, then invoking at least once {@link
592      * #tryAcquire}, returning on success.  Otherwise, the thread is
593      * queued, possibly repeatedly blocking and unblocking, invoking
594      * {@link #tryAcquire} until success or the thread is interrupted
595      * or the timeout elapses.  This method can be used to implement
596      * method {@link Lock#tryLock(long, TimeUnit)}.
597      *
598      * @param arg the acquire argument.  This value is conveyed to
599      *        {@link #tryAcquire} but is otherwise uninterpreted and
600      *        can represent anything you like.
601      * @param nanosTimeout the maximum number of nanoseconds to wait
602      * @return {@code true} if acquired; {@code false} if timed out
603      * @throws InterruptedException if the current thread is interrupted
604      */
tryAcquireNanos(long arg, long nanosTimeout)605     public final boolean tryAcquireNanos(long arg, long nanosTimeout)
606         throws InterruptedException {
607         if (!Thread.interrupted()) {
608             if (tryAcquire(arg))
609                 return true;
610             if (nanosTimeout <= 0L)
611                 return false;
612             int stat = acquire(null, arg, false, true, true,
613                                System.nanoTime() + nanosTimeout);
614             if (stat > 0)
615                 return true;
616             if (stat == 0)
617                 return false;
618         }
619         throw new InterruptedException();
620     }
621 
622     /**
623      * Releases in exclusive mode.  Implemented by unblocking one or
624      * more threads if {@link #tryRelease} returns true.
625      * This method can be used to implement method {@link Lock#unlock}.
626      *
627      * @param arg the release argument.  This value is conveyed to
628      *        {@link #tryRelease} but is otherwise uninterpreted and
629      *        can represent anything you like.
630      * @return the value returned from {@link #tryRelease}
631      */
release(long arg)632     public final boolean release(long arg) {
633         if (tryRelease(arg)) {
634             signalNext(head);
635             return true;
636         }
637         return false;
638     }
639 
640     /**
641      * Acquires in shared mode, ignoring interrupts.  Implemented by
642      * first invoking at least once {@link #tryAcquireShared},
643      * returning on success.  Otherwise the thread is queued, possibly
644      * repeatedly blocking and unblocking, invoking {@link
645      * #tryAcquireShared} until success.
646      *
647      * @param arg the acquire argument.  This value is conveyed to
648      *        {@link #tryAcquireShared} but is otherwise uninterpreted
649      *        and can represent anything you like.
650      */
acquireShared(long arg)651     public final void acquireShared(long arg) {
652         if (tryAcquireShared(arg) < 0)
653             acquire(null, arg, true, false, false, 0L);
654     }
655 
656     /**
657      * Acquires in shared mode, aborting if interrupted.  Implemented
658      * by first checking interrupt status, then invoking at least once
659      * {@link #tryAcquireShared}, returning on success.  Otherwise the
660      * thread is queued, possibly repeatedly blocking and unblocking,
661      * invoking {@link #tryAcquireShared} until success or the thread
662      * is interrupted.
663      * @param arg the acquire argument.
664      * This value is conveyed to {@link #tryAcquireShared} but is
665      * otherwise uninterpreted and can represent anything
666      * you like.
667      * @throws InterruptedException if the current thread is interrupted
668      */
acquireSharedInterruptibly(long arg)669     public final void acquireSharedInterruptibly(long arg)
670         throws InterruptedException {
671         if (Thread.interrupted() ||
672             (tryAcquireShared(arg) < 0 &&
673              acquire(null, arg, true, true, false, 0L) < 0))
674             throw new InterruptedException();
675     }
676 
677     /**
678      * Attempts to acquire in shared mode, aborting if interrupted, and
679      * failing if the given timeout elapses.  Implemented by first
680      * checking interrupt status, then invoking at least once {@link
681      * #tryAcquireShared}, returning on success.  Otherwise, the
682      * thread is queued, possibly repeatedly blocking and unblocking,
683      * invoking {@link #tryAcquireShared} until success or the thread
684      * is interrupted or the timeout elapses.
685      *
686      * @param arg the acquire argument.  This value is conveyed to
687      *        {@link #tryAcquireShared} but is otherwise uninterpreted
688      *        and can represent anything you like.
689      * @param nanosTimeout the maximum number of nanoseconds to wait
690      * @return {@code true} if acquired; {@code false} if timed out
691      * @throws InterruptedException if the current thread is interrupted
692      */
tryAcquireSharedNanos(long arg, long nanosTimeout)693     public final boolean tryAcquireSharedNanos(long arg, long nanosTimeout)
694             throws InterruptedException {
695         if (!Thread.interrupted()) {
696             if (tryAcquireShared(arg) >= 0)
697                 return true;
698             if (nanosTimeout <= 0L)
699                 return false;
700             int stat = acquire(null, arg, true, true, true,
701                                System.nanoTime() + nanosTimeout);
702             if (stat > 0)
703                 return true;
704             if (stat == 0)
705                 return false;
706         }
707         throw new InterruptedException();
708     }
709 
710     /**
711      * Releases in shared mode.  Implemented by unblocking one or more
712      * threads if {@link #tryReleaseShared} returns true.
713      *
714      * @param arg the release argument.  This value is conveyed to
715      *        {@link #tryReleaseShared} but is otherwise uninterpreted
716      *        and can represent anything you like.
717      * @return the value returned from {@link #tryReleaseShared}
718      */
releaseShared(long arg)719     public final boolean releaseShared(long arg) {
720         if (tryReleaseShared(arg)) {
721             signalNext(head);
722             return true;
723         }
724         return false;
725     }
726 
727     // Queue inspection methods
728 
729     /**
730      * Queries whether any threads are waiting to acquire. Note that
731      * because cancellations due to interrupts and timeouts may occur
732      * at any time, a {@code true} return does not guarantee that any
733      * other thread will ever acquire.
734      *
735      * @return {@code true} if there may be other threads waiting to acquire
736      */
hasQueuedThreads()737     public final boolean hasQueuedThreads() {
738         for (Node p = tail, h = head; p != h && p != null; p = p.prev)
739             if (p.status >= 0)
740                 return true;
741         return false;
742     }
743 
744     /**
745      * Queries whether any threads have ever contended to acquire this
746      * synchronizer; that is, if an acquire method has ever blocked.
747      *
748      * <p>In this implementation, this operation returns in
749      * constant time.
750      *
751      * @return {@code true} if there has ever been contention
752      */
hasContended()753     public final boolean hasContended() {
754         return head != null;
755     }
756 
757     /**
758      * Returns the first (longest-waiting) thread in the queue, or
759      * {@code null} if no threads are currently queued.
760      *
761      * <p>In this implementation, this operation normally returns in
762      * constant time, but may iterate upon contention if other threads are
763      * concurrently modifying the queue.
764      *
765      * @return the first (longest-waiting) thread in the queue, or
766      *         {@code null} if no threads are currently queued
767      */
getFirstQueuedThread()768     public final Thread getFirstQueuedThread() {
769         Thread first = null, w; Node h, s;
770         if ((h = head) != null && ((s = h.next) == null ||
771                                    (first = s.waiter) == null ||
772                                    s.prev == null)) {
773             // traverse from tail on stale reads
774             for (Node p = tail, q; p != null && (q = p.prev) != null; p = q)
775                 if ((w = p.waiter) != null)
776                     first = w;
777         }
778         return first;
779     }
780 
781     /**
782      * Returns true if the given thread is currently queued.
783      *
784      * <p>This implementation traverses the queue to determine
785      * presence of the given thread.
786      *
787      * @param thread the thread
788      * @return {@code true} if the given thread is on the queue
789      * @throws NullPointerException if the thread is null
790      */
isQueued(Thread thread)791     public final boolean isQueued(Thread thread) {
792         if (thread == null)
793             throw new NullPointerException();
794         for (Node p = tail; p != null; p = p.prev)
795             if (p.waiter == thread)
796                 return true;
797         return false;
798     }
799 
800     /**
801      * Returns {@code true} if the apparent first queued thread, if one
802      * exists, is waiting in exclusive mode.  If this method returns
803      * {@code true}, and the current thread is attempting to acquire in
804      * shared mode (that is, this method is invoked from {@link
805      * #tryAcquireShared}) then it is guaranteed that the current thread
806      * is not the first queued thread.  Used only as a heuristic in
807      * ReentrantReadWriteLock.
808      */
apparentlyFirstQueuedIsExclusive()809     final boolean apparentlyFirstQueuedIsExclusive() {
810         Node h, s;
811         return (h = head) != null && (s = h.next)  != null &&
812             !(s instanceof SharedNode) && s.waiter != null;
813     }
814 
815     /**
816      * Queries whether any threads have been waiting to acquire longer
817      * than the current thread.
818      *
819      * <p>An invocation of this method is equivalent to (but may be
820      * more efficient than):
821      * <pre> {@code
822      * getFirstQueuedThread() != Thread.currentThread()
823      *   && hasQueuedThreads()}</pre>
824      *
825      * <p>Note that because cancellations due to interrupts and
826      * timeouts may occur at any time, a {@code true} return does not
827      * guarantee that some other thread will acquire before the current
828      * thread.  Likewise, it is possible for another thread to win a
829      * race to enqueue after this method has returned {@code false},
830      * due to the queue being empty.
831      *
832      * <p>This method is designed to be used by a fair synchronizer to
833      * avoid <a href="AbstractQueuedSynchronizer.html#barging">barging</a>.
834      * Such a synchronizer's {@link #tryAcquire} method should return
835      * {@code false}, and its {@link #tryAcquireShared} method should
836      * return a negative value, if this method returns {@code true}
837      * (unless this is a reentrant acquire).  For example, the {@code
838      * tryAcquire} method for a fair, reentrant, exclusive mode
839      * synchronizer might look like this:
840      *
841      * <pre> {@code
842      * protected boolean tryAcquire(long arg) {
843      *   if (isHeldExclusively()) {
844      *     // A reentrant acquire; increment hold count
845      *     return true;
846      *   } else if (hasQueuedPredecessors()) {
847      *     return false;
848      *   } else {
849      *     // try to acquire normally
850      *   }
851      * }}</pre>
852      *
853      * @return {@code true} if there is a queued thread preceding the
854      *         current thread, and {@code false} if the current thread
855      *         is at the head of the queue or the queue is empty
856      * @since 1.7
857      */
hasQueuedPredecessors()858     public final boolean hasQueuedPredecessors() {
859         Thread first = null; Node h, s;
860         if ((h = head) != null && ((s = h.next) == null ||
861                                    (first = s.waiter) == null ||
862                                    s.prev == null))
863             first = getFirstQueuedThread(); // retry via getFirstQueuedThread
864         return first != null && first != Thread.currentThread();
865     }
866 
867     // Instrumentation and monitoring methods
868 
869     /**
870      * Returns an estimate of the number of threads waiting to
871      * acquire.  The value is only an estimate because the number of
872      * threads may change dynamically while this method traverses
873      * internal data structures.  This method is designed for use in
874      * monitoring system state, not for synchronization control.
875      *
876      * @return the estimated number of threads waiting to acquire
877      */
getQueueLength()878     public final int getQueueLength() {
879         int n = 0;
880         for (Node p = tail; p != null; p = p.prev) {
881             if (p.waiter != null)
882                 ++n;
883         }
884         return n;
885     }
886 
887     /**
888      * Returns a collection containing threads that may be waiting to
889      * acquire.  Because the actual set of threads may change
890      * dynamically while constructing this result, the returned
891      * collection is only a best-effort estimate.  The elements of the
892      * returned collection are in no particular order.  This method is
893      * designed to facilitate construction of subclasses that provide
894      * more extensive monitoring facilities.
895      *
896      * @return the collection of threads
897      */
getQueuedThreads()898     public final Collection<Thread> getQueuedThreads() {
899         ArrayList<Thread> list = new ArrayList<>();
900         for (Node p = tail; p != null; p = p.prev) {
901             Thread t = p.waiter;
902             if (t != null)
903                 list.add(t);
904         }
905         return list;
906     }
907 
908     /**
909      * Returns a collection containing threads that may be waiting to
910      * acquire in exclusive mode. This has the same properties
911      * as {@link #getQueuedThreads} except that it only returns
912      * those threads waiting due to an exclusive acquire.
913      *
914      * @return the collection of threads
915      */
getExclusiveQueuedThreads()916     public final Collection<Thread> getExclusiveQueuedThreads() {
917         ArrayList<Thread> list = new ArrayList<>();
918         for (Node p = tail; p != null; p = p.prev) {
919             if (!(p instanceof SharedNode)) {
920                 Thread t = p.waiter;
921                 if (t != null)
922                     list.add(t);
923             }
924         }
925         return list;
926     }
927 
928     /**
929      * Returns a collection containing threads that may be waiting to
930      * acquire in shared mode. This has the same properties
931      * as {@link #getQueuedThreads} except that it only returns
932      * those threads waiting due to a shared acquire.
933      *
934      * @return the collection of threads
935      */
getSharedQueuedThreads()936     public final Collection<Thread> getSharedQueuedThreads() {
937         ArrayList<Thread> list = new ArrayList<>();
938         for (Node p = tail; p != null; p = p.prev) {
939             if (p instanceof SharedNode) {
940                 Thread t = p.waiter;
941                 if (t != null)
942                     list.add(t);
943             }
944         }
945         return list;
946     }
947 
948     /**
949      * Returns a string identifying this synchronizer, as well as its state.
950      * The state, in brackets, includes the String {@code "State ="}
951      * followed by the current value of {@link #getState}, and either
952      * {@code "nonempty"} or {@code "empty"} depending on whether the
953      * queue is empty.
954      *
955      * @return a string identifying this synchronizer, as well as its state
956      */
toString()957     public String toString() {
958         return super.toString()
959             + "[State = " + getState() + ", "
960             + (hasQueuedThreads() ? "non" : "") + "empty queue]";
961     }
962 
963     // Instrumentation methods for conditions
964 
965     /**
966      * Queries whether the given ConditionObject
967      * uses this synchronizer as its lock.
968      *
969      * @param condition the condition
970      * @return {@code true} if owned
971      * @throws NullPointerException if the condition is null
972      */
owns(ConditionObject condition)973     public final boolean owns(ConditionObject condition) {
974         return condition.isOwnedBy(this);
975     }
976 
977     /**
978      * Queries whether any threads are waiting on the given condition
979      * associated with this synchronizer. Note that because timeouts
980      * and interrupts may occur at any time, a {@code true} return
981      * does not guarantee that a future {@code signal} will awaken
982      * any threads.  This method is designed primarily for use in
983      * monitoring of the system state.
984      *
985      * @param condition the condition
986      * @return {@code true} if there are any waiting threads
987      * @throws IllegalMonitorStateException if exclusive synchronization
988      *         is not held
989      * @throws IllegalArgumentException if the given condition is
990      *         not associated with this synchronizer
991      * @throws NullPointerException if the condition is null
992      */
hasWaiters(ConditionObject condition)993     public final boolean hasWaiters(ConditionObject condition) {
994         if (!owns(condition))
995             throw new IllegalArgumentException("Not owner");
996         return condition.hasWaiters();
997     }
998 
999     /**
1000      * Returns an estimate of the number of threads waiting on the
1001      * given condition associated with this synchronizer. Note that
1002      * because timeouts and interrupts may occur at any time, the
1003      * estimate serves only as an upper bound on the actual number of
1004      * waiters.  This method is designed for use in monitoring system
1005      * state, not for synchronization control.
1006      *
1007      * @param condition the condition
1008      * @return the estimated number of waiting threads
1009      * @throws IllegalMonitorStateException if exclusive synchronization
1010      *         is not held
1011      * @throws IllegalArgumentException if the given condition is
1012      *         not associated with this synchronizer
1013      * @throws NullPointerException if the condition is null
1014      */
getWaitQueueLength(ConditionObject condition)1015     public final int getWaitQueueLength(ConditionObject condition) {
1016         if (!owns(condition))
1017             throw new IllegalArgumentException("Not owner");
1018         return condition.getWaitQueueLength();
1019     }
1020 
1021     /**
1022      * Returns a collection containing those threads that may be
1023      * waiting on the given condition associated with this
1024      * synchronizer.  Because the actual set of threads may change
1025      * dynamically while constructing this result, the returned
1026      * collection is only a best-effort estimate. The elements of the
1027      * returned collection are in no particular order.
1028      *
1029      * @param condition the condition
1030      * @return the collection of threads
1031      * @throws IllegalMonitorStateException if exclusive synchronization
1032      *         is not held
1033      * @throws IllegalArgumentException if the given condition is
1034      *         not associated with this synchronizer
1035      * @throws NullPointerException if the condition is null
1036      */
getWaitingThreads(ConditionObject condition)1037     public final Collection<Thread> getWaitingThreads(ConditionObject condition) {
1038         if (!owns(condition))
1039             throw new IllegalArgumentException("Not owner");
1040         return condition.getWaitingThreads();
1041     }
1042 
1043     /**
1044      * Condition implementation for a {@link AbstractQueuedLongSynchronizer}
1045      * serving as the basis of a {@link Lock} implementation.
1046      *
1047      * <p>Method documentation for this class describes mechanics,
1048      * not behavioral specifications from the point of view of Lock
1049      * and Condition users. Exported versions of this class will in
1050      * general need to be accompanied by documentation describing
1051      * condition semantics that rely on those of the associated
1052      * {@code AbstractQueuedLongSynchronizer}.
1053      *
1054      * <p>This class is Serializable, but all fields are transient,
1055      * so deserialized conditions have no waiters.
1056      */
1057     public class ConditionObject implements Condition, java.io.Serializable {
1058         private static final long serialVersionUID = 1173984872572414699L;
1059         /** First node of condition queue. */
1060         private transient ConditionNode firstWaiter;
1061         /** Last node of condition queue. */
1062         private transient ConditionNode lastWaiter;
1063 
1064         /**
1065          * Creates a new {@code ConditionObject} instance.
1066          */
ConditionObject()1067         public ConditionObject() { }
1068 
1069         // Signalling methods
1070 
1071         /**
1072          * Removes and transfers one or all waiters to sync queue.
1073          */
doSignal(ConditionNode first, boolean all)1074         private void doSignal(ConditionNode first, boolean all) {
1075             while (first != null) {
1076                 ConditionNode next = first.nextWaiter;
1077                 if ((firstWaiter = next) == null)
1078                     lastWaiter = null;
1079                 if ((first.getAndUnsetStatus(COND) & COND) != 0) {
1080                     enqueue(first);
1081                     if (!all)
1082                         break;
1083                 }
1084                 first = next;
1085             }
1086         }
1087 
1088         /**
1089          * Moves the longest-waiting thread, if one exists, from the
1090          * wait queue for this condition to the wait queue for the
1091          * owning lock.
1092          *
1093          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1094          *         returns {@code false}
1095          */
signal()1096         public final void signal() {
1097             ConditionNode first = firstWaiter;
1098             if (!isHeldExclusively())
1099                 throw new IllegalMonitorStateException();
1100             if (first != null)
1101                 doSignal(first, false);
1102         }
1103 
1104         /**
1105          * Moves all threads from the wait queue for this condition to
1106          * the wait queue for the owning lock.
1107          *
1108          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1109          *         returns {@code false}
1110          */
signalAll()1111         public final void signalAll() {
1112             ConditionNode first = firstWaiter;
1113             if (!isHeldExclusively())
1114                 throw new IllegalMonitorStateException();
1115             if (first != null)
1116                 doSignal(first, true);
1117         }
1118 
1119         // Waiting methods
1120 
1121         /**
1122          * Adds node to condition list and releases lock.
1123          *
1124          * @param node the node
1125          * @return savedState to reacquire after wait
1126          */
enableWait(ConditionNode node)1127         private long enableWait(ConditionNode node) {
1128             if (isHeldExclusively()) {
1129                 node.waiter = Thread.currentThread();
1130                 node.setStatusRelaxed(COND | WAITING);
1131                 ConditionNode last = lastWaiter;
1132                 if (last == null)
1133                     firstWaiter = node;
1134                 else
1135                     last.nextWaiter = node;
1136                 lastWaiter = node;
1137                 long savedState = getState();
1138                 if (release(savedState))
1139                     return savedState;
1140             }
1141             node.status = CANCELLED; // lock not held or inconsistent
1142             throw new IllegalMonitorStateException();
1143         }
1144 
1145         /**
1146          * Returns true if a node that was initially placed on a condition
1147          * queue is now ready to reacquire on sync queue.
1148          * @param node the node
1149          * @return true if is reacquiring
1150          */
canReacquire(ConditionNode node)1151         private boolean canReacquire(ConditionNode node) {
1152             // check links, not status to avoid enqueue race
1153             return node != null && node.prev != null && isEnqueued(node);
1154         }
1155 
1156         /**
1157          * Unlinks the given node and other non-waiting nodes from
1158          * condition queue unless already unlinked.
1159          */
unlinkCancelledWaiters(ConditionNode node)1160         private void unlinkCancelledWaiters(ConditionNode node) {
1161             if (node == null || node.nextWaiter != null || node == lastWaiter) {
1162                 ConditionNode w = firstWaiter, trail = null;
1163                 while (w != null) {
1164                     ConditionNode next = w.nextWaiter;
1165                     if ((w.status & COND) == 0) {
1166                         w.nextWaiter = null;
1167                         if (trail == null)
1168                             firstWaiter = next;
1169                         else
1170                             trail.nextWaiter = next;
1171                         if (next == null)
1172                             lastWaiter = trail;
1173                     } else
1174                         trail = w;
1175                     w = next;
1176                 }
1177             }
1178         }
1179 
1180         /**
1181          * Implements uninterruptible condition wait.
1182          * <ol>
1183          * <li>Save lock state returned by {@link #getState}.
1184          * <li>Invoke {@link #release} with saved state as argument,
1185          *     throwing IllegalMonitorStateException if it fails.
1186          * <li>Block until signalled.
1187          * <li>Reacquire by invoking specialized version of
1188          *     {@link #acquire} with saved state as argument.
1189          * </ol>
1190          */
awaitUninterruptibly()1191         public final void awaitUninterruptibly() {
1192             ConditionNode node = new ConditionNode();
1193             long savedState = enableWait(node);
1194             LockSupport.setCurrentBlocker(this); // for back-compatibility
1195             boolean interrupted = false;
1196             while (!canReacquire(node)) {
1197                 if (Thread.interrupted())
1198                     interrupted = true;
1199                 else if ((node.status & COND) != 0) {
1200                     try {
1201                         ForkJoinPool.managedBlock(node);
1202                     } catch (InterruptedException ie) {
1203                         interrupted = true;
1204                     }
1205                 } else
1206                     Thread.onSpinWait();    // awoke while enqueuing
1207             }
1208             LockSupport.setCurrentBlocker(null);
1209             node.clearStatus();
1210             acquire(node, savedState, false, false, false, 0L);
1211             if (interrupted)
1212                 Thread.currentThread().interrupt();
1213         }
1214 
1215         /**
1216          * Implements interruptible condition wait.
1217          * <ol>
1218          * <li>If current thread is interrupted, throw InterruptedException.
1219          * <li>Save lock state returned by {@link #getState}.
1220          * <li>Invoke {@link #release} with saved state as argument,
1221          *     throwing IllegalMonitorStateException if it fails.
1222          * <li>Block until signalled or interrupted.
1223          * <li>Reacquire by invoking specialized version of
1224          *     {@link #acquire} with saved state as argument.
1225          * <li>If interrupted while blocked in step 4, throw InterruptedException.
1226          * </ol>
1227          */
await()1228         public final void await() throws InterruptedException {
1229             if (Thread.interrupted())
1230                 throw new InterruptedException();
1231             ConditionNode node = new ConditionNode();
1232             long savedState = enableWait(node);
1233             LockSupport.setCurrentBlocker(this); // for back-compatibility
1234             boolean interrupted = false, cancelled = false;
1235             while (!canReacquire(node)) {
1236                 if (interrupted |= Thread.interrupted()) {
1237                     if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
1238                         break;              // else interrupted after signal
1239                 } else if ((node.status & COND) != 0) {
1240                     try {
1241                         ForkJoinPool.managedBlock(node);
1242                     } catch (InterruptedException ie) {
1243                         interrupted = true;
1244                     }
1245                 } else
1246                     Thread.onSpinWait();    // awoke while enqueuing
1247             }
1248             LockSupport.setCurrentBlocker(null);
1249             node.clearStatus();
1250             acquire(node, savedState, false, false, false, 0L);
1251             if (interrupted) {
1252                 if (cancelled) {
1253                     unlinkCancelledWaiters(node);
1254                     throw new InterruptedException();
1255                 }
1256                 Thread.currentThread().interrupt();
1257             }
1258         }
1259 
1260         /**
1261          * Implements timed condition wait.
1262          * <ol>
1263          * <li>If current thread is interrupted, throw InterruptedException.
1264          * <li>Save lock state returned by {@link #getState}.
1265          * <li>Invoke {@link #release} with saved state as argument,
1266          *     throwing IllegalMonitorStateException if it fails.
1267          * <li>Block until signalled, interrupted, or timed out.
1268          * <li>Reacquire by invoking specialized version of
1269          *     {@link #acquire} with saved state as argument.
1270          * <li>If interrupted while blocked in step 4, throw InterruptedException.
1271          * </ol>
1272          */
awaitNanos(long nanosTimeout)1273         public final long awaitNanos(long nanosTimeout)
1274                 throws InterruptedException {
1275             if (Thread.interrupted())
1276                 throw new InterruptedException();
1277             ConditionNode node = new ConditionNode();
1278             long savedState = enableWait(node);
1279             long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout;
1280             long deadline = System.nanoTime() + nanos;
1281             boolean cancelled = false, interrupted = false;
1282             while (!canReacquire(node)) {
1283                 if ((interrupted |= Thread.interrupted()) ||
1284                     (nanos = deadline - System.nanoTime()) <= 0L) {
1285                     if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
1286                         break;
1287                 } else
1288                     LockSupport.parkNanos(this, nanos);
1289             }
1290             node.clearStatus();
1291             acquire(node, savedState, false, false, false, 0L);
1292             if (cancelled) {
1293                 unlinkCancelledWaiters(node);
1294                 if (interrupted)
1295                     throw new InterruptedException();
1296             } else if (interrupted)
1297                 Thread.currentThread().interrupt();
1298             long remaining = deadline - System.nanoTime(); // avoid overflow
1299             return (remaining <= nanosTimeout) ? remaining : Long.MIN_VALUE;
1300         }
1301 
1302         /**
1303          * Implements absolute timed condition wait.
1304          * <ol>
1305          * <li>If current thread is interrupted, throw InterruptedException.
1306          * <li>Save lock state returned by {@link #getState}.
1307          * <li>Invoke {@link #release} with saved state as argument,
1308          *     throwing IllegalMonitorStateException if it fails.
1309          * <li>Block until signalled, interrupted, or timed out.
1310          * <li>Reacquire by invoking specialized version of
1311          *     {@link #acquire} with saved state as argument.
1312          * <li>If interrupted while blocked in step 4, throw InterruptedException.
1313          * <li>If timed out while blocked in step 4, return false, else true.
1314          * </ol>
1315          */
awaitUntil(Date deadline)1316         public final boolean awaitUntil(Date deadline)
1317                 throws InterruptedException {
1318             long abstime = deadline.getTime();
1319             if (Thread.interrupted())
1320                 throw new InterruptedException();
1321             ConditionNode node = new ConditionNode();
1322             long savedState = enableWait(node);
1323             boolean cancelled = false, interrupted = false;
1324             while (!canReacquire(node)) {
1325                 if ((interrupted |= Thread.interrupted()) ||
1326                     System.currentTimeMillis() >= abstime) {
1327                     if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
1328                         break;
1329                 } else
1330                     LockSupport.parkUntil(this, abstime);
1331             }
1332             node.clearStatus();
1333             acquire(node, savedState, false, false, false, 0L);
1334             if (cancelled) {
1335                 unlinkCancelledWaiters(node);
1336                 if (interrupted)
1337                     throw new InterruptedException();
1338             } else if (interrupted)
1339                 Thread.currentThread().interrupt();
1340             return !cancelled;
1341         }
1342 
1343         /**
1344          * Implements timed condition wait.
1345          * <ol>
1346          * <li>If current thread is interrupted, throw InterruptedException.
1347          * <li>Save lock state returned by {@link #getState}.
1348          * <li>Invoke {@link #release} with saved state as argument,
1349          *     throwing IllegalMonitorStateException if it fails.
1350          * <li>Block until signalled, interrupted, or timed out.
1351          * <li>Reacquire by invoking specialized version of
1352          *     {@link #acquire} with saved state as argument.
1353          * <li>If interrupted while blocked in step 4, throw InterruptedException.
1354          * <li>If timed out while blocked in step 4, return false, else true.
1355          * </ol>
1356          */
await(long time, TimeUnit unit)1357         public final boolean await(long time, TimeUnit unit)
1358                 throws InterruptedException {
1359             long nanosTimeout = unit.toNanos(time);
1360             if (Thread.interrupted())
1361                 throw new InterruptedException();
1362             ConditionNode node = new ConditionNode();
1363             long savedState = enableWait(node);
1364             long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout;
1365             long deadline = System.nanoTime() + nanos;
1366             boolean cancelled = false, interrupted = false;
1367             while (!canReacquire(node)) {
1368                 if ((interrupted |= Thread.interrupted()) ||
1369                     (nanos = deadline - System.nanoTime()) <= 0L) {
1370                     if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
1371                         break;
1372                 } else
1373                     LockSupport.parkNanos(this, nanos);
1374             }
1375             node.clearStatus();
1376             acquire(node, savedState, false, false, false, 0L);
1377             if (cancelled) {
1378                 unlinkCancelledWaiters(node);
1379                 if (interrupted)
1380                     throw new InterruptedException();
1381             } else if (interrupted)
1382                 Thread.currentThread().interrupt();
1383             return !cancelled;
1384         }
1385 
1386         //  support for instrumentation
1387 
1388         /**
1389          * Returns true if this condition was created by the given
1390          * synchronization object.
1391          *
1392          * @return {@code true} if owned
1393          */
isOwnedBy(AbstractQueuedLongSynchronizer sync)1394         final boolean isOwnedBy(AbstractQueuedLongSynchronizer sync) {
1395             return sync == AbstractQueuedLongSynchronizer.this;
1396         }
1397 
1398         /**
1399          * Queries whether any threads are waiting on this condition.
1400          * Implements {@link AbstractQueuedLongSynchronizer#hasWaiters(ConditionObject)}.
1401          *
1402          * @return {@code true} if there are any waiting threads
1403          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1404          *         returns {@code false}
1405          */
hasWaiters()1406         protected final boolean hasWaiters() {
1407             if (!isHeldExclusively())
1408                 throw new IllegalMonitorStateException();
1409             for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) {
1410                 if ((w.status & COND) != 0)
1411                     return true;
1412             }
1413             return false;
1414         }
1415 
1416         /**
1417          * Returns an estimate of the number of threads waiting on
1418          * this condition.
1419          * Implements {@link AbstractQueuedLongSynchronizer#getWaitQueueLength(ConditionObject)}.
1420          *
1421          * @return the estimated number of waiting threads
1422          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1423          *         returns {@code false}
1424          */
getWaitQueueLength()1425         protected final int getWaitQueueLength() {
1426             if (!isHeldExclusively())
1427                 throw new IllegalMonitorStateException();
1428             int n = 0;
1429             for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) {
1430                 if ((w.status & COND) != 0)
1431                     ++n;
1432             }
1433             return n;
1434         }
1435 
1436         /**
1437          * Returns a collection containing those threads that may be
1438          * waiting on this Condition.
1439          * Implements {@link AbstractQueuedLongSynchronizer#getWaitingThreads(ConditionObject)}.
1440          *
1441          * @return the collection of threads
1442          * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
1443          *         returns {@code false}
1444          */
getWaitingThreads()1445         protected final Collection<Thread> getWaitingThreads() {
1446             if (!isHeldExclusively())
1447                 throw new IllegalMonitorStateException();
1448             ArrayList<Thread> list = new ArrayList<>();
1449             for (ConditionNode w = firstWaiter; w != null; w = w.nextWaiter) {
1450                 if ((w.status & COND) != 0) {
1451                     Thread t = w.waiter;
1452                     if (t != null)
1453                         list.add(t);
1454                 }
1455             }
1456             return list;
1457         }
1458     }
1459 
1460     // Unsafe
1461     private static final Unsafe U = Unsafe.getUnsafe();
1462     private static final long STATE
1463         = U.objectFieldOffset(AbstractQueuedLongSynchronizer.class, "state");
1464     private static final long HEAD
1465         = U.objectFieldOffset(AbstractQueuedLongSynchronizer.class, "head");
1466     private static final long TAIL
1467         = U.objectFieldOffset(AbstractQueuedLongSynchronizer.class, "tail");
1468 
1469     static {
1470         Class<?> ensureLoaded = LockSupport.class;
1471     }
1472 }
1473