1 /*
2  * Copyright (c) 2012, 2017, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 package java.util.stream;
26 
27 import java.util.Spliterator;
28 import java.util.concurrent.CountedCompleter;
29 import java.util.concurrent.ForkJoinPool;
30 import java.util.concurrent.ForkJoinWorkerThread;
31 
32 /**
33  * Abstract base class for most fork-join tasks used to implement stream ops.
34  * Manages splitting logic, tracking of child tasks, and intermediate results.
35  * Each task is associated with a {@link Spliterator} that describes the portion
36  * of the input associated with the subtree rooted at this task.
37  * Tasks may be leaf nodes (which will traverse the elements of
38  * the {@code Spliterator}) or internal nodes (which split the
39  * {@code Spliterator} into multiple child tasks).
40  *
41  * @implNote
42  * <p>This class is based on {@link CountedCompleter}, a form of fork-join task
43  * where each task has a semaphore-like count of uncompleted children, and the
44  * task is implicitly completed and notified when its last child completes.
45  * Internal node tasks will likely override the {@code onCompletion} method from
46  * {@code CountedCompleter} to merge the results from child tasks into the
47  * current task's result.
48  *
49  * <p>Splitting and setting up the child task links is done by {@code compute()}
50  * for internal nodes.  At {@code compute()} time for leaf nodes, it is
51  * guaranteed that the parent's child-related fields (including sibling links
52  * for the parent's children) will be set up for all children.
53  *
54  * <p>For example, a task that performs a reduce would override {@code doLeaf()}
55  * to perform a reduction on that leaf node's chunk using the
56  * {@code Spliterator}, and override {@code onCompletion()} to merge the results
57  * of the child tasks for internal nodes:
58  *
59  * <pre>{@code
60  *     protected S doLeaf() {
61  *         spliterator.forEach(...);
62  *         return localReductionResult;
63  *     }
64  *
65  *     public void onCompletion(CountedCompleter caller) {
66  *         if (!isLeaf()) {
67  *             ReduceTask<P_IN, P_OUT, T, R> child = children;
68  *             R result = child.getLocalResult();
69  *             child = child.nextSibling;
70  *             for (; child != null; child = child.nextSibling)
71  *                 result = combine(result, child.getLocalResult());
72  *             setLocalResult(result);
73  *         }
74  *     }
75  * }</pre>
76  *
77  * <p>Serialization is not supported as there is no intention to serialize
78  * tasks managed by stream ops.
79  *
80  * @param <P_IN> Type of elements input to the pipeline
81  * @param <P_OUT> Type of elements output from the pipeline
82  * @param <R> Type of intermediate result, which may be different from operation
83  *        result type
84  * @param <K> Type of parent, child and sibling tasks
85  * @since 1.8
86  */
87 @SuppressWarnings("serial")
88 abstract class AbstractTask<P_IN, P_OUT, R,
89                             K extends AbstractTask<P_IN, P_OUT, R, K>>
90         extends CountedCompleter<R> {
91 
92     private static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2;
93 
94     /** The pipeline helper, common to all tasks in a computation */
95     protected final PipelineHelper<P_OUT> helper;
96 
97     /**
98      * The spliterator for the portion of the input associated with the subtree
99      * rooted at this task
100      */
101     protected Spliterator<P_IN> spliterator;
102 
103     /** Target leaf size, common to all tasks in a computation */
104     protected long targetSize; // may be lazily initialized
105 
106     /**
107      * The left child.
108      * null if no children
109      * if non-null rightChild is non-null
110      */
111     protected K leftChild;
112 
113     /**
114      * The right child.
115      * null if no children
116      * if non-null leftChild is non-null
117      */
118     protected K rightChild;
119 
120     /** The result of this node, if completed */
121     private R localResult;
122 
123     /**
124      * Constructor for root nodes.
125      *
126      * @param helper The {@code PipelineHelper} describing the stream pipeline
127      *               up to this operation
128      * @param spliterator The {@code Spliterator} describing the source for this
129      *                    pipeline
130      */
AbstractTask(PipelineHelper<P_OUT> helper, Spliterator<P_IN> spliterator)131     protected AbstractTask(PipelineHelper<P_OUT> helper,
132                            Spliterator<P_IN> spliterator) {
133         super(null);
134         this.helper = helper;
135         this.spliterator = spliterator;
136         this.targetSize = 0L;
137     }
138 
139     /**
140      * Constructor for non-root nodes.
141      *
142      * @param parent this node's parent task
143      * @param spliterator {@code Spliterator} describing the subtree rooted at
144      *        this node, obtained by splitting the parent {@code Spliterator}
145      */
AbstractTask(K parent, Spliterator<P_IN> spliterator)146     protected AbstractTask(K parent,
147                            Spliterator<P_IN> spliterator) {
148         super(parent);
149         this.spliterator = spliterator;
150         this.helper = parent.helper;
151         this.targetSize = parent.targetSize;
152     }
153 
154     /**
155      * Default target of leaf tasks for parallel decomposition.
156      * To allow load balancing, we over-partition, currently to approximately
157      * four tasks per processor, which enables others to help out
158      * if leaf tasks are uneven or some processors are otherwise busy.
159      */
getLeafTarget()160     public static int getLeafTarget() {
161         Thread t = Thread.currentThread();
162         if (t instanceof ForkJoinWorkerThread) {
163             return ((ForkJoinWorkerThread) t).getPool().getParallelism() << 2;
164         }
165         else {
166             return LEAF_TARGET;
167         }
168     }
169 
170     /**
171      * Constructs a new node of type T whose parent is the receiver; must call
172      * the AbstractTask(T, Spliterator) constructor with the receiver and the
173      * provided Spliterator.
174      *
175      * @param spliterator {@code Spliterator} describing the subtree rooted at
176      *        this node, obtained by splitting the parent {@code Spliterator}
177      * @return newly constructed child node
178      */
makeChild(Spliterator<P_IN> spliterator)179     protected abstract K makeChild(Spliterator<P_IN> spliterator);
180 
181     /**
182      * Computes the result associated with a leaf node.  Will be called by
183      * {@code compute()} and the result passed to @{code setLocalResult()}
184      *
185      * @return the computed result of a leaf node
186      */
doLeaf()187     protected abstract R doLeaf();
188 
189     /**
190      * Returns a suggested target leaf size based on the initial size estimate.
191      *
192      * @return suggested target leaf size
193      */
suggestTargetSize(long sizeEstimate)194     public static long suggestTargetSize(long sizeEstimate) {
195         long est = sizeEstimate / getLeafTarget();
196         return est > 0L ? est : 1L;
197     }
198 
199     /**
200      * Returns the targetSize, initializing it via the supplied
201      * size estimate if not already initialized.
202      */
getTargetSize(long sizeEstimate)203     protected final long getTargetSize(long sizeEstimate) {
204         long s;
205         return ((s = targetSize) != 0 ? s :
206                 (targetSize = suggestTargetSize(sizeEstimate)));
207     }
208 
209     /**
210      * Returns the local result, if any. Subclasses should use
211      * {@link #setLocalResult(Object)} and {@link #getLocalResult()} to manage
212      * results.  This returns the local result so that calls from within the
213      * fork-join framework will return the correct result.
214      *
215      * @return local result for this node previously stored with
216      * {@link #setLocalResult}
217      */
218     @Override
getRawResult()219     public R getRawResult() {
220         return localResult;
221     }
222 
223     /**
224      * Does nothing; instead, subclasses should use
225      * {@link #setLocalResult(Object)}} to manage results.
226      *
227      * @param result must be null, or an exception is thrown (this is a safety
228      *        tripwire to detect when {@code setRawResult()} is being used
229      *        instead of {@code setLocalResult()}
230      */
231     @Override
setRawResult(R result)232     protected void setRawResult(R result) {
233         if (result != null)
234             throw new IllegalStateException();
235     }
236 
237     /**
238      * Retrieves a result previously stored with {@link #setLocalResult}
239      *
240      * @return local result for this node previously stored with
241      * {@link #setLocalResult}
242      */
getLocalResult()243     protected R getLocalResult() {
244         return localResult;
245     }
246 
247     /**
248      * Associates the result with the task, can be retrieved with
249      * {@link #getLocalResult}
250      *
251      * @param localResult local result for this node
252      */
setLocalResult(R localResult)253     protected void setLocalResult(R localResult) {
254         this.localResult = localResult;
255     }
256 
257     /**
258      * Indicates whether this task is a leaf node.  (Only valid after
259      * {@link #compute} has been called on this node).  If the node is not a
260      * leaf node, then children will be non-null and numChildren will be
261      * positive.
262      *
263      * @return {@code true} if this task is a leaf node
264      */
isLeaf()265     protected boolean isLeaf() {
266         return leftChild == null;
267     }
268 
269     /**
270      * Indicates whether this task is the root node
271      *
272      * @return {@code true} if this task is the root node.
273      */
isRoot()274     protected boolean isRoot() {
275         return getParent() == null;
276     }
277 
278     /**
279      * Returns the parent of this task, or null if this task is the root
280      *
281      * @return the parent of this task, or null if this task is the root
282      */
283     @SuppressWarnings("unchecked")
getParent()284     protected K getParent() {
285         return (K) getCompleter();
286     }
287 
288     /**
289      * Decides whether or not to split a task further or compute it
290      * directly. If computing directly, calls {@code doLeaf} and pass
291      * the result to {@code setRawResult}. Otherwise splits off
292      * subtasks, forking one and continuing as the other.
293      *
294      * <p> The method is structured to conserve resources across a
295      * range of uses.  The loop continues with one of the child tasks
296      * when split, to avoid deep recursion. To cope with spliterators
297      * that may be systematically biased toward left-heavy or
298      * right-heavy splits, we alternate which child is forked versus
299      * continued in the loop.
300      */
301     @Override
compute()302     public void compute() {
303         Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators
304         long sizeEstimate = rs.estimateSize();
305         long sizeThreshold = getTargetSize(sizeEstimate);
306         boolean forkRight = false;
307         @SuppressWarnings("unchecked") K task = (K) this;
308         while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) {
309             K leftChild, rightChild, taskToFork;
310             task.leftChild  = leftChild = task.makeChild(ls);
311             task.rightChild = rightChild = task.makeChild(rs);
312             task.setPendingCount(1);
313             if (forkRight) {
314                 forkRight = false;
315                 rs = ls;
316                 task = leftChild;
317                 taskToFork = rightChild;
318             }
319             else {
320                 forkRight = true;
321                 task = rightChild;
322                 taskToFork = leftChild;
323             }
324             taskToFork.fork();
325             sizeEstimate = rs.estimateSize();
326         }
327         task.setLocalResult(task.doLeaf());
328         task.tryComplete();
329     }
330 
331     /**
332      * {@inheritDoc}
333      *
334      * @implNote
335      * Clears spliterator and children fields.  Overriders MUST call
336      * {@code super.onCompletion} as the last thing they do if they want these
337      * cleared.
338      */
339     @Override
onCompletion(CountedCompleter<?> caller)340     public void onCompletion(CountedCompleter<?> caller) {
341         spliterator = null;
342         leftChild = rightChild = null;
343     }
344 
345     /**
346      * Returns whether this node is a "leftmost" node -- whether the path from
347      * the root to this node involves only traversing leftmost child links.  For
348      * a leaf node, this means it is the first leaf node in the encounter order.
349      *
350      * @return {@code true} if this node is a "leftmost" node
351      */
isLeftmostNode()352     protected boolean isLeftmostNode() {
353         @SuppressWarnings("unchecked")
354         K node = (K) this;
355         while (node != null) {
356             K parent = node.getParent();
357             if (parent != null && parent.leftChild != node)
358                 return false;
359             node = parent;
360         }
361         return true;
362     }
363 }
364