1 /*
2  * Copyright (c) 2012, 2017, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 package java.util.stream;
26 
27 import java.util.Objects;
28 import java.util.Spliterator;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.CountedCompleter;
31 import java.util.function.Consumer;
32 import java.util.function.DoubleConsumer;
33 import java.util.function.IntConsumer;
34 import java.util.function.IntFunction;
35 import java.util.function.LongConsumer;
36 
37 /**
38  * Factory for creating instances of {@code TerminalOp} that perform an
39  * action for every element of a stream.  Supported variants include unordered
40  * traversal (elements are provided to the {@code Consumer} as soon as they are
41  * available), and ordered traversal (elements are provided to the
42  * {@code Consumer} in encounter order.)
43  *
44  * <p>Elements are provided to the {@code Consumer} on whatever thread and
45  * whatever order they become available.  For ordered traversals, it is
46  * guaranteed that processing an element <em>happens-before</em> processing
47  * subsequent elements in the encounter order.
48  *
49  * <p>Exceptions occurring as a result of sending an element to the
50  * {@code Consumer} will be relayed to the caller and traversal will be
51  * prematurely terminated.
52  *
53  * @since 1.8
54  */
55 final class ForEachOps {
56 
ForEachOps()57     private ForEachOps() { }
58 
59     /**
60      * Constructs a {@code TerminalOp} that perform an action for every element
61      * of a stream.
62      *
63      * @param action the {@code Consumer} that receives all elements of a
64      *        stream
65      * @param ordered whether an ordered traversal is requested
66      * @param <T> the type of the stream elements
67      * @return the {@code TerminalOp} instance
68      */
makeRef(Consumer<? super T> action, boolean ordered)69     public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action,
70                                                   boolean ordered) {
71         Objects.requireNonNull(action);
72         return new ForEachOp.OfRef<>(action, ordered);
73     }
74 
75     /**
76      * Constructs a {@code TerminalOp} that perform an action for every element
77      * of an {@code IntStream}.
78      *
79      * @param action the {@code IntConsumer} that receives all elements of a
80      *        stream
81      * @param ordered whether an ordered traversal is requested
82      * @return the {@code TerminalOp} instance
83      */
makeInt(IntConsumer action, boolean ordered)84     public static TerminalOp<Integer, Void> makeInt(IntConsumer action,
85                                                     boolean ordered) {
86         Objects.requireNonNull(action);
87         return new ForEachOp.OfInt(action, ordered);
88     }
89 
90     /**
91      * Constructs a {@code TerminalOp} that perform an action for every element
92      * of a {@code LongStream}.
93      *
94      * @param action the {@code LongConsumer} that receives all elements of a
95      *        stream
96      * @param ordered whether an ordered traversal is requested
97      * @return the {@code TerminalOp} instance
98      */
makeLong(LongConsumer action, boolean ordered)99     public static TerminalOp<Long, Void> makeLong(LongConsumer action,
100                                                   boolean ordered) {
101         Objects.requireNonNull(action);
102         return new ForEachOp.OfLong(action, ordered);
103     }
104 
105     /**
106      * Constructs a {@code TerminalOp} that perform an action for every element
107      * of a {@code DoubleStream}.
108      *
109      * @param action the {@code DoubleConsumer} that receives all elements of
110      *        a stream
111      * @param ordered whether an ordered traversal is requested
112      * @return the {@code TerminalOp} instance
113      */
makeDouble(DoubleConsumer action, boolean ordered)114     public static TerminalOp<Double, Void> makeDouble(DoubleConsumer action,
115                                                       boolean ordered) {
116         Objects.requireNonNull(action);
117         return new ForEachOp.OfDouble(action, ordered);
118     }
119 
120     /**
121      * A {@code TerminalOp} that evaluates a stream pipeline and sends the
122      * output to itself as a {@code TerminalSink}.  Elements will be sent in
123      * whatever thread they become available.  If the traversal is unordered,
124      * they will be sent independent of the stream's encounter order.
125      *
126      * <p>This terminal operation is stateless.  For parallel evaluation, each
127      * leaf instance of a {@code ForEachTask} will send elements to the same
128      * {@code TerminalSink} reference that is an instance of this class.
129      *
130      * @param <T> the output type of the stream pipeline
131      */
132     abstract static class ForEachOp<T>
133             implements TerminalOp<T, Void>, TerminalSink<T, Void> {
134         private final boolean ordered;
135 
ForEachOp(boolean ordered)136         protected ForEachOp(boolean ordered) {
137             this.ordered = ordered;
138         }
139 
140         // TerminalOp
141 
142         @Override
getOpFlags()143         public int getOpFlags() {
144             return ordered ? 0 : StreamOpFlag.NOT_ORDERED;
145         }
146 
147         @Override
evaluateSequential(PipelineHelper<T> helper, Spliterator<S> spliterator)148         public <S> Void evaluateSequential(PipelineHelper<T> helper,
149                                            Spliterator<S> spliterator) {
150             return helper.wrapAndCopyInto(this, spliterator).get();
151         }
152 
153         @Override
evaluateParallel(PipelineHelper<T> helper, Spliterator<S> spliterator)154         public <S> Void evaluateParallel(PipelineHelper<T> helper,
155                                          Spliterator<S> spliterator) {
156             if (ordered)
157                 new ForEachOrderedTask<>(helper, spliterator, this).invoke();
158             else
159                 new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();
160             return null;
161         }
162 
163         // TerminalSink
164 
165         @Override
get()166         public Void get() {
167             return null;
168         }
169 
170         // Implementations
171 
172         /** Implementation class for reference streams */
173         static final class OfRef<T> extends ForEachOp<T> {
174             final Consumer<? super T> consumer;
175 
OfRef(Consumer<? super T> consumer, boolean ordered)176             OfRef(Consumer<? super T> consumer, boolean ordered) {
177                 super(ordered);
178                 this.consumer = consumer;
179             }
180 
181             @Override
accept(T t)182             public void accept(T t) {
183                 consumer.accept(t);
184             }
185         }
186 
187         /** Implementation class for {@code IntStream} */
188         static final class OfInt extends ForEachOp<Integer>
189                 implements Sink.OfInt {
190             final IntConsumer consumer;
191 
OfInt(IntConsumer consumer, boolean ordered)192             OfInt(IntConsumer consumer, boolean ordered) {
193                 super(ordered);
194                 this.consumer = consumer;
195             }
196 
197             @Override
inputShape()198             public StreamShape inputShape() {
199                 return StreamShape.INT_VALUE;
200             }
201 
202             @Override
accept(int t)203             public void accept(int t) {
204                 consumer.accept(t);
205             }
206         }
207 
208         /** Implementation class for {@code LongStream} */
209         static final class OfLong extends ForEachOp<Long>
210                 implements Sink.OfLong {
211             final LongConsumer consumer;
212 
OfLong(LongConsumer consumer, boolean ordered)213             OfLong(LongConsumer consumer, boolean ordered) {
214                 super(ordered);
215                 this.consumer = consumer;
216             }
217 
218             @Override
inputShape()219             public StreamShape inputShape() {
220                 return StreamShape.LONG_VALUE;
221             }
222 
223             @Override
accept(long t)224             public void accept(long t) {
225                 consumer.accept(t);
226             }
227         }
228 
229         /** Implementation class for {@code DoubleStream} */
230         static final class OfDouble extends ForEachOp<Double>
231                 implements Sink.OfDouble {
232             final DoubleConsumer consumer;
233 
OfDouble(DoubleConsumer consumer, boolean ordered)234             OfDouble(DoubleConsumer consumer, boolean ordered) {
235                 super(ordered);
236                 this.consumer = consumer;
237             }
238 
239             @Override
inputShape()240             public StreamShape inputShape() {
241                 return StreamShape.DOUBLE_VALUE;
242             }
243 
244             @Override
accept(double t)245             public void accept(double t) {
246                 consumer.accept(t);
247             }
248         }
249     }
250 
251     /** A {@code ForkJoinTask} for performing a parallel for-each operation */
252     @SuppressWarnings("serial")
253     static final class ForEachTask<S, T> extends CountedCompleter<Void> {
254         private Spliterator<S> spliterator;
255         private final Sink<S> sink;
256         private final PipelineHelper<T> helper;
257         private long targetSize;
258 
ForEachTask(PipelineHelper<T> helper, Spliterator<S> spliterator, Sink<S> sink)259         ForEachTask(PipelineHelper<T> helper,
260                     Spliterator<S> spliterator,
261                     Sink<S> sink) {
262             super(null);
263             this.sink = sink;
264             this.helper = helper;
265             this.spliterator = spliterator;
266             this.targetSize = 0L;
267         }
268 
ForEachTask(ForEachTask<S, T> parent, Spliterator<S> spliterator)269         ForEachTask(ForEachTask<S, T> parent, Spliterator<S> spliterator) {
270             super(parent);
271             this.spliterator = spliterator;
272             this.sink = parent.sink;
273             this.targetSize = parent.targetSize;
274             this.helper = parent.helper;
275         }
276 
277         // Similar to AbstractTask but doesn't need to track child tasks
compute()278         public void compute() {
279             Spliterator<S> rightSplit = spliterator, leftSplit;
280             long sizeEstimate = rightSplit.estimateSize(), sizeThreshold;
281             if ((sizeThreshold = targetSize) == 0L)
282                 targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate);
283             boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags());
284             boolean forkRight = false;
285             Sink<S> taskSink = sink;
286             ForEachTask<S, T> task = this;
287             while (!isShortCircuit || !taskSink.cancellationRequested()) {
288                 if (sizeEstimate <= sizeThreshold ||
289                     (leftSplit = rightSplit.trySplit()) == null) {
290                     task.helper.copyInto(taskSink, rightSplit);
291                     break;
292                 }
293                 ForEachTask<S, T> leftTask = new ForEachTask<>(task, leftSplit);
294                 task.addToPendingCount(1);
295                 ForEachTask<S, T> taskToFork;
296                 if (forkRight) {
297                     forkRight = false;
298                     rightSplit = leftSplit;
299                     taskToFork = task;
300                     task = leftTask;
301                 }
302                 else {
303                     forkRight = true;
304                     taskToFork = leftTask;
305                 }
306                 taskToFork.fork();
307                 sizeEstimate = rightSplit.estimateSize();
308             }
309             task.spliterator = null;
310             task.propagateCompletion();
311         }
312     }
313 
314     /**
315      * A {@code ForkJoinTask} for performing a parallel for-each operation
316      * which visits the elements in encounter order
317      */
318     @SuppressWarnings("serial")
319     static final class ForEachOrderedTask<S, T> extends CountedCompleter<Void> {
320         /*
321          * Our goal is to ensure that the elements associated with a task are
322          * processed according to an in-order traversal of the computation tree.
323          * We use completion counts for representing these dependencies, so that
324          * a task does not complete until all the tasks preceding it in this
325          * order complete.  We use the "completion map" to associate the next
326          * task in this order for any left child.  We increase the pending count
327          * of any node on the right side of such a mapping by one to indicate
328          * its dependency, and when a node on the left side of such a mapping
329          * completes, it decrements the pending count of its corresponding right
330          * side.  As the computation tree is expanded by splitting, we must
331          * atomically update the mappings to maintain the invariant that the
332          * completion map maps left children to the next node in the in-order
333          * traversal.
334          *
335          * Take, for example, the following computation tree of tasks:
336          *
337          *       a
338          *      / \
339          *     b   c
340          *    / \ / \
341          *   d  e f  g
342          *
343          * The complete map will contain (not necessarily all at the same time)
344          * the following associations:
345          *
346          *   d -> e
347          *   b -> f
348          *   f -> g
349          *
350          * Tasks e, f, g will have their pending counts increased by 1.
351          *
352          * The following relationships hold:
353          *
354          *   - completion of d "happens-before" e;
355          *   - completion of d and e "happens-before b;
356          *   - completion of b "happens-before" f; and
357          *   - completion of f "happens-before" g
358          *
359          * Thus overall the "happens-before" relationship holds for the
360          * reporting of elements, covered by tasks d, e, f and g, as specified
361          * by the forEachOrdered operation.
362          */
363 
364         private final PipelineHelper<T> helper;
365         private Spliterator<S> spliterator;
366         private final long targetSize;
367         private final ConcurrentHashMap<ForEachOrderedTask<S, T>, ForEachOrderedTask<S, T>> completionMap;
368         private final Sink<T> action;
369         private final ForEachOrderedTask<S, T> leftPredecessor;
370         private Node<T> node;
371 
ForEachOrderedTask(PipelineHelper<T> helper, Spliterator<S> spliterator, Sink<T> action)372         protected ForEachOrderedTask(PipelineHelper<T> helper,
373                                      Spliterator<S> spliterator,
374                                      Sink<T> action) {
375             super(null);
376             this.helper = helper;
377             this.spliterator = spliterator;
378             this.targetSize = AbstractTask.suggestTargetSize(spliterator.estimateSize());
379             // Size map to avoid concurrent re-sizes
380             this.completionMap = new ConcurrentHashMap<>(Math.max(16, AbstractTask.getLeafTarget() << 1));
381             this.action = action;
382             this.leftPredecessor = null;
383         }
384 
ForEachOrderedTask(ForEachOrderedTask<S, T> parent, Spliterator<S> spliterator, ForEachOrderedTask<S, T> leftPredecessor)385         ForEachOrderedTask(ForEachOrderedTask<S, T> parent,
386                            Spliterator<S> spliterator,
387                            ForEachOrderedTask<S, T> leftPredecessor) {
388             super(parent);
389             this.helper = parent.helper;
390             this.spliterator = spliterator;
391             this.targetSize = parent.targetSize;
392             this.completionMap = parent.completionMap;
393             this.action = parent.action;
394             this.leftPredecessor = leftPredecessor;
395         }
396 
397         @Override
compute()398         public final void compute() {
399             doCompute(this);
400         }
401 
doCompute(ForEachOrderedTask<S, T> task)402         private static <S, T> void doCompute(ForEachOrderedTask<S, T> task) {
403             Spliterator<S> rightSplit = task.spliterator, leftSplit;
404             long sizeThreshold = task.targetSize;
405             boolean forkRight = false;
406             while (rightSplit.estimateSize() > sizeThreshold &&
407                    (leftSplit = rightSplit.trySplit()) != null) {
408                 ForEachOrderedTask<S, T> leftChild =
409                     new ForEachOrderedTask<>(task, leftSplit, task.leftPredecessor);
410                 ForEachOrderedTask<S, T> rightChild =
411                     new ForEachOrderedTask<>(task, rightSplit, leftChild);
412 
413                 // Fork the parent task
414                 // Completion of the left and right children "happens-before"
415                 // completion of the parent
416                 task.addToPendingCount(1);
417                 // Completion of the left child "happens-before" completion of
418                 // the right child
419                 rightChild.addToPendingCount(1);
420                 task.completionMap.put(leftChild, rightChild);
421 
422                 // If task is not on the left spine
423                 if (task.leftPredecessor != null) {
424                     /*
425                      * Completion of left-predecessor, or left subtree,
426                      * "happens-before" completion of left-most leaf node of
427                      * right subtree.
428                      * The left child's pending count needs to be updated before
429                      * it is associated in the completion map, otherwise the
430                      * left child can complete prematurely and violate the
431                      * "happens-before" constraint.
432                      */
433                     leftChild.addToPendingCount(1);
434                     // Update association of left-predecessor to left-most
435                     // leaf node of right subtree
436                     if (task.completionMap.replace(task.leftPredecessor, task, leftChild)) {
437                         // If replaced, adjust the pending count of the parent
438                         // to complete when its children complete
439                         task.addToPendingCount(-1);
440                     } else {
441                         // Left-predecessor has already completed, parent's
442                         // pending count is adjusted by left-predecessor;
443                         // left child is ready to complete
444                         leftChild.addToPendingCount(-1);
445                     }
446                 }
447 
448                 ForEachOrderedTask<S, T> taskToFork;
449                 if (forkRight) {
450                     forkRight = false;
451                     rightSplit = leftSplit;
452                     task = leftChild;
453                     taskToFork = rightChild;
454                 }
455                 else {
456                     forkRight = true;
457                     task = rightChild;
458                     taskToFork = leftChild;
459                 }
460                 taskToFork.fork();
461             }
462 
463             /*
464              * Task's pending count is either 0 or 1.  If 1 then the completion
465              * map will contain a value that is task, and two calls to
466              * tryComplete are required for completion, one below and one
467              * triggered by the completion of task's left-predecessor in
468              * onCompletion.  Therefore there is no data race within the if
469              * block.
470              */
471             if (task.getPendingCount() > 0) {
472                 // Cannot complete just yet so buffer elements into a Node
473                 // for use when completion occurs
474                 @SuppressWarnings("unchecked")
475                 IntFunction<T[]> generator = size -> (T[]) new Object[size];
476                 Node.Builder<T> nb = task.helper.makeNodeBuilder(
477                         task.helper.exactOutputSizeIfKnown(rightSplit),
478                         generator);
479                 task.node = task.helper.wrapAndCopyInto(nb, rightSplit).build();
480                 task.spliterator = null;
481             }
482             task.tryComplete();
483         }
484 
485         @Override
onCompletion(CountedCompleter<?> caller)486         public void onCompletion(CountedCompleter<?> caller) {
487             if (node != null) {
488                 // Dump buffered elements from this leaf into the sink
489                 node.forEach(action);
490                 node = null;
491             }
492             else if (spliterator != null) {
493                 // Dump elements output from this leaf's pipeline into the sink
494                 helper.wrapAndCopyInto(action, spliterator);
495                 spliterator = null;
496             }
497 
498             // The completion of this task *and* the dumping of elements
499             // "happens-before" completion of the associated left-most leaf task
500             // of right subtree (if any, which can be this task's right sibling)
501             //
502             ForEachOrderedTask<S, T> leftDescendant = completionMap.remove(this);
503             if (leftDescendant != null)
504                 leftDescendant.tryComplete();
505         }
506     }
507 }
508