1 /*
2  * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.
8  *
9  * This code is distributed in the hope that it will be useful, but WITHOUT
10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  * version 2 for more details (a copy is included in the LICENSE file that
13  * accompanied this code).
14  *
15  * You should have received a copy of the GNU General Public License version
16  * 2 along with this work; if not, write to the Free Software Foundation,
17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18  *
19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20  * or visit www.oracle.com if you need additional information or have any
21  * questions.
22  */
23 package java.util.stream;
24 
25 import org.testng.Assert;
26 import org.testng.annotations.Test;
27 
28 import java.util.ArrayList;
29 import java.util.EnumSet;
30 import java.util.List;
31 import java.util.function.Supplier;
32 
33 import static java.util.stream.LambdaTestHelpers.countTo;
34 
35 @Test
36 public class FlagOpTest extends OpTestCase {
37 
38     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
testFlagsPassThrough(String name, TestData<Integer, Stream<Integer>> data)39     public void testFlagsPassThrough(String name, TestData<Integer, Stream<Integer>> data) {
40 
41         @SuppressWarnings({"unchecked", "rawtypes"})
42         TestFlagPassThroughOp<Integer>[] ops = new TestFlagPassThroughOp[3];
43         ops[0] = new TestFlagPassThroughOp<>();
44         ops[1] = new TestFlagPassThroughOp<>();
45         ops[2] = new TestFlagPassThroughOp<>();
46 
47         ops[0].set(null, ops[1]);
48         ops[1].set(ops[0], ops[2]);
49         ops[2].set(ops[1], null);
50 
51         withData(data).ops(ops).exercise();
52     }
53 
54     static class TestFlagPassThroughOp<T> extends FlagDeclaringOp<T> {
55         TestFlagPassThroughOp<T> upstream;
56         TestFlagPassThroughOp<T> downstream;
57 
TestFlagPassThroughOp()58         TestFlagPassThroughOp() {
59             super(0);
60         }
61 
set(TestFlagPassThroughOp<T> upstream, TestFlagPassThroughOp<T> downstream)62         void set(TestFlagPassThroughOp<T> upstream, TestFlagPassThroughOp<T> downstream)  {
63             this.upstream = upstream;
64             this.downstream = downstream;
65         }
66 
67         int wrapFlags;
68 
69         @Override
70         @SuppressWarnings({"unchecked", "rawtypes"})
opWrapSink(int flags, boolean parallel, Sink sink)71         public Sink<T> opWrapSink(int flags, boolean parallel, Sink sink) {
72             this.wrapFlags = flags;
73 
74             if (downstream != null) {
75                 assertTrue(flags == downstream.wrapFlags);
76             }
77 
78             return sink;
79         }
80     }
81 
testFlagsClearAllSet()82     public void testFlagsClearAllSet() {
83         int clearAllFlags = 0;
84         for (StreamOpFlag f : EnumSet.allOf(StreamOpFlag.class)) {
85             if (f.isStreamFlag()) {
86                 clearAllFlags |= f.clear();
87             }
88         }
89 
90         EnumSet<StreamOpFlag> known = EnumSet.noneOf(StreamOpFlag.class);
91         EnumSet<StreamOpFlag> notKnown = StreamOpFlagTestHelper.allStreamFlags();
92 
93         List<FlagDeclaringOp<Integer>> ops = new ArrayList<>();
94         ops.add(new FlagDeclaringOp<>(clearAllFlags));
95         for (StreamOpFlag f : StreamOpFlagTestHelper.allStreamFlags()) {
96             if (f.canSet(StreamOpFlag.Type.OP)) {
97                 ops.add(new TestFlagExpectedOp<>(f.set(),
98                                              known.clone(),
99                                              EnumSet.noneOf(StreamOpFlag.class),
100                                              notKnown.clone()));
101                 known.add(f);
102                 notKnown.remove(f);
103             }
104         }
105         ops.add(new TestFlagExpectedOp<>(0,
106                                          known.clone(),
107                                          EnumSet.noneOf(StreamOpFlag.class),
108                                          notKnown.clone()));
109 
110         TestData<Integer, Stream<Integer>> data = TestData.Factory.ofArray("Array", countTo(10).toArray(new Integer[0]));
111         @SuppressWarnings("rawtypes")
112         FlagDeclaringOp[] opsArray = ops.toArray(new FlagDeclaringOp[ops.size()]);
113 
114         withData(data).ops(opsArray).
115                 without(StreamTestScenario.CLEAR_SIZED_SCENARIOS).
116                 exercise();
117     }
118 
testFlagsSetAllClear()119     public void testFlagsSetAllClear() {
120         EnumSet<StreamOpFlag> known = StreamOpFlagTestHelper.allStreamFlags();
121         int setAllFlags = 0;
122         for (StreamOpFlag f : EnumSet.allOf(StreamOpFlag.class)) {
123             if (f.isStreamFlag()) {
124                 if (f.canSet(StreamOpFlag.Type.OP)) {
125                     setAllFlags |= f.set();
126                 } else {
127                     known.remove(f);
128                 }
129             }
130         }
131 
132         EnumSet<StreamOpFlag> notKnown = EnumSet.noneOf(StreamOpFlag.class);
133 
134         List<FlagDeclaringOp<Integer>> ops = new ArrayList<>();
135         ops.add(new FlagDeclaringOp<>(setAllFlags));
136         for (StreamOpFlag f : StreamOpFlagTestHelper.allStreamFlags()) {
137             ops.add(new TestFlagExpectedOp<>(f.clear(),
138                                              known.clone(),
139                                              EnumSet.noneOf(StreamOpFlag.class),
140                                              notKnown.clone()));
141             known.remove(f);
142             notKnown.add(f);
143         }
144         ops.add(new TestFlagExpectedOp<>(0,
145                                          known.clone(),
146                                          EnumSet.noneOf(StreamOpFlag.class),
147                                          notKnown.clone()));
148 
149         TestData<Integer, Stream<Integer>> data = TestData.Factory.ofArray("Array", countTo(10).toArray(new Integer[0]));
150         @SuppressWarnings("rawtypes")
151         FlagDeclaringOp[] opsArray = ops.toArray(new FlagDeclaringOp[ops.size()]);
152 
153 
154         withData(data).ops(opsArray).
155                 without(StreamTestScenario.CLEAR_SIZED_SCENARIOS).
156                 exercise();
157     }
158 
testFlagsParallelCollect()159     public void testFlagsParallelCollect() {
160         testFlagsSetSequence(CollectorOps::collector);
161     }
162 
testFlagsSetSequence(Supplier<StatefulTestOp<Integer>> cf)163     private void testFlagsSetSequence(Supplier<StatefulTestOp<Integer>> cf) {
164         EnumSet<StreamOpFlag> known = EnumSet.of(StreamOpFlag.ORDERED, StreamOpFlag.SIZED);
165         EnumSet<StreamOpFlag> preserve = EnumSet.of(StreamOpFlag.DISTINCT, StreamOpFlag.SORTED);
166 
167         List<IntermediateTestOp<Integer, Integer>> ops = new ArrayList<>();
168         for (StreamOpFlag f : EnumSet.of(StreamOpFlag.DISTINCT, StreamOpFlag.SORTED)) {
169             ops.add(cf.get());
170             ops.add(new TestFlagExpectedOp<>(f.set(),
171                                              known.clone(),
172                                              preserve.clone(),
173                                              EnumSet.noneOf(StreamOpFlag.class)));
174             known.add(f);
175             preserve.remove(f);
176         }
177         ops.add(cf.get());
178         ops.add(new TestFlagExpectedOp<>(0,
179                                          known.clone(),
180                                          preserve.clone(),
181                                          EnumSet.noneOf(StreamOpFlag.class)));
182 
183         TestData<Integer, Stream<Integer>> data = TestData.Factory.ofArray("Array", countTo(10).toArray(new Integer[0]));
184         @SuppressWarnings("rawtypes")
185         IntermediateTestOp[] opsArray = ops.toArray(new IntermediateTestOp[ops.size()]);
186 
187         withData(data).ops(opsArray).
188                 without(StreamTestScenario.CLEAR_SIZED_SCENARIOS).
189                 exercise();
190     }
191 
192 
testFlagsClearParallelCollect()193     public void testFlagsClearParallelCollect() {
194         testFlagsClearSequence(CollectorOps::collector);
195     }
196 
testFlagsClearSequence(Supplier<StatefulTestOp<Integer>> cf)197     protected void testFlagsClearSequence(Supplier<StatefulTestOp<Integer>> cf) {
198         EnumSet<StreamOpFlag> known = EnumSet.of(StreamOpFlag.ORDERED, StreamOpFlag.SIZED);
199         EnumSet<StreamOpFlag> preserve = EnumSet.of(StreamOpFlag.DISTINCT, StreamOpFlag.SORTED);
200         EnumSet<StreamOpFlag> notKnown = EnumSet.noneOf(StreamOpFlag.class);
201 
202         List<IntermediateTestOp<Integer, Integer>> ops = new ArrayList<>();
203         for (StreamOpFlag f : EnumSet.of(StreamOpFlag.ORDERED, StreamOpFlag.DISTINCT, StreamOpFlag.SORTED)) {
204             ops.add(cf.get());
205             ops.add(new TestFlagExpectedOp<>(f.clear(),
206                                              known.clone(),
207                                              preserve.clone(),
208                                              notKnown.clone()));
209             known.remove(f);
210             preserve.remove(f);
211             notKnown.add(f);
212         }
213         ops.add(cf.get());
214         ops.add(new TestFlagExpectedOp<>(0,
215                                          known.clone(),
216                                          preserve.clone(),
217                                          notKnown.clone()));
218 
219         TestData<Integer, Stream<Integer>> data = TestData.Factory.ofArray("Array", countTo(10).toArray(new Integer[0]));
220         @SuppressWarnings("rawtypes")
221         IntermediateTestOp[] opsArray = ops.toArray(new IntermediateTestOp[ops.size()]);
222 
223         withData(data).ops(opsArray).
224                 without(StreamTestScenario.CLEAR_SIZED_SCENARIOS).
225                 exercise();
226     }
227 
testFlagsSizedOrderedParallelCollect()228     public void testFlagsSizedOrderedParallelCollect() {
229         EnumSet<StreamOpFlag> parKnown = EnumSet.of(StreamOpFlag.SIZED);
230         EnumSet<StreamOpFlag> serKnown = parKnown.clone();
231 
232         List<IntermediateTestOp<Integer, Integer>> ops = new ArrayList<>();
233         for (StreamOpFlag f : parKnown) {
234             ops.add(CollectorOps.collector());
235             ops.add(new ParSerTestFlagExpectedOp<>(f.clear(),
236                                              parKnown,
237                                              serKnown));
238             serKnown.remove(f);
239         }
240         ops.add(CollectorOps.collector());
241         ops.add(new ParSerTestFlagExpectedOp<>(0,
242                                          parKnown,
243                                          EnumSet.noneOf(StreamOpFlag.class)));
244 
245         TestData<Integer, Stream<Integer>> data = TestData.Factory.ofArray("Array", countTo(10).toArray(new Integer[0]));
246         @SuppressWarnings("rawtypes")
247         IntermediateTestOp[] opsArray = ops.toArray(new IntermediateTestOp[ops.size()]);
248 
249         withData(data).ops(opsArray).exercise();
250     }
251 
252     static class ParSerTestFlagExpectedOp<T> extends FlagDeclaringOp<T> {
253         final EnumSet<StreamOpFlag> parKnown;
254         final EnumSet<StreamOpFlag> serKnown;
255 
ParSerTestFlagExpectedOp(int flags, EnumSet<StreamOpFlag> known, EnumSet<StreamOpFlag> serKnown)256         ParSerTestFlagExpectedOp(int flags, EnumSet<StreamOpFlag> known, EnumSet<StreamOpFlag> serKnown) {
257             super(flags);
258             this.parKnown = known;
259             this.serKnown = serKnown;
260         }
261 
262         @Override
263         @SuppressWarnings({"unchecked", "rawtypes"})
opWrapSink(int flags, boolean parallel, Sink upstream)264         public Sink<T> opWrapSink(int flags, boolean parallel, Sink upstream) {
265             assertFlags(flags, parallel);
266             return upstream;
267         }
268 
assertFlags(int flags, boolean parallel)269         protected void assertFlags(int flags, boolean parallel) {
270             if (parallel) {
271                 for (StreamOpFlag f : parKnown) {
272                     Assert.assertTrue(f.isKnown(flags), String.format("Flag %s is not known, but should be known.", f.toString()));
273                 }
274 
275             } else {
276                 for (StreamOpFlag f : serKnown) {
277                     Assert.assertTrue(f.isKnown(flags), String.format("Flag %s is not known, but should be known.", f.toString()));
278                 }
279 
280             }
281         }
282     }
283 }
284