1 /*
2  * Copyright (c) 2013, 2017, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 package java.util.stream;
26 
27 import java.util.DoubleSummaryStatistics;
28 import java.util.Objects;
29 import java.util.OptionalDouble;
30 import java.util.PrimitiveIterator;
31 import java.util.Spliterator;
32 import java.util.Spliterators;
33 import java.util.function.BiConsumer;
34 import java.util.function.BinaryOperator;
35 import java.util.function.DoubleBinaryOperator;
36 import java.util.function.DoubleConsumer;
37 import java.util.function.DoubleFunction;
38 import java.util.function.DoublePredicate;
39 import java.util.function.DoubleToIntFunction;
40 import java.util.function.DoubleToLongFunction;
41 import java.util.function.DoubleUnaryOperator;
42 import java.util.function.IntFunction;
43 import java.util.function.ObjDoubleConsumer;
44 import java.util.function.Supplier;
45 
46 /**
47  * Abstract base class for an intermediate pipeline stage or pipeline source
48  * stage implementing whose elements are of type {@code double}.
49  *
50  * @param <E_IN> type of elements in the upstream source
51  *
52  * @since 1.8
53  */
54 abstract class DoublePipeline<E_IN>
55         extends AbstractPipeline<E_IN, Double, DoubleStream>
56         implements DoubleStream {
57 
58     /**
59      * Constructor for the head of a stream pipeline.
60      *
61      * @param source {@code Supplier<Spliterator>} describing the stream source
62      * @param sourceFlags the source flags for the stream source, described in
63      * {@link StreamOpFlag}
64      */
DoublePipeline(Supplier<? extends Spliterator<Double>> source, int sourceFlags, boolean parallel)65     DoublePipeline(Supplier<? extends Spliterator<Double>> source,
66                    int sourceFlags, boolean parallel) {
67         super(source, sourceFlags, parallel);
68     }
69 
70     /**
71      * Constructor for the head of a stream pipeline.
72      *
73      * @param source {@code Spliterator} describing the stream source
74      * @param sourceFlags the source flags for the stream source, described in
75      * {@link StreamOpFlag}
76      */
DoublePipeline(Spliterator<Double> source, int sourceFlags, boolean parallel)77     DoublePipeline(Spliterator<Double> source,
78                    int sourceFlags, boolean parallel) {
79         super(source, sourceFlags, parallel);
80     }
81 
82     /**
83      * Constructor for appending an intermediate operation onto an existing
84      * pipeline.
85      *
86      * @param upstream the upstream element source.
87      * @param opFlags the operation flags
88      */
DoublePipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags)89     DoublePipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) {
90         super(upstream, opFlags);
91     }
92 
93     /**
94      * Adapt a {@code Sink<Double> to a {@code DoubleConsumer}, ideally simply
95      * by casting.
96      */
adapt(Sink<Double> sink)97     private static DoubleConsumer adapt(Sink<Double> sink) {
98         if (sink instanceof DoubleConsumer) {
99             return (DoubleConsumer) sink;
100         } else {
101             if (Tripwire.ENABLED)
102                 Tripwire.trip(AbstractPipeline.class,
103                               "using DoubleStream.adapt(Sink<Double> s)");
104             return sink::accept;
105         }
106     }
107 
108     /**
109      * Adapt a {@code Spliterator<Double>} to a {@code Spliterator.OfDouble}.
110      *
111      * @implNote
112      * The implementation attempts to cast to a Spliterator.OfDouble, and throws
113      * an exception if this cast is not possible.
114      */
adapt(Spliterator<Double> s)115     private static Spliterator.OfDouble adapt(Spliterator<Double> s) {
116         if (s instanceof Spliterator.OfDouble) {
117             return (Spliterator.OfDouble) s;
118         } else {
119             if (Tripwire.ENABLED)
120                 Tripwire.trip(AbstractPipeline.class,
121                               "using DoubleStream.adapt(Spliterator<Double> s)");
122             throw new UnsupportedOperationException("DoubleStream.adapt(Spliterator<Double> s)");
123         }
124     }
125 
126 
127     // Shape-specific methods
128 
129     @Override
getOutputShape()130     final StreamShape getOutputShape() {
131         return StreamShape.DOUBLE_VALUE;
132     }
133 
134     @Override
evaluateToNode(PipelineHelper<Double> helper, Spliterator<P_IN> spliterator, boolean flattenTree, IntFunction<Double[]> generator)135     final <P_IN> Node<Double> evaluateToNode(PipelineHelper<Double> helper,
136                                              Spliterator<P_IN> spliterator,
137                                              boolean flattenTree,
138                                              IntFunction<Double[]> generator) {
139         return Nodes.collectDouble(helper, spliterator, flattenTree);
140     }
141 
142     @Override
wrap(PipelineHelper<Double> ph, Supplier<Spliterator<P_IN>> supplier, boolean isParallel)143     final <P_IN> Spliterator<Double> wrap(PipelineHelper<Double> ph,
144                                           Supplier<Spliterator<P_IN>> supplier,
145                                           boolean isParallel) {
146         return new StreamSpliterators.DoubleWrappingSpliterator<>(ph, supplier, isParallel);
147     }
148 
149     @Override
150     @SuppressWarnings("unchecked")
lazySpliterator(Supplier<? extends Spliterator<Double>> supplier)151     final Spliterator.OfDouble lazySpliterator(Supplier<? extends Spliterator<Double>> supplier) {
152         return new StreamSpliterators.DelegatingSpliterator.OfDouble((Supplier<Spliterator.OfDouble>) supplier);
153     }
154 
155     @Override
forEachWithCancel(Spliterator<Double> spliterator, Sink<Double> sink)156     final boolean forEachWithCancel(Spliterator<Double> spliterator, Sink<Double> sink) {
157         Spliterator.OfDouble spl = adapt(spliterator);
158         DoubleConsumer adaptedSink = adapt(sink);
159         boolean cancelled;
160         do { } while (!(cancelled = sink.cancellationRequested()) && spl.tryAdvance(adaptedSink));
161         return cancelled;
162     }
163 
164     @Override
makeNodeBuilder(long exactSizeIfKnown, IntFunction<Double[]> generator)165     final  Node.Builder<Double> makeNodeBuilder(long exactSizeIfKnown, IntFunction<Double[]> generator) {
166         return Nodes.doubleBuilder(exactSizeIfKnown);
167     }
168 
mapToObj(DoubleFunction<? extends U> mapper, int opFlags)169     private <U> Stream<U> mapToObj(DoubleFunction<? extends U> mapper, int opFlags) {
170         return new ReferencePipeline.StatelessOp<Double, U>(this, StreamShape.DOUBLE_VALUE, opFlags) {
171             @Override
172             Sink<Double> opWrapSink(int flags, Sink<U> sink) {
173                 return new Sink.ChainedDouble<U>(sink) {
174                     @Override
175                     public void accept(double t) {
176                         downstream.accept(mapper.apply(t));
177                     }
178                 };
179             }
180         };
181     }
182 
183     // DoubleStream
184 
185     @Override
186     public final PrimitiveIterator.OfDouble iterator() {
187         return Spliterators.iterator(spliterator());
188     }
189 
190     @Override
191     public final Spliterator.OfDouble spliterator() {
192         return adapt(super.spliterator());
193     }
194 
195     // Stateless intermediate ops from DoubleStream
196 
197     @Override
198     public final Stream<Double> boxed() {
199         return mapToObj(Double::valueOf, 0);
200     }
201 
202     @Override
203     public final DoubleStream map(DoubleUnaryOperator mapper) {
204         Objects.requireNonNull(mapper);
205         return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
206                                        StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
207             @Override
208             Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
209                 return new Sink.ChainedDouble<Double>(sink) {
210                     @Override
211                     public void accept(double t) {
212                         downstream.accept(mapper.applyAsDouble(t));
213                     }
214                 };
215             }
216         };
217     }
218 
219     @Override
220     public final <U> Stream<U> mapToObj(DoubleFunction<? extends U> mapper) {
221         Objects.requireNonNull(mapper);
222         return mapToObj(mapper, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT);
223     }
224 
225     @Override
226     public final IntStream mapToInt(DoubleToIntFunction mapper) {
227         Objects.requireNonNull(mapper);
228         return new IntPipeline.StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
229                                                    StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
230             @Override
231             Sink<Double> opWrapSink(int flags, Sink<Integer> sink) {
232                 return new Sink.ChainedDouble<Integer>(sink) {
233                     @Override
234                     public void accept(double t) {
235                         downstream.accept(mapper.applyAsInt(t));
236                     }
237                 };
238             }
239         };
240     }
241 
242     @Override
243     public final LongStream mapToLong(DoubleToLongFunction mapper) {
244         Objects.requireNonNull(mapper);
245         return new LongPipeline.StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
246                                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
247             @Override
248             Sink<Double> opWrapSink(int flags, Sink<Long> sink) {
249                 return new Sink.ChainedDouble<Long>(sink) {
250                     @Override
251                     public void accept(double t) {
252                         downstream.accept(mapper.applyAsLong(t));
253                     }
254                 };
255             }
256         };
257     }
258 
259     @Override
260     public final DoubleStream flatMap(DoubleFunction<? extends DoubleStream> mapper) {
261         Objects.requireNonNull(mapper);
262         return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
263                                         StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
264             @Override
265             Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
266                 return new Sink.ChainedDouble<Double>(sink) {
267                     // true if cancellationRequested() has been called
268                     boolean cancellationRequestedCalled;
269 
270                     // cache the consumer to avoid creation on every accepted element
271                     DoubleConsumer downstreamAsDouble = downstream::accept;
272 
273                     @Override
274                     public void begin(long size) {
275                         downstream.begin(-1);
276                     }
277 
278                     @Override
279                     public void accept(double t) {
280                         try (DoubleStream result = mapper.apply(t)) {
281                             if (result != null) {
282                                 if (!cancellationRequestedCalled) {
283                                     result.sequential().forEach(downstreamAsDouble);
284                                 }
285                                 else {
286                                     var s = result.sequential().spliterator();
287                                     do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsDouble));
288                                 }
289                             }
290                         }
291                     }
292 
293                     @Override
294                     public boolean cancellationRequested() {
295                         // If this method is called then an operation within the stream
296                         // pipeline is short-circuiting (see AbstractPipeline.copyInto).
297                         // Note that we cannot differentiate between an upstream or
298                         // downstream operation
299                         cancellationRequestedCalled = true;
300                         return downstream.cancellationRequested();
301                     }
302                 };
303             }
304         };
305     }
306 
307     @Override
308     public DoubleStream unordered() {
309         if (!isOrdered())
310             return this;
311         return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE, StreamOpFlag.NOT_ORDERED) {
312             @Override
313             Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
314                 return sink;
315             }
316         };
317     }
318 
319     @Override
320     public final DoubleStream filter(DoublePredicate predicate) {
321         Objects.requireNonNull(predicate);
322         return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
323                                        StreamOpFlag.NOT_SIZED) {
324             @Override
325             Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
326                 return new Sink.ChainedDouble<Double>(sink) {
327                     @Override
328                     public void begin(long size) {
329                         downstream.begin(-1);
330                     }
331 
332                     @Override
333                     public void accept(double t) {
334                         if (predicate.test(t))
335                             downstream.accept(t);
336                     }
337                 };
338             }
339         };
340     }
341 
342     @Override
343     public final DoubleStream peek(DoubleConsumer action) {
344         Objects.requireNonNull(action);
345         return new StatelessOp<Double>(this, StreamShape.DOUBLE_VALUE,
346                                        0) {
347             @Override
348             Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
349                 return new Sink.ChainedDouble<Double>(sink) {
350                     @Override
351                     public void accept(double t) {
352                         action.accept(t);
353                         downstream.accept(t);
354                     }
355                 };
356             }
357         };
358     }
359 
360     // Stateful intermediate ops from DoubleStream
361 
362     @Override
363     public final DoubleStream limit(long maxSize) {
364         if (maxSize < 0)
365             throw new IllegalArgumentException(Long.toString(maxSize));
366         return SliceOps.makeDouble(this, (long) 0, maxSize);
367     }
368 
369     @Override
370     public final DoubleStream skip(long n) {
371         if (n < 0)
372             throw new IllegalArgumentException(Long.toString(n));
373         if (n == 0)
374             return this;
375         else {
376             long limit = -1;
377             return SliceOps.makeDouble(this, n, limit);
378         }
379     }
380 
381     @Override
382     public final DoubleStream takeWhile(DoublePredicate predicate) {
383         return WhileOps.makeTakeWhileDouble(this, predicate);
384     }
385 
386     @Override
387     public final DoubleStream dropWhile(DoublePredicate predicate) {
388         return WhileOps.makeDropWhileDouble(this, predicate);
389     }
390 
391     @Override
392     public final DoubleStream sorted() {
393         return SortedOps.makeDouble(this);
394     }
395 
396     @Override
397     public final DoubleStream distinct() {
398         // While functional and quick to implement, this approach is not very efficient.
399         // An efficient version requires a double-specific map/set implementation.
400         return boxed().distinct().mapToDouble(i -> (double) i);
401     }
402 
403     // Terminal ops from DoubleStream
404 
405     @Override
406     public void forEach(DoubleConsumer consumer) {
407         evaluate(ForEachOps.makeDouble(consumer, false));
408     }
409 
410     @Override
411     public void forEachOrdered(DoubleConsumer consumer) {
412         evaluate(ForEachOps.makeDouble(consumer, true));
413     }
414 
415     @Override
416     public final double sum() {
417         /*
418          * In the arrays allocated for the collect operation, index 0
419          * holds the high-order bits of the running sum, index 1 holds
420          * the low-order bits of the sum computed via compensated
421          * summation, and index 2 holds the simple sum used to compute
422          * the proper result if the stream contains infinite values of
423          * the same sign.
424          */
425         double[] summation = collect(() -> new double[3],
426                                (ll, d) -> {
427                                    Collectors.sumWithCompensation(ll, d);
428                                    ll[2] += d;
429                                },
430                                (ll, rr) -> {
431                                    Collectors.sumWithCompensation(ll, rr[0]);
432                                    Collectors.sumWithCompensation(ll, rr[1]);
433                                    ll[2] += rr[2];
434                                });
435 
436         return Collectors.computeFinalSum(summation);
437     }
438 
439     @Override
440     public final OptionalDouble min() {
441         return reduce(Math::min);
442     }
443 
444     @Override
445     public final OptionalDouble max() {
446         return reduce(Math::max);
447     }
448 
449     /**
450      * {@inheritDoc}
451      *
452      * @implNote The {@code double} format can represent all
453      * consecutive integers in the range -2<sup>53</sup> to
454      * 2<sup>53</sup>. If the pipeline has more than 2<sup>53</sup>
455      * values, the divisor in the average computation will saturate at
456      * 2<sup>53</sup>, leading to additional numerical errors.
457      */
458     @Override
459     public final OptionalDouble average() {
460         /*
461          * In the arrays allocated for the collect operation, index 0
462          * holds the high-order bits of the running sum, index 1 holds
463          * the low-order bits of the sum computed via compensated
464          * summation, index 2 holds the number of values seen, index 3
465          * holds the simple sum.
466          */
467         double[] avg = collect(() -> new double[4],
468                                (ll, d) -> {
469                                    ll[2]++;
470                                    Collectors.sumWithCompensation(ll, d);
471                                    ll[3] += d;
472                                },
473                                (ll, rr) -> {
474                                    Collectors.sumWithCompensation(ll, rr[0]);
475                                    Collectors.sumWithCompensation(ll, rr[1]);
476                                    ll[2] += rr[2];
477                                    ll[3] += rr[3];
478                                });
479         return avg[2] > 0
480             ? OptionalDouble.of(Collectors.computeFinalSum(avg) / avg[2])
481             : OptionalDouble.empty();
482     }
483 
484     @Override
485     public final long count() {
486         return evaluate(ReduceOps.makeDoubleCounting());
487     }
488 
489     @Override
490     public final DoubleSummaryStatistics summaryStatistics() {
491         return collect(DoubleSummaryStatistics::new, DoubleSummaryStatistics::accept,
492                        DoubleSummaryStatistics::combine);
493     }
494 
495     @Override
496     public final double reduce(double identity, DoubleBinaryOperator op) {
497         return evaluate(ReduceOps.makeDouble(identity, op));
498     }
499 
500     @Override
501     public final OptionalDouble reduce(DoubleBinaryOperator op) {
502         return evaluate(ReduceOps.makeDouble(op));
503     }
504 
505     @Override
506     public final <R> R collect(Supplier<R> supplier,
507                                ObjDoubleConsumer<R> accumulator,
508                                BiConsumer<R, R> combiner) {
509         Objects.requireNonNull(combiner);
510         BinaryOperator<R> operator = (left, right) -> {
511             combiner.accept(left, right);
512             return left;
513         };
514         return evaluate(ReduceOps.makeDouble(supplier, accumulator, operator));
515     }
516 
517     @Override
518     public final boolean anyMatch(DoublePredicate predicate) {
519         return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.ANY));
520     }
521 
522     @Override
523     public final boolean allMatch(DoublePredicate predicate) {
524         return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.ALL));
525     }
526 
527     @Override
528     public final boolean noneMatch(DoublePredicate predicate) {
529         return evaluate(MatchOps.makeDouble(predicate, MatchOps.MatchKind.NONE));
530     }
531 
532     @Override
533     public final OptionalDouble findFirst() {
534         return evaluate(FindOps.makeDouble(true));
535     }
536 
537     @Override
538     public final OptionalDouble findAny() {
539         return evaluate(FindOps.makeDouble(false));
540     }
541 
542     @Override
543     public final double[] toArray() {
544         return Nodes.flattenDouble((Node.OfDouble) evaluateToArrayNode(Double[]::new))
545                         .asPrimitiveArray();
546     }
547 
548     //
549 
550     /**
551      * Source stage of a DoubleStream
552      *
553      * @param <E_IN> type of elements in the upstream source
554      */
555     static class Head<E_IN> extends DoublePipeline<E_IN> {
556         /**
557          * Constructor for the source stage of a DoubleStream.
558          *
559          * @param source {@code Supplier<Spliterator>} describing the stream
560          *               source
561          * @param sourceFlags the source flags for the stream source, described
562          *                    in {@link StreamOpFlag}
563          * @param parallel {@code true} if the pipeline is parallel
564          */
565         Head(Supplier<? extends Spliterator<Double>> source,
566              int sourceFlags, boolean parallel) {
567             super(source, sourceFlags, parallel);
568         }
569 
570         /**
571          * Constructor for the source stage of a DoubleStream.
572          *
573          * @param source {@code Spliterator} describing the stream source
574          * @param sourceFlags the source flags for the stream source, described
575          *                    in {@link StreamOpFlag}
576          * @param parallel {@code true} if the pipeline is parallel
577          */
578         Head(Spliterator<Double> source,
579              int sourceFlags, boolean parallel) {
580             super(source, sourceFlags, parallel);
581         }
582 
583         @Override
584         final boolean opIsStateful() {
585             throw new UnsupportedOperationException();
586         }
587 
588         @Override
589         final Sink<E_IN> opWrapSink(int flags, Sink<Double> sink) {
590             throw new UnsupportedOperationException();
591         }
592 
593         // Optimized sequential terminal operations for the head of the pipeline
594 
595         @Override
596         public void forEach(DoubleConsumer consumer) {
597             if (!isParallel()) {
598                 adapt(sourceStageSpliterator()).forEachRemaining(consumer);
599             }
600             else {
601                 super.forEach(consumer);
602             }
603         }
604 
605         @Override
606         public void forEachOrdered(DoubleConsumer consumer) {
607             if (!isParallel()) {
608                 adapt(sourceStageSpliterator()).forEachRemaining(consumer);
609             }
610             else {
611                 super.forEachOrdered(consumer);
612             }
613         }
614 
615     }
616 
617     /**
618      * Base class for a stateless intermediate stage of a DoubleStream.
619      *
620      * @param <E_IN> type of elements in the upstream source
621      * @since 1.8
622      */
623     abstract static class StatelessOp<E_IN> extends DoublePipeline<E_IN> {
624         /**
625          * Construct a new DoubleStream by appending a stateless intermediate
626          * operation to an existing stream.
627          *
628          * @param upstream the upstream pipeline stage
629          * @param inputShape the stream shape for the upstream pipeline stage
630          * @param opFlags operation flags for the new stage
631          */
632         StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
633                     StreamShape inputShape,
634                     int opFlags) {
635             super(upstream, opFlags);
636             assert upstream.getOutputShape() == inputShape;
637         }
638 
639         @Override
640         final boolean opIsStateful() {
641             return false;
642         }
643     }
644 
645     /**
646      * Base class for a stateful intermediate stage of a DoubleStream.
647      *
648      * @param <E_IN> type of elements in the upstream source
649      * @since 1.8
650      */
651     abstract static class StatefulOp<E_IN> extends DoublePipeline<E_IN> {
652         /**
653          * Construct a new DoubleStream by appending a stateful intermediate
654          * operation to an existing stream.
655          *
656          * @param upstream the upstream pipeline stage
657          * @param inputShape the stream shape for the upstream pipeline stage
658          * @param opFlags operation flags for the new stage
659          */
660         StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
661                    StreamShape inputShape,
662                    int opFlags) {
663             super(upstream, opFlags);
664             assert upstream.getOutputShape() == inputShape;
665         }
666 
667         @Override
668         final boolean opIsStateful() {
669             return true;
670         }
671 
672         @Override
673         abstract <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper,
674                                                         Spliterator<P_IN> spliterator,
675                                                         IntFunction<Double[]> generator);
676     }
677 }
678