1 /*
2  * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 package java.util.stream;
26 
27 import java.util.Objects;
28 import java.util.function.Consumer;
29 import java.util.function.DoubleConsumer;
30 import java.util.function.IntConsumer;
31 import java.util.function.LongConsumer;
32 
33 /**
34  * An extension of {@link Consumer} used to conduct values through the stages of
35  * a stream pipeline, with additional methods to manage size information,
36  * control flow, etc.  Before calling the {@code accept()} method on a
37  * {@code Sink} for the first time, you must first call the {@code begin()}
38  * method to inform it that data is coming (optionally informing the sink how
39  * much data is coming), and after all data has been sent, you must call the
40  * {@code end()} method.  After calling {@code end()}, you should not call
41  * {@code accept()} without again calling {@code begin()}.  {@code Sink} also
42  * offers a mechanism by which the sink can cooperatively signal that it does
43  * not wish to receive any more data (the {@code cancellationRequested()}
44  * method), which a source can poll before sending more data to the
45  * {@code Sink}.
46  *
47  * <p>A sink may be in one of two states: an initial state and an active state.
48  * It starts out in the initial state; the {@code begin()} method transitions
49  * it to the active state, and the {@code end()} method transitions it back into
50  * the initial state, where it can be re-used.  Data-accepting methods (such as
51  * {@code accept()} are only valid in the active state.
52  *
53  * @apiNote
54  * A stream pipeline consists of a source, zero or more intermediate stages
55  * (such as filtering or mapping), and a terminal stage, such as reduction or
56  * for-each.  For concreteness, consider the pipeline:
57  *
58  * <pre>{@code
59  *     int longestStringLengthStartingWithA
60  *         = strings.stream()
61  *                  .filter(s -> s.startsWith("A"))
62  *                  .mapToInt(String::length)
63  *                  .max();
64  * }</pre>
65  *
66  * <p>Here, we have three stages, filtering, mapping, and reducing.  The
67  * filtering stage consumes strings and emits a subset of those strings; the
68  * mapping stage consumes strings and emits ints; the reduction stage consumes
69  * those ints and computes the maximal value.
70  *
71  * <p>A {@code Sink} instance is used to represent each stage of this pipeline,
72  * whether the stage accepts objects, ints, longs, or doubles.  Sink has entry
73  * points for {@code accept(Object)}, {@code accept(int)}, etc, so that we do
74  * not need a specialized interface for each primitive specialization.  (It
75  * might be called a "kitchen sink" for this omnivorous tendency.)  The entry
76  * point to the pipeline is the {@code Sink} for the filtering stage, which
77  * sends some elements "downstream" -- into the {@code Sink} for the mapping
78  * stage, which in turn sends integral values downstream into the {@code Sink}
79  * for the reduction stage. The {@code Sink} implementations associated with a
80  * given stage is expected to know the data type for the next stage, and call
81  * the correct {@code accept} method on its downstream {@code Sink}.  Similarly,
82  * each stage must implement the correct {@code accept} method corresponding to
83  * the data type it accepts.
84  *
85  * <p>The specialized subtypes such as {@link Sink.OfInt} override
86  * {@code accept(Object)} to call the appropriate primitive specialization of
87  * {@code accept}, implement the appropriate primitive specialization of
88  * {@code Consumer}, and re-abstract the appropriate primitive specialization of
89  * {@code accept}.
90  *
91  * <p>The chaining subtypes such as {@link ChainedInt} not only implement
92  * {@code Sink.OfInt}, but also maintain a {@code downstream} field which
93  * represents the downstream {@code Sink}, and implement the methods
94  * {@code begin()}, {@code end()}, and {@code cancellationRequested()} to
95  * delegate to the downstream {@code Sink}.  Most implementations of
96  * intermediate operations will use these chaining wrappers.  For example, the
97  * mapping stage in the above example would look like:
98  *
99  * <pre>{@code
100  *     IntSink is = new Sink.ChainedReference<U>(sink) {
101  *         public void accept(U u) {
102  *             downstream.accept(mapper.applyAsInt(u));
103  *         }
104  *     };
105  * }</pre>
106  *
107  * <p>Here, we implement {@code Sink.ChainedReference<U>}, meaning that we expect
108  * to receive elements of type {@code U} as input, and pass the downstream sink
109  * to the constructor.  Because the next stage expects to receive integers, we
110  * must call the {@code accept(int)} method when emitting values to the downstream.
111  * The {@code accept()} method applies the mapping function from {@code U} to
112  * {@code int} and passes the resulting value to the downstream {@code Sink}.
113  *
114  * @param <T> type of elements for value streams
115  * @since 1.8
116  */
117 interface Sink<T> extends Consumer<T> {
118     /**
119      * Resets the sink state to receive a fresh data set.  This must be called
120      * before sending any data to the sink.  After calling {@link #end()},
121      * you may call this method to reset the sink for another calculation.
122      * @param size The exact size of the data to be pushed downstream, if
123      * known or {@code -1} if unknown or infinite.
124      *
125      * <p>Prior to this call, the sink must be in the initial state, and after
126      * this call it is in the active state.
127      */
begin(long size)128     default void begin(long size) {}
129 
130     /**
131      * Indicates that all elements have been pushed.  If the {@code Sink} is
132      * stateful, it should send any stored state downstream at this time, and
133      * should clear any accumulated state (and associated resources).
134      *
135      * <p>Prior to this call, the sink must be in the active state, and after
136      * this call it is returned to the initial state.
137      */
end()138     default void end() {}
139 
140     /**
141      * Indicates that this {@code Sink} does not wish to receive any more data.
142      *
143      * @implSpec The default implementation always returns false.
144      *
145      * @return true if cancellation is requested
146      */
cancellationRequested()147     default boolean cancellationRequested() {
148         return false;
149     }
150 
151     /**
152      * Accepts an int value.
153      *
154      * @implSpec The default implementation throws IllegalStateException.
155      *
156      * @throws IllegalStateException if this sink does not accept int values
157      */
accept(int value)158     default void accept(int value) {
159         throw new IllegalStateException("called wrong accept method");
160     }
161 
162     /**
163      * Accepts a long value.
164      *
165      * @implSpec The default implementation throws IllegalStateException.
166      *
167      * @throws IllegalStateException if this sink does not accept long values
168      */
accept(long value)169     default void accept(long value) {
170         throw new IllegalStateException("called wrong accept method");
171     }
172 
173     /**
174      * Accepts a double value.
175      *
176      * @implSpec The default implementation throws IllegalStateException.
177      *
178      * @throws IllegalStateException if this sink does not accept double values
179      */
accept(double value)180     default void accept(double value) {
181         throw new IllegalStateException("called wrong accept method");
182     }
183 
184     /**
185      * {@code Sink} that implements {@code Sink<Integer>}, re-abstracts
186      * {@code accept(int)}, and wires {@code accept(Integer)} to bridge to
187      * {@code accept(int)}.
188      */
189     interface OfInt extends Sink<Integer>, IntConsumer {
190         @Override
accept(int value)191         void accept(int value);
192 
193         @Override
accept(Integer i)194         default void accept(Integer i) {
195             if (Tripwire.ENABLED)
196                 Tripwire.trip(getClass(), "{0} calling Sink.OfInt.accept(Integer)");
197             accept(i.intValue());
198         }
199     }
200 
201     /**
202      * {@code Sink} that implements {@code Sink<Long>}, re-abstracts
203      * {@code accept(long)}, and wires {@code accept(Long)} to bridge to
204      * {@code accept(long)}.
205      */
206     interface OfLong extends Sink<Long>, LongConsumer {
207         @Override
accept(long value)208         void accept(long value);
209 
210         @Override
accept(Long i)211         default void accept(Long i) {
212             if (Tripwire.ENABLED)
213                 Tripwire.trip(getClass(), "{0} calling Sink.OfLong.accept(Long)");
214             accept(i.longValue());
215         }
216     }
217 
218     /**
219      * {@code Sink} that implements {@code Sink<Double>}, re-abstracts
220      * {@code accept(double)}, and wires {@code accept(Double)} to bridge to
221      * {@code accept(double)}.
222      */
223     interface OfDouble extends Sink<Double>, DoubleConsumer {
224         @Override
accept(double value)225         void accept(double value);
226 
227         @Override
accept(Double i)228         default void accept(Double i) {
229             if (Tripwire.ENABLED)
230                 Tripwire.trip(getClass(), "{0} calling Sink.OfDouble.accept(Double)");
231             accept(i.doubleValue());
232         }
233     }
234 
235     /**
236      * Abstract {@code Sink} implementation for creating chains of
237      * sinks.  The {@code begin}, {@code end}, and
238      * {@code cancellationRequested} methods are wired to chain to the
239      * downstream {@code Sink}.  This implementation takes a downstream
240      * {@code Sink} of unknown input shape and produces a {@code Sink<T>}.  The
241      * implementation of the {@code accept()} method must call the correct
242      * {@code accept()} method on the downstream {@code Sink}.
243      */
244     abstract static class ChainedReference<T, E_OUT> implements Sink<T> {
245         protected final Sink<? super E_OUT> downstream;
246 
ChainedReference(Sink<? super E_OUT> downstream)247         public ChainedReference(Sink<? super E_OUT> downstream) {
248             this.downstream = Objects.requireNonNull(downstream);
249         }
250 
251         @Override
begin(long size)252         public void begin(long size) {
253             downstream.begin(size);
254         }
255 
256         @Override
end()257         public void end() {
258             downstream.end();
259         }
260 
261         @Override
cancellationRequested()262         public boolean cancellationRequested() {
263             return downstream.cancellationRequested();
264         }
265     }
266 
267     /**
268      * Abstract {@code Sink} implementation designed for creating chains of
269      * sinks.  The {@code begin}, {@code end}, and
270      * {@code cancellationRequested} methods are wired to chain to the
271      * downstream {@code Sink}.  This implementation takes a downstream
272      * {@code Sink} of unknown input shape and produces a {@code Sink.OfInt}.
273      * The implementation of the {@code accept()} method must call the correct
274      * {@code accept()} method on the downstream {@code Sink}.
275      */
276     abstract static class ChainedInt<E_OUT> implements Sink.OfInt {
277         protected final Sink<? super E_OUT> downstream;
278 
ChainedInt(Sink<? super E_OUT> downstream)279         public ChainedInt(Sink<? super E_OUT> downstream) {
280             this.downstream = Objects.requireNonNull(downstream);
281         }
282 
283         @Override
begin(long size)284         public void begin(long size) {
285             downstream.begin(size);
286         }
287 
288         @Override
end()289         public void end() {
290             downstream.end();
291         }
292 
293         @Override
cancellationRequested()294         public boolean cancellationRequested() {
295             return downstream.cancellationRequested();
296         }
297     }
298 
299     /**
300      * Abstract {@code Sink} implementation designed for creating chains of
301      * sinks.  The {@code begin}, {@code end}, and
302      * {@code cancellationRequested} methods are wired to chain to the
303      * downstream {@code Sink}.  This implementation takes a downstream
304      * {@code Sink} of unknown input shape and produces a {@code Sink.OfLong}.
305      * The implementation of the {@code accept()} method must call the correct
306      * {@code accept()} method on the downstream {@code Sink}.
307      */
308     abstract static class ChainedLong<E_OUT> implements Sink.OfLong {
309         protected final Sink<? super E_OUT> downstream;
310 
ChainedLong(Sink<? super E_OUT> downstream)311         public ChainedLong(Sink<? super E_OUT> downstream) {
312             this.downstream = Objects.requireNonNull(downstream);
313         }
314 
315         @Override
begin(long size)316         public void begin(long size) {
317             downstream.begin(size);
318         }
319 
320         @Override
end()321         public void end() {
322             downstream.end();
323         }
324 
325         @Override
cancellationRequested()326         public boolean cancellationRequested() {
327             return downstream.cancellationRequested();
328         }
329     }
330 
331     /**
332      * Abstract {@code Sink} implementation designed for creating chains of
333      * sinks.  The {@code begin}, {@code end}, and
334      * {@code cancellationRequested} methods are wired to chain to the
335      * downstream {@code Sink}.  This implementation takes a downstream
336      * {@code Sink} of unknown input shape and produces a {@code Sink.OfDouble}.
337      * The implementation of the {@code accept()} method must call the correct
338      * {@code accept()} method on the downstream {@code Sink}.
339      */
340     abstract static class ChainedDouble<E_OUT> implements Sink.OfDouble {
341         protected final Sink<? super E_OUT> downstream;
342 
ChainedDouble(Sink<? super E_OUT> downstream)343         public ChainedDouble(Sink<? super E_OUT> downstream) {
344             this.downstream = Objects.requireNonNull(downstream);
345         }
346 
347         @Override
begin(long size)348         public void begin(long size) {
349             downstream.begin(size);
350         }
351 
352         @Override
end()353         public void end() {
354             downstream.end();
355         }
356 
357         @Override
cancellationRequested()358         public boolean cancellationRequested() {
359             return downstream.cancellationRequested();
360         }
361     }
362 }
363