1 /*
2  * Copyright (c) 2012, 2017, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.
8  *
9  * This code is distributed in the hope that it will be useful, but WITHOUT
10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  * version 2 for more details (a copy is included in the LICENSE file that
13  * accompanied this code).
14  *
15  * You should have received a copy of the GNU General Public License version
16  * 2 along with this work; if not, write to the Free Software Foundation,
17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18  *
19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20  * or visit www.oracle.com if you need additional information or have any
21  * questions.
22  */
23 package java.util.stream;
24 
25 import java.util.Collections;
26 import java.util.EnumSet;
27 import java.util.Iterator;
28 import java.util.Set;
29 import java.util.Spliterator;
30 import java.util.SpliteratorTestHelper;
31 import java.util.function.Consumer;
32 import java.util.function.Function;
33 
34 /**
35  * Test scenarios for reference streams.
36  *
37  * Each scenario is provided with a data source, a function that maps a fresh
38  * stream (as provided by the data source) to a new stream, and a sink to
39  * receive results.  Each scenario describes a different way of computing the
40  * stream contents.  The test driver will ensure that all scenarios produce
41  * the same output (modulo allowable differences in ordering).
42  */
43 @SuppressWarnings({"rawtypes", "unchecked"})
44 public enum StreamTestScenario implements OpTestCase.BaseStreamTestScenario {
45 
STREAM_FOR_EACH(false)46     STREAM_FOR_EACH(false) {
47         <T, U, S_IN extends BaseStream<T, S_IN>>
48         void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
49             Stream<U> s = m.apply(source);
50             if (s.isParallel()) {
51                 s = s.sequential();
52             }
53             s.forEach(b);
54         }
55     },
56 
57     // Collec to list
STREAM_COLLECT(false)58     STREAM_COLLECT(false) {
59         <T, U, S_IN extends BaseStream<T, S_IN>>
60         void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
61             for (U t : m.apply(source).collect(Collectors.toList())) {
62                 b.accept(t);
63             }
64         }
65     },
66 
67     // To array
STREAM_TO_ARRAY(false)68     STREAM_TO_ARRAY(false) {
69         <T, U, S_IN extends BaseStream<T, S_IN>>
70         void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
71             for (Object t : m.apply(source).toArray()) {
72                 b.accept((U) t);
73             }
74         }
75     },
76 
77     // Wrap as stream, and iterate in pull mode
STREAM_ITERATOR(false)78     STREAM_ITERATOR(false) {
79         <T, U, S_IN extends BaseStream<T, S_IN>>
80         void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
81             for (Iterator<U> seqIter = m.apply(source).iterator(); seqIter.hasNext(); )
82                 b.accept(seqIter.next());
83         }
84     },
85 
86     // Wrap as stream, and spliterate then iterate in pull mode
STREAM_SPLITERATOR(false)87     STREAM_SPLITERATOR(false) {
88         <T, U, S_IN extends BaseStream<T, S_IN>>
89         void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
90             for (Spliterator<U> spl = m.apply(source).spliterator(); spl.tryAdvance(b); ) {
91             }
92         }
93     },
94 
95     // Wrap as stream, spliterate, then split a few times mixing advances with forEach
STREAM_SPLITERATOR_WITH_MIXED_TRAVERSE_AND_SPLIT(false)96     STREAM_SPLITERATOR_WITH_MIXED_TRAVERSE_AND_SPLIT(false) {
97         <T, U, S_IN extends BaseStream<T, S_IN>>
98         void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
99             SpliteratorTestHelper.mixedTraverseAndSplit(b, m.apply(source).spliterator());
100         }
101     },
102 
103     // Wrap as stream, and spliterate then iterate in pull mode
STREAM_SPLITERATOR_FOREACH(false)104     STREAM_SPLITERATOR_FOREACH(false) {
105         <T, U, S_IN extends BaseStream<T, S_IN>>
106         void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
107             m.apply(source).spliterator().forEachRemaining(b);
108         }
109     },
110 
111     // Wrap as parallel stream + sequential
PAR_STREAM_SEQUENTIAL_FOR_EACH(true)112     PAR_STREAM_SEQUENTIAL_FOR_EACH(true) {
113         <T, U, S_IN extends BaseStream<T, S_IN>>
114         void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
115             m.apply(source).sequential().forEach(b);
116         }
117     },
118 
119     // Wrap as parallel stream + forEachOrdered
PAR_STREAM_FOR_EACH_ORDERED(true)120     PAR_STREAM_FOR_EACH_ORDERED(true) {
121         <T, U, S_IN extends BaseStream<T, S_IN>>
122         void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
123             // @@@ Want to explicitly select ordered equalator
124             m.apply(source).forEachOrdered(b);
125         }
126     },
127 
128     // Wrap as stream, and spliterate then iterate sequentially
PAR_STREAM_SPLITERATOR(true)129     PAR_STREAM_SPLITERATOR(true) {
130         <T, U, S_IN extends BaseStream<T, S_IN>>
131         void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
132             for (Spliterator<U> spl = m.apply(source).spliterator(); spl.tryAdvance(b); ) {
133             }
134         }
135     },
136 
137     // Wrap as stream, and spliterate then iterate sequentially
PAR_STREAM_SPLITERATOR_FOREACH(true)138     PAR_STREAM_SPLITERATOR_FOREACH(true) {
139         <T, U, S_IN extends BaseStream<T, S_IN>>
140         void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
141             m.apply(source).spliterator().forEachRemaining(b);
142         }
143     },
144 
145     // Wrap as parallel stream + toArray
PAR_STREAM_TO_ARRAY(true)146     PAR_STREAM_TO_ARRAY(true) {
147         <T, U, S_IN extends BaseStream<T, S_IN>>
148         void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
149             for (Object t : m.apply(source).toArray())
150                 b.accept((U) t);
151         }
152     },
153 
154     // Wrap as parallel stream, get the spliterator, wrap as a stream + toArray
PAR_STREAM_SPLITERATOR_STREAM_TO_ARRAY(true)155     PAR_STREAM_SPLITERATOR_STREAM_TO_ARRAY(true) {
156         <T, U, S_IN extends BaseStream<T, S_IN>>
157         void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
158             Stream<U> s = m.apply(source);
159             Spliterator<U> sp = s.spliterator();
160             Stream<U> ss = StreamSupport.stream(() -> sp,
161                                                 StreamOpFlag.toCharacteristics(OpTestCase.getStreamFlags(s))
162                                                 | (sp.getExactSizeIfKnown() < 0 ? 0 : Spliterator.SIZED), true);
163             for (Object t : ss.toArray())
164                 b.accept((U) t);
165         }
166     },
167 
168     // Wrap as parallel stream + toArray and clear SIZED flag
PAR_STREAM_TO_ARRAY_CLEAR_SIZED(true)169     PAR_STREAM_TO_ARRAY_CLEAR_SIZED(true) {
170         <T, U, S_IN extends BaseStream<T, S_IN>>
171         void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
172             S_IN pipe1 = (S_IN) OpTestCase.chain(source,
173                                                  new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
174             Stream<U> pipe2 = m.apply(pipe1);
175 
176             for (Object t : pipe2.toArray())
177                 b.accept((U) t);
178         }
179     },
180 
181     // Wrap as parallel + collect to list
PAR_STREAM_COLLECT_TO_LIST(true)182     PAR_STREAM_COLLECT_TO_LIST(true) {
183         <T, U, S_IN extends BaseStream<T, S_IN>>
184         void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
185             for (U u : m.apply(source).collect(Collectors.toList()))
186                 b.accept(u);
187         }
188     },
189 
190     // Wrap sequential as parallel, + collect to list
STREAM_TO_PAR_STREAM_COLLECT_TO_LIST(true)191     STREAM_TO_PAR_STREAM_COLLECT_TO_LIST(true) {
192         public <T, S_IN extends BaseStream<T, S_IN>>
193         S_IN getStream(TestData<T, S_IN> data) {
194             return data.stream().parallel();
195         }
196 
197         <T, U, S_IN extends BaseStream<T, S_IN>>
198         void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
199             for (U u : m.apply(source).collect(Collectors.toList()))
200                 b.accept(u);
201         }
202     },
203 
204     // Wrap parallel as sequential,, + collect
PAR_STREAM_TO_STREAM_COLLECT_TO_LIST(true)205     PAR_STREAM_TO_STREAM_COLLECT_TO_LIST(true) {
206         <T, U, S_IN extends BaseStream<T, S_IN>>
207         void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
208             for (U u : m.apply(source).collect(Collectors.toList()))
209                 b.accept(u);
210         }
211     },
212 
213     // Wrap as parallel stream + forEach synchronizing
PAR_STREAM_FOR_EACH(true, false)214     PAR_STREAM_FOR_EACH(true, false) {
215         <T, U, S_IN extends BaseStream<T, S_IN>>
216         void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
217             m.apply(source).forEach(e -> {
218                 synchronized (data) {
219                     b.accept(e);
220                 }
221             });
222         }
223     },
224 
225     // Wrap as parallel stream + forEach synchronizing and clear SIZED flag
PAR_STREAM_FOR_EACH_CLEAR_SIZED(true, false)226     PAR_STREAM_FOR_EACH_CLEAR_SIZED(true, false) {
227         <T, U, S_IN extends BaseStream<T, S_IN>>
228         void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
229             S_IN pipe1 = (S_IN) OpTestCase.chain(source,
230                                                  new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
231             m.apply(pipe1).forEach(e -> {
232                 synchronized (data) {
233                     b.accept(e);
234                 }
235             });
236         }
237     },
238     ;
239 
240     // The set of scenarios that clean the SIZED flag
241     public static final Set<StreamTestScenario> CLEAR_SIZED_SCENARIOS = Collections.unmodifiableSet(
242             EnumSet.of(PAR_STREAM_TO_ARRAY_CLEAR_SIZED, PAR_STREAM_FOR_EACH_CLEAR_SIZED));
243 
244     private final boolean isParallel;
245 
246     private final boolean isOrdered;
247 
StreamTestScenario(boolean isParallel)248     StreamTestScenario(boolean isParallel) {
249         this(isParallel, true);
250     }
251 
StreamTestScenario(boolean isParallel, boolean isOrdered)252     StreamTestScenario(boolean isParallel, boolean isOrdered) {
253         this.isParallel = isParallel;
254         this.isOrdered = isOrdered;
255     }
256 
getShape()257     public StreamShape getShape() {
258         return StreamShape.REFERENCE;
259     }
260 
isParallel()261     public boolean isParallel() {
262         return isParallel;
263     }
264 
isOrdered()265     public boolean isOrdered() {
266         return isOrdered;
267     }
268 
269     public <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m)270     void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m) {
271         try (S_IN source = getStream(data)) {
272             run(data, source, b, (Function<S_IN, Stream<U>>) m);
273         }
274     }
275 
276     abstract <T, U, S_IN extends BaseStream<T, S_IN>>
run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m)277     void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m);
278 
279 }
280