1 /*
2  * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.
8  *
9  * This code is distributed in the hope that it will be useful, but WITHOUT
10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  * version 2 for more details (a copy is included in the LICENSE file that
13  * accompanied this code).
14  *
15  * You should have received a copy of the GNU General Public License version
16  * 2 along with this work; if not, write to the Free Software Foundation,
17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18  *
19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20  * or visit www.oracle.com if you need additional information or have any
21  * questions.
22  */
23 package java.util.stream;
24 
25 import java.io.PrintWriter;
26 import java.io.StringWriter;
27 import java.util.ArrayList;
28 import java.util.Arrays;
29 import java.util.Collection;
30 import java.util.Collections;
31 import java.util.EnumMap;
32 import java.util.EnumSet;
33 import java.util.HashMap;
34 import java.util.HashSet;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.Objects;
38 import java.util.Set;
39 import java.util.Spliterator;
40 import java.util.function.BiConsumer;
41 import java.util.function.Consumer;
42 import java.util.function.Function;
43 
44 import org.testng.annotations.Test;
45 
46 /**
47  * Base class for streams test cases.  Provides 'exercise' methods for taking
48  * lambdas that construct and modify streams, and evaluates them in different
49  * ways and asserts that they produce equivalent results.
50  */
51 @Test
52 public abstract class OpTestCase extends LoggingTestCase {
53 
54     private final Map<StreamShape, Set<? extends BaseStreamTestScenario>> testScenarios;
55 
OpTestCase()56     protected OpTestCase() {
57         testScenarios = new EnumMap<>(StreamShape.class);
58         testScenarios.put(StreamShape.REFERENCE, Collections.unmodifiableSet(EnumSet.allOf(StreamTestScenario.class)));
59         testScenarios.put(StreamShape.INT_VALUE, Collections.unmodifiableSet(EnumSet.allOf(IntStreamTestScenario.class)));
60         testScenarios.put(StreamShape.LONG_VALUE, Collections.unmodifiableSet(EnumSet.allOf(LongStreamTestScenario.class)));
61         testScenarios.put(StreamShape.DOUBLE_VALUE, Collections.unmodifiableSet(EnumSet.allOf(DoubleStreamTestScenario.class)));
62     }
63 
64     @SuppressWarnings("rawtypes")
getStreamFlags(BaseStream s)65     public static int getStreamFlags(BaseStream s) {
66         return ((AbstractPipeline) s).getStreamFlags();
67     }
68 
69     /**
70      * An asserter for results produced when exercising of stream or terminal
71      * tests.
72      *
73      * @param <R> the type of result to assert on
74      */
75     public interface ResultAsserter<R> {
76         /**
77          * Assert a result produced when exercising of stream or terminal
78          * test.
79          *
80          * @param actual the actual result
81          * @param expected the expected result
82          * @param isOrdered true if the pipeline is ordered
83          * @param isParallel true if the pipeline is parallel
84          */
assertResult(R actual, R expected, boolean isOrdered, boolean isParallel)85         void assertResult(R actual, R expected, boolean isOrdered, boolean isParallel);
86     }
87 
88     // Exercise stream operations
89 
90     public interface BaseStreamTestScenario {
getShape()91         StreamShape getShape();
92 
isParallel()93         boolean isParallel();
94 
isOrdered()95         boolean isOrdered();
96 
97         <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m)98         void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m);
99     }
100 
101     protected <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
exerciseOps(TestData<T, S_IN> data, Function<S_IN, S_OUT> m)102     Collection<U> exerciseOps(TestData<T, S_IN> data, Function<S_IN, S_OUT> m) {
103         return withData(data).stream(m).exercise();
104     }
105 
106     // Run multiple versions of exercise(), returning the result of the first, and asserting that others return the same result
107     // If the first version is s -> s.foo(), can be used with s -> s.mapToInt(i -> i).foo().mapToObj(i -> i) to test all shape variants
108     @SafeVarargs
109     protected final<T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
exerciseOpsMulti(TestData<T, S_IN> data, Function<S_IN, S_OUT>... ms)110     Collection<U> exerciseOpsMulti(TestData<T, S_IN> data,
111                                    Function<S_IN, S_OUT>... ms) {
112         Collection<U> result = null;
113         for (Function<S_IN, S_OUT> m : ms) {
114             if (result == null)
115                 result = withData(data).stream(m).exercise();
116             else {
117                 Collection<U> r2 = withData(data).stream(m).exercise();
118                 assertEquals(result, r2);
119             }
120         }
121         return result;
122     }
123 
124     // Run multiple versions of exercise() for an Integer stream, returning the result of the first, and asserting that others return the same result
125     // Automates the conversion between Stream<Integer> and {Int,Long,Double}Stream and back, so client sites look like you are passing the same
126     // lambda four times, but in fact they are four different lambdas since they are transforming four different kinds of streams
127     protected final
exerciseOpsInt(TestData.OfRef<Integer> data, Function<Stream<Integer>, Stream<Integer>> mRef, Function<IntStream, IntStream> mInt, Function<LongStream, LongStream> mLong, Function<DoubleStream, DoubleStream> mDouble)128     Collection<Integer> exerciseOpsInt(TestData.OfRef<Integer> data,
129                                        Function<Stream<Integer>, Stream<Integer>> mRef,
130                                        Function<IntStream, IntStream> mInt,
131                                        Function<LongStream, LongStream> mLong,
132                                        Function<DoubleStream, DoubleStream> mDouble) {
133         @SuppressWarnings({ "rawtypes", "unchecked" })
134         Function<Stream<Integer>, Stream<Integer>>[] ms = new Function[4];
135         ms[0] = mRef;
136         ms[1] = s -> mInt.apply(s.mapToInt(e -> e)).mapToObj(e -> e);
137         ms[2] = s -> mLong.apply(s.mapToLong(e -> e)).mapToObj(e -> (int) e);
138         ms[3] = s -> mDouble.apply(s.mapToDouble(e -> e)).mapToObj(e -> (int) e);
139         return exerciseOpsMulti(data, ms);
140     }
141 
142     // Run multiple versions of exercise() with multiple terminal operations for all kinds of stream, , and asserting against the expected result
143     // If the first version is s -> s.foo(), can be used with s -> s.mapToInt(i -> i).foo().mapToObj(i -> i) to test all shape variants
144     protected final<T, U, R, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
exerciseTerminalOpsMulti(TestData<T, S_IN> data, R expected, Map<String, Function<S_IN, S_OUT>> streams, Map<String, Function<S_OUT, R>> terminals)145     void exerciseTerminalOpsMulti(TestData<T, S_IN> data,
146                                   R expected,
147                                   Map<String, Function<S_IN, S_OUT>> streams,
148                                   Map<String, Function<S_OUT, R>> terminals) {
149         for (Map.Entry<String, Function<S_IN, S_OUT>> se : streams.entrySet()) {
150             setContext("Intermediate stream", se.getKey());
151             for (Map.Entry<String, Function<S_OUT, R>> te : terminals.entrySet()) {
152                 setContext("Terminal stream", te.getKey());
153                 withData(data)
154                         .terminal(se.getValue(), te.getValue())
155                         .expectedResult(expected)
156                         .exercise();
157 
158             }
159         }
160     }
161 
162     // Run multiple versions of exercise() with multiple terminal operation for all kinds of stream, and asserting against the expected result
163     // Automates the conversion between Stream<Integer> and {Int,Long,Double}Stream and back, so client sites look like you are passing the same
164     // lambda four times, but in fact they are four different lambdas since they are transforming four different kinds of streams
165     protected final
exerciseTerminalOpsInt(TestData<Integer, Stream<Integer>> data, Collection<Integer> expected, String desc, Function<Stream<Integer>, Stream<Integer>> mRef, Function<IntStream, IntStream> mInt, Function<LongStream, LongStream> mLong, Function<DoubleStream, DoubleStream> mDouble, Map<String, Function<Stream<Integer>, Collection<Integer>>> terminals)166     void exerciseTerminalOpsInt(TestData<Integer, Stream<Integer>> data,
167                                 Collection<Integer> expected,
168                                 String desc,
169                                 Function<Stream<Integer>, Stream<Integer>> mRef,
170                                 Function<IntStream, IntStream> mInt,
171                                 Function<LongStream, LongStream> mLong,
172                                 Function<DoubleStream, DoubleStream> mDouble,
173                                 Map<String, Function<Stream<Integer>, Collection<Integer>>> terminals) {
174 
175         Map<String, Function<Stream<Integer>, Stream<Integer>>> m = new HashMap<>();
176         m.put("Ref " + desc, mRef);
177         m.put("Int " + desc, s -> mInt.apply(s.mapToInt(e -> e)).mapToObj(e -> e));
178         m.put("Long " + desc, s -> mLong.apply(s.mapToLong(e -> e)).mapToObj(e -> (int) e));
179         m.put("Double " + desc, s -> mDouble.apply(s.mapToDouble(e -> e)).mapToObj(e -> (int) e));
180 
181         exerciseTerminalOpsMulti(data, expected, m, terminals);
182     }
183 
184 
185     protected <T, U, S_OUT extends BaseStream<U, S_OUT>>
exerciseOps(Collection<T> data, Function<Stream<T>, S_OUT> m)186     Collection<U> exerciseOps(Collection<T> data, Function<Stream<T>, S_OUT> m) {
187         TestData.OfRef<T> data1 = TestData.Factory.ofCollection("Collection of type " + data.getClass().getName(), data);
188         return withData(data1).stream(m).exercise();
189     }
190 
191     protected <T, U, S_OUT extends BaseStream<U, S_OUT>, I extends Iterable<U>>
exerciseOps(Collection<T> data, Function<Stream<T>, S_OUT> m, I expected)192     Collection<U> exerciseOps(Collection<T> data, Function<Stream<T>, S_OUT> m, I expected) {
193         TestData.OfRef<T> data1 = TestData.Factory.ofCollection("Collection of type " + data.getClass().getName(), data);
194         return withData(data1).stream(m).expectedResult(expected).exercise();
195     }
196 
197     @SuppressWarnings("unchecked")
198     protected <U, S_OUT extends BaseStream<U, S_OUT>>
exerciseOps(int[] data, Function<IntStream, S_OUT> m)199     Collection<U> exerciseOps(int[] data, Function<IntStream, S_OUT> m) {
200         return withData(TestData.Factory.ofArray("int array", data)).stream(m).exercise();
201     }
202 
exerciseOps(int[] data, Function<IntStream, IntStream> m, int[] expected)203     protected Collection<Integer> exerciseOps(int[] data, Function<IntStream, IntStream> m, int[] expected) {
204         TestData.OfInt data1 = TestData.Factory.ofArray("int array", data);
205         return withData(data1).stream(m).expectedResult(expected).exercise();
206     }
207 
withData(TestData<T, S_IN> data)208     protected <T, S_IN extends BaseStream<T, S_IN>> DataStreamBuilder<T, S_IN> withData(TestData<T, S_IN> data) {
209         Objects.requireNonNull(data);
210         return new DataStreamBuilder<>(data);
211     }
212 
213     @SuppressWarnings({"rawtypes", "unchecked"})
214     public class DataStreamBuilder<T, S_IN extends BaseStream<T, S_IN>> {
215         final TestData<T, S_IN> data;
216 
DataStreamBuilder(TestData<T, S_IN> data)217         private DataStreamBuilder(TestData<T, S_IN> data) {
218             this.data = Objects.requireNonNull(data);
219         }
220 
221         public <U, S_OUT extends BaseStream<U, S_OUT>>
ops(IntermediateTestOp... ops)222         ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> ops(IntermediateTestOp... ops) {
223             return new ExerciseDataStreamBuilder<>(data, (S_IN s) -> (S_OUT) chain(s, ops));
224         }
225 
226         public <U, S_OUT extends BaseStream<U, S_OUT>> ExerciseDataStreamBuilder<T, U, S_IN, S_OUT>
stream(Function<S_IN, S_OUT> m)227         stream(Function<S_IN, S_OUT> m) {
228             return new ExerciseDataStreamBuilder<>(data, m);
229         }
230 
231         public <U, S_OUT extends BaseStream<U, S_OUT>> ExerciseDataStreamBuilder<T, U, S_IN, S_OUT>
stream(Function<S_IN, S_OUT> m, IntermediateTestOp<U, U> additionalOp)232         stream(Function<S_IN, S_OUT> m, IntermediateTestOp<U, U> additionalOp) {
233             return new ExerciseDataStreamBuilder<>(data, s -> (S_OUT) chain(m.apply(s), additionalOp));
234         }
235 
236         public <R> ExerciseDataTerminalBuilder<T, T, R, S_IN, S_IN>
terminal(Function<S_IN, R> terminalF)237         terminal(Function<S_IN, R> terminalF) {
238             return new ExerciseDataTerminalBuilder<>(data, s -> s, terminalF);
239         }
240 
241         public <U, R, S_OUT extends BaseStream<U, S_OUT>> ExerciseDataTerminalBuilder<T, U, R, S_IN, S_OUT>
terminal(Function<S_IN, S_OUT> streamF, Function<S_OUT, R> terminalF)242         terminal(Function<S_IN, S_OUT> streamF, Function<S_OUT, R> terminalF) {
243             return new ExerciseDataTerminalBuilder<>(data, streamF, terminalF);
244         }
245     }
246 
247     @SuppressWarnings({"rawtypes", "unchecked"})
248     public class ExerciseDataStreamBuilder<T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> {
249         final TestData<T, S_IN> data;
250         final Function<S_IN, S_OUT> m;
251         final StreamShape shape;
252 
253         Set<BaseStreamTestScenario> testSet = new HashSet<>();
254 
255         Collection<U> refResult;
256 
257         Consumer<TestData<T, S_IN>> before = LambdaTestHelpers.bEmpty;
258 
259         Consumer<TestData<T, S_IN>> after = LambdaTestHelpers.bEmpty;
260 
261         ResultAsserter<Iterable<U>> resultAsserter = (act, exp, ord, par) -> {
262             if (par & !ord) {
263                 LambdaTestHelpers.assertContentsUnordered(act, exp);
264             }
265             else {
266                 LambdaTestHelpers.assertContentsEqual(act, exp);
267             }
268         };
269 
ExerciseDataStreamBuilder(TestData<T, S_IN> data, Function<S_IN, S_OUT> m)270         private ExerciseDataStreamBuilder(TestData<T, S_IN> data, Function<S_IN, S_OUT> m) {
271             this.data = data;
272 
273             this.m = Objects.requireNonNull(m);
274 
275             this.shape = ((AbstractPipeline<?, U, ?>) m.apply(data.stream())).getOutputShape();
276 
277             // Have to initiate from the output shape of the last stream
278             // This means the stream mapper is required first rather than last
279             testSet.addAll(testScenarios.get(shape));
280         }
281 
282         //
283 
expectedResult(I expectedResult)284         public <I extends Iterable<U>> ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> expectedResult(I expectedResult) {
285             List<U> l = new ArrayList<>();
286             expectedResult.forEach(l::add);
287             refResult = l;
288             return this;
289         }
290 
expectedResult(int[] expectedResult)291         public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> expectedResult(int[] expectedResult) {
292             List l = new ArrayList();
293             for (int anExpectedResult : expectedResult) {
294                 l.add(anExpectedResult);
295             }
296             refResult = l;
297             return this;
298         }
299 
expectedResult(long[] expectedResult)300         public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> expectedResult(long[] expectedResult) {
301             List l = new ArrayList();
302             for (long anExpectedResult : expectedResult) {
303                 l.add(anExpectedResult);
304             }
305             refResult = l;
306             return this;
307         }
308 
expectedResult(double[] expectedResult)309         public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> expectedResult(double[] expectedResult) {
310             List l = new ArrayList();
311             for (double anExpectedResult : expectedResult) {
312                 l.add(anExpectedResult);
313             }
314             refResult = l;
315             return this;
316         }
317 
before(Consumer<TestData<T, S_IN>> before)318         public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> before(Consumer<TestData<T, S_IN>> before) {
319             this.before = Objects.requireNonNull(before);
320             return this;
321         }
322 
after(Consumer<TestData<T, S_IN>> after)323         public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> after(Consumer<TestData<T, S_IN>> after) {
324             this.after = Objects.requireNonNull(after);
325             return this;
326         }
327 
without(BaseStreamTestScenario... tests)328         public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> without(BaseStreamTestScenario... tests) {
329             return without(Arrays.asList(tests));
330         }
331 
without(Collection<? extends BaseStreamTestScenario> tests)332         public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> without(Collection<? extends BaseStreamTestScenario> tests) {
333             for (BaseStreamTestScenario ts : tests) {
334                 if (ts.getShape() == shape) {
335                     testSet.remove(ts);
336                 }
337             }
338 
339             if (testSet.isEmpty()) {
340                 throw new IllegalStateException("Test scenario set is empty");
341             }
342 
343             return this;
344         }
345 
with(BaseStreamTestScenario... tests)346         public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> with(BaseStreamTestScenario... tests) {
347             return with(Arrays.asList(tests));
348         }
349 
with(Collection<? extends BaseStreamTestScenario> tests)350         public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> with(Collection<? extends BaseStreamTestScenario> tests) {
351             testSet = new HashSet<>();
352 
353             for (BaseStreamTestScenario ts : tests) {
354                 if (ts.getShape() == shape) {
355                     testSet.add(ts);
356                 }
357             }
358 
359             if (testSet.isEmpty()) {
360                 throw new IllegalStateException("Test scenario set is empty");
361             }
362 
363             return this;
364         }
365 
resultAsserter(ResultAsserter<Iterable<U>> resultAsserter)366         public ExerciseDataStreamBuilder<T, U, S_IN, S_OUT> resultAsserter(ResultAsserter<Iterable<U>> resultAsserter) {
367             this.resultAsserter = resultAsserter;
368             return this;
369         }
370 
371         // Build method
372 
exercise()373         public Collection<U> exercise() {
374             final boolean isStreamOrdered;
375             if (refResult == null) {
376                 // Induce the reference result
377                 before.accept(data);
378                 S_OUT sOut = m.apply(data.stream());
379                 isStreamOrdered = StreamOpFlag.ORDERED.isKnown(((AbstractPipeline) sOut).getStreamFlags());
380                 Node<U> refNodeResult = ((AbstractPipeline<?, U, ?>) sOut).evaluateToArrayNode(size -> (U[]) new Object[size]);
381                 refResult = LambdaTestHelpers.toBoxedList(refNodeResult.spliterator());
382                 after.accept(data);
383             }
384             else {
385                 S_OUT sOut = m.apply(data.stream());
386                 isStreamOrdered = StreamOpFlag.ORDERED.isKnown(((AbstractPipeline) sOut).getStreamFlags());
387             }
388 
389             List<Error> errors = new ArrayList<>();
390             for (BaseStreamTestScenario test : testSet) {
391                 try {
392                     before.accept(data);
393 
394                     List<U> result = new ArrayList<>();
395                     test.run(data, LambdaTestHelpers.<U>toBoxingConsumer(result::add), m);
396 
397                     Runnable asserter = () -> resultAsserter.assertResult(result, refResult, isStreamOrdered && test.isOrdered(), test.isParallel());
398 
399                     if (refResult.size() > 1000) {
400                         LambdaTestHelpers.launderAssertion(
401                                 asserter,
402                                 () -> String.format("%n%s: [actual size=%d] != [expected size=%d]", test, result.size(), refResult.size()));
403                     }
404                     else {
405                         LambdaTestHelpers.launderAssertion(
406                                 asserter,
407                                 () -> String.format("%n%s: [actual] %s != [expected] %s", test, result, refResult));
408                     }
409 
410                     after.accept(data);
411                 } catch (Throwable t) {
412                     errors.add(new Error(String.format("%s: %s", test, t), t));
413                 }
414             }
415 
416             if (!errors.isEmpty()) {
417                 StringBuilder sb = new StringBuilder();
418                 int i = 1;
419                 for (Error t : errors) {
420                     sb.append(i++).append(": ");
421                     if (t instanceof AssertionError) {
422                         sb.append(t).append("\n");
423                     }
424                     else {
425                         StringWriter sw = new StringWriter();
426                         PrintWriter pw = new PrintWriter(sw);
427 
428                         t.getCause().printStackTrace(pw);
429                         pw.flush();
430                         sb.append(t).append("\n").append(sw);
431                     }
432                 }
433                 sb.append("--");
434 
435                 fail(String.format("%d failure(s) for test data: %s\n%s", i - 1, data.toString(), sb));
436             }
437 
438             return refResult;
439         }
440     }
441 
442     // Exercise terminal operations
443 
444     interface BaseTerminalTestScenario<U, R, S_OUT extends BaseStream<U, S_OUT>> {
requiresSingleStageSource()445         boolean requiresSingleStageSource();
446 
requiresParallelSource()447         boolean requiresParallelSource();
448 
run(Function<S_OUT, R> terminalF, S_OUT source, StreamShape shape)449         default R run(Function<S_OUT, R> terminalF, S_OUT source, StreamShape shape) {
450             return terminalF.apply(source);
451         }
452     }
453 
454     @SuppressWarnings({"rawtypes", "unchecked"})
455     enum TerminalTestScenario implements BaseTerminalTestScenario {
456         SINGLE_SEQUENTIAL(true, false),
457 
SINGLE_SEQUENTIAL_SHORT_CIRCUIT(true, false)458         SINGLE_SEQUENTIAL_SHORT_CIRCUIT(true, false) {
459             @Override
460             public Object run(Function terminalF, BaseStream source, StreamShape shape) {
461                 source = (BaseStream) chain(source, new ShortCircuitOp(shape));
462                 return terminalF.apply(source);
463             }
464         },
465 
466         SINGLE_PARALLEL(true, true),
467 
468         ALL_SEQUENTIAL(false, false),
469 
ALL_SEQUENTIAL_SHORT_CIRCUIT(false, false)470         ALL_SEQUENTIAL_SHORT_CIRCUIT(false, false) {
471             @Override
472             public Object run(Function terminalF, BaseStream source, StreamShape shape) {
473                 source = (BaseStream) chain(source, new ShortCircuitOp(shape));
474                 return terminalF.apply(source);
475             }
476         },
477 
478         ALL_PARALLEL(false, true),
479 
ALL_PARALLEL_SEQUENTIAL(false, false)480         ALL_PARALLEL_SEQUENTIAL(false, false) {
481             @Override
482             public Object run(Function terminalF, BaseStream source, StreamShape shape) {
483                 return terminalF.apply(source.sequential());
484             }
485         },
486         ;
487 
488         private final boolean requiresSingleStageSource;
489         private final boolean isParallel;
490 
TerminalTestScenario(boolean requiresSingleStageSource, boolean isParallel)491         TerminalTestScenario(boolean requiresSingleStageSource, boolean isParallel) {
492             this.requiresSingleStageSource = requiresSingleStageSource;
493             this.isParallel = isParallel;
494         }
495 
496         @Override
requiresSingleStageSource()497         public boolean requiresSingleStageSource() {
498             return requiresSingleStageSource;
499         }
500 
501         @Override
requiresParallelSource()502         public boolean requiresParallelSource() {
503             return isParallel;
504         }
505 
506     }
507 
508     @SuppressWarnings({"rawtypes", "unchecked"})
509     public class ExerciseDataTerminalBuilder<T, U, R, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> {
510         final TestData<T, S_IN> data;
511         final Function<S_IN, S_OUT> streamF;
512         final Function<S_OUT, R> terminalF;
513 
514         R refResult;
515 
516         ResultAsserter<R> resultAsserter = (act, exp, ord, par) -> LambdaTestHelpers.assertContentsEqual(act, exp);
517 
ExerciseDataTerminalBuilder(TestData<T, S_IN> data, Function<S_IN, S_OUT> streamF, Function<S_OUT, R> terminalF)518         private ExerciseDataTerminalBuilder(TestData<T, S_IN> data, Function<S_IN, S_OUT> streamF, Function<S_OUT, R> terminalF) {
519             this.data = data;
520             this.streamF = Objects.requireNonNull(streamF);
521             this.terminalF = Objects.requireNonNull(terminalF);
522         }
523 
524         //
525 
expectedResult(R expectedResult)526         public ExerciseDataTerminalBuilder<T, U, R, S_IN, S_OUT> expectedResult(R expectedResult) {
527             this.refResult = expectedResult;
528             return this;
529         }
530 
equalator(BiConsumer<R, R> equalityAsserter)531         public ExerciseDataTerminalBuilder<T, U, R, S_IN, S_OUT> equalator(BiConsumer<R, R> equalityAsserter) {
532             resultAsserter = (act, exp, ord, par) -> equalityAsserter.accept(act, exp);
533             return this;
534         }
535 
resultAsserter(ResultAsserter<R> resultAsserter)536         public ExerciseDataTerminalBuilder<T, U, R, S_IN, S_OUT> resultAsserter(ResultAsserter<R> resultAsserter) {
537             this.resultAsserter = resultAsserter;
538             return this;
539         }
540 
541         // Build method
542 
exercise()543         public R exercise() {
544             S_OUT out = streamF.apply(data.stream()).sequential();
545             AbstractPipeline ap = (AbstractPipeline) out;
546             boolean isOrdered = StreamOpFlag.ORDERED.isKnown(ap.getStreamFlags());
547             StreamShape shape = ap.getOutputShape();
548 
549             EnumSet<TerminalTestScenario> tests = EnumSet.allOf(TerminalTestScenario.class);
550             // Sequentially collect the output that will be input to the terminal op
551             Node<U> node = ap.evaluateToArrayNode(size -> (U[]) new Object[size]);
552             if (refResult == null) {
553                 // Induce the reference result
554                 S_OUT source = (S_OUT) createPipeline(shape, node.spliterator(),
555                                                       StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SIZED,
556                                                       false);
557 
558                 refResult = (R) TerminalTestScenario.SINGLE_SEQUENTIAL.run(terminalF, source, shape);
559                 tests.remove(TerminalTestScenario.SINGLE_SEQUENTIAL);
560             }
561 
562             for (BaseTerminalTestScenario test : tests) {
563                 S_OUT source;
564                 if (test.requiresSingleStageSource()) {
565                     source = (S_OUT) createPipeline(shape, node.spliterator(),
566                                                     StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SIZED,
567                                                     test.requiresParallelSource());
568                 }
569                 else {
570                     source = streamF.apply(test.requiresParallelSource()
571                                            ? data.parallelStream() : data.stream());
572                 }
573 
574                 R result = (R) test.run(terminalF, source, shape);
575 
576                 LambdaTestHelpers.launderAssertion(
577                         () -> resultAsserter.assertResult(result, refResult, isOrdered, test.requiresParallelSource()),
578                         () -> String.format("%s: %s != %s", test, refResult, result));
579             }
580 
581             return refResult;
582         }
583 
createPipeline(StreamShape shape, Spliterator s, int flags, boolean parallel)584         AbstractPipeline createPipeline(StreamShape shape, Spliterator s, int flags, boolean parallel) {
585             switch (shape) {
586                 case REFERENCE:    return new ReferencePipeline.Head<>(s, flags, parallel);
587                 case INT_VALUE:    return new IntPipeline.Head(s, flags, parallel);
588                 case LONG_VALUE:   return new LongPipeline.Head(s, flags, parallel);
589                 case DOUBLE_VALUE: return new DoublePipeline.Head(s, flags, parallel);
590                 default: throw new IllegalStateException("Unknown shape: " + shape);
591             }
592         }
593     }
594 
exerciseTerminalOps(Collection<T> data, Function<Stream<T>, R> m, R expected)595     protected <T, R> R exerciseTerminalOps(Collection<T> data, Function<Stream<T>, R> m, R expected) {
596         TestData.OfRef<T> data1
597                 = TestData.Factory.ofCollection("Collection of type " + data.getClass().getName(), data);
598         return withData(data1).terminal(m).expectedResult(expected).exercise();
599     }
600 
601     protected <T, R, S_IN extends BaseStream<T, S_IN>> R
exerciseTerminalOps(TestData<T, S_IN> data, Function<S_IN, R> terminalF)602     exerciseTerminalOps(TestData<T, S_IN> data,
603                         Function<S_IN, R> terminalF) {
604         return withData(data).terminal(terminalF).exercise();
605     }
606 
607     protected <T, U, R, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>> R
exerciseTerminalOps(TestData<T, S_IN> data, Function<S_IN, S_OUT> streamF, Function<S_OUT, R> terminalF)608     exerciseTerminalOps(TestData<T, S_IN> data,
609                         Function<S_IN, S_OUT> streamF,
610                         Function<S_OUT, R> terminalF) {
611         return withData(data).terminal(streamF, terminalF).exercise();
612     }
613 
614     //
615 
616     @SuppressWarnings({"rawtypes", "unchecked"})
chain(AbstractPipeline upstream, IntermediateTestOp<?, T> op)617     private static <T> AbstractPipeline<?, T, ?> chain(AbstractPipeline upstream, IntermediateTestOp<?, T> op) {
618         return (AbstractPipeline<?, T, ?>) IntermediateTestOp.chain(upstream, op);
619     }
620 
621     @SuppressWarnings({"rawtypes", "unchecked"})
chain(AbstractPipeline pipe, IntermediateTestOp... ops)622     private static AbstractPipeline<?, ?, ?> chain(AbstractPipeline pipe, IntermediateTestOp... ops) {
623         for (IntermediateTestOp op : ops)
624             pipe = chain(pipe, op);
625         return pipe;
626     }
627 
628     @SuppressWarnings("rawtypes")
chain(BaseStream pipe, IntermediateTestOp<?, T> op)629     private static <T> AbstractPipeline<?, T, ?> chain(BaseStream pipe, IntermediateTestOp<?, T> op) {
630         return chain((AbstractPipeline) pipe, op);
631     }
632 
633     @SuppressWarnings("rawtypes")
chain(BaseStream pipe, IntermediateTestOp... ops)634     public static AbstractPipeline<?, ?, ?> chain(BaseStream pipe, IntermediateTestOp... ops) {
635         return chain((AbstractPipeline) pipe, ops);
636     }
637 
638     // Test data
639 
640     static class ShortCircuitOp<T> implements StatelessTestOp<T,T> {
641         private final StreamShape shape;
642 
ShortCircuitOp(StreamShape shape)643         ShortCircuitOp(StreamShape shape) {
644             this.shape = shape;
645         }
646 
647         @Override
opWrapSink(int flags, boolean parallel, Sink<T> sink)648         public Sink<T> opWrapSink(int flags, boolean parallel, Sink<T> sink) {
649             return sink;
650         }
651 
652         @Override
opGetFlags()653         public int opGetFlags() {
654             return StreamOpFlag.IS_SHORT_CIRCUIT;
655         }
656 
657         @Override
outputShape()658         public StreamShape outputShape() {
659             return shape;
660         }
661 
662         @Override
inputShape()663         public StreamShape inputShape() {
664             return shape;
665         }
666     }
667 }
668