1 /*
2  * Copyright (c) 2012, 2016, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 package java.util.stream;
26 
27 import java.util.Spliterator;
28 import java.util.concurrent.CountedCompleter;
29 import java.util.function.IntFunction;
30 
31 /**
32  * Factory for instances of a short-circuiting stateful intermediate operations
33  * that produce subsequences of their input stream.
34  *
35  * @since 1.8
36  */
37 final class SliceOps {
38 
39     // No instances
SliceOps()40     private SliceOps() { }
41 
42     /**
43      * Calculates the sliced size given the current size, number of elements
44      * skip, and the number of elements to limit.
45      *
46      * @param size the current size
47      * @param skip the number of elements to skip, assumed to be >= 0
48      * @param limit the number of elements to limit, assumed to be >= 0, with
49      *        a value of {@code Long.MAX_VALUE} if there is no limit
50      * @return the sliced size
51      */
calcSize(long size, long skip, long limit)52     private static long calcSize(long size, long skip, long limit) {
53         return size >= 0 ? Math.max(-1, Math.min(size - skip, limit)) : -1;
54     }
55 
56     /**
57      * Calculates the slice fence, which is one past the index of the slice
58      * range
59      * @param skip the number of elements to skip, assumed to be >= 0
60      * @param limit the number of elements to limit, assumed to be >= 0, with
61      *        a value of {@code Long.MAX_VALUE} if there is no limit
62      * @return the slice fence.
63      */
calcSliceFence(long skip, long limit)64     private static long calcSliceFence(long skip, long limit) {
65         long sliceFence = limit >= 0 ? skip + limit : Long.MAX_VALUE;
66         // Check for overflow
67         return (sliceFence >= 0) ? sliceFence : Long.MAX_VALUE;
68     }
69 
70     /**
71      * Creates a slice spliterator given a stream shape governing the
72      * spliterator type.  Requires that the underlying Spliterator
73      * be SUBSIZED.
74      */
75     @SuppressWarnings("unchecked")
sliceSpliterator(StreamShape shape, Spliterator<P_IN> s, long skip, long limit)76     private static <P_IN> Spliterator<P_IN> sliceSpliterator(StreamShape shape,
77                                                              Spliterator<P_IN> s,
78                                                              long skip, long limit) {
79         assert s.hasCharacteristics(Spliterator.SUBSIZED);
80         long sliceFence = calcSliceFence(skip, limit);
81         switch (shape) {
82             case REFERENCE:
83                 return new StreamSpliterators
84                         .SliceSpliterator.OfRef<>(s, skip, sliceFence);
85             case INT_VALUE:
86                 return (Spliterator<P_IN>) new StreamSpliterators
87                         .SliceSpliterator.OfInt((Spliterator.OfInt) s, skip, sliceFence);
88             case LONG_VALUE:
89                 return (Spliterator<P_IN>) new StreamSpliterators
90                         .SliceSpliterator.OfLong((Spliterator.OfLong) s, skip, sliceFence);
91             case DOUBLE_VALUE:
92                 return (Spliterator<P_IN>) new StreamSpliterators
93                         .SliceSpliterator.OfDouble((Spliterator.OfDouble) s, skip, sliceFence);
94             default:
95                 throw new IllegalStateException("Unknown shape " + shape);
96         }
97     }
98 
99     /**
100      * Appends a "slice" operation to the provided stream.  The slice operation
101      * may be may be skip-only, limit-only, or skip-and-limit.
102      *
103      * @param <T> the type of both input and output elements
104      * @param upstream a reference stream with element type T
105      * @param skip the number of elements to skip.  Must be >= 0.
106      * @param limit the maximum size of the resulting stream, or -1 if no limit
107      *        is to be imposed
108      */
makeRef(AbstractPipeline<?, T, ?> upstream, long skip, long limit)109     public static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream,
110                                         long skip, long limit) {
111         if (skip < 0)
112             throw new IllegalArgumentException("Skip must be non-negative: " + skip);
113 
114         return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE,
115                                                       flags(limit)) {
116             Spliterator<T> unorderedSkipLimitSpliterator(Spliterator<T> s,
117                                                          long skip, long limit, long sizeIfKnown) {
118                 if (skip <= sizeIfKnown) {
119                     // Use just the limit if the number of elements
120                     // to skip is <= the known pipeline size
121                     limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip;
122                     skip = 0;
123                 }
124                 return new StreamSpliterators.UnorderedSliceSpliterator.OfRef<>(s, skip, limit);
125             }
126 
127             @Override
128             <P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
129                 long size = helper.exactOutputSizeIfKnown(spliterator);
130                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
131                     return new StreamSpliterators.SliceSpliterator.OfRef<>(
132                             helper.wrapSpliterator(spliterator),
133                             skip,
134                             calcSliceFence(skip, limit));
135                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
136                     return unorderedSkipLimitSpliterator(
137                             helper.wrapSpliterator(spliterator),
138                             skip, limit, size);
139                 }
140                 else {
141                     // @@@ OOMEs will occur for LongStream.range(0, Long.MAX_VALUE).filter(i -> true).limit(n)
142                     //     when n * parallelismLevel is sufficiently large.
143                     //     Need to adjust the target size of splitting for the
144                     //     SliceTask from say (size / k) to say min(size / k, 1 << 14)
145                     //     This will limit the size of the buffers created at the leaf nodes
146                     //     cancellation will be more aggressive cancelling later tasks
147                     //     if the target slice size has been reached from a given task,
148                     //     cancellation should also clear local results if any
149                     return new SliceTask<>(this, helper, spliterator, Nodes.castingArray(), skip, limit).
150                             invoke().spliterator();
151                 }
152             }
153 
154             @Override
155             <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
156                                               Spliterator<P_IN> spliterator,
157                                               IntFunction<T[]> generator) {
158                 long size = helper.exactOutputSizeIfKnown(spliterator);
159                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
160                     // Because the pipeline is SIZED the slice spliterator
161                     // can be created from the source, this requires matching
162                     // to shape of the source, and is potentially more efficient
163                     // than creating the slice spliterator from the pipeline
164                     // wrapping spliterator
165                     Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
166                     return Nodes.collect(helper, s, true, generator);
167                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
168                     Spliterator<T> s =  unorderedSkipLimitSpliterator(
169                             helper.wrapSpliterator(spliterator),
170                             skip, limit, size);
171                     // Collect using this pipeline, which is empty and therefore
172                     // can be used with the pipeline wrapping spliterator
173                     // Note that we cannot create a slice spliterator from
174                     // the source spliterator if the pipeline is not SIZED
175                     return Nodes.collect(this, s, true, generator);
176                 }
177                 else {
178                     return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
179                             invoke();
180                 }
181             }
182 
183             @Override
184             Sink<T> opWrapSink(int flags, Sink<T> sink) {
185                 return new Sink.ChainedReference<T, T>(sink) {
186                     long n = skip;
187                     long m = limit >= 0 ? limit : Long.MAX_VALUE;
188 
189                     @Override
190                     public void begin(long size) {
191                         downstream.begin(calcSize(size, skip, m));
192                     }
193 
194                     @Override
195                     public void accept(T t) {
196                         if (n == 0) {
197                             if (m > 0) {
198                                 m--;
199                                 downstream.accept(t);
200                             }
201                         }
202                         else {
203                             n--;
204                         }
205                     }
206 
207                     @Override
208                     public boolean cancellationRequested() {
209                         return m == 0 || downstream.cancellationRequested();
210                     }
211                 };
212             }
213         };
214     }
215 
216     /**
217      * Appends a "slice" operation to the provided IntStream.  The slice
218      * operation may be may be skip-only, limit-only, or skip-and-limit.
219      *
220      * @param upstream An IntStream
221      * @param skip The number of elements to skip.  Must be >= 0.
222      * @param limit The maximum size of the resulting stream, or -1 if no limit
223      *        is to be imposed
224      */
225     public static IntStream makeInt(AbstractPipeline<?, Integer, ?> upstream,
226                                     long skip, long limit) {
227         if (skip < 0)
228             throw new IllegalArgumentException("Skip must be non-negative: " + skip);
229 
230         return new IntPipeline.StatefulOp<Integer>(upstream, StreamShape.INT_VALUE,
231                                                    flags(limit)) {
232             Spliterator.OfInt unorderedSkipLimitSpliterator(
233                     Spliterator.OfInt s, long skip, long limit, long sizeIfKnown) {
234                 if (skip <= sizeIfKnown) {
235                     // Use just the limit if the number of elements
236                     // to skip is <= the known pipeline size
237                     limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip;
238                     skip = 0;
239                 }
240                 return new StreamSpliterators.UnorderedSliceSpliterator.OfInt(s, skip, limit);
241             }
242 
243             @Override
244             <P_IN> Spliterator<Integer> opEvaluateParallelLazy(PipelineHelper<Integer> helper,
245                                                                Spliterator<P_IN> spliterator) {
246                 long size = helper.exactOutputSizeIfKnown(spliterator);
247                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
248                     return new StreamSpliterators.SliceSpliterator.OfInt(
249                             (Spliterator.OfInt) helper.wrapSpliterator(spliterator),
250                             skip,
251                             calcSliceFence(skip, limit));
252                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
253                     return unorderedSkipLimitSpliterator(
254                             (Spliterator.OfInt) helper.wrapSpliterator(spliterator),
255                             skip, limit, size);
256                 }
257                 else {
258                     return new SliceTask<>(this, helper, spliterator, Integer[]::new, skip, limit).
259                             invoke().spliterator();
260                 }
261             }
262 
263             @Override
264             <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,
265                                                     Spliterator<P_IN> spliterator,
266                                                     IntFunction<Integer[]> generator) {
267                 long size = helper.exactOutputSizeIfKnown(spliterator);
268                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
269                     // Because the pipeline is SIZED the slice spliterator
270                     // can be created from the source, this requires matching
271                     // to shape of the source, and is potentially more efficient
272                     // than creating the slice spliterator from the pipeline
273                     // wrapping spliterator
274                     Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
275                     return Nodes.collectInt(helper, s, true);
276                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
277                     Spliterator.OfInt s =  unorderedSkipLimitSpliterator(
278                             (Spliterator.OfInt) helper.wrapSpliterator(spliterator),
279                             skip, limit, size);
280                     // Collect using this pipeline, which is empty and therefore
281                     // can be used with the pipeline wrapping spliterator
282                     // Note that we cannot create a slice spliterator from
283                     // the source spliterator if the pipeline is not SIZED
284                     return Nodes.collectInt(this, s, true);
285                 }
286                 else {
287                     return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
288                             invoke();
289                 }
290             }
291 
292             @Override
293             Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
294                 return new Sink.ChainedInt<Integer>(sink) {
295                     long n = skip;
296                     long m = limit >= 0 ? limit : Long.MAX_VALUE;
297 
298                     @Override
299                     public void begin(long size) {
300                         downstream.begin(calcSize(size, skip, m));
301                     }
302 
303                     @Override
304                     public void accept(int t) {
305                         if (n == 0) {
306                             if (m > 0) {
307                                 m--;
308                                 downstream.accept(t);
309                             }
310                         }
311                         else {
312                             n--;
313                         }
314                     }
315 
316                     @Override
317                     public boolean cancellationRequested() {
318                         return m == 0 || downstream.cancellationRequested();
319                     }
320                 };
321             }
322         };
323     }
324 
325     /**
326      * Appends a "slice" operation to the provided LongStream.  The slice
327      * operation may be may be skip-only, limit-only, or skip-and-limit.
328      *
329      * @param upstream A LongStream
330      * @param skip The number of elements to skip.  Must be >= 0.
331      * @param limit The maximum size of the resulting stream, or -1 if no limit
332      *        is to be imposed
333      */
334     public static LongStream makeLong(AbstractPipeline<?, Long, ?> upstream,
335                                       long skip, long limit) {
336         if (skip < 0)
337             throw new IllegalArgumentException("Skip must be non-negative: " + skip);
338 
339         return new LongPipeline.StatefulOp<Long>(upstream, StreamShape.LONG_VALUE,
340                                                  flags(limit)) {
341             Spliterator.OfLong unorderedSkipLimitSpliterator(
342                     Spliterator.OfLong s, long skip, long limit, long sizeIfKnown) {
343                 if (skip <= sizeIfKnown) {
344                     // Use just the limit if the number of elements
345                     // to skip is <= the known pipeline size
346                     limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip;
347                     skip = 0;
348                 }
349                 return new StreamSpliterators.UnorderedSliceSpliterator.OfLong(s, skip, limit);
350             }
351 
352             @Override
353             <P_IN> Spliterator<Long> opEvaluateParallelLazy(PipelineHelper<Long> helper,
354                                                             Spliterator<P_IN> spliterator) {
355                 long size = helper.exactOutputSizeIfKnown(spliterator);
356                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
357                     return new StreamSpliterators.SliceSpliterator.OfLong(
358                             (Spliterator.OfLong) helper.wrapSpliterator(spliterator),
359                             skip,
360                             calcSliceFence(skip, limit));
361                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
362                     return unorderedSkipLimitSpliterator(
363                             (Spliterator.OfLong) helper.wrapSpliterator(spliterator),
364                             skip, limit, size);
365                 }
366                 else {
367                     return new SliceTask<>(this, helper, spliterator, Long[]::new, skip, limit).
368                             invoke().spliterator();
369                 }
370             }
371 
372             @Override
373             <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper,
374                                                  Spliterator<P_IN> spliterator,
375                                                  IntFunction<Long[]> generator) {
376                 long size = helper.exactOutputSizeIfKnown(spliterator);
377                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
378                     // Because the pipeline is SIZED the slice spliterator
379                     // can be created from the source, this requires matching
380                     // to shape of the source, and is potentially more efficient
381                     // than creating the slice spliterator from the pipeline
382                     // wrapping spliterator
383                     Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
384                     return Nodes.collectLong(helper, s, true);
385                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
386                     Spliterator.OfLong s =  unorderedSkipLimitSpliterator(
387                             (Spliterator.OfLong) helper.wrapSpliterator(spliterator),
388                             skip, limit, size);
389                     // Collect using this pipeline, which is empty and therefore
390                     // can be used with the pipeline wrapping spliterator
391                     // Note that we cannot create a slice spliterator from
392                     // the source spliterator if the pipeline is not SIZED
393                     return Nodes.collectLong(this, s, true);
394                 }
395                 else {
396                     return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
397                             invoke();
398                 }
399             }
400 
401             @Override
402             Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
403                 return new Sink.ChainedLong<Long>(sink) {
404                     long n = skip;
405                     long m = limit >= 0 ? limit : Long.MAX_VALUE;
406 
407                     @Override
408                     public void begin(long size) {
409                         downstream.begin(calcSize(size, skip, m));
410                     }
411 
412                     @Override
413                     public void accept(long t) {
414                         if (n == 0) {
415                             if (m > 0) {
416                                 m--;
417                                 downstream.accept(t);
418                             }
419                         }
420                         else {
421                             n--;
422                         }
423                     }
424 
425                     @Override
426                     public boolean cancellationRequested() {
427                         return m == 0 || downstream.cancellationRequested();
428                     }
429                 };
430             }
431         };
432     }
433 
434     /**
435      * Appends a "slice" operation to the provided DoubleStream.  The slice
436      * operation may be may be skip-only, limit-only, or skip-and-limit.
437      *
438      * @param upstream A DoubleStream
439      * @param skip The number of elements to skip.  Must be >= 0.
440      * @param limit The maximum size of the resulting stream, or -1 if no limit
441      *        is to be imposed
442      */
443     public static DoubleStream makeDouble(AbstractPipeline<?, Double, ?> upstream,
444                                           long skip, long limit) {
445         if (skip < 0)
446             throw new IllegalArgumentException("Skip must be non-negative: " + skip);
447 
448         return new DoublePipeline.StatefulOp<Double>(upstream, StreamShape.DOUBLE_VALUE,
449                                                      flags(limit)) {
450             Spliterator.OfDouble unorderedSkipLimitSpliterator(
451                     Spliterator.OfDouble s, long skip, long limit, long sizeIfKnown) {
452                 if (skip <= sizeIfKnown) {
453                     // Use just the limit if the number of elements
454                     // to skip is <= the known pipeline size
455                     limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip;
456                     skip = 0;
457                 }
458                 return new StreamSpliterators.UnorderedSliceSpliterator.OfDouble(s, skip, limit);
459             }
460 
461             @Override
462             <P_IN> Spliterator<Double> opEvaluateParallelLazy(PipelineHelper<Double> helper,
463                                                               Spliterator<P_IN> spliterator) {
464                 long size = helper.exactOutputSizeIfKnown(spliterator);
465                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
466                     return new StreamSpliterators.SliceSpliterator.OfDouble(
467                             (Spliterator.OfDouble) helper.wrapSpliterator(spliterator),
468                             skip,
469                             calcSliceFence(skip, limit));
470                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
471                     return unorderedSkipLimitSpliterator(
472                             (Spliterator.OfDouble) helper.wrapSpliterator(spliterator),
473                             skip, limit, size);
474                 }
475                 else {
476                     return new SliceTask<>(this, helper, spliterator, Double[]::new, skip, limit).
477                             invoke().spliterator();
478                 }
479             }
480 
481             @Override
482             <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper,
483                                                    Spliterator<P_IN> spliterator,
484                                                    IntFunction<Double[]> generator) {
485                 long size = helper.exactOutputSizeIfKnown(spliterator);
486                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
487                     // Because the pipeline is SIZED the slice spliterator
488                     // can be created from the source, this requires matching
489                     // to shape of the source, and is potentially more efficient
490                     // than creating the slice spliterator from the pipeline
491                     // wrapping spliterator
492                     Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
493                     return Nodes.collectDouble(helper, s, true);
494                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
495                     Spliterator.OfDouble s =  unorderedSkipLimitSpliterator(
496                             (Spliterator.OfDouble) helper.wrapSpliterator(spliterator),
497                             skip, limit, size);
498                     // Collect using this pipeline, which is empty and therefore
499                     // can be used with the pipeline wrapping spliterator
500                     // Note that we cannot create a slice spliterator from
501                     // the source spliterator if the pipeline is not SIZED
502                     return Nodes.collectDouble(this, s, true);
503                 }
504                 else {
505                     return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
506                             invoke();
507                 }
508             }
509 
510             @Override
511             Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
512                 return new Sink.ChainedDouble<Double>(sink) {
513                     long n = skip;
514                     long m = limit >= 0 ? limit : Long.MAX_VALUE;
515 
516                     @Override
517                     public void begin(long size) {
518                         downstream.begin(calcSize(size, skip, m));
519                     }
520 
521                     @Override
522                     public void accept(double t) {
523                         if (n == 0) {
524                             if (m > 0) {
525                                 m--;
526                                 downstream.accept(t);
527                             }
528                         }
529                         else {
530                             n--;
531                         }
532                     }
533 
534                     @Override
535                     public boolean cancellationRequested() {
536                         return m == 0 || downstream.cancellationRequested();
537                     }
538                 };
539             }
540         };
541     }
542 
543     private static int flags(long limit) {
544         return StreamOpFlag.NOT_SIZED | ((limit != -1) ? StreamOpFlag.IS_SHORT_CIRCUIT : 0);
545     }
546 
547     /**
548      * {@code ForkJoinTask} implementing slice computation.
549      *
550      * @param <P_IN> Input element type to the stream pipeline
551      * @param <P_OUT> Output element type from the stream pipeline
552      */
553     @SuppressWarnings("serial")
554     private static final class SliceTask<P_IN, P_OUT>
555             extends AbstractShortCircuitTask<P_IN, P_OUT, Node<P_OUT>, SliceTask<P_IN, P_OUT>> {
556         private final AbstractPipeline<P_OUT, P_OUT, ?> op;
557         private final IntFunction<P_OUT[]> generator;
558         private final long targetOffset, targetSize;
559         private long thisNodeSize;
560 
561         private volatile boolean completed;
562 
563         SliceTask(AbstractPipeline<P_OUT, P_OUT, ?> op,
564                   PipelineHelper<P_OUT> helper,
565                   Spliterator<P_IN> spliterator,
566                   IntFunction<P_OUT[]> generator,
567                   long offset, long size) {
568             super(helper, spliterator);
569             this.op = op;
570             this.generator = generator;
571             this.targetOffset = offset;
572             this.targetSize = size;
573         }
574 
575         SliceTask(SliceTask<P_IN, P_OUT> parent, Spliterator<P_IN> spliterator) {
576             super(parent, spliterator);
577             this.op = parent.op;
578             this.generator = parent.generator;
579             this.targetOffset = parent.targetOffset;
580             this.targetSize = parent.targetSize;
581         }
582 
583         @Override
584         protected SliceTask<P_IN, P_OUT> makeChild(Spliterator<P_IN> spliterator) {
585             return new SliceTask<>(this, spliterator);
586         }
587 
588         @Override
589         protected final Node<P_OUT> getEmptyResult() {
590             return Nodes.emptyNode(op.getOutputShape());
591         }
592 
593         @Override
594         protected final Node<P_OUT> doLeaf() {
595             if (isRoot()) {
596                 long sizeIfKnown = StreamOpFlag.SIZED.isPreserved(op.sourceOrOpFlags)
597                                    ? op.exactOutputSizeIfKnown(spliterator)
598                                    : -1;
599                 final Node.Builder<P_OUT> nb = op.makeNodeBuilder(sizeIfKnown, generator);
600                 Sink<P_OUT> opSink = op.opWrapSink(helper.getStreamAndOpFlags(), nb);
601                 helper.copyIntoWithCancel(helper.wrapSink(opSink), spliterator);
602                 // There is no need to truncate since the op performs the
603                 // skipping and limiting of elements
604                 return nb.build();
605             }
606             else {
607                 final Node.Builder<P_OUT> nb = op.makeNodeBuilder(-1, generator);
608                 if (targetOffset == 0) { // limit only
609                     Sink<P_OUT> opSink = op.opWrapSink(helper.getStreamAndOpFlags(), nb);
610                     helper.copyIntoWithCancel(helper.wrapSink(opSink), spliterator);
611                 }
612                 else {
613                     helper.wrapAndCopyInto(nb, spliterator);
614                 }
615                 Node<P_OUT> node = nb.build();
616                 thisNodeSize = node.count();
617                 completed = true;
618                 spliterator = null;
619                 return node;
620             }
621         }
622 
623         @Override
624         public final void onCompletion(CountedCompleter<?> caller) {
625             if (!isLeaf()) {
626                 Node<P_OUT> result;
627                 thisNodeSize = leftChild.thisNodeSize + rightChild.thisNodeSize;
628                 if (canceled) {
629                     thisNodeSize = 0;
630                     result = getEmptyResult();
631                 }
632                 else if (thisNodeSize == 0)
633                     result = getEmptyResult();
634                 else if (leftChild.thisNodeSize == 0)
635                     result = rightChild.getLocalResult();
636                 else {
637                     result = Nodes.conc(op.getOutputShape(),
638                                         leftChild.getLocalResult(), rightChild.getLocalResult());
639                 }
640                 setLocalResult(isRoot() ? doTruncate(result) : result);
641                 completed = true;
642             }
643             if (targetSize >= 0
644                 && !isRoot()
645                 && isLeftCompleted(targetOffset + targetSize))
646                     cancelLaterNodes();
647 
648             super.onCompletion(caller);
649         }
650 
651         @Override
652         protected void cancel() {
653             super.cancel();
654             if (completed)
655                 setLocalResult(getEmptyResult());
656         }
657 
658         private Node<P_OUT> doTruncate(Node<P_OUT> input) {
659             long to = targetSize >= 0 ? Math.min(input.count(), targetOffset + targetSize) : thisNodeSize;
660             return input.truncate(targetOffset, to, generator);
661         }
662 
663         /**
664          * Determine if the number of completed elements in this node and nodes
665          * to the left of this node is greater than or equal to the target size.
666          *
667          * @param target the target size
668          * @return true if the number of elements is greater than or equal to
669          *         the target size, otherwise false.
670          */
671         private boolean isLeftCompleted(long target) {
672             long size = completed ? thisNodeSize : completedSize(target);
673             if (size >= target)
674                 return true;
675             for (SliceTask<P_IN, P_OUT> parent = getParent(), node = this;
676                  parent != null;
677                  node = parent, parent = parent.getParent()) {
678                 if (node == parent.rightChild) {
679                     SliceTask<P_IN, P_OUT> left = parent.leftChild;
680                     if (left != null) {
681                         size += left.completedSize(target);
682                         if (size >= target)
683                             return true;
684                     }
685                 }
686             }
687             return size >= target;
688         }
689 
690         /**
691          * Compute the number of completed elements in this node.
692          * <p>
693          * Computation terminates if all nodes have been processed or the
694          * number of completed elements is greater than or equal to the target
695          * size.
696          *
697          * @param target the target size
698          * @return the number of completed elements
699          */
700         private long completedSize(long target) {
701             if (completed)
702                 return thisNodeSize;
703             else {
704                 SliceTask<P_IN, P_OUT> left = leftChild;
705                 SliceTask<P_IN, P_OUT> right = rightChild;
706                 if (left == null || right == null) {
707                     // must be completed
708                     return thisNodeSize;
709                 }
710                 else {
711                     long leftSize = left.completedSize(target);
712                     return (leftSize >= target) ? leftSize : leftSize + right.completedSize(target);
713                 }
714             }
715         }
716     }
717 }
718