1 /*
2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3  *
4  * This code is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 only, as
6  * published by the Free Software Foundation.  Oracle designates this
7  * particular file as subject to the "Classpath" exception as provided
8  * by Oracle in the LICENSE file that accompanied this code.
9  *
10  * This code is distributed in the hope that it will be useful, but WITHOUT
11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * version 2 for more details (a copy is included in the LICENSE file that
14  * accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License version
17  * 2 along with this work; if not, write to the Free Software Foundation,
18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19  *
20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21  * or visit www.oracle.com if you need additional information or have any
22  * questions.
23  */
24 
25 /*
26  * This file is available under and governed by the GNU General Public
27  * License version 2 only, as published by the Free Software Foundation.
28  * However, the following notice accompanied the original version of this
29  * file:
30  *
31  * Written by Doug Lea with assistance from members of JCP JSR-166
32  * Expert Group and released to the public domain, as explained at
33  * http://creativecommons.org/publicdomain/zero/1.0/
34  */
35 
36 package java.util.concurrent;
37 
38 import java.lang.ref.WeakReference;
39 import java.util.AbstractQueue;
40 import java.util.Arrays;
41 import java.util.Collection;
42 import java.util.Iterator;
43 import java.util.NoSuchElementException;
44 import java.util.Objects;
45 import java.util.Spliterator;
46 import java.util.Spliterators;
47 import java.util.concurrent.locks.Condition;
48 import java.util.concurrent.locks.ReentrantLock;
49 import java.util.function.Consumer;
50 import java.util.function.Predicate;
51 
52 /**
53  * A bounded {@linkplain BlockingQueue blocking queue} backed by an
54  * array.  This queue orders elements FIFO (first-in-first-out).  The
55  * <em>head</em> of the queue is that element that has been on the
56  * queue the longest time.  The <em>tail</em> of the queue is that
57  * element that has been on the queue the shortest time. New elements
58  * are inserted at the tail of the queue, and the queue retrieval
59  * operations obtain elements at the head of the queue.
60  *
61  * <p>This is a classic &quot;bounded buffer&quot;, in which a
62  * fixed-sized array holds elements inserted by producers and
63  * extracted by consumers.  Once created, the capacity cannot be
64  * changed.  Attempts to {@code put} an element into a full queue
65  * will result in the operation blocking; attempts to {@code take} an
66  * element from an empty queue will similarly block.
67  *
68  * <p>This class supports an optional fairness policy for ordering
69  * waiting producer and consumer threads.  By default, this ordering
70  * is not guaranteed. However, a queue constructed with fairness set
71  * to {@code true} grants threads access in FIFO order. Fairness
72  * generally decreases throughput but reduces variability and avoids
73  * starvation.
74  *
75  * <p>This class and its iterator implement all of the <em>optional</em>
76  * methods of the {@link Collection} and {@link Iterator} interfaces.
77  *
78  * <p>This class is a member of the
79  * <a href="{@docRoot}/java.base/java/util/package-summary.html#CollectionsFramework">
80  * Java Collections Framework</a>.
81  *
82  * @since 1.5
83  * @author Doug Lea
84  * @param <E> the type of elements held in this queue
85  */
86 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
87         implements BlockingQueue<E>, java.io.Serializable {
88 
89     /*
90      * Much of the implementation mechanics, especially the unusual
91      * nested loops, are shared and co-maintained with ArrayDeque.
92      */
93 
94     /**
95      * Serialization ID. This class relies on default serialization
96      * even for the items array, which is default-serialized, even if
97      * it is empty. Otherwise it could not be declared final, which is
98      * necessary here.
99      */
100     private static final long serialVersionUID = -817911632652898426L;
101 
102     /** The queued items */
103     final Object[] items;
104 
105     /** items index for next take, poll, peek or remove */
106     int takeIndex;
107 
108     /** items index for next put, offer, or add */
109     int putIndex;
110 
111     /** Number of elements in the queue */
112     int count;
113 
114     /*
115      * Concurrency control uses the classic two-condition algorithm
116      * found in any textbook.
117      */
118 
119     /** Main lock guarding all access */
120     final ReentrantLock lock;
121 
122     /** Condition for waiting takes */
123     private final Condition notEmpty;
124 
125     /** Condition for waiting puts */
126     private final Condition notFull;
127 
128     /**
129      * Shared state for currently active iterators, or null if there
130      * are known not to be any.  Allows queue operations to update
131      * iterator state.
132      */
133     transient Itrs itrs;
134 
135     // Internal helper methods
136 
137     /**
138      * Increments i, mod modulus.
139      * Precondition and postcondition: 0 <= i < modulus.
140      */
inc(int i, int modulus)141     static final int inc(int i, int modulus) {
142         if (++i >= modulus) i = 0;
143         return i;
144     }
145 
146     /**
147      * Decrements i, mod modulus.
148      * Precondition and postcondition: 0 <= i < modulus.
149      */
dec(int i, int modulus)150     static final int dec(int i, int modulus) {
151         if (--i < 0) i = modulus - 1;
152         return i;
153     }
154 
155     /**
156      * Returns item at index i.
157      */
158     @SuppressWarnings("unchecked")
itemAt(int i)159     final E itemAt(int i) {
160         return (E) items[i];
161     }
162 
163     /**
164      * Returns element at array index i.
165      * This is a slight abuse of generics, accepted by javac.
166      */
167     @SuppressWarnings("unchecked")
itemAt(Object[] items, int i)168     static <E> E itemAt(Object[] items, int i) {
169         return (E) items[i];
170     }
171 
172     /**
173      * Inserts element at current put position, advances, and signals.
174      * Call only when holding lock.
175      */
enqueue(E e)176     private void enqueue(E e) {
177         // assert lock.isHeldByCurrentThread();
178         // assert lock.getHoldCount() == 1;
179         // assert items[putIndex] == null;
180         final Object[] items = this.items;
181         items[putIndex] = e;
182         if (++putIndex == items.length) putIndex = 0;
183         count++;
184         notEmpty.signal();
185     }
186 
187     /**
188      * Extracts element at current take position, advances, and signals.
189      * Call only when holding lock.
190      */
dequeue()191     private E dequeue() {
192         // assert lock.isHeldByCurrentThread();
193         // assert lock.getHoldCount() == 1;
194         // assert items[takeIndex] != null;
195         final Object[] items = this.items;
196         @SuppressWarnings("unchecked")
197         E e = (E) items[takeIndex];
198         items[takeIndex] = null;
199         if (++takeIndex == items.length) takeIndex = 0;
200         count--;
201         if (itrs != null)
202             itrs.elementDequeued();
203         notFull.signal();
204         return e;
205     }
206 
207     /**
208      * Deletes item at array index removeIndex.
209      * Utility for remove(Object) and iterator.remove.
210      * Call only when holding lock.
211      */
removeAt(final int removeIndex)212     void removeAt(final int removeIndex) {
213         // assert lock.isHeldByCurrentThread();
214         // assert lock.getHoldCount() == 1;
215         // assert items[removeIndex] != null;
216         // assert removeIndex >= 0 && removeIndex < items.length;
217         final Object[] items = this.items;
218         if (removeIndex == takeIndex) {
219             // removing front item; just advance
220             items[takeIndex] = null;
221             if (++takeIndex == items.length) takeIndex = 0;
222             count--;
223             if (itrs != null)
224                 itrs.elementDequeued();
225         } else {
226             // an "interior" remove
227 
228             // slide over all others up through putIndex.
229             for (int i = removeIndex, putIndex = this.putIndex;;) {
230                 int pred = i;
231                 if (++i == items.length) i = 0;
232                 if (i == putIndex) {
233                     items[pred] = null;
234                     this.putIndex = pred;
235                     break;
236                 }
237                 items[pred] = items[i];
238             }
239             count--;
240             if (itrs != null)
241                 itrs.removedAt(removeIndex);
242         }
243         notFull.signal();
244     }
245 
246     /**
247      * Creates an {@code ArrayBlockingQueue} with the given (fixed)
248      * capacity and default access policy.
249      *
250      * @param capacity the capacity of this queue
251      * @throws IllegalArgumentException if {@code capacity < 1}
252      */
ArrayBlockingQueue(int capacity)253     public ArrayBlockingQueue(int capacity) {
254         this(capacity, false);
255     }
256 
257     /**
258      * Creates an {@code ArrayBlockingQueue} with the given (fixed)
259      * capacity and the specified access policy.
260      *
261      * @param capacity the capacity of this queue
262      * @param fair if {@code true} then queue accesses for threads blocked
263      *        on insertion or removal, are processed in FIFO order;
264      *        if {@code false} the access order is unspecified.
265      * @throws IllegalArgumentException if {@code capacity < 1}
266      */
ArrayBlockingQueue(int capacity, boolean fair)267     public ArrayBlockingQueue(int capacity, boolean fair) {
268         if (capacity <= 0)
269             throw new IllegalArgumentException();
270         this.items = new Object[capacity];
271         lock = new ReentrantLock(fair);
272         notEmpty = lock.newCondition();
273         notFull =  lock.newCondition();
274     }
275 
276     /**
277      * Creates an {@code ArrayBlockingQueue} with the given (fixed)
278      * capacity, the specified access policy and initially containing the
279      * elements of the given collection,
280      * added in traversal order of the collection's iterator.
281      *
282      * @param capacity the capacity of this queue
283      * @param fair if {@code true} then queue accesses for threads blocked
284      *        on insertion or removal, are processed in FIFO order;
285      *        if {@code false} the access order is unspecified.
286      * @param c the collection of elements to initially contain
287      * @throws IllegalArgumentException if {@code capacity} is less than
288      *         {@code c.size()}, or less than 1.
289      * @throws NullPointerException if the specified collection or any
290      *         of its elements are null
291      */
ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)292     public ArrayBlockingQueue(int capacity, boolean fair,
293                               Collection<? extends E> c) {
294         this(capacity, fair);
295 
296         final ReentrantLock lock = this.lock;
297         lock.lock(); // Lock only for visibility, not mutual exclusion
298         try {
299             final Object[] items = this.items;
300             int i = 0;
301             try {
302                 for (E e : c)
303                     items[i++] = Objects.requireNonNull(e);
304             } catch (ArrayIndexOutOfBoundsException ex) {
305                 throw new IllegalArgumentException();
306             }
307             count = i;
308             putIndex = (i == capacity) ? 0 : i;
309         } finally {
310             lock.unlock();
311         }
312     }
313 
314     /**
315      * Inserts the specified element at the tail of this queue if it is
316      * possible to do so immediately without exceeding the queue's capacity,
317      * returning {@code true} upon success and throwing an
318      * {@code IllegalStateException} if this queue is full.
319      *
320      * @param e the element to add
321      * @return {@code true} (as specified by {@link Collection#add})
322      * @throws IllegalStateException if this queue is full
323      * @throws NullPointerException if the specified element is null
324      */
add(E e)325     public boolean add(E e) {
326         return super.add(e);
327     }
328 
329     /**
330      * Inserts the specified element at the tail of this queue if it is
331      * possible to do so immediately without exceeding the queue's capacity,
332      * returning {@code true} upon success and {@code false} if this queue
333      * is full.  This method is generally preferable to method {@link #add},
334      * which can fail to insert an element only by throwing an exception.
335      *
336      * @throws NullPointerException if the specified element is null
337      */
offer(E e)338     public boolean offer(E e) {
339         Objects.requireNonNull(e);
340         final ReentrantLock lock = this.lock;
341         lock.lock();
342         try {
343             if (count == items.length)
344                 return false;
345             else {
346                 enqueue(e);
347                 return true;
348             }
349         } finally {
350             lock.unlock();
351         }
352     }
353 
354     /**
355      * Inserts the specified element at the tail of this queue, waiting
356      * for space to become available if the queue is full.
357      *
358      * @throws InterruptedException {@inheritDoc}
359      * @throws NullPointerException {@inheritDoc}
360      */
put(E e)361     public void put(E e) throws InterruptedException {
362         Objects.requireNonNull(e);
363         final ReentrantLock lock = this.lock;
364         lock.lockInterruptibly();
365         try {
366             while (count == items.length)
367                 notFull.await();
368             enqueue(e);
369         } finally {
370             lock.unlock();
371         }
372     }
373 
374     /**
375      * Inserts the specified element at the tail of this queue, waiting
376      * up to the specified wait time for space to become available if
377      * the queue is full.
378      *
379      * @throws InterruptedException {@inheritDoc}
380      * @throws NullPointerException {@inheritDoc}
381      */
offer(E e, long timeout, TimeUnit unit)382     public boolean offer(E e, long timeout, TimeUnit unit)
383         throws InterruptedException {
384 
385         Objects.requireNonNull(e);
386         long nanos = unit.toNanos(timeout);
387         final ReentrantLock lock = this.lock;
388         lock.lockInterruptibly();
389         try {
390             while (count == items.length) {
391                 if (nanos <= 0L)
392                     return false;
393                 nanos = notFull.awaitNanos(nanos);
394             }
395             enqueue(e);
396             return true;
397         } finally {
398             lock.unlock();
399         }
400     }
401 
poll()402     public E poll() {
403         final ReentrantLock lock = this.lock;
404         lock.lock();
405         try {
406             return (count == 0) ? null : dequeue();
407         } finally {
408             lock.unlock();
409         }
410     }
411 
take()412     public E take() throws InterruptedException {
413         final ReentrantLock lock = this.lock;
414         lock.lockInterruptibly();
415         try {
416             while (count == 0)
417                 notEmpty.await();
418             return dequeue();
419         } finally {
420             lock.unlock();
421         }
422     }
423 
poll(long timeout, TimeUnit unit)424     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
425         long nanos = unit.toNanos(timeout);
426         final ReentrantLock lock = this.lock;
427         lock.lockInterruptibly();
428         try {
429             while (count == 0) {
430                 if (nanos <= 0L)
431                     return null;
432                 nanos = notEmpty.awaitNanos(nanos);
433             }
434             return dequeue();
435         } finally {
436             lock.unlock();
437         }
438     }
439 
peek()440     public E peek() {
441         final ReentrantLock lock = this.lock;
442         lock.lock();
443         try {
444             return itemAt(takeIndex); // null when queue is empty
445         } finally {
446             lock.unlock();
447         }
448     }
449 
450     // this doc comment is overridden to remove the reference to collections
451     // greater in size than Integer.MAX_VALUE
452     /**
453      * Returns the number of elements in this queue.
454      *
455      * @return the number of elements in this queue
456      */
size()457     public int size() {
458         final ReentrantLock lock = this.lock;
459         lock.lock();
460         try {
461             return count;
462         } finally {
463             lock.unlock();
464         }
465     }
466 
467     // this doc comment is a modified copy of the inherited doc comment,
468     // without the reference to unlimited queues.
469     /**
470      * Returns the number of additional elements that this queue can ideally
471      * (in the absence of memory or resource constraints) accept without
472      * blocking. This is always equal to the initial capacity of this queue
473      * less the current {@code size} of this queue.
474      *
475      * <p>Note that you <em>cannot</em> always tell if an attempt to insert
476      * an element will succeed by inspecting {@code remainingCapacity}
477      * because it may be the case that another thread is about to
478      * insert or remove an element.
479      */
remainingCapacity()480     public int remainingCapacity() {
481         final ReentrantLock lock = this.lock;
482         lock.lock();
483         try {
484             return items.length - count;
485         } finally {
486             lock.unlock();
487         }
488     }
489 
490     /**
491      * Removes a single instance of the specified element from this queue,
492      * if it is present.  More formally, removes an element {@code e} such
493      * that {@code o.equals(e)}, if this queue contains one or more such
494      * elements.
495      * Returns {@code true} if this queue contained the specified element
496      * (or equivalently, if this queue changed as a result of the call).
497      *
498      * <p>Removal of interior elements in circular array based queues
499      * is an intrinsically slow and disruptive operation, so should
500      * be undertaken only in exceptional circumstances, ideally
501      * only when the queue is known not to be accessible by other
502      * threads.
503      *
504      * @param o element to be removed from this queue, if present
505      * @return {@code true} if this queue changed as a result of the call
506      */
remove(Object o)507     public boolean remove(Object o) {
508         if (o == null) return false;
509         final ReentrantLock lock = this.lock;
510         lock.lock();
511         try {
512             if (count > 0) {
513                 final Object[] items = this.items;
514                 for (int i = takeIndex, end = putIndex,
515                          to = (i < end) ? end : items.length;
516                      ; i = 0, to = end) {
517                     for (; i < to; i++)
518                         if (o.equals(items[i])) {
519                             removeAt(i);
520                             return true;
521                         }
522                     if (to == end) break;
523                 }
524             }
525             return false;
526         } finally {
527             lock.unlock();
528         }
529     }
530 
531     /**
532      * Returns {@code true} if this queue contains the specified element.
533      * More formally, returns {@code true} if and only if this queue contains
534      * at least one element {@code e} such that {@code o.equals(e)}.
535      *
536      * @param o object to be checked for containment in this queue
537      * @return {@code true} if this queue contains the specified element
538      */
contains(Object o)539     public boolean contains(Object o) {
540         if (o == null) return false;
541         final ReentrantLock lock = this.lock;
542         lock.lock();
543         try {
544             if (count > 0) {
545                 final Object[] items = this.items;
546                 for (int i = takeIndex, end = putIndex,
547                          to = (i < end) ? end : items.length;
548                      ; i = 0, to = end) {
549                     for (; i < to; i++)
550                         if (o.equals(items[i]))
551                             return true;
552                     if (to == end) break;
553                 }
554             }
555             return false;
556         } finally {
557             lock.unlock();
558         }
559     }
560 
561     /**
562      * Returns an array containing all of the elements in this queue, in
563      * proper sequence.
564      *
565      * <p>The returned array will be "safe" in that no references to it are
566      * maintained by this queue.  (In other words, this method must allocate
567      * a new array).  The caller is thus free to modify the returned array.
568      *
569      * <p>This method acts as bridge between array-based and collection-based
570      * APIs.
571      *
572      * @return an array containing all of the elements in this queue
573      */
toArray()574     public Object[] toArray() {
575         final ReentrantLock lock = this.lock;
576         lock.lock();
577         try {
578             final Object[] items = this.items;
579             final int end = takeIndex + count;
580             final Object[] a = Arrays.copyOfRange(items, takeIndex, end);
581             if (end != putIndex)
582                 System.arraycopy(items, 0, a, items.length - takeIndex, putIndex);
583             return a;
584         } finally {
585             lock.unlock();
586         }
587     }
588 
589     /**
590      * Returns an array containing all of the elements in this queue, in
591      * proper sequence; the runtime type of the returned array is that of
592      * the specified array.  If the queue fits in the specified array, it
593      * is returned therein.  Otherwise, a new array is allocated with the
594      * runtime type of the specified array and the size of this queue.
595      *
596      * <p>If this queue fits in the specified array with room to spare
597      * (i.e., the array has more elements than this queue), the element in
598      * the array immediately following the end of the queue is set to
599      * {@code null}.
600      *
601      * <p>Like the {@link #toArray()} method, this method acts as bridge between
602      * array-based and collection-based APIs.  Further, this method allows
603      * precise control over the runtime type of the output array, and may,
604      * under certain circumstances, be used to save allocation costs.
605      *
606      * <p>Suppose {@code x} is a queue known to contain only strings.
607      * The following code can be used to dump the queue into a newly
608      * allocated array of {@code String}:
609      *
610      * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
611      *
612      * Note that {@code toArray(new Object[0])} is identical in function to
613      * {@code toArray()}.
614      *
615      * @param a the array into which the elements of the queue are to
616      *          be stored, if it is big enough; otherwise, a new array of the
617      *          same runtime type is allocated for this purpose
618      * @return an array containing all of the elements in this queue
619      * @throws ArrayStoreException if the runtime type of the specified array
620      *         is not a supertype of the runtime type of every element in
621      *         this queue
622      * @throws NullPointerException if the specified array is null
623      */
624     @SuppressWarnings("unchecked")
toArray(T[] a)625     public <T> T[] toArray(T[] a) {
626         final ReentrantLock lock = this.lock;
627         lock.lock();
628         try {
629             final Object[] items = this.items;
630             final int count = this.count;
631             final int firstLeg = Math.min(items.length - takeIndex, count);
632             if (a.length < count) {
633                 a = (T[]) Arrays.copyOfRange(items, takeIndex, takeIndex + count,
634                                              a.getClass());
635             } else {
636                 System.arraycopy(items, takeIndex, a, 0, firstLeg);
637                 if (a.length > count)
638                     a[count] = null;
639             }
640             if (firstLeg < count)
641                 System.arraycopy(items, 0, a, firstLeg, putIndex);
642             return a;
643         } finally {
644             lock.unlock();
645         }
646     }
647 
toString()648     public String toString() {
649         return Helpers.collectionToString(this);
650     }
651 
652     /**
653      * Atomically removes all of the elements from this queue.
654      * The queue will be empty after this call returns.
655      */
clear()656     public void clear() {
657         final ReentrantLock lock = this.lock;
658         lock.lock();
659         try {
660             int k;
661             if ((k = count) > 0) {
662                 circularClear(items, takeIndex, putIndex);
663                 takeIndex = putIndex;
664                 count = 0;
665                 if (itrs != null)
666                     itrs.queueIsEmpty();
667                 for (; k > 0 && lock.hasWaiters(notFull); k--)
668                     notFull.signal();
669             }
670         } finally {
671             lock.unlock();
672         }
673     }
674 
675     /**
676      * Nulls out slots starting at array index i, upto index end.
677      * Condition i == end means "full" - the entire array is cleared.
678      */
circularClear(Object[] items, int i, int end)679     private static void circularClear(Object[] items, int i, int end) {
680         // assert 0 <= i && i < items.length;
681         // assert 0 <= end && end < items.length;
682         for (int to = (i < end) ? end : items.length;
683              ; i = 0, to = end) {
684             for (; i < to; i++) items[i] = null;
685             if (to == end) break;
686         }
687     }
688 
689     /**
690      * @throws UnsupportedOperationException {@inheritDoc}
691      * @throws ClassCastException            {@inheritDoc}
692      * @throws NullPointerException          {@inheritDoc}
693      * @throws IllegalArgumentException      {@inheritDoc}
694      */
drainTo(Collection<? super E> c)695     public int drainTo(Collection<? super E> c) {
696         return drainTo(c, Integer.MAX_VALUE);
697     }
698 
699     /**
700      * @throws UnsupportedOperationException {@inheritDoc}
701      * @throws ClassCastException            {@inheritDoc}
702      * @throws NullPointerException          {@inheritDoc}
703      * @throws IllegalArgumentException      {@inheritDoc}
704      */
drainTo(Collection<? super E> c, int maxElements)705     public int drainTo(Collection<? super E> c, int maxElements) {
706         Objects.requireNonNull(c);
707         if (c == this)
708             throw new IllegalArgumentException();
709         if (maxElements <= 0)
710             return 0;
711         final Object[] items = this.items;
712         final ReentrantLock lock = this.lock;
713         lock.lock();
714         try {
715             int n = Math.min(maxElements, count);
716             int take = takeIndex;
717             int i = 0;
718             try {
719                 while (i < n) {
720                     @SuppressWarnings("unchecked")
721                     E e = (E) items[take];
722                     c.add(e);
723                     items[take] = null;
724                     if (++take == items.length) take = 0;
725                     i++;
726                 }
727                 return n;
728             } finally {
729                 // Restore invariants even if c.add() threw
730                 if (i > 0) {
731                     count -= i;
732                     takeIndex = take;
733                     if (itrs != null) {
734                         if (count == 0)
735                             itrs.queueIsEmpty();
736                         else if (i > take)
737                             itrs.takeIndexWrapped();
738                     }
739                     for (; i > 0 && lock.hasWaiters(notFull); i--)
740                         notFull.signal();
741                 }
742             }
743         } finally {
744             lock.unlock();
745         }
746     }
747 
748     /**
749      * Returns an iterator over the elements in this queue in proper sequence.
750      * The elements will be returned in order from first (head) to last (tail).
751      *
752      * <p>The returned iterator is
753      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
754      *
755      * @return an iterator over the elements in this queue in proper sequence
756      */
iterator()757     public Iterator<E> iterator() {
758         return new Itr();
759     }
760 
761     /**
762      * Shared data between iterators and their queue, allowing queue
763      * modifications to update iterators when elements are removed.
764      *
765      * This adds a lot of complexity for the sake of correctly
766      * handling some uncommon operations, but the combination of
767      * circular-arrays and supporting interior removes (i.e., those
768      * not at head) would cause iterators to sometimes lose their
769      * places and/or (re)report elements they shouldn't.  To avoid
770      * this, when a queue has one or more iterators, it keeps iterator
771      * state consistent by:
772      *
773      * (1) keeping track of the number of "cycles", that is, the
774      *     number of times takeIndex has wrapped around to 0.
775      * (2) notifying all iterators via the callback removedAt whenever
776      *     an interior element is removed (and thus other elements may
777      *     be shifted).
778      *
779      * These suffice to eliminate iterator inconsistencies, but
780      * unfortunately add the secondary responsibility of maintaining
781      * the list of iterators.  We track all active iterators in a
782      * simple linked list (accessed only when the queue's lock is
783      * held) of weak references to Itr.  The list is cleaned up using
784      * 3 different mechanisms:
785      *
786      * (1) Whenever a new iterator is created, do some O(1) checking for
787      *     stale list elements.
788      *
789      * (2) Whenever takeIndex wraps around to 0, check for iterators
790      *     that have been unused for more than one wrap-around cycle.
791      *
792      * (3) Whenever the queue becomes empty, all iterators are notified
793      *     and this entire data structure is discarded.
794      *
795      * So in addition to the removedAt callback that is necessary for
796      * correctness, iterators have the shutdown and takeIndexWrapped
797      * callbacks that help remove stale iterators from the list.
798      *
799      * Whenever a list element is examined, it is expunged if either
800      * the GC has determined that the iterator is discarded, or if the
801      * iterator reports that it is "detached" (does not need any
802      * further state updates).  Overhead is maximal when takeIndex
803      * never advances, iterators are discarded before they are
804      * exhausted, and all removals are interior removes, in which case
805      * all stale iterators are discovered by the GC.  But even in this
806      * case we don't increase the amortized complexity.
807      *
808      * Care must be taken to keep list sweeping methods from
809      * reentrantly invoking another such method, causing subtle
810      * corruption bugs.
811      */
812     class Itrs {
813 
814         /**
815          * Node in a linked list of weak iterator references.
816          */
817         private class Node extends WeakReference<Itr> {
818             Node next;
819 
Node(Itr iterator, Node next)820             Node(Itr iterator, Node next) {
821                 super(iterator);
822                 this.next = next;
823             }
824         }
825 
826         /** Incremented whenever takeIndex wraps around to 0 */
827         int cycles;
828 
829         /** Linked list of weak iterator references */
830         private Node head;
831 
832         /** Used to expunge stale iterators */
833         private Node sweeper;
834 
835         private static final int SHORT_SWEEP_PROBES = 4;
836         private static final int LONG_SWEEP_PROBES = 16;
837 
Itrs(Itr initial)838         Itrs(Itr initial) {
839             register(initial);
840         }
841 
842         /**
843          * Sweeps itrs, looking for and expunging stale iterators.
844          * If at least one was found, tries harder to find more.
845          * Called only from iterating thread.
846          *
847          * @param tryHarder whether to start in try-harder mode, because
848          * there is known to be at least one iterator to collect
849          */
doSomeSweeping(boolean tryHarder)850         void doSomeSweeping(boolean tryHarder) {
851             // assert lock.isHeldByCurrentThread();
852             // assert head != null;
853             int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
854             Node o, p;
855             final Node sweeper = this.sweeper;
856             boolean passedGo;   // to limit search to one full sweep
857 
858             if (sweeper == null) {
859                 o = null;
860                 p = head;
861                 passedGo = true;
862             } else {
863                 o = sweeper;
864                 p = o.next;
865                 passedGo = false;
866             }
867 
868             for (; probes > 0; probes--) {
869                 if (p == null) {
870                     if (passedGo)
871                         break;
872                     o = null;
873                     p = head;
874                     passedGo = true;
875                 }
876                 final Itr it = p.get();
877                 final Node next = p.next;
878                 if (it == null || it.isDetached()) {
879                     // found a discarded/exhausted iterator
880                     probes = LONG_SWEEP_PROBES; // "try harder"
881                     // unlink p
882                     p.clear();
883                     p.next = null;
884                     if (o == null) {
885                         head = next;
886                         if (next == null) {
887                             // We've run out of iterators to track; retire
888                             itrs = null;
889                             return;
890                         }
891                     }
892                     else
893                         o.next = next;
894                 } else {
895                     o = p;
896                 }
897                 p = next;
898             }
899 
900             this.sweeper = (p == null) ? null : o;
901         }
902 
903         /**
904          * Adds a new iterator to the linked list of tracked iterators.
905          */
register(Itr itr)906         void register(Itr itr) {
907             // assert lock.isHeldByCurrentThread();
908             head = new Node(itr, head);
909         }
910 
911         /**
912          * Called whenever takeIndex wraps around to 0.
913          *
914          * Notifies all iterators, and expunges any that are now stale.
915          */
takeIndexWrapped()916         void takeIndexWrapped() {
917             // assert lock.isHeldByCurrentThread();
918             cycles++;
919             for (Node o = null, p = head; p != null;) {
920                 final Itr it = p.get();
921                 final Node next = p.next;
922                 if (it == null || it.takeIndexWrapped()) {
923                     // unlink p
924                     // assert it == null || it.isDetached();
925                     p.clear();
926                     p.next = null;
927                     if (o == null)
928                         head = next;
929                     else
930                         o.next = next;
931                 } else {
932                     o = p;
933                 }
934                 p = next;
935             }
936             if (head == null)   // no more iterators to track
937                 itrs = null;
938         }
939 
940         /**
941          * Called whenever an interior remove (not at takeIndex) occurred.
942          *
943          * Notifies all iterators, and expunges any that are now stale.
944          */
removedAt(int removedIndex)945         void removedAt(int removedIndex) {
946             for (Node o = null, p = head; p != null;) {
947                 final Itr it = p.get();
948                 final Node next = p.next;
949                 if (it == null || it.removedAt(removedIndex)) {
950                     // unlink p
951                     // assert it == null || it.isDetached();
952                     p.clear();
953                     p.next = null;
954                     if (o == null)
955                         head = next;
956                     else
957                         o.next = next;
958                 } else {
959                     o = p;
960                 }
961                 p = next;
962             }
963             if (head == null)   // no more iterators to track
964                 itrs = null;
965         }
966 
967         /**
968          * Called whenever the queue becomes empty.
969          *
970          * Notifies all active iterators that the queue is empty,
971          * clears all weak refs, and unlinks the itrs datastructure.
972          */
queueIsEmpty()973         void queueIsEmpty() {
974             // assert lock.isHeldByCurrentThread();
975             for (Node p = head; p != null; p = p.next) {
976                 Itr it = p.get();
977                 if (it != null) {
978                     p.clear();
979                     it.shutdown();
980                 }
981             }
982             head = null;
983             itrs = null;
984         }
985 
986         /**
987          * Called whenever an element has been dequeued (at takeIndex).
988          */
elementDequeued()989         void elementDequeued() {
990             // assert lock.isHeldByCurrentThread();
991             if (count == 0)
992                 queueIsEmpty();
993             else if (takeIndex == 0)
994                 takeIndexWrapped();
995         }
996     }
997 
998     /**
999      * Iterator for ArrayBlockingQueue.
1000      *
1001      * To maintain weak consistency with respect to puts and takes, we
1002      * read ahead one slot, so as to not report hasNext true but then
1003      * not have an element to return.
1004      *
1005      * We switch into "detached" mode (allowing prompt unlinking from
1006      * itrs without help from the GC) when all indices are negative, or
1007      * when hasNext returns false for the first time.  This allows the
1008      * iterator to track concurrent updates completely accurately,
1009      * except for the corner case of the user calling Iterator.remove()
1010      * after hasNext() returned false.  Even in this case, we ensure
1011      * that we don't remove the wrong element by keeping track of the
1012      * expected element to remove, in lastItem.  Yes, we may fail to
1013      * remove lastItem from the queue if it moved due to an interleaved
1014      * interior remove while in detached mode.
1015      *
1016      * Method forEachRemaining, added in Java 8, is treated similarly
1017      * to hasNext returning false, in that we switch to detached mode,
1018      * but we regard it as an even stronger request to "close" this
1019      * iteration, and don't bother supporting subsequent remove().
1020      */
1021     private class Itr implements Iterator<E> {
1022         /** Index to look for new nextItem; NONE at end */
1023         private int cursor;
1024 
1025         /** Element to be returned by next call to next(); null if none */
1026         private E nextItem;
1027 
1028         /** Index of nextItem; NONE if none, REMOVED if removed elsewhere */
1029         private int nextIndex;
1030 
1031         /** Last element returned; null if none or not detached. */
1032         private E lastItem;
1033 
1034         /** Index of lastItem, NONE if none, REMOVED if removed elsewhere */
1035         private int lastRet;
1036 
1037         /** Previous value of takeIndex, or DETACHED when detached */
1038         private int prevTakeIndex;
1039 
1040         /** Previous value of iters.cycles */
1041         private int prevCycles;
1042 
1043         /** Special index value indicating "not available" or "undefined" */
1044         private static final int NONE = -1;
1045 
1046         /**
1047          * Special index value indicating "removed elsewhere", that is,
1048          * removed by some operation other than a call to this.remove().
1049          */
1050         private static final int REMOVED = -2;
1051 
1052         /** Special value for prevTakeIndex indicating "detached mode" */
1053         private static final int DETACHED = -3;
1054 
Itr()1055         Itr() {
1056             lastRet = NONE;
1057             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1058             lock.lock();
1059             try {
1060                 if (count == 0) {
1061                     // assert itrs == null;
1062                     cursor = NONE;
1063                     nextIndex = NONE;
1064                     prevTakeIndex = DETACHED;
1065                 } else {
1066                     final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1067                     prevTakeIndex = takeIndex;
1068                     nextItem = itemAt(nextIndex = takeIndex);
1069                     cursor = incCursor(takeIndex);
1070                     if (itrs == null) {
1071                         itrs = new Itrs(this);
1072                     } else {
1073                         itrs.register(this); // in this order
1074                         itrs.doSomeSweeping(false);
1075                     }
1076                     prevCycles = itrs.cycles;
1077                     // assert takeIndex >= 0;
1078                     // assert prevTakeIndex == takeIndex;
1079                     // assert nextIndex >= 0;
1080                     // assert nextItem != null;
1081                 }
1082             } finally {
1083                 lock.unlock();
1084             }
1085         }
1086 
isDetached()1087         boolean isDetached() {
1088             // assert lock.isHeldByCurrentThread();
1089             return prevTakeIndex < 0;
1090         }
1091 
incCursor(int index)1092         private int incCursor(int index) {
1093             // assert lock.isHeldByCurrentThread();
1094             if (++index == items.length) index = 0;
1095             if (index == putIndex) index = NONE;
1096             return index;
1097         }
1098 
1099         /**
1100          * Returns true if index is invalidated by the given number of
1101          * dequeues, starting from prevTakeIndex.
1102          */
invalidated(int index, int prevTakeIndex, long dequeues, int length)1103         private boolean invalidated(int index, int prevTakeIndex,
1104                                     long dequeues, int length) {
1105             if (index < 0)
1106                 return false;
1107             int distance = index - prevTakeIndex;
1108             if (distance < 0)
1109                 distance += length;
1110             return dequeues > distance;
1111         }
1112 
1113         /**
1114          * Adjusts indices to incorporate all dequeues since the last
1115          * operation on this iterator.  Call only from iterating thread.
1116          */
incorporateDequeues()1117         private void incorporateDequeues() {
1118             // assert lock.isHeldByCurrentThread();
1119             // assert itrs != null;
1120             // assert !isDetached();
1121             // assert count > 0;
1122 
1123             final int cycles = itrs.cycles;
1124             final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1125             final int prevCycles = this.prevCycles;
1126             final int prevTakeIndex = this.prevTakeIndex;
1127 
1128             if (cycles != prevCycles || takeIndex != prevTakeIndex) {
1129                 final int len = items.length;
1130                 // how far takeIndex has advanced since the previous
1131                 // operation of this iterator
1132                 long dequeues = (long) (cycles - prevCycles) * len
1133                     + (takeIndex - prevTakeIndex);
1134 
1135                 // Check indices for invalidation
1136                 if (invalidated(lastRet, prevTakeIndex, dequeues, len))
1137                     lastRet = REMOVED;
1138                 if (invalidated(nextIndex, prevTakeIndex, dequeues, len))
1139                     nextIndex = REMOVED;
1140                 if (invalidated(cursor, prevTakeIndex, dequeues, len))
1141                     cursor = takeIndex;
1142 
1143                 if (cursor < 0 && nextIndex < 0 && lastRet < 0)
1144                     detach();
1145                 else {
1146                     this.prevCycles = cycles;
1147                     this.prevTakeIndex = takeIndex;
1148                 }
1149             }
1150         }
1151 
1152         /**
1153          * Called when itrs should stop tracking this iterator, either
1154          * because there are no more indices to update (cursor < 0 &&
1155          * nextIndex < 0 && lastRet < 0) or as a special exception, when
1156          * lastRet >= 0, because hasNext() is about to return false for the
1157          * first time.  Call only from iterating thread.
1158          */
detach()1159         private void detach() {
1160             // Switch to detached mode
1161             // assert lock.isHeldByCurrentThread();
1162             // assert cursor == NONE;
1163             // assert nextIndex < 0;
1164             // assert lastRet < 0 || nextItem == null;
1165             // assert lastRet < 0 ^ lastItem != null;
1166             if (prevTakeIndex >= 0) {
1167                 // assert itrs != null;
1168                 prevTakeIndex = DETACHED;
1169                 // try to unlink from itrs (but not too hard)
1170                 itrs.doSomeSweeping(true);
1171             }
1172         }
1173 
1174         /**
1175          * For performance reasons, we would like not to acquire a lock in
1176          * hasNext in the common case.  To allow for this, we only access
1177          * fields (i.e. nextItem) that are not modified by update operations
1178          * triggered by queue modifications.
1179          */
hasNext()1180         public boolean hasNext() {
1181             if (nextItem != null)
1182                 return true;
1183             noNext();
1184             return false;
1185         }
1186 
noNext()1187         private void noNext() {
1188             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1189             lock.lock();
1190             try {
1191                 // assert cursor == NONE;
1192                 // assert nextIndex == NONE;
1193                 if (!isDetached()) {
1194                     // assert lastRet >= 0;
1195                     incorporateDequeues(); // might update lastRet
1196                     if (lastRet >= 0) {
1197                         lastItem = itemAt(lastRet);
1198                         // assert lastItem != null;
1199                         detach();
1200                     }
1201                 }
1202                 // assert isDetached();
1203                 // assert lastRet < 0 ^ lastItem != null;
1204             } finally {
1205                 lock.unlock();
1206             }
1207         }
1208 
next()1209         public E next() {
1210             final E e = nextItem;
1211             if (e == null)
1212                 throw new NoSuchElementException();
1213             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1214             lock.lock();
1215             try {
1216                 if (!isDetached())
1217                     incorporateDequeues();
1218                 // assert nextIndex != NONE;
1219                 // assert lastItem == null;
1220                 lastRet = nextIndex;
1221                 final int cursor = this.cursor;
1222                 if (cursor >= 0) {
1223                     nextItem = itemAt(nextIndex = cursor);
1224                     // assert nextItem != null;
1225                     this.cursor = incCursor(cursor);
1226                 } else {
1227                     nextIndex = NONE;
1228                     nextItem = null;
1229                     if (lastRet == REMOVED) detach();
1230                 }
1231             } finally {
1232                 lock.unlock();
1233             }
1234             return e;
1235         }
1236 
forEachRemaining(Consumer<? super E> action)1237         public void forEachRemaining(Consumer<? super E> action) {
1238             Objects.requireNonNull(action);
1239             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1240             lock.lock();
1241             try {
1242                 final E e = nextItem;
1243                 if (e == null) return;
1244                 if (!isDetached())
1245                     incorporateDequeues();
1246                 action.accept(e);
1247                 if (isDetached() || cursor < 0) return;
1248                 final Object[] items = ArrayBlockingQueue.this.items;
1249                 for (int i = cursor, end = putIndex,
1250                          to = (i < end) ? end : items.length;
1251                      ; i = 0, to = end) {
1252                     for (; i < to; i++)
1253                         action.accept(itemAt(items, i));
1254                     if (to == end) break;
1255                 }
1256             } finally {
1257                 // Calling forEachRemaining is a strong hint that this
1258                 // iteration is surely over; supporting remove() after
1259                 // forEachRemaining() is more trouble than it's worth
1260                 cursor = nextIndex = lastRet = NONE;
1261                 nextItem = lastItem = null;
1262                 detach();
1263                 lock.unlock();
1264             }
1265         }
1266 
remove()1267         public void remove() {
1268             final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1269             lock.lock();
1270             // assert lock.getHoldCount() == 1;
1271             try {
1272                 if (!isDetached())
1273                     incorporateDequeues(); // might update lastRet or detach
1274                 final int lastRet = this.lastRet;
1275                 this.lastRet = NONE;
1276                 if (lastRet >= 0) {
1277                     if (!isDetached())
1278                         removeAt(lastRet);
1279                     else {
1280                         final E lastItem = this.lastItem;
1281                         // assert lastItem != null;
1282                         this.lastItem = null;
1283                         if (itemAt(lastRet) == lastItem)
1284                             removeAt(lastRet);
1285                     }
1286                 } else if (lastRet == NONE)
1287                     throw new IllegalStateException();
1288                 // else lastRet == REMOVED and the last returned element was
1289                 // previously asynchronously removed via an operation other
1290                 // than this.remove(), so nothing to do.
1291 
1292                 if (cursor < 0 && nextIndex < 0)
1293                     detach();
1294             } finally {
1295                 lock.unlock();
1296                 // assert lastRet == NONE;
1297                 // assert lastItem == null;
1298             }
1299         }
1300 
1301         /**
1302          * Called to notify the iterator that the queue is empty, or that it
1303          * has fallen hopelessly behind, so that it should abandon any
1304          * further iteration, except possibly to return one more element
1305          * from next(), as promised by returning true from hasNext().
1306          */
shutdown()1307         void shutdown() {
1308             // assert lock.isHeldByCurrentThread();
1309             cursor = NONE;
1310             if (nextIndex >= 0)
1311                 nextIndex = REMOVED;
1312             if (lastRet >= 0) {
1313                 lastRet = REMOVED;
1314                 lastItem = null;
1315             }
1316             prevTakeIndex = DETACHED;
1317             // Don't set nextItem to null because we must continue to be
1318             // able to return it on next().
1319             //
1320             // Caller will unlink from itrs when convenient.
1321         }
1322 
distance(int index, int prevTakeIndex, int length)1323         private int distance(int index, int prevTakeIndex, int length) {
1324             int distance = index - prevTakeIndex;
1325             if (distance < 0)
1326                 distance += length;
1327             return distance;
1328         }
1329 
1330         /**
1331          * Called whenever an interior remove (not at takeIndex) occurred.
1332          *
1333          * @return true if this iterator should be unlinked from itrs
1334          */
removedAt(int removedIndex)1335         boolean removedAt(int removedIndex) {
1336             // assert lock.isHeldByCurrentThread();
1337             if (isDetached())
1338                 return true;
1339 
1340             final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1341             final int prevTakeIndex = this.prevTakeIndex;
1342             final int len = items.length;
1343             // distance from prevTakeIndex to removedIndex
1344             final int removedDistance =
1345                 len * (itrs.cycles - this.prevCycles
1346                        + ((removedIndex < takeIndex) ? 1 : 0))
1347                 + (removedIndex - prevTakeIndex);
1348             // assert itrs.cycles - this.prevCycles >= 0;
1349             // assert itrs.cycles - this.prevCycles <= 1;
1350             // assert removedDistance > 0;
1351             // assert removedIndex != takeIndex;
1352             int cursor = this.cursor;
1353             if (cursor >= 0) {
1354                 int x = distance(cursor, prevTakeIndex, len);
1355                 if (x == removedDistance) {
1356                     if (cursor == putIndex)
1357                         this.cursor = cursor = NONE;
1358                 }
1359                 else if (x > removedDistance) {
1360                     // assert cursor != prevTakeIndex;
1361                     this.cursor = cursor = dec(cursor, len);
1362                 }
1363             }
1364             int lastRet = this.lastRet;
1365             if (lastRet >= 0) {
1366                 int x = distance(lastRet, prevTakeIndex, len);
1367                 if (x == removedDistance)
1368                     this.lastRet = lastRet = REMOVED;
1369                 else if (x > removedDistance)
1370                     this.lastRet = lastRet = dec(lastRet, len);
1371             }
1372             int nextIndex = this.nextIndex;
1373             if (nextIndex >= 0) {
1374                 int x = distance(nextIndex, prevTakeIndex, len);
1375                 if (x == removedDistance)
1376                     this.nextIndex = nextIndex = REMOVED;
1377                 else if (x > removedDistance)
1378                     this.nextIndex = nextIndex = dec(nextIndex, len);
1379             }
1380             if (cursor < 0 && nextIndex < 0 && lastRet < 0) {
1381                 this.prevTakeIndex = DETACHED;
1382                 return true;
1383             }
1384             return false;
1385         }
1386 
1387         /**
1388          * Called whenever takeIndex wraps around to zero.
1389          *
1390          * @return true if this iterator should be unlinked from itrs
1391          */
takeIndexWrapped()1392         boolean takeIndexWrapped() {
1393             // assert lock.isHeldByCurrentThread();
1394             if (isDetached())
1395                 return true;
1396             if (itrs.cycles - prevCycles > 1) {
1397                 // All the elements that existed at the time of the last
1398                 // operation are gone, so abandon further iteration.
1399                 shutdown();
1400                 return true;
1401             }
1402             return false;
1403         }
1404 
1405 //         /** Uncomment for debugging. */
1406 //         public String toString() {
1407 //             return ("cursor=" + cursor + " " +
1408 //                     "nextIndex=" + nextIndex + " " +
1409 //                     "lastRet=" + lastRet + " " +
1410 //                     "nextItem=" + nextItem + " " +
1411 //                     "lastItem=" + lastItem + " " +
1412 //                     "prevCycles=" + prevCycles + " " +
1413 //                     "prevTakeIndex=" + prevTakeIndex + " " +
1414 //                     "size()=" + size() + " " +
1415 //                     "remainingCapacity()=" + remainingCapacity());
1416 //         }
1417     }
1418 
1419     /**
1420      * Returns a {@link Spliterator} over the elements in this queue.
1421      *
1422      * <p>The returned spliterator is
1423      * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
1424      *
1425      * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT},
1426      * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}.
1427      *
1428      * @implNote
1429      * The {@code Spliterator} implements {@code trySplit} to permit limited
1430      * parallelism.
1431      *
1432      * @return a {@code Spliterator} over the elements in this queue
1433      * @since 1.8
1434      */
spliterator()1435     public Spliterator<E> spliterator() {
1436         return Spliterators.spliterator
1437             (this, (Spliterator.ORDERED |
1438                     Spliterator.NONNULL |
1439                     Spliterator.CONCURRENT));
1440     }
1441 
1442     /**
1443      * @throws NullPointerException {@inheritDoc}
1444      */
forEach(Consumer<? super E> action)1445     public void forEach(Consumer<? super E> action) {
1446         Objects.requireNonNull(action);
1447         final ReentrantLock lock = this.lock;
1448         lock.lock();
1449         try {
1450             if (count > 0) {
1451                 final Object[] items = this.items;
1452                 for (int i = takeIndex, end = putIndex,
1453                          to = (i < end) ? end : items.length;
1454                      ; i = 0, to = end) {
1455                     for (; i < to; i++)
1456                         action.accept(itemAt(items, i));
1457                     if (to == end) break;
1458                 }
1459             }
1460         } finally {
1461             lock.unlock();
1462         }
1463     }
1464 
1465     /**
1466      * @throws NullPointerException {@inheritDoc}
1467      */
removeIf(Predicate<? super E> filter)1468     public boolean removeIf(Predicate<? super E> filter) {
1469         Objects.requireNonNull(filter);
1470         return bulkRemove(filter);
1471     }
1472 
1473     /**
1474      * @throws NullPointerException {@inheritDoc}
1475      */
removeAll(Collection<?> c)1476     public boolean removeAll(Collection<?> c) {
1477         Objects.requireNonNull(c);
1478         return bulkRemove(e -> c.contains(e));
1479     }
1480 
1481     /**
1482      * @throws NullPointerException {@inheritDoc}
1483      */
retainAll(Collection<?> c)1484     public boolean retainAll(Collection<?> c) {
1485         Objects.requireNonNull(c);
1486         return bulkRemove(e -> !c.contains(e));
1487     }
1488 
1489     /** Implementation of bulk remove methods. */
bulkRemove(Predicate<? super E> filter)1490     private boolean bulkRemove(Predicate<? super E> filter) {
1491         final ReentrantLock lock = this.lock;
1492         lock.lock();
1493         try {
1494             if (itrs == null) { // check for active iterators
1495                 if (count > 0) {
1496                     final Object[] items = this.items;
1497                     // Optimize for initial run of survivors
1498                     for (int i = takeIndex, end = putIndex,
1499                              to = (i < end) ? end : items.length;
1500                          ; i = 0, to = end) {
1501                         for (; i < to; i++)
1502                             if (filter.test(itemAt(items, i)))
1503                                 return bulkRemoveModified(filter, i);
1504                         if (to == end) break;
1505                     }
1506                 }
1507                 return false;
1508             }
1509         } finally {
1510             lock.unlock();
1511         }
1512         // Active iterators are too hairy!
1513         // Punting (for now) to the slow n^2 algorithm ...
1514         return super.removeIf(filter);
1515     }
1516 
1517     // A tiny bit set implementation
1518 
nBits(int n)1519     private static long[] nBits(int n) {
1520         return new long[((n - 1) >> 6) + 1];
1521     }
setBit(long[] bits, int i)1522     private static void setBit(long[] bits, int i) {
1523         bits[i >> 6] |= 1L << i;
1524     }
isClear(long[] bits, int i)1525     private static boolean isClear(long[] bits, int i) {
1526         return (bits[i >> 6] & (1L << i)) == 0;
1527     }
1528 
1529     /**
1530      * Returns circular distance from i to j, disambiguating i == j to
1531      * items.length; never returns 0.
1532      */
distanceNonEmpty(int i, int j)1533     private int distanceNonEmpty(int i, int j) {
1534         if ((j -= i) <= 0) j += items.length;
1535         return j;
1536     }
1537 
1538     /**
1539      * Helper for bulkRemove, in case of at least one deletion.
1540      * Tolerate predicates that reentrantly access the collection for
1541      * read (but not write), so traverse once to find elements to
1542      * delete, a second pass to physically expunge.
1543      *
1544      * @param beg valid index of first element to be deleted
1545      */
bulkRemoveModified( Predicate<? super E> filter, final int beg)1546     private boolean bulkRemoveModified(
1547         Predicate<? super E> filter, final int beg) {
1548         final Object[] es = items;
1549         final int capacity = items.length;
1550         final int end = putIndex;
1551         final long[] deathRow = nBits(distanceNonEmpty(beg, putIndex));
1552         deathRow[0] = 1L;   // set bit 0
1553         for (int i = beg + 1, to = (i <= end) ? end : es.length, k = beg;
1554              ; i = 0, to = end, k -= capacity) {
1555             for (; i < to; i++)
1556                 if (filter.test(itemAt(es, i)))
1557                     setBit(deathRow, i - k);
1558             if (to == end) break;
1559         }
1560         // a two-finger traversal, with hare i reading, tortoise w writing
1561         int w = beg;
1562         for (int i = beg + 1, to = (i <= end) ? end : es.length, k = beg;
1563              ; w = 0) { // w rejoins i on second leg
1564             // In this loop, i and w are on the same leg, with i > w
1565             for (; i < to; i++)
1566                 if (isClear(deathRow, i - k))
1567                     es[w++] = es[i];
1568             if (to == end) break;
1569             // In this loop, w is on the first leg, i on the second
1570             for (i = 0, to = end, k -= capacity; i < to && w < capacity; i++)
1571                 if (isClear(deathRow, i - k))
1572                     es[w++] = es[i];
1573             if (i >= to) {
1574                 if (w == capacity) w = 0; // "corner" case
1575                 break;
1576             }
1577         }
1578         count -= distanceNonEmpty(w, end);
1579         circularClear(es, putIndex = w, end);
1580         return true;
1581     }
1582 
1583     /** debugging */
checkInvariants()1584     void checkInvariants() {
1585         // meta-assertions
1586         // assert lock.isHeldByCurrentThread();
1587         if (!invariantsSatisfied()) {
1588             String detail = String.format(
1589                 "takeIndex=%d putIndex=%d count=%d capacity=%d items=%s",
1590                 takeIndex, putIndex, count, items.length,
1591                 Arrays.toString(items));
1592             System.err.println(detail);
1593             throw new AssertionError(detail);
1594         }
1595     }
1596 
invariantsSatisfied()1597     private boolean invariantsSatisfied() {
1598         // Unlike ArrayDeque, we have a count field but no spare slot.
1599         // We prefer ArrayDeque's strategy (and the names of its fields!),
1600         // but our field layout is baked into the serial form, and so is
1601         // too annoying to change.
1602         //
1603         // putIndex == takeIndex must be disambiguated by checking count.
1604         int capacity = items.length;
1605         return capacity > 0
1606             && items.getClass() == Object[].class
1607             && (takeIndex | putIndex | count) >= 0
1608             && takeIndex <  capacity
1609             && putIndex  <  capacity
1610             && count     <= capacity
1611             && (putIndex - takeIndex - count) % capacity == 0
1612             && (count == 0 || items[takeIndex] != null)
1613             && (count == capacity || items[putIndex] == null)
1614             && (count == 0 || items[dec(putIndex, capacity)] != null);
1615     }
1616 
1617     /**
1618      * Reconstitutes this queue from a stream (that is, deserializes it).
1619      *
1620      * @param s the stream
1621      * @throws ClassNotFoundException if the class of a serialized object
1622      *         could not be found
1623      * @throws java.io.InvalidObjectException if invariants are violated
1624      * @throws java.io.IOException if an I/O error occurs
1625      */
readObject(java.io.ObjectInputStream s)1626     private void readObject(java.io.ObjectInputStream s)
1627         throws java.io.IOException, ClassNotFoundException {
1628 
1629         // Read in items array and various fields
1630         s.defaultReadObject();
1631 
1632         if (!invariantsSatisfied())
1633             throw new java.io.InvalidObjectException("invariants violated");
1634     }
1635 }
1636