1 /*
2  * Copyright (c) 2013, 2017, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 package java.util.stream;
26 
27 import java.util.LongSummaryStatistics;
28 import java.util.Objects;
29 import java.util.OptionalDouble;
30 import java.util.OptionalLong;
31 import java.util.PrimitiveIterator;
32 import java.util.Spliterator;
33 import java.util.Spliterators;
34 import java.util.function.BiConsumer;
35 import java.util.function.BinaryOperator;
36 import java.util.function.IntFunction;
37 import java.util.function.LongBinaryOperator;
38 import java.util.function.LongConsumer;
39 import java.util.function.LongFunction;
40 import java.util.function.LongPredicate;
41 import java.util.function.LongToDoubleFunction;
42 import java.util.function.LongToIntFunction;
43 import java.util.function.LongUnaryOperator;
44 import java.util.function.ObjLongConsumer;
45 import java.util.function.Supplier;
46 
47 /**
48  * Abstract base class for an intermediate pipeline stage or pipeline source
49  * stage implementing whose elements are of type {@code long}.
50  *
51  * @param <E_IN> type of elements in the upstream source
52  * @since 1.8
53  */
54 abstract class LongPipeline<E_IN>
55         extends AbstractPipeline<E_IN, Long, LongStream>
56         implements LongStream {
57 
58     /**
59      * Constructor for the head of a stream pipeline.
60      *
61      * @param source {@code Supplier<Spliterator>} describing the stream source
62      * @param sourceFlags the source flags for the stream source, described in
63      *        {@link StreamOpFlag}
64      * @param parallel {@code true} if the pipeline is parallel
65      */
LongPipeline(Supplier<? extends Spliterator<Long>> source, int sourceFlags, boolean parallel)66     LongPipeline(Supplier<? extends Spliterator<Long>> source,
67                  int sourceFlags, boolean parallel) {
68         super(source, sourceFlags, parallel);
69     }
70 
71     /**
72      * Constructor for the head of a stream pipeline.
73      *
74      * @param source {@code Spliterator} describing the stream source
75      * @param sourceFlags the source flags for the stream source, described in
76      *        {@link StreamOpFlag}
77      * @param parallel {@code true} if the pipeline is parallel
78      */
LongPipeline(Spliterator<Long> source, int sourceFlags, boolean parallel)79     LongPipeline(Spliterator<Long> source,
80                  int sourceFlags, boolean parallel) {
81         super(source, sourceFlags, parallel);
82     }
83 
84     /**
85      * Constructor for appending an intermediate operation onto an existing pipeline.
86      *
87      * @param upstream the upstream element source.
88      * @param opFlags the operation flags
89      */
LongPipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags)90     LongPipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) {
91         super(upstream, opFlags);
92     }
93 
94     /**
95      * Adapt a {@code Sink<Long> to an {@code LongConsumer}, ideally simply
96      * by casting.
97      */
adapt(Sink<Long> sink)98     private static LongConsumer adapt(Sink<Long> sink) {
99         if (sink instanceof LongConsumer) {
100             return (LongConsumer) sink;
101         } else {
102             if (Tripwire.ENABLED)
103                 Tripwire.trip(AbstractPipeline.class,
104                               "using LongStream.adapt(Sink<Long> s)");
105             return sink::accept;
106         }
107     }
108 
109     /**
110      * Adapt a {@code Spliterator<Long>} to a {@code Spliterator.OfLong}.
111      *
112      * @implNote
113      * The implementation attempts to cast to a Spliterator.OfLong, and throws
114      * an exception if this cast is not possible.
115      */
adapt(Spliterator<Long> s)116     private static Spliterator.OfLong adapt(Spliterator<Long> s) {
117         if (s instanceof Spliterator.OfLong) {
118             return (Spliterator.OfLong) s;
119         } else {
120             if (Tripwire.ENABLED)
121                 Tripwire.trip(AbstractPipeline.class,
122                               "using LongStream.adapt(Spliterator<Long> s)");
123             throw new UnsupportedOperationException("LongStream.adapt(Spliterator<Long> s)");
124         }
125     }
126 
127 
128     // Shape-specific methods
129 
130     @Override
getOutputShape()131     final StreamShape getOutputShape() {
132         return StreamShape.LONG_VALUE;
133     }
134 
135     @Override
evaluateToNode(PipelineHelper<Long> helper, Spliterator<P_IN> spliterator, boolean flattenTree, IntFunction<Long[]> generator)136     final <P_IN> Node<Long> evaluateToNode(PipelineHelper<Long> helper,
137                                            Spliterator<P_IN> spliterator,
138                                            boolean flattenTree,
139                                            IntFunction<Long[]> generator) {
140         return Nodes.collectLong(helper, spliterator, flattenTree);
141     }
142 
143     @Override
wrap(PipelineHelper<Long> ph, Supplier<Spliterator<P_IN>> supplier, boolean isParallel)144     final <P_IN> Spliterator<Long> wrap(PipelineHelper<Long> ph,
145                                         Supplier<Spliterator<P_IN>> supplier,
146                                         boolean isParallel) {
147         return new StreamSpliterators.LongWrappingSpliterator<>(ph, supplier, isParallel);
148     }
149 
150     @Override
151     @SuppressWarnings("unchecked")
lazySpliterator(Supplier<? extends Spliterator<Long>> supplier)152     final Spliterator.OfLong lazySpliterator(Supplier<? extends Spliterator<Long>> supplier) {
153         return new StreamSpliterators.DelegatingSpliterator.OfLong((Supplier<Spliterator.OfLong>) supplier);
154     }
155 
156     @Override
forEachWithCancel(Spliterator<Long> spliterator, Sink<Long> sink)157     final boolean forEachWithCancel(Spliterator<Long> spliterator, Sink<Long> sink) {
158         Spliterator.OfLong spl = adapt(spliterator);
159         LongConsumer adaptedSink =  adapt(sink);
160         boolean cancelled;
161         do { } while (!(cancelled = sink.cancellationRequested()) && spl.tryAdvance(adaptedSink));
162         return cancelled;
163     }
164 
165     @Override
makeNodeBuilder(long exactSizeIfKnown, IntFunction<Long[]> generator)166     final Node.Builder<Long> makeNodeBuilder(long exactSizeIfKnown, IntFunction<Long[]> generator) {
167         return Nodes.longBuilder(exactSizeIfKnown);
168     }
169 
mapToObj(LongFunction<? extends U> mapper, int opFlags)170     private <U> Stream<U> mapToObj(LongFunction<? extends U> mapper, int opFlags) {
171         return new ReferencePipeline.StatelessOp<Long, U>(this, StreamShape.LONG_VALUE, opFlags) {
172             @Override
173             Sink<Long> opWrapSink(int flags, Sink<U> sink) {
174                 return new Sink.ChainedLong<U>(sink) {
175                     @Override
176                     public void accept(long t) {
177                         downstream.accept(mapper.apply(t));
178                     }
179                 };
180             }
181         };
182     }
183 
184     // LongStream
185 
186     @Override
187     public final PrimitiveIterator.OfLong iterator() {
188         return Spliterators.iterator(spliterator());
189     }
190 
191     @Override
192     public final Spliterator.OfLong spliterator() {
193         return adapt(super.spliterator());
194     }
195 
196     // Stateless intermediate ops from LongStream
197 
198     @Override
199     public final DoubleStream asDoubleStream() {
200         return new DoublePipeline.StatelessOp<Long>(this, StreamShape.LONG_VALUE, StreamOpFlag.NOT_DISTINCT) {
201             @Override
202             Sink<Long> opWrapSink(int flags, Sink<Double> sink) {
203                 return new Sink.ChainedLong<Double>(sink) {
204                     @Override
205                     public void accept(long t) {
206                         downstream.accept((double) t);
207                     }
208                 };
209             }
210         };
211     }
212 
213     @Override
214     public final Stream<Long> boxed() {
215         return mapToObj(Long::valueOf, 0);
216     }
217 
218     @Override
219     public final LongStream map(LongUnaryOperator mapper) {
220         Objects.requireNonNull(mapper);
221         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
222                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
223             @Override
224             Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
225                 return new Sink.ChainedLong<Long>(sink) {
226                     @Override
227                     public void accept(long t) {
228                         downstream.accept(mapper.applyAsLong(t));
229                     }
230                 };
231             }
232         };
233     }
234 
235     @Override
236     public final <U> Stream<U> mapToObj(LongFunction<? extends U> mapper) {
237         Objects.requireNonNull(mapper);
238         return mapToObj(mapper, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT);
239     }
240 
241     @Override
242     public final IntStream mapToInt(LongToIntFunction mapper) {
243         Objects.requireNonNull(mapper);
244         return new IntPipeline.StatelessOp<Long>(this, StreamShape.LONG_VALUE,
245                                                  StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
246             @Override
247             Sink<Long> opWrapSink(int flags, Sink<Integer> sink) {
248                 return new Sink.ChainedLong<Integer>(sink) {
249                     @Override
250                     public void accept(long t) {
251                         downstream.accept(mapper.applyAsInt(t));
252                     }
253                 };
254             }
255         };
256     }
257 
258     @Override
259     public final DoubleStream mapToDouble(LongToDoubleFunction mapper) {
260         Objects.requireNonNull(mapper);
261         return new DoublePipeline.StatelessOp<Long>(this, StreamShape.LONG_VALUE,
262                                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
263             @Override
264             Sink<Long> opWrapSink(int flags, Sink<Double> sink) {
265                 return new Sink.ChainedLong<Double>(sink) {
266                     @Override
267                     public void accept(long t) {
268                         downstream.accept(mapper.applyAsDouble(t));
269                     }
270                 };
271             }
272         };
273     }
274 
275     @Override
276     public final LongStream flatMap(LongFunction<? extends LongStream> mapper) {
277         Objects.requireNonNull(mapper);
278         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
279                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
280             @Override
281             Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
282                 return new Sink.ChainedLong<Long>(sink) {
283                     // true if cancellationRequested() has been called
284                     boolean cancellationRequestedCalled;
285 
286                     // cache the consumer to avoid creation on every accepted element
287                     LongConsumer downstreamAsLong = downstream::accept;
288 
289                     @Override
290                     public void begin(long size) {
291                         downstream.begin(-1);
292                     }
293 
294                     @Override
295                     public void accept(long t) {
296                         try (LongStream result = mapper.apply(t)) {
297                             if (result != null) {
298                                 if (!cancellationRequestedCalled) {
299                                     result.sequential().forEach(downstreamAsLong);
300                                 }
301                                 else {
302                                     var s = result.sequential().spliterator();
303                                     do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsLong));
304                                 }
305                             }
306                         }
307                     }
308 
309                     @Override
310                     public boolean cancellationRequested() {
311                         // If this method is called then an operation within the stream
312                         // pipeline is short-circuiting (see AbstractPipeline.copyInto).
313                         // Note that we cannot differentiate between an upstream or
314                         // downstream operation
315                         cancellationRequestedCalled = true;
316                         return downstream.cancellationRequested();
317                     }
318                 };
319             }
320         };
321     }
322 
323     @Override
324     public LongStream unordered() {
325         if (!isOrdered())
326             return this;
327         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE, StreamOpFlag.NOT_ORDERED) {
328             @Override
329             Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
330                 return sink;
331             }
332         };
333     }
334 
335     @Override
336     public final LongStream filter(LongPredicate predicate) {
337         Objects.requireNonNull(predicate);
338         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
339                                      StreamOpFlag.NOT_SIZED) {
340             @Override
341             Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
342                 return new Sink.ChainedLong<Long>(sink) {
343                     @Override
344                     public void begin(long size) {
345                         downstream.begin(-1);
346                     }
347 
348                     @Override
349                     public void accept(long t) {
350                         if (predicate.test(t))
351                             downstream.accept(t);
352                     }
353                 };
354             }
355         };
356     }
357 
358     @Override
359     public final LongStream peek(LongConsumer action) {
360         Objects.requireNonNull(action);
361         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
362                                      0) {
363             @Override
364             Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
365                 return new Sink.ChainedLong<Long>(sink) {
366                     @Override
367                     public void accept(long t) {
368                         action.accept(t);
369                         downstream.accept(t);
370                     }
371                 };
372             }
373         };
374     }
375 
376     // Stateful intermediate ops from LongStream
377 
378     @Override
379     public final LongStream limit(long maxSize) {
380         if (maxSize < 0)
381             throw new IllegalArgumentException(Long.toString(maxSize));
382         return SliceOps.makeLong(this, 0, maxSize);
383     }
384 
385     @Override
386     public final LongStream skip(long n) {
387         if (n < 0)
388             throw new IllegalArgumentException(Long.toString(n));
389         if (n == 0)
390             return this;
391         else
392             return SliceOps.makeLong(this, n, -1);
393     }
394 
395     @Override
396     public final LongStream takeWhile(LongPredicate predicate) {
397         return WhileOps.makeTakeWhileLong(this, predicate);
398     }
399 
400     @Override
401     public final LongStream dropWhile(LongPredicate predicate) {
402         return WhileOps.makeDropWhileLong(this, predicate);
403     }
404 
405     @Override
406     public final LongStream sorted() {
407         return SortedOps.makeLong(this);
408     }
409 
410     @Override
411     public final LongStream distinct() {
412         // While functional and quick to implement, this approach is not very efficient.
413         // An efficient version requires a long-specific map/set implementation.
414         return boxed().distinct().mapToLong(i -> (long) i);
415     }
416 
417     // Terminal ops from LongStream
418 
419     @Override
420     public void forEach(LongConsumer action) {
421         evaluate(ForEachOps.makeLong(action, false));
422     }
423 
424     @Override
425     public void forEachOrdered(LongConsumer action) {
426         evaluate(ForEachOps.makeLong(action, true));
427     }
428 
429     @Override
430     public final long sum() {
431         // use better algorithm to compensate for intermediate overflow?
432         return reduce(0, Long::sum);
433     }
434 
435     @Override
436     public final OptionalLong min() {
437         return reduce(Math::min);
438     }
439 
440     @Override
441     public final OptionalLong max() {
442         return reduce(Math::max);
443     }
444 
445     @Override
446     public final OptionalDouble average() {
447         long[] avg = collect(() -> new long[2],
448                              (ll, i) -> {
449                                  ll[0]++;
450                                  ll[1] += i;
451                              },
452                              (ll, rr) -> {
453                                  ll[0] += rr[0];
454                                  ll[1] += rr[1];
455                              });
456         return avg[0] > 0
457                ? OptionalDouble.of((double) avg[1] / avg[0])
458                : OptionalDouble.empty();
459     }
460 
461     @Override
462     public final long count() {
463         return evaluate(ReduceOps.makeLongCounting());
464     }
465 
466     @Override
467     public final LongSummaryStatistics summaryStatistics() {
468         return collect(LongSummaryStatistics::new, LongSummaryStatistics::accept,
469                        LongSummaryStatistics::combine);
470     }
471 
472     @Override
473     public final long reduce(long identity, LongBinaryOperator op) {
474         return evaluate(ReduceOps.makeLong(identity, op));
475     }
476 
477     @Override
478     public final OptionalLong reduce(LongBinaryOperator op) {
479         return evaluate(ReduceOps.makeLong(op));
480     }
481 
482     @Override
483     public final <R> R collect(Supplier<R> supplier,
484                                ObjLongConsumer<R> accumulator,
485                                BiConsumer<R, R> combiner) {
486         Objects.requireNonNull(combiner);
487         BinaryOperator<R> operator = (left, right) -> {
488             combiner.accept(left, right);
489             return left;
490         };
491         return evaluate(ReduceOps.makeLong(supplier, accumulator, operator));
492     }
493 
494     @Override
495     public final boolean anyMatch(LongPredicate predicate) {
496         return evaluate(MatchOps.makeLong(predicate, MatchOps.MatchKind.ANY));
497     }
498 
499     @Override
500     public final boolean allMatch(LongPredicate predicate) {
501         return evaluate(MatchOps.makeLong(predicate, MatchOps.MatchKind.ALL));
502     }
503 
504     @Override
505     public final boolean noneMatch(LongPredicate predicate) {
506         return evaluate(MatchOps.makeLong(predicate, MatchOps.MatchKind.NONE));
507     }
508 
509     @Override
510     public final OptionalLong findFirst() {
511         return evaluate(FindOps.makeLong(true));
512     }
513 
514     @Override
515     public final OptionalLong findAny() {
516         return evaluate(FindOps.makeLong(false));
517     }
518 
519     @Override
520     public final long[] toArray() {
521         return Nodes.flattenLong((Node.OfLong) evaluateToArrayNode(Long[]::new))
522                 .asPrimitiveArray();
523     }
524 
525 
526     //
527 
528     /**
529      * Source stage of a LongPipeline.
530      *
531      * @param <E_IN> type of elements in the upstream source
532      * @since 1.8
533      */
534     static class Head<E_IN> extends LongPipeline<E_IN> {
535         /**
536          * Constructor for the source stage of a LongStream.
537          *
538          * @param source {@code Supplier<Spliterator>} describing the stream
539          *               source
540          * @param sourceFlags the source flags for the stream source, described
541          *                    in {@link StreamOpFlag}
542          * @param parallel {@code true} if the pipeline is parallel
543          */
544         Head(Supplier<? extends Spliterator<Long>> source,
545              int sourceFlags, boolean parallel) {
546             super(source, sourceFlags, parallel);
547         }
548 
549         /**
550          * Constructor for the source stage of a LongStream.
551          *
552          * @param source {@code Spliterator} describing the stream source
553          * @param sourceFlags the source flags for the stream source, described
554          *                    in {@link StreamOpFlag}
555          * @param parallel {@code true} if the pipeline is parallel
556          */
557         Head(Spliterator<Long> source,
558              int sourceFlags, boolean parallel) {
559             super(source, sourceFlags, parallel);
560         }
561 
562         @Override
563         final boolean opIsStateful() {
564             throw new UnsupportedOperationException();
565         }
566 
567         @Override
568         final Sink<E_IN> opWrapSink(int flags, Sink<Long> sink) {
569             throw new UnsupportedOperationException();
570         }
571 
572         // Optimized sequential terminal operations for the head of the pipeline
573 
574         @Override
575         public void forEach(LongConsumer action) {
576             if (!isParallel()) {
577                 adapt(sourceStageSpliterator()).forEachRemaining(action);
578             } else {
579                 super.forEach(action);
580             }
581         }
582 
583         @Override
584         public void forEachOrdered(LongConsumer action) {
585             if (!isParallel()) {
586                 adapt(sourceStageSpliterator()).forEachRemaining(action);
587             } else {
588                 super.forEachOrdered(action);
589             }
590         }
591     }
592 
593     /** Base class for a stateless intermediate stage of a LongStream.
594      *
595      * @param <E_IN> type of elements in the upstream source
596      * @since 1.8
597      */
598     abstract static class StatelessOp<E_IN> extends LongPipeline<E_IN> {
599         /**
600          * Construct a new LongStream by appending a stateless intermediate
601          * operation to an existing stream.
602          * @param upstream The upstream pipeline stage
603          * @param inputShape The stream shape for the upstream pipeline stage
604          * @param opFlags Operation flags for the new stage
605          */
606         StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
607                     StreamShape inputShape,
608                     int opFlags) {
609             super(upstream, opFlags);
610             assert upstream.getOutputShape() == inputShape;
611         }
612 
613         @Override
614         final boolean opIsStateful() {
615             return false;
616         }
617     }
618 
619     /**
620      * Base class for a stateful intermediate stage of a LongStream.
621      *
622      * @param <E_IN> type of elements in the upstream source
623      * @since 1.8
624      */
625     abstract static class StatefulOp<E_IN> extends LongPipeline<E_IN> {
626         /**
627          * Construct a new LongStream by appending a stateful intermediate
628          * operation to an existing stream.
629          * @param upstream The upstream pipeline stage
630          * @param inputShape The stream shape for the upstream pipeline stage
631          * @param opFlags Operation flags for the new stage
632          */
633         StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
634                    StreamShape inputShape,
635                    int opFlags) {
636             super(upstream, opFlags);
637             assert upstream.getOutputShape() == inputShape;
638         }
639 
640         @Override
641         final boolean opIsStateful() {
642             return true;
643         }
644 
645         @Override
646         abstract <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper,
647                                                       Spliterator<P_IN> spliterator,
648                                                       IntFunction<Long[]> generator);
649     }
650 }
651