1 /*
2  * Copyright (c) 2012, 2017, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 package java.util.stream;
26 
27 import java.util.Comparator;
28 import java.util.Objects;
29 import java.util.Spliterator;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.ForkJoinPool;
32 import java.util.concurrent.atomic.AtomicLong;
33 import java.util.function.BooleanSupplier;
34 import java.util.function.Consumer;
35 import java.util.function.DoubleConsumer;
36 import java.util.function.DoubleSupplier;
37 import java.util.function.IntConsumer;
38 import java.util.function.IntSupplier;
39 import java.util.function.LongConsumer;
40 import java.util.function.LongSupplier;
41 import java.util.function.Supplier;
42 
43 /**
44  * Spliterator implementations for wrapping and delegating spliterators, used
45  * in the implementation of the {@link Stream#spliterator()} method.
46  *
47  * @since 1.8
48  */
49 class StreamSpliterators {
50 
51     /**
52      * Abstract wrapping spliterator that binds to the spliterator of a
53      * pipeline helper on first operation.
54      *
55      * <p>This spliterator is not late-binding and will bind to the source
56      * spliterator when first operated on.
57      *
58      * <p>A wrapping spliterator produced from a sequential stream
59      * cannot be split if there are stateful operations present.
60      */
61     private static abstract class AbstractWrappingSpliterator<P_IN, P_OUT,
62                                                               T_BUFFER extends AbstractSpinedBuffer>
63             implements Spliterator<P_OUT> {
64 
65         // @@@ Detect if stateful operations are present or not
66         //     If not then can split otherwise cannot
67 
68         /**
69          * True if this spliterator supports splitting
70          */
71         final boolean isParallel;
72 
73         final PipelineHelper<P_OUT> ph;
74 
75         /**
76          * Supplier for the source spliterator.  Client provides either a
77          * spliterator or a supplier.
78          */
79         private Supplier<Spliterator<P_IN>> spliteratorSupplier;
80 
81         /**
82          * Source spliterator.  Either provided from client or obtained from
83          * supplier.
84          */
85         Spliterator<P_IN> spliterator;
86 
87         /**
88          * Sink chain for the downstream stages of the pipeline, ultimately
89          * leading to the buffer. Used during partial traversal.
90          */
91         Sink<P_IN> bufferSink;
92 
93         /**
94          * A function that advances one element of the spliterator, pushing
95          * it to bufferSink.  Returns whether any elements were processed.
96          * Used during partial traversal.
97          */
98         BooleanSupplier pusher;
99 
100         /** Next element to consume from the buffer, used during partial traversal */
101         long nextToConsume;
102 
103         /** Buffer into which elements are pushed.  Used during partial traversal. */
104         T_BUFFER buffer;
105 
106         /**
107          * True if full traversal has occurred (with possible cancelation).
108          * If doing a partial traversal, there may be still elements in buffer.
109          */
110         boolean finished;
111 
112         /**
113          * Construct an AbstractWrappingSpliterator from a
114          * {@code Supplier<Spliterator>}.
115          */
AbstractWrappingSpliterator(PipelineHelper<P_OUT> ph, Supplier<Spliterator<P_IN>> spliteratorSupplier, boolean parallel)116         AbstractWrappingSpliterator(PipelineHelper<P_OUT> ph,
117                                     Supplier<Spliterator<P_IN>> spliteratorSupplier,
118                                     boolean parallel) {
119             this.ph = ph;
120             this.spliteratorSupplier = spliteratorSupplier;
121             this.spliterator = null;
122             this.isParallel = parallel;
123         }
124 
125         /**
126          * Construct an AbstractWrappingSpliterator from a
127          * {@code Spliterator}.
128          */
AbstractWrappingSpliterator(PipelineHelper<P_OUT> ph, Spliterator<P_IN> spliterator, boolean parallel)129         AbstractWrappingSpliterator(PipelineHelper<P_OUT> ph,
130                                     Spliterator<P_IN> spliterator,
131                                     boolean parallel) {
132             this.ph = ph;
133             this.spliteratorSupplier = null;
134             this.spliterator = spliterator;
135             this.isParallel = parallel;
136         }
137 
138         /**
139          * Called before advancing to set up spliterator, if needed.
140          */
init()141         final void init() {
142             if (spliterator == null) {
143                 spliterator = spliteratorSupplier.get();
144                 spliteratorSupplier = null;
145             }
146         }
147 
148         /**
149          * Get an element from the source, pushing it into the sink chain,
150          * setting up the buffer if needed
151          * @return whether there are elements to consume from the buffer
152          */
doAdvance()153         final boolean doAdvance() {
154             if (buffer == null) {
155                 if (finished)
156                     return false;
157 
158                 init();
159                 initPartialTraversalState();
160                 nextToConsume = 0;
161                 bufferSink.begin(spliterator.getExactSizeIfKnown());
162                 return fillBuffer();
163             }
164             else {
165                 ++nextToConsume;
166                 boolean hasNext = nextToConsume < buffer.count();
167                 if (!hasNext) {
168                     nextToConsume = 0;
169                     buffer.clear();
170                     hasNext = fillBuffer();
171                 }
172                 return hasNext;
173             }
174         }
175 
176         /**
177          * Invokes the shape-specific constructor with the provided arguments
178          * and returns the result.
179          */
180         abstract AbstractWrappingSpliterator<P_IN, P_OUT, ?> wrap(Spliterator<P_IN> s);
181 
182         /**
183          * Initializes buffer, sink chain, and pusher for a shape-specific
184          * implementation.
185          */
186         abstract void initPartialTraversalState();
187 
188         @Override
189         public Spliterator<P_OUT> trySplit() {
190             if (isParallel && !finished) {
191                 init();
192 
193                 Spliterator<P_IN> split = spliterator.trySplit();
194                 return (split == null) ? null : wrap(split);
195             }
196             else
197                 return null;
198         }
199 
200         /**
201          * If the buffer is empty, push elements into the sink chain until
202          * the source is empty or cancellation is requested.
203          * @return whether there are elements to consume from the buffer
204          */
205         private boolean fillBuffer() {
206             while (buffer.count() == 0) {
207                 if (bufferSink.cancellationRequested() || !pusher.getAsBoolean()) {
208                     if (finished)
209                         return false;
210                     else {
211                         bufferSink.end(); // might trigger more elements
212                         finished = true;
213                     }
214                 }
215             }
216             return true;
217         }
218 
219         @Override
220         public final long estimateSize() {
221             init();
222             // Use the estimate of the wrapped spliterator
223             // Note this may not be accurate if there are filter/flatMap
224             // operations filtering or adding elements to the stream
225             return spliterator.estimateSize();
226         }
227 
228         @Override
229         public final long getExactSizeIfKnown() {
230             init();
231             return StreamOpFlag.SIZED.isKnown(ph.getStreamAndOpFlags())
232                    ? spliterator.getExactSizeIfKnown()
233                    : -1;
234         }
235 
236         @Override
237         public final int characteristics() {
238             init();
239 
240             // Get the characteristics from the pipeline
241             int c = StreamOpFlag.toCharacteristics(StreamOpFlag.toStreamFlags(ph.getStreamAndOpFlags()));
242 
243             // Mask off the size and uniform characteristics and replace with
244             // those of the spliterator
245             // Note that a non-uniform spliterator can change from something
246             // with an exact size to an estimate for a sub-split, for example
247             // with HashSet where the size is known at the top level spliterator
248             // but for sub-splits only an estimate is known
249             if ((c & Spliterator.SIZED) != 0) {
250                 c &= ~(Spliterator.SIZED | Spliterator.SUBSIZED);
251                 c |= (spliterator.characteristics() & (Spliterator.SIZED | Spliterator.SUBSIZED));
252             }
253 
254             return c;
255         }
256 
257         @Override
258         public Comparator<? super P_OUT> getComparator() {
259             if (!hasCharacteristics(SORTED))
260                 throw new IllegalStateException();
261             return null;
262         }
263 
264         @Override
265         public final String toString() {
266             return String.format("%s[%s]", getClass().getName(), spliterator);
267         }
268     }
269 
270     static final class WrappingSpliterator<P_IN, P_OUT>
271             extends AbstractWrappingSpliterator<P_IN, P_OUT, SpinedBuffer<P_OUT>> {
272 
273         WrappingSpliterator(PipelineHelper<P_OUT> ph,
274                             Supplier<Spliterator<P_IN>> supplier,
275                             boolean parallel) {
276             super(ph, supplier, parallel);
277         }
278 
279         WrappingSpliterator(PipelineHelper<P_OUT> ph,
280                             Spliterator<P_IN> spliterator,
281                             boolean parallel) {
282             super(ph, spliterator, parallel);
283         }
284 
285         @Override
286         WrappingSpliterator<P_IN, P_OUT> wrap(Spliterator<P_IN> s) {
287             return new WrappingSpliterator<>(ph, s, isParallel);
288         }
289 
290         @Override
291         void initPartialTraversalState() {
292             SpinedBuffer<P_OUT> b = new SpinedBuffer<>();
293             buffer = b;
294             bufferSink = ph.wrapSink(b::accept);
295             pusher = () -> spliterator.tryAdvance(bufferSink);
296         }
297 
298         @Override
tryAdvance(Consumer<? super P_OUT> consumer)299         public boolean tryAdvance(Consumer<? super P_OUT> consumer) {
300             Objects.requireNonNull(consumer);
301             boolean hasNext = doAdvance();
302             if (hasNext)
303                 consumer.accept(buffer.get(nextToConsume));
304             return hasNext;
305         }
306 
307         @Override
forEachRemaining(Consumer<? super P_OUT> consumer)308         public void forEachRemaining(Consumer<? super P_OUT> consumer) {
309             if (buffer == null && !finished) {
310                 Objects.requireNonNull(consumer);
311                 init();
312 
313                 ph.wrapAndCopyInto((Sink<P_OUT>) consumer::accept, spliterator);
314                 finished = true;
315             }
316             else {
317                 do { } while (tryAdvance(consumer));
318             }
319         }
320     }
321 
322     static final class IntWrappingSpliterator<P_IN>
323             extends AbstractWrappingSpliterator<P_IN, Integer, SpinedBuffer.OfInt>
324             implements Spliterator.OfInt {
325 
IntWrappingSpliterator(PipelineHelper<Integer> ph, Supplier<Spliterator<P_IN>> supplier, boolean parallel)326         IntWrappingSpliterator(PipelineHelper<Integer> ph,
327                                Supplier<Spliterator<P_IN>> supplier,
328                                boolean parallel) {
329             super(ph, supplier, parallel);
330         }
331 
IntWrappingSpliterator(PipelineHelper<Integer> ph, Spliterator<P_IN> spliterator, boolean parallel)332         IntWrappingSpliterator(PipelineHelper<Integer> ph,
333                                Spliterator<P_IN> spliterator,
334                                boolean parallel) {
335             super(ph, spliterator, parallel);
336         }
337 
338         @Override
wrap(Spliterator<P_IN> s)339         AbstractWrappingSpliterator<P_IN, Integer, ?> wrap(Spliterator<P_IN> s) {
340             return new IntWrappingSpliterator<>(ph, s, isParallel);
341         }
342 
343         @Override
initPartialTraversalState()344         void initPartialTraversalState() {
345             SpinedBuffer.OfInt b = new SpinedBuffer.OfInt();
346             buffer = b;
347             bufferSink = ph.wrapSink((Sink.OfInt) b::accept);
348             pusher = () -> spliterator.tryAdvance(bufferSink);
349         }
350 
351         @Override
trySplit()352         public Spliterator.OfInt trySplit() {
353             return (Spliterator.OfInt) super.trySplit();
354         }
355 
356         @Override
tryAdvance(IntConsumer consumer)357         public boolean tryAdvance(IntConsumer consumer) {
358             Objects.requireNonNull(consumer);
359             boolean hasNext = doAdvance();
360             if (hasNext)
361                 consumer.accept(buffer.get(nextToConsume));
362             return hasNext;
363         }
364 
365         @Override
forEachRemaining(IntConsumer consumer)366         public void forEachRemaining(IntConsumer consumer) {
367             if (buffer == null && !finished) {
368                 Objects.requireNonNull(consumer);
369                 init();
370 
371                 ph.wrapAndCopyInto((Sink.OfInt) consumer::accept, spliterator);
372                 finished = true;
373             }
374             else {
375                 do { } while (tryAdvance(consumer));
376             }
377         }
378     }
379 
380     static final class LongWrappingSpliterator<P_IN>
381             extends AbstractWrappingSpliterator<P_IN, Long, SpinedBuffer.OfLong>
382             implements Spliterator.OfLong {
383 
LongWrappingSpliterator(PipelineHelper<Long> ph, Supplier<Spliterator<P_IN>> supplier, boolean parallel)384         LongWrappingSpliterator(PipelineHelper<Long> ph,
385                                 Supplier<Spliterator<P_IN>> supplier,
386                                 boolean parallel) {
387             super(ph, supplier, parallel);
388         }
389 
LongWrappingSpliterator(PipelineHelper<Long> ph, Spliterator<P_IN> spliterator, boolean parallel)390         LongWrappingSpliterator(PipelineHelper<Long> ph,
391                                 Spliterator<P_IN> spliterator,
392                                 boolean parallel) {
393             super(ph, spliterator, parallel);
394         }
395 
396         @Override
wrap(Spliterator<P_IN> s)397         AbstractWrappingSpliterator<P_IN, Long, ?> wrap(Spliterator<P_IN> s) {
398             return new LongWrappingSpliterator<>(ph, s, isParallel);
399         }
400 
401         @Override
initPartialTraversalState()402         void initPartialTraversalState() {
403             SpinedBuffer.OfLong b = new SpinedBuffer.OfLong();
404             buffer = b;
405             bufferSink = ph.wrapSink((Sink.OfLong) b::accept);
406             pusher = () -> spliterator.tryAdvance(bufferSink);
407         }
408 
409         @Override
trySplit()410         public Spliterator.OfLong trySplit() {
411             return (Spliterator.OfLong) super.trySplit();
412         }
413 
414         @Override
tryAdvance(LongConsumer consumer)415         public boolean tryAdvance(LongConsumer consumer) {
416             Objects.requireNonNull(consumer);
417             boolean hasNext = doAdvance();
418             if (hasNext)
419                 consumer.accept(buffer.get(nextToConsume));
420             return hasNext;
421         }
422 
423         @Override
forEachRemaining(LongConsumer consumer)424         public void forEachRemaining(LongConsumer consumer) {
425             if (buffer == null && !finished) {
426                 Objects.requireNonNull(consumer);
427                 init();
428 
429                 ph.wrapAndCopyInto((Sink.OfLong) consumer::accept, spliterator);
430                 finished = true;
431             }
432             else {
433                 do { } while (tryAdvance(consumer));
434             }
435         }
436     }
437 
438     static final class DoubleWrappingSpliterator<P_IN>
439             extends AbstractWrappingSpliterator<P_IN, Double, SpinedBuffer.OfDouble>
440             implements Spliterator.OfDouble {
441 
DoubleWrappingSpliterator(PipelineHelper<Double> ph, Supplier<Spliterator<P_IN>> supplier, boolean parallel)442         DoubleWrappingSpliterator(PipelineHelper<Double> ph,
443                                   Supplier<Spliterator<P_IN>> supplier,
444                                   boolean parallel) {
445             super(ph, supplier, parallel);
446         }
447 
DoubleWrappingSpliterator(PipelineHelper<Double> ph, Spliterator<P_IN> spliterator, boolean parallel)448         DoubleWrappingSpliterator(PipelineHelper<Double> ph,
449                                   Spliterator<P_IN> spliterator,
450                                   boolean parallel) {
451             super(ph, spliterator, parallel);
452         }
453 
454         @Override
wrap(Spliterator<P_IN> s)455         AbstractWrappingSpliterator<P_IN, Double, ?> wrap(Spliterator<P_IN> s) {
456             return new DoubleWrappingSpliterator<>(ph, s, isParallel);
457         }
458 
459         @Override
initPartialTraversalState()460         void initPartialTraversalState() {
461             SpinedBuffer.OfDouble b = new SpinedBuffer.OfDouble();
462             buffer = b;
463             bufferSink = ph.wrapSink((Sink.OfDouble) b::accept);
464             pusher = () -> spliterator.tryAdvance(bufferSink);
465         }
466 
467         @Override
trySplit()468         public Spliterator.OfDouble trySplit() {
469             return (Spliterator.OfDouble) super.trySplit();
470         }
471 
472         @Override
tryAdvance(DoubleConsumer consumer)473         public boolean tryAdvance(DoubleConsumer consumer) {
474             Objects.requireNonNull(consumer);
475             boolean hasNext = doAdvance();
476             if (hasNext)
477                 consumer.accept(buffer.get(nextToConsume));
478             return hasNext;
479         }
480 
481         @Override
forEachRemaining(DoubleConsumer consumer)482         public void forEachRemaining(DoubleConsumer consumer) {
483             if (buffer == null && !finished) {
484                 Objects.requireNonNull(consumer);
485                 init();
486 
487                 ph.wrapAndCopyInto((Sink.OfDouble) consumer::accept, spliterator);
488                 finished = true;
489             }
490             else {
491                 do { } while (tryAdvance(consumer));
492             }
493         }
494     }
495 
496     /**
497      * Spliterator implementation that delegates to an underlying spliterator,
498      * acquiring the spliterator from a {@code Supplier<Spliterator>} on the
499      * first call to any spliterator method.
500      * @param <T>
501      */
502     static class DelegatingSpliterator<T, T_SPLITR extends Spliterator<T>>
503             implements Spliterator<T> {
504         private final Supplier<? extends T_SPLITR> supplier;
505 
506         private T_SPLITR s;
507 
DelegatingSpliterator(Supplier<? extends T_SPLITR> supplier)508         DelegatingSpliterator(Supplier<? extends T_SPLITR> supplier) {
509             this.supplier = supplier;
510         }
511 
get()512         T_SPLITR get() {
513             if (s == null) {
514                 s = supplier.get();
515             }
516             return s;
517         }
518 
519         @Override
520         @SuppressWarnings("unchecked")
trySplit()521         public T_SPLITR trySplit() {
522             return (T_SPLITR) get().trySplit();
523         }
524 
525         @Override
tryAdvance(Consumer<? super T> consumer)526         public boolean tryAdvance(Consumer<? super T> consumer) {
527             return get().tryAdvance(consumer);
528         }
529 
530         @Override
forEachRemaining(Consumer<? super T> consumer)531         public void forEachRemaining(Consumer<? super T> consumer) {
532             get().forEachRemaining(consumer);
533         }
534 
535         @Override
estimateSize()536         public long estimateSize() {
537             return get().estimateSize();
538         }
539 
540         @Override
characteristics()541         public int characteristics() {
542             return get().characteristics();
543         }
544 
545         @Override
getComparator()546         public Comparator<? super T> getComparator() {
547             return get().getComparator();
548         }
549 
550         @Override
getExactSizeIfKnown()551         public long getExactSizeIfKnown() {
552             return get().getExactSizeIfKnown();
553         }
554 
555         @Override
toString()556         public String toString() {
557             return getClass().getName() + "[" + get() + "]";
558         }
559 
560         static class OfPrimitive<T, T_CONS, T_SPLITR extends Spliterator.OfPrimitive<T, T_CONS, T_SPLITR>>
561             extends DelegatingSpliterator<T, T_SPLITR>
562             implements Spliterator.OfPrimitive<T, T_CONS, T_SPLITR> {
OfPrimitive(Supplier<? extends T_SPLITR> supplier)563             OfPrimitive(Supplier<? extends T_SPLITR> supplier) {
564                 super(supplier);
565             }
566 
567             @Override
tryAdvance(T_CONS consumer)568             public boolean tryAdvance(T_CONS consumer) {
569                 return get().tryAdvance(consumer);
570             }
571 
572             @Override
forEachRemaining(T_CONS consumer)573             public void forEachRemaining(T_CONS consumer) {
574                 get().forEachRemaining(consumer);
575             }
576         }
577 
578         static final class OfInt
579                 extends OfPrimitive<Integer, IntConsumer, Spliterator.OfInt>
580                 implements Spliterator.OfInt {
581 
OfInt(Supplier<Spliterator.OfInt> supplier)582             OfInt(Supplier<Spliterator.OfInt> supplier) {
583                 super(supplier);
584             }
585         }
586 
587         static final class OfLong
588                 extends OfPrimitive<Long, LongConsumer, Spliterator.OfLong>
589                 implements Spliterator.OfLong {
590 
OfLong(Supplier<Spliterator.OfLong> supplier)591             OfLong(Supplier<Spliterator.OfLong> supplier) {
592                 super(supplier);
593             }
594         }
595 
596         static final class OfDouble
597                 extends OfPrimitive<Double, DoubleConsumer, Spliterator.OfDouble>
598                 implements Spliterator.OfDouble {
599 
OfDouble(Supplier<Spliterator.OfDouble> supplier)600             OfDouble(Supplier<Spliterator.OfDouble> supplier) {
601                 super(supplier);
602             }
603         }
604     }
605 
606     /**
607      * A slice Spliterator from a source Spliterator that reports
608      * {@code SUBSIZED}.
609      *
610      */
611     static abstract class SliceSpliterator<T, T_SPLITR extends Spliterator<T>> {
612         // The start index of the slice
613         final long sliceOrigin;
614         // One past the last index of the slice
615         final long sliceFence;
616 
617         // The spliterator to slice
618         T_SPLITR s;
619         // current (absolute) index, modified on advance/split
620         long index;
621         // one past last (absolute) index or sliceFence, which ever is smaller
622         long fence;
623 
SliceSpliterator(T_SPLITR s, long sliceOrigin, long sliceFence, long origin, long fence)624         SliceSpliterator(T_SPLITR s, long sliceOrigin, long sliceFence, long origin, long fence) {
625             assert s.hasCharacteristics(Spliterator.SUBSIZED);
626             this.s = s;
627             this.sliceOrigin = sliceOrigin;
628             this.sliceFence = sliceFence;
629             this.index = origin;
630             this.fence = fence;
631         }
632 
makeSpliterator(T_SPLITR s, long sliceOrigin, long sliceFence, long origin, long fence)633         protected abstract T_SPLITR makeSpliterator(T_SPLITR s, long sliceOrigin, long sliceFence, long origin, long fence);
634 
trySplit()635         public T_SPLITR trySplit() {
636             if (sliceOrigin >= fence)
637                 return null;
638 
639             if (index >= fence)
640                 return null;
641 
642             // Keep splitting until the left and right splits intersect with the slice
643             // thereby ensuring the size estimate decreases.
644             // This also avoids creating empty spliterators which can result in
645             // existing and additionally created F/J tasks that perform
646             // redundant work on no elements.
647             while (true) {
648                 @SuppressWarnings("unchecked")
649                 T_SPLITR leftSplit = (T_SPLITR) s.trySplit();
650                 if (leftSplit == null)
651                     return null;
652 
653                 long leftSplitFenceUnbounded = index + leftSplit.estimateSize();
654                 long leftSplitFence = Math.min(leftSplitFenceUnbounded, sliceFence);
655                 if (sliceOrigin >= leftSplitFence) {
656                     // The left split does not intersect with, and is to the left of, the slice
657                     // The right split does intersect
658                     // Discard the left split and split further with the right split
659                     index = leftSplitFence;
660                 }
661                 else if (leftSplitFence >= sliceFence) {
662                     // The right split does not intersect with, and is to the right of, the slice
663                     // The left split does intersect
664                     // Discard the right split and split further with the left split
665                     s = leftSplit;
666                     fence = leftSplitFence;
667                 }
668                 else if (index >= sliceOrigin && leftSplitFenceUnbounded <= sliceFence) {
669                     // The left split is contained within the slice, return the underlying left split
670                     // Right split is contained within or intersects with the slice
671                     index = leftSplitFence;
672                     return leftSplit;
673                 } else {
674                     // The left split intersects with the slice
675                     // Right split is contained within or intersects with the slice
676                     return makeSpliterator(leftSplit, sliceOrigin, sliceFence, index, index = leftSplitFence);
677                 }
678             }
679         }
680 
estimateSize()681         public long estimateSize() {
682             return (sliceOrigin < fence)
683                    ? fence - Math.max(sliceOrigin, index) : 0;
684         }
685 
characteristics()686         public int characteristics() {
687             return s.characteristics();
688         }
689 
690         static final class OfRef<T>
691                 extends SliceSpliterator<T, Spliterator<T>>
692                 implements Spliterator<T> {
693 
OfRef(Spliterator<T> s, long sliceOrigin, long sliceFence)694             OfRef(Spliterator<T> s, long sliceOrigin, long sliceFence) {
695                 this(s, sliceOrigin, sliceFence, 0, Math.min(s.estimateSize(), sliceFence));
696             }
697 
OfRef(Spliterator<T> s, long sliceOrigin, long sliceFence, long origin, long fence)698             private OfRef(Spliterator<T> s,
699                           long sliceOrigin, long sliceFence, long origin, long fence) {
700                 super(s, sliceOrigin, sliceFence, origin, fence);
701             }
702 
703             @Override
makeSpliterator(Spliterator<T> s, long sliceOrigin, long sliceFence, long origin, long fence)704             protected Spliterator<T> makeSpliterator(Spliterator<T> s,
705                                                      long sliceOrigin, long sliceFence,
706                                                      long origin, long fence) {
707                 return new OfRef<>(s, sliceOrigin, sliceFence, origin, fence);
708             }
709 
710             @Override
tryAdvance(Consumer<? super T> action)711             public boolean tryAdvance(Consumer<? super T> action) {
712                 Objects.requireNonNull(action);
713 
714                 if (sliceOrigin >= fence)
715                     return false;
716 
717                 while (sliceOrigin > index) {
718                     s.tryAdvance(e -> {});
719                     index++;
720                 }
721 
722                 if (index >= fence)
723                     return false;
724 
725                 index++;
726                 return s.tryAdvance(action);
727             }
728 
729             @Override
forEachRemaining(Consumer<? super T> action)730             public void forEachRemaining(Consumer<? super T> action) {
731                 Objects.requireNonNull(action);
732 
733                 if (sliceOrigin >= fence)
734                     return;
735 
736                 if (index >= fence)
737                     return;
738 
739                 if (index >= sliceOrigin && (index + s.estimateSize()) <= sliceFence) {
740                     // The spliterator is contained within the slice
741                     s.forEachRemaining(action);
742                     index = fence;
743                 } else {
744                     // The spliterator intersects with the slice
745                     while (sliceOrigin > index) {
746                         s.tryAdvance(e -> {});
747                         index++;
748                     }
749                     // Traverse elements up to the fence
750                     for (;index < fence; index++) {
751                         s.tryAdvance(action);
752                     }
753                 }
754             }
755         }
756 
757         static abstract class OfPrimitive<T,
758                 T_SPLITR extends Spliterator.OfPrimitive<T, T_CONS, T_SPLITR>,
759                 T_CONS>
760                 extends SliceSpliterator<T, T_SPLITR>
761                 implements Spliterator.OfPrimitive<T, T_CONS, T_SPLITR> {
762 
OfPrimitive(T_SPLITR s, long sliceOrigin, long sliceFence)763             OfPrimitive(T_SPLITR s, long sliceOrigin, long sliceFence) {
764                 this(s, sliceOrigin, sliceFence, 0, Math.min(s.estimateSize(), sliceFence));
765             }
766 
OfPrimitive(T_SPLITR s, long sliceOrigin, long sliceFence, long origin, long fence)767             private OfPrimitive(T_SPLITR s,
768                                 long sliceOrigin, long sliceFence, long origin, long fence) {
769                 super(s, sliceOrigin, sliceFence, origin, fence);
770             }
771 
772             @Override
tryAdvance(T_CONS action)773             public boolean tryAdvance(T_CONS action) {
774                 Objects.requireNonNull(action);
775 
776                 if (sliceOrigin >= fence)
777                     return false;
778 
779                 while (sliceOrigin > index) {
780                     s.tryAdvance(emptyConsumer());
781                     index++;
782                 }
783 
784                 if (index >= fence)
785                     return false;
786 
787                 index++;
788                 return s.tryAdvance(action);
789             }
790 
791             @Override
forEachRemaining(T_CONS action)792             public void forEachRemaining(T_CONS action) {
793                 Objects.requireNonNull(action);
794 
795                 if (sliceOrigin >= fence)
796                     return;
797 
798                 if (index >= fence)
799                     return;
800 
801                 if (index >= sliceOrigin && (index + s.estimateSize()) <= sliceFence) {
802                     // The spliterator is contained within the slice
803                     s.forEachRemaining(action);
804                     index = fence;
805                 } else {
806                     // The spliterator intersects with the slice
807                     while (sliceOrigin > index) {
808                         s.tryAdvance(emptyConsumer());
809                         index++;
810                     }
811                     // Traverse elements up to the fence
812                     for (;index < fence; index++) {
813                         s.tryAdvance(action);
814                     }
815                 }
816             }
817 
emptyConsumer()818             protected abstract T_CONS emptyConsumer();
819         }
820 
821         static final class OfInt extends OfPrimitive<Integer, Spliterator.OfInt, IntConsumer>
822                 implements Spliterator.OfInt {
OfInt(Spliterator.OfInt s, long sliceOrigin, long sliceFence)823             OfInt(Spliterator.OfInt s, long sliceOrigin, long sliceFence) {
824                 super(s, sliceOrigin, sliceFence);
825             }
826 
OfInt(Spliterator.OfInt s, long sliceOrigin, long sliceFence, long origin, long fence)827             OfInt(Spliterator.OfInt s,
828                   long sliceOrigin, long sliceFence, long origin, long fence) {
829                 super(s, sliceOrigin, sliceFence, origin, fence);
830             }
831 
832             @Override
makeSpliterator(Spliterator.OfInt s, long sliceOrigin, long sliceFence, long origin, long fence)833             protected Spliterator.OfInt makeSpliterator(Spliterator.OfInt s,
834                                                         long sliceOrigin, long sliceFence,
835                                                         long origin, long fence) {
836                 return new SliceSpliterator.OfInt(s, sliceOrigin, sliceFence, origin, fence);
837             }
838 
839             @Override
emptyConsumer()840             protected IntConsumer emptyConsumer() {
841                 return e -> {};
842             }
843         }
844 
845         static final class OfLong extends OfPrimitive<Long, Spliterator.OfLong, LongConsumer>
846                 implements Spliterator.OfLong {
OfLong(Spliterator.OfLong s, long sliceOrigin, long sliceFence)847             OfLong(Spliterator.OfLong s, long sliceOrigin, long sliceFence) {
848                 super(s, sliceOrigin, sliceFence);
849             }
850 
OfLong(Spliterator.OfLong s, long sliceOrigin, long sliceFence, long origin, long fence)851             OfLong(Spliterator.OfLong s,
852                    long sliceOrigin, long sliceFence, long origin, long fence) {
853                 super(s, sliceOrigin, sliceFence, origin, fence);
854             }
855 
856             @Override
makeSpliterator(Spliterator.OfLong s, long sliceOrigin, long sliceFence, long origin, long fence)857             protected Spliterator.OfLong makeSpliterator(Spliterator.OfLong s,
858                                                          long sliceOrigin, long sliceFence,
859                                                          long origin, long fence) {
860                 return new SliceSpliterator.OfLong(s, sliceOrigin, sliceFence, origin, fence);
861             }
862 
863             @Override
emptyConsumer()864             protected LongConsumer emptyConsumer() {
865                 return e -> {};
866             }
867         }
868 
869         static final class OfDouble extends OfPrimitive<Double, Spliterator.OfDouble, DoubleConsumer>
870                 implements Spliterator.OfDouble {
OfDouble(Spliterator.OfDouble s, long sliceOrigin, long sliceFence)871             OfDouble(Spliterator.OfDouble s, long sliceOrigin, long sliceFence) {
872                 super(s, sliceOrigin, sliceFence);
873             }
874 
OfDouble(Spliterator.OfDouble s, long sliceOrigin, long sliceFence, long origin, long fence)875             OfDouble(Spliterator.OfDouble s,
876                      long sliceOrigin, long sliceFence, long origin, long fence) {
877                 super(s, sliceOrigin, sliceFence, origin, fence);
878             }
879 
880             @Override
makeSpliterator(Spliterator.OfDouble s, long sliceOrigin, long sliceFence, long origin, long fence)881             protected Spliterator.OfDouble makeSpliterator(Spliterator.OfDouble s,
882                                                            long sliceOrigin, long sliceFence,
883                                                            long origin, long fence) {
884                 return new SliceSpliterator.OfDouble(s, sliceOrigin, sliceFence, origin, fence);
885             }
886 
887             @Override
emptyConsumer()888             protected DoubleConsumer emptyConsumer() {
889                 return e -> {};
890             }
891         }
892     }
893 
894     /**
895      * A slice Spliterator that does not preserve order, if any, of a source
896      * Spliterator.
897      *
898      * Note: The source spliterator may report {@code ORDERED} since that
899      * spliterator be the result of a previous pipeline stage that was
900      * collected to a {@code Node}. It is the order of the pipeline stage
901      * that governs whether this slice spliterator is to be used or not.
902      */
903     static abstract class UnorderedSliceSpliterator<T, T_SPLITR extends Spliterator<T>> {
904         static final int CHUNK_SIZE = 1 << 7;
905 
906         // The spliterator to slice
907         protected final T_SPLITR s;
908         protected final boolean unlimited;
909         protected final int chunkSize;
910         private final long skipThreshold;
911         private final AtomicLong permits;
912 
UnorderedSliceSpliterator(T_SPLITR s, long skip, long limit)913         UnorderedSliceSpliterator(T_SPLITR s, long skip, long limit) {
914             this.s = s;
915             this.unlimited = limit < 0;
916             this.skipThreshold = limit >= 0 ? limit : 0;
917             this.chunkSize = limit >= 0 ? (int)Math.min(CHUNK_SIZE,
918                                                         ((skip + limit) / AbstractTask.getLeafTarget()) + 1) : CHUNK_SIZE;
919             this.permits = new AtomicLong(limit >= 0 ? skip + limit : skip);
920         }
921 
UnorderedSliceSpliterator(T_SPLITR s, UnorderedSliceSpliterator<T, T_SPLITR> parent)922         UnorderedSliceSpliterator(T_SPLITR s,
923                                   UnorderedSliceSpliterator<T, T_SPLITR> parent) {
924             this.s = s;
925             this.unlimited = parent.unlimited;
926             this.permits = parent.permits;
927             this.skipThreshold = parent.skipThreshold;
928             this.chunkSize = parent.chunkSize;
929         }
930 
931         /**
932          * Acquire permission to skip or process elements.  The caller must
933          * first acquire the elements, then consult this method for guidance
934          * as to what to do with the data.
935          *
936          * <p>We use an {@code AtomicLong} to atomically maintain a counter,
937          * which is initialized as skip+limit if we are limiting, or skip only
938          * if we are not limiting.  The user should consult the method
939          * {@code checkPermits()} before acquiring data elements.
940          *
941          * @param numElements the number of elements the caller has in hand
942          * @return the number of elements that should be processed; any
943          * remaining elements should be discarded.
944          */
acquirePermits(long numElements)945         protected final long acquirePermits(long numElements) {
946             long remainingPermits;
947             long grabbing;
948             // permits never increase, and don't decrease below zero
949             assert numElements > 0;
950             do {
951                 remainingPermits = permits.get();
952                 if (remainingPermits == 0)
953                     return unlimited ? numElements : 0;
954                 grabbing = Math.min(remainingPermits, numElements);
955             } while (grabbing > 0 &&
956                      !permits.compareAndSet(remainingPermits, remainingPermits - grabbing));
957 
958             if (unlimited)
959                 return Math.max(numElements - grabbing, 0);
960             else if (remainingPermits > skipThreshold)
961                 return Math.max(grabbing - (remainingPermits - skipThreshold), 0);
962             else
963                 return grabbing;
964         }
965 
966         enum PermitStatus { NO_MORE, MAYBE_MORE, UNLIMITED }
967 
968         /** Call to check if permits might be available before acquiring data */
permitStatus()969         protected final PermitStatus permitStatus() {
970             if (permits.get() > 0)
971                 return PermitStatus.MAYBE_MORE;
972             else
973                 return unlimited ?  PermitStatus.UNLIMITED : PermitStatus.NO_MORE;
974         }
975 
trySplit()976         public final T_SPLITR trySplit() {
977             // Stop splitting when there are no more limit permits
978             if (permits.get() == 0)
979                 return null;
980             @SuppressWarnings("unchecked")
981             T_SPLITR split = (T_SPLITR) s.trySplit();
982             return split == null ? null : makeSpliterator(split);
983         }
984 
makeSpliterator(T_SPLITR s)985         protected abstract T_SPLITR makeSpliterator(T_SPLITR s);
986 
estimateSize()987         public final long estimateSize() {
988             return s.estimateSize();
989         }
990 
characteristics()991         public final int characteristics() {
992             return s.characteristics() &
993                    ~(Spliterator.SIZED | Spliterator.SUBSIZED | Spliterator.ORDERED);
994         }
995 
996         static final class OfRef<T> extends UnorderedSliceSpliterator<T, Spliterator<T>>
997                 implements Spliterator<T>, Consumer<T> {
998             T tmpSlot;
999 
OfRef(Spliterator<T> s, long skip, long limit)1000             OfRef(Spliterator<T> s, long skip, long limit) {
1001                 super(s, skip, limit);
1002             }
1003 
OfRef(Spliterator<T> s, OfRef<T> parent)1004             OfRef(Spliterator<T> s, OfRef<T> parent) {
1005                 super(s, parent);
1006             }
1007 
1008             @Override
accept(T t)1009             public final void accept(T t) {
1010                 tmpSlot = t;
1011             }
1012 
1013             @Override
tryAdvance(Consumer<? super T> action)1014             public boolean tryAdvance(Consumer<? super T> action) {
1015                 Objects.requireNonNull(action);
1016 
1017                 while (permitStatus() != PermitStatus.NO_MORE) {
1018                     if (!s.tryAdvance(this))
1019                         return false;
1020                     else if (acquirePermits(1) == 1) {
1021                         action.accept(tmpSlot);
1022                         tmpSlot = null;
1023                         return true;
1024                     }
1025                 }
1026                 return false;
1027             }
1028 
1029             @Override
forEachRemaining(Consumer<? super T> action)1030             public void forEachRemaining(Consumer<? super T> action) {
1031                 Objects.requireNonNull(action);
1032 
1033                 ArrayBuffer.OfRef<T> sb = null;
1034                 PermitStatus permitStatus;
1035                 while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) {
1036                     if (permitStatus == PermitStatus.MAYBE_MORE) {
1037                         // Optimistically traverse elements up to a threshold of chunkSize
1038                         if (sb == null)
1039                             sb = new ArrayBuffer.OfRef<>(chunkSize);
1040                         else
1041                             sb.reset();
1042                         long permitsRequested = 0;
1043                         do { } while (s.tryAdvance(sb) && ++permitsRequested < chunkSize);
1044                         if (permitsRequested == 0)
1045                             return;
1046                         sb.forEach(action, acquirePermits(permitsRequested));
1047                     }
1048                     else {
1049                         // Must be UNLIMITED; let 'er rip
1050                         s.forEachRemaining(action);
1051                         return;
1052                     }
1053                 }
1054             }
1055 
1056             @Override
makeSpliterator(Spliterator<T> s)1057             protected Spliterator<T> makeSpliterator(Spliterator<T> s) {
1058                 return new UnorderedSliceSpliterator.OfRef<>(s, this);
1059             }
1060         }
1061 
1062         /**
1063          * Concrete sub-types must also be an instance of type {@code T_CONS}.
1064          *
1065          * @param <T_BUFF> the type of the spined buffer. Must also be a type of
1066          *        {@code T_CONS}.
1067          */
1068         static abstract class OfPrimitive<
1069                 T,
1070                 T_CONS,
1071                 T_BUFF extends ArrayBuffer.OfPrimitive<T_CONS>,
1072                 T_SPLITR extends Spliterator.OfPrimitive<T, T_CONS, T_SPLITR>>
1073                 extends UnorderedSliceSpliterator<T, T_SPLITR>
1074                 implements Spliterator.OfPrimitive<T, T_CONS, T_SPLITR> {
OfPrimitive(T_SPLITR s, long skip, long limit)1075             OfPrimitive(T_SPLITR s, long skip, long limit) {
1076                 super(s, skip, limit);
1077             }
1078 
OfPrimitive(T_SPLITR s, UnorderedSliceSpliterator.OfPrimitive<T, T_CONS, T_BUFF, T_SPLITR> parent)1079             OfPrimitive(T_SPLITR s, UnorderedSliceSpliterator.OfPrimitive<T, T_CONS, T_BUFF, T_SPLITR> parent) {
1080                 super(s, parent);
1081             }
1082 
1083             @Override
tryAdvance(T_CONS action)1084             public boolean tryAdvance(T_CONS action) {
1085                 Objects.requireNonNull(action);
1086                 @SuppressWarnings("unchecked")
1087                 T_CONS consumer = (T_CONS) this;
1088 
1089                 while (permitStatus() != PermitStatus.NO_MORE) {
1090                     if (!s.tryAdvance(consumer))
1091                         return false;
1092                     else if (acquirePermits(1) == 1) {
1093                         acceptConsumed(action);
1094                         return true;
1095                     }
1096                 }
1097                 return false;
1098             }
1099 
acceptConsumed(T_CONS action)1100             protected abstract void acceptConsumed(T_CONS action);
1101 
1102             @Override
forEachRemaining(T_CONS action)1103             public void forEachRemaining(T_CONS action) {
1104                 Objects.requireNonNull(action);
1105 
1106                 T_BUFF sb = null;
1107                 PermitStatus permitStatus;
1108                 while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) {
1109                     if (permitStatus == PermitStatus.MAYBE_MORE) {
1110                         // Optimistically traverse elements up to a threshold of chunkSize
1111                         if (sb == null)
1112                             sb = bufferCreate(chunkSize);
1113                         else
1114                             sb.reset();
1115                         @SuppressWarnings("unchecked")
1116                         T_CONS sbc = (T_CONS) sb;
1117                         long permitsRequested = 0;
1118                         do { } while (s.tryAdvance(sbc) && ++permitsRequested < chunkSize);
1119                         if (permitsRequested == 0)
1120                             return;
1121                         sb.forEach(action, acquirePermits(permitsRequested));
1122                     }
1123                     else {
1124                         // Must be UNLIMITED; let 'er rip
1125                         s.forEachRemaining(action);
1126                         return;
1127                     }
1128                 }
1129             }
1130 
bufferCreate(int initialCapacity)1131             protected abstract T_BUFF bufferCreate(int initialCapacity);
1132         }
1133 
1134         static final class OfInt
1135                 extends OfPrimitive<Integer, IntConsumer, ArrayBuffer.OfInt, Spliterator.OfInt>
1136                 implements Spliterator.OfInt, IntConsumer {
1137 
1138             int tmpValue;
1139 
OfInt(Spliterator.OfInt s, long skip, long limit)1140             OfInt(Spliterator.OfInt s, long skip, long limit) {
1141                 super(s, skip, limit);
1142             }
1143 
OfInt(Spliterator.OfInt s, UnorderedSliceSpliterator.OfInt parent)1144             OfInt(Spliterator.OfInt s, UnorderedSliceSpliterator.OfInt parent) {
1145                 super(s, parent);
1146             }
1147 
1148             @Override
accept(int value)1149             public void accept(int value) {
1150                 tmpValue = value;
1151             }
1152 
1153             @Override
acceptConsumed(IntConsumer action)1154             protected void acceptConsumed(IntConsumer action) {
1155                 action.accept(tmpValue);
1156             }
1157 
1158             @Override
bufferCreate(int initialCapacity)1159             protected ArrayBuffer.OfInt bufferCreate(int initialCapacity) {
1160                 return new ArrayBuffer.OfInt(initialCapacity);
1161             }
1162 
1163             @Override
makeSpliterator(Spliterator.OfInt s)1164             protected Spliterator.OfInt makeSpliterator(Spliterator.OfInt s) {
1165                 return new UnorderedSliceSpliterator.OfInt(s, this);
1166             }
1167         }
1168 
1169         static final class OfLong
1170                 extends OfPrimitive<Long, LongConsumer, ArrayBuffer.OfLong, Spliterator.OfLong>
1171                 implements Spliterator.OfLong, LongConsumer {
1172 
1173             long tmpValue;
1174 
OfLong(Spliterator.OfLong s, long skip, long limit)1175             OfLong(Spliterator.OfLong s, long skip, long limit) {
1176                 super(s, skip, limit);
1177             }
1178 
OfLong(Spliterator.OfLong s, UnorderedSliceSpliterator.OfLong parent)1179             OfLong(Spliterator.OfLong s, UnorderedSliceSpliterator.OfLong parent) {
1180                 super(s, parent);
1181             }
1182 
1183             @Override
accept(long value)1184             public void accept(long value) {
1185                 tmpValue = value;
1186             }
1187 
1188             @Override
acceptConsumed(LongConsumer action)1189             protected void acceptConsumed(LongConsumer action) {
1190                 action.accept(tmpValue);
1191             }
1192 
1193             @Override
bufferCreate(int initialCapacity)1194             protected ArrayBuffer.OfLong bufferCreate(int initialCapacity) {
1195                 return new ArrayBuffer.OfLong(initialCapacity);
1196             }
1197 
1198             @Override
makeSpliterator(Spliterator.OfLong s)1199             protected Spliterator.OfLong makeSpliterator(Spliterator.OfLong s) {
1200                 return new UnorderedSliceSpliterator.OfLong(s, this);
1201             }
1202         }
1203 
1204         static final class OfDouble
1205                 extends OfPrimitive<Double, DoubleConsumer, ArrayBuffer.OfDouble, Spliterator.OfDouble>
1206                 implements Spliterator.OfDouble, DoubleConsumer {
1207 
1208             double tmpValue;
1209 
OfDouble(Spliterator.OfDouble s, long skip, long limit)1210             OfDouble(Spliterator.OfDouble s, long skip, long limit) {
1211                 super(s, skip, limit);
1212             }
1213 
OfDouble(Spliterator.OfDouble s, UnorderedSliceSpliterator.OfDouble parent)1214             OfDouble(Spliterator.OfDouble s, UnorderedSliceSpliterator.OfDouble parent) {
1215                 super(s, parent);
1216             }
1217 
1218             @Override
accept(double value)1219             public void accept(double value) {
1220                 tmpValue = value;
1221             }
1222 
1223             @Override
acceptConsumed(DoubleConsumer action)1224             protected void acceptConsumed(DoubleConsumer action) {
1225                 action.accept(tmpValue);
1226             }
1227 
1228             @Override
bufferCreate(int initialCapacity)1229             protected ArrayBuffer.OfDouble bufferCreate(int initialCapacity) {
1230                 return new ArrayBuffer.OfDouble(initialCapacity);
1231             }
1232 
1233             @Override
makeSpliterator(Spliterator.OfDouble s)1234             protected Spliterator.OfDouble makeSpliterator(Spliterator.OfDouble s) {
1235                 return new UnorderedSliceSpliterator.OfDouble(s, this);
1236             }
1237         }
1238     }
1239 
1240     /**
1241      * A wrapping spliterator that only reports distinct elements of the
1242      * underlying spliterator. Does not preserve size and encounter order.
1243      */
1244     static final class DistinctSpliterator<T> implements Spliterator<T>, Consumer<T> {
1245 
1246         // The value to represent null in the ConcurrentHashMap
1247         private static final Object NULL_VALUE = new Object();
1248 
1249         // The underlying spliterator
1250         private final Spliterator<T> s;
1251 
1252         // ConcurrentHashMap holding distinct elements as keys
1253         private final ConcurrentHashMap<T, Boolean> seen;
1254 
1255         // Temporary element, only used with tryAdvance
1256         private T tmpSlot;
1257 
DistinctSpliterator(Spliterator<T> s)1258         DistinctSpliterator(Spliterator<T> s) {
1259             this(s, new ConcurrentHashMap<>());
1260         }
1261 
DistinctSpliterator(Spliterator<T> s, ConcurrentHashMap<T, Boolean> seen)1262         private DistinctSpliterator(Spliterator<T> s, ConcurrentHashMap<T, Boolean> seen) {
1263             this.s = s;
1264             this.seen = seen;
1265         }
1266 
1267         @Override
accept(T t)1268         public void accept(T t) {
1269             this.tmpSlot = t;
1270         }
1271 
1272         @SuppressWarnings("unchecked")
mapNull(T t)1273         private T mapNull(T t) {
1274             return t != null ? t : (T) NULL_VALUE;
1275         }
1276 
1277         @Override
tryAdvance(Consumer<? super T> action)1278         public boolean tryAdvance(Consumer<? super T> action) {
1279             while (s.tryAdvance(this)) {
1280                 if (seen.putIfAbsent(mapNull(tmpSlot), Boolean.TRUE) == null) {
1281                     action.accept(tmpSlot);
1282                     tmpSlot = null;
1283                     return true;
1284                 }
1285             }
1286             return false;
1287         }
1288 
1289         @Override
forEachRemaining(Consumer<? super T> action)1290         public void forEachRemaining(Consumer<? super T> action) {
1291             s.forEachRemaining(t -> {
1292                 if (seen.putIfAbsent(mapNull(t), Boolean.TRUE) == null) {
1293                     action.accept(t);
1294                 }
1295             });
1296         }
1297 
1298         @Override
trySplit()1299         public Spliterator<T> trySplit() {
1300             Spliterator<T> split = s.trySplit();
1301             return (split != null) ? new DistinctSpliterator<>(split, seen) : null;
1302         }
1303 
1304         @Override
estimateSize()1305         public long estimateSize() {
1306             return s.estimateSize();
1307         }
1308 
1309         @Override
characteristics()1310         public int characteristics() {
1311             return (s.characteristics() & ~(Spliterator.SIZED | Spliterator.SUBSIZED |
1312                                             Spliterator.SORTED | Spliterator.ORDERED))
1313                    | Spliterator.DISTINCT;
1314         }
1315 
1316         @Override
getComparator()1317         public Comparator<? super T> getComparator() {
1318             return s.getComparator();
1319         }
1320     }
1321 
1322     /**
1323      * A Spliterator that infinitely supplies elements in no particular order.
1324      *
1325      * <p>Splitting divides the estimated size in two and stops when the
1326      * estimate size is 0.
1327      *
1328      * <p>The {@code forEachRemaining} method if invoked will never terminate.
1329      * The {@code tryAdvance} method always returns true.
1330      *
1331      */
1332     static abstract class InfiniteSupplyingSpliterator<T> implements Spliterator<T> {
1333         long estimate;
1334 
InfiniteSupplyingSpliterator(long estimate)1335         protected InfiniteSupplyingSpliterator(long estimate) {
1336             this.estimate = estimate;
1337         }
1338 
1339         @Override
estimateSize()1340         public long estimateSize() {
1341             return estimate;
1342         }
1343 
1344         @Override
characteristics()1345         public int characteristics() {
1346             return IMMUTABLE;
1347         }
1348 
1349         static final class OfRef<T> extends InfiniteSupplyingSpliterator<T> {
1350             final Supplier<T> s;
1351 
OfRef(long size, Supplier<T> s)1352             OfRef(long size, Supplier<T> s) {
1353                 super(size);
1354                 this.s = s;
1355             }
1356 
1357             @Override
tryAdvance(Consumer<? super T> action)1358             public boolean tryAdvance(Consumer<? super T> action) {
1359                 Objects.requireNonNull(action);
1360 
1361                 action.accept(s.get());
1362                 return true;
1363             }
1364 
1365             @Override
trySplit()1366             public Spliterator<T> trySplit() {
1367                 if (estimate == 0)
1368                     return null;
1369                 return new InfiniteSupplyingSpliterator.OfRef<>(estimate >>>= 1, s);
1370             }
1371         }
1372 
1373         static final class OfInt extends InfiniteSupplyingSpliterator<Integer>
1374                 implements Spliterator.OfInt {
1375             final IntSupplier s;
1376 
OfInt(long size, IntSupplier s)1377             OfInt(long size, IntSupplier s) {
1378                 super(size);
1379                 this.s = s;
1380             }
1381 
1382             @Override
tryAdvance(IntConsumer action)1383             public boolean tryAdvance(IntConsumer action) {
1384                 Objects.requireNonNull(action);
1385 
1386                 action.accept(s.getAsInt());
1387                 return true;
1388             }
1389 
1390             @Override
trySplit()1391             public Spliterator.OfInt trySplit() {
1392                 if (estimate == 0)
1393                     return null;
1394                 return new InfiniteSupplyingSpliterator.OfInt(estimate = estimate >>> 1, s);
1395             }
1396         }
1397 
1398         static final class OfLong extends InfiniteSupplyingSpliterator<Long>
1399                 implements Spliterator.OfLong {
1400             final LongSupplier s;
1401 
OfLong(long size, LongSupplier s)1402             OfLong(long size, LongSupplier s) {
1403                 super(size);
1404                 this.s = s;
1405             }
1406 
1407             @Override
tryAdvance(LongConsumer action)1408             public boolean tryAdvance(LongConsumer action) {
1409                 Objects.requireNonNull(action);
1410 
1411                 action.accept(s.getAsLong());
1412                 return true;
1413             }
1414 
1415             @Override
trySplit()1416             public Spliterator.OfLong trySplit() {
1417                 if (estimate == 0)
1418                     return null;
1419                 return new InfiniteSupplyingSpliterator.OfLong(estimate = estimate >>> 1, s);
1420             }
1421         }
1422 
1423         static final class OfDouble extends InfiniteSupplyingSpliterator<Double>
1424                 implements Spliterator.OfDouble {
1425             final DoubleSupplier s;
1426 
OfDouble(long size, DoubleSupplier s)1427             OfDouble(long size, DoubleSupplier s) {
1428                 super(size);
1429                 this.s = s;
1430             }
1431 
1432             @Override
tryAdvance(DoubleConsumer action)1433             public boolean tryAdvance(DoubleConsumer action) {
1434                 Objects.requireNonNull(action);
1435 
1436                 action.accept(s.getAsDouble());
1437                 return true;
1438             }
1439 
1440             @Override
trySplit()1441             public Spliterator.OfDouble trySplit() {
1442                 if (estimate == 0)
1443                     return null;
1444                 return new InfiniteSupplyingSpliterator.OfDouble(estimate = estimate >>> 1, s);
1445             }
1446         }
1447     }
1448 
1449     // @@@ Consolidate with Node.Builder
1450     static abstract class ArrayBuffer {
1451         int index;
1452 
reset()1453         void reset() {
1454             index = 0;
1455         }
1456 
1457         static final class OfRef<T> extends ArrayBuffer implements Consumer<T> {
1458             final Object[] array;
1459 
OfRef(int size)1460             OfRef(int size) {
1461                 this.array = new Object[size];
1462             }
1463 
1464             @Override
accept(T t)1465             public void accept(T t) {
1466                 array[index++] = t;
1467             }
1468 
forEach(Consumer<? super T> action, long fence)1469             public void forEach(Consumer<? super T> action, long fence) {
1470                 for (int i = 0; i < fence; i++) {
1471                     @SuppressWarnings("unchecked")
1472                     T t = (T) array[i];
1473                     action.accept(t);
1474                 }
1475             }
1476         }
1477 
1478         static abstract class OfPrimitive<T_CONS> extends ArrayBuffer {
1479             int index;
1480 
1481             @Override
reset()1482             void reset() {
1483                 index = 0;
1484             }
1485 
forEach(T_CONS action, long fence)1486             abstract void forEach(T_CONS action, long fence);
1487         }
1488 
1489         static final class OfInt extends OfPrimitive<IntConsumer>
1490                 implements IntConsumer {
1491             final int[] array;
1492 
OfInt(int size)1493             OfInt(int size) {
1494                 this.array = new int[size];
1495             }
1496 
1497             @Override
accept(int t)1498             public void accept(int t) {
1499                 array[index++] = t;
1500             }
1501 
1502             @Override
forEach(IntConsumer action, long fence)1503             public void forEach(IntConsumer action, long fence) {
1504                 for (int i = 0; i < fence; i++) {
1505                     action.accept(array[i]);
1506                 }
1507             }
1508         }
1509 
1510         static final class OfLong extends OfPrimitive<LongConsumer>
1511                 implements LongConsumer {
1512             final long[] array;
1513 
OfLong(int size)1514             OfLong(int size) {
1515                 this.array = new long[size];
1516             }
1517 
1518             @Override
accept(long t)1519             public void accept(long t) {
1520                 array[index++] = t;
1521             }
1522 
1523             @Override
forEach(LongConsumer action, long fence)1524             public void forEach(LongConsumer action, long fence) {
1525                 for (int i = 0; i < fence; i++) {
1526                     action.accept(array[i]);
1527                 }
1528             }
1529         }
1530 
1531         static final class OfDouble extends OfPrimitive<DoubleConsumer>
1532                 implements DoubleConsumer {
1533             final double[] array;
1534 
OfDouble(int size)1535             OfDouble(int size) {
1536                 this.array = new double[size];
1537             }
1538 
1539             @Override
accept(double t)1540             public void accept(double t) {
1541                 array[index++] = t;
1542             }
1543 
1544             @Override
forEach(DoubleConsumer action, long fence)1545             void forEach(DoubleConsumer action, long fence) {
1546                 for (int i = 0; i < fence; i++) {
1547                     action.accept(array[i]);
1548                 }
1549             }
1550         }
1551     }
1552 }
1553 
1554