1 /*
2  * Copyright (c) 2015, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.
8  *
9  * This code is distributed in the hope that it will be useful, but WITHOUT
10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  * version 2 for more details (a copy is included in the LICENSE file that
13  * accompanied this code).
14  *
15  * You should have received a copy of the GNU General Public License version
16  * 2 along with this work; if not, write to the Free Software Foundation,
17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18  *
19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20  * or visit www.oracle.com if you need additional information or have any
21  * questions.
22  */
23 package org.openjdk.tests.java.util.stream;
24 
25 import org.testng.annotations.Test;
26 
27 import java.util.HashMap;
28 import java.util.LinkedList;
29 import java.util.Map;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicLong;
32 import java.util.function.BooleanSupplier;
33 import java.util.function.Consumer;
34 import java.util.function.Function;
35 import java.util.function.Supplier;
36 import java.util.stream.DefaultMethodStreams;
37 import java.util.stream.DoubleStream;
38 import java.util.stream.IntStream;
39 import java.util.stream.LongStream;
40 import java.util.stream.OpTestCase;
41 import java.util.stream.Stream;
42 
43 import static java.util.stream.Collectors.toCollection;
44 
45 /*
46  * @test
47  * @bug 8071597
48  */
49 @Test
50 public class WhileOpStatefulTest extends OpTestCase {
51     static final long COUNT_PERIOD = 100;
52 
53     static final long EXECUTION_TIME_LIMIT = TimeUnit.SECONDS.toMillis(10);
54 
55     static final long TAKE_WHILE_COUNT_LIMIT = 100_000;
56 
57     static final int DROP_SOURCE_SIZE = 10_000;
58 
59     static final long DROP_WHILE_COUNT_LIMIT = 5000;
60 
61     @Test
testTimedTakeWithCount()62     public void testTimedTakeWithCount() {
63         testTakeWhileMulti(
64                 s -> {
65                     BooleanSupplier isWithinTakePeriod =
66                             within(System.currentTimeMillis(), COUNT_PERIOD);
67                     s.takeWhile(e -> isWithinTakePeriod.getAsBoolean())
68                             .mapToLong(e -> 1).reduce(0, Long::sum);
69                 },
70                 s -> {
71                     BooleanSupplier isWithinTakePeriod =
72                             within(System.currentTimeMillis(), COUNT_PERIOD);
73                     s.takeWhile(e -> isWithinTakePeriod.getAsBoolean())
74                             .mapToLong(e -> 1).reduce(0, Long::sum);
75                 },
76                 s -> {
77                     BooleanSupplier isWithinTakePeriod =
78                             within(System.currentTimeMillis(), COUNT_PERIOD);
79                     s.takeWhile(e -> isWithinTakePeriod.getAsBoolean())
80                             .map(e -> 1).reduce(0, Long::sum);
81                 },
82                 s -> {
83                     BooleanSupplier isWithinTakePeriod =
84                             within(System.currentTimeMillis(), COUNT_PERIOD);
85                     s.takeWhile(e -> isWithinTakePeriod.getAsBoolean())
86                             .mapToLong(e -> 1).reduce(0, Long::sum);
87                 });
88     }
89 
90     @Test(groups = { "serialization-hostile" })
testCountTakeWithCount()91     public void testCountTakeWithCount() {
92         testTakeWhileMulti(
93                 s -> {
94                     AtomicLong c = new AtomicLong();
95                     long rc = s.takeWhile(e -> c.getAndIncrement() < TAKE_WHILE_COUNT_LIMIT)
96                             .mapToLong(e -> 1).reduce(0, Long::sum);
97                     assertTrue(rc <= c.get());
98                 },
99                 s -> {
100                     AtomicLong c = new AtomicLong();
101                     long rc = s.takeWhile(e -> c.getAndIncrement() < TAKE_WHILE_COUNT_LIMIT)
102                             .mapToLong(e -> 1).reduce(0, Long::sum);
103                     assertTrue(rc <= c.get());
104                 },
105                 s -> {
106                     AtomicLong c = new AtomicLong();
107                     long rc = s.takeWhile(e -> c.getAndIncrement() < TAKE_WHILE_COUNT_LIMIT)
108                             .map(e -> 1).reduce(0, Long::sum);
109                     assertTrue(rc <= c.get());
110                 },
111                 s -> {
112                     AtomicLong c = new AtomicLong();
113                     long rc = s.takeWhile(e -> c.getAndIncrement() < TAKE_WHILE_COUNT_LIMIT)
114                             .mapToLong(e -> 1).reduce(0, Long::sum);
115                     assertTrue(rc <= c.get());
116                 });
117     }
118 
119     @Test(groups = { "serialization-hostile" })
testCountTakeWithToArray()120     public void testCountTakeWithToArray() {
121         testTakeWhileMulti(
122                 s -> {
123                     AtomicLong c = new AtomicLong();
124                     Object[] ra = s.takeWhile(e -> c.getAndIncrement() < TAKE_WHILE_COUNT_LIMIT)
125                             .toArray();
126                     assertTrue(ra.length <= c.get());
127                 },
128                 s -> {
129                     AtomicLong c = new AtomicLong();
130                     int[] ra = s.takeWhile(e -> c.getAndIncrement() < TAKE_WHILE_COUNT_LIMIT)
131                             .toArray();
132                     assertTrue(ra.length <= c.get());
133                 },
134                 s -> {
135                     AtomicLong c = new AtomicLong();
136                     long[] ra = s.takeWhile(e -> c.getAndIncrement() < TAKE_WHILE_COUNT_LIMIT)
137                             .toArray();
138                     assertTrue(ra.length <= c.get());
139                 },
140                 s -> {
141                     AtomicLong c = new AtomicLong();
142                     double[] ra = s.takeWhile(e -> c.getAndIncrement() < TAKE_WHILE_COUNT_LIMIT)
143                             .toArray();
144                     assertTrue(ra.length <= c.get());
145                 });
146     }
147 
148 
149     @Test(groups = { "serialization-hostile" })
testCountDropWithCount()150     public void testCountDropWithCount() {
151         testDropWhileMulti(
152                 s -> {
153                     AtomicLong c = new AtomicLong();
154                     long rc = s.dropWhile(e -> c.getAndIncrement() < DROP_WHILE_COUNT_LIMIT)
155                             .mapToLong(e -> 1).reduce(0, Long::sum);
156                     assertTrue(c.get() >= DROP_WHILE_COUNT_LIMIT);
157                     assertTrue(rc <= DROP_SOURCE_SIZE);
158                 },
159                 s -> {
160                     AtomicLong c = new AtomicLong();
161                     long rc = s.dropWhile(e -> c.getAndIncrement() < DROP_WHILE_COUNT_LIMIT)
162                             .mapToLong(e -> 1).reduce(0, Long::sum);
163                     assertTrue(c.get() >= DROP_WHILE_COUNT_LIMIT);
164                     assertTrue(rc <= DROP_SOURCE_SIZE);
165                 },
166                 s -> {
167                     AtomicLong c = new AtomicLong();
168                     long rc = s.dropWhile(e -> c.getAndIncrement() < DROP_WHILE_COUNT_LIMIT)
169                             .map(e -> 1).reduce(0, Long::sum);
170                     assertTrue(c.get() >= DROP_WHILE_COUNT_LIMIT);
171                     assertTrue(rc <= DROP_SOURCE_SIZE);
172                 },
173                 s -> {
174                     AtomicLong c = new AtomicLong();
175                     long rc = s.dropWhile(e -> c.getAndIncrement() < DROP_WHILE_COUNT_LIMIT)
176                             .mapToLong(e -> 1).reduce(0, Long::sum);
177                     assertTrue(c.get() >= DROP_WHILE_COUNT_LIMIT);
178                     assertTrue(rc <= DROP_SOURCE_SIZE);
179                 });
180     }
181 
182     @Test(groups = { "serialization-hostile" })
testCountDropWithToArray()183     public void testCountDropWithToArray() {
184         testDropWhileMulti(
185                 s -> {
186                     AtomicLong c = new AtomicLong();
187                     Object[] ra = s.dropWhile(e -> c.getAndIncrement() < DROP_WHILE_COUNT_LIMIT)
188                             .toArray();
189                     assertTrue(c.get() >= DROP_WHILE_COUNT_LIMIT);
190                     assertTrue(ra.length <= DROP_SOURCE_SIZE);
191                 },
192                 s -> {
193                     AtomicLong c = new AtomicLong();
194                     int[] ra = s.dropWhile(e -> c.getAndIncrement() < DROP_WHILE_COUNT_LIMIT)
195                             .toArray();
196                     assertTrue(c.get() >= DROP_WHILE_COUNT_LIMIT);
197                     assertTrue(ra.length <= DROP_SOURCE_SIZE);
198                 },
199                 s -> {
200                     AtomicLong c = new AtomicLong();
201                     long[] ra = s.dropWhile(e -> c.getAndIncrement() < DROP_WHILE_COUNT_LIMIT)
202                             .toArray();
203                     assertTrue(c.get() >= DROP_WHILE_COUNT_LIMIT);
204                     assertTrue(ra.length <= DROP_SOURCE_SIZE);
205                 },
206                 s -> {
207                     AtomicLong c = new AtomicLong();
208                     double[] ra = s.dropWhile(e -> c.getAndIncrement() < DROP_WHILE_COUNT_LIMIT)
209                             .toArray();
210                     assertTrue(c.get() >= DROP_WHILE_COUNT_LIMIT);
211                     assertTrue(ra.length <= DROP_SOURCE_SIZE);
212                 });
213     }
214 
215 
testTakeWhileMulti(Consumer<Stream<Integer>> mRef, Consumer<IntStream> mInt, Consumer<LongStream> mLong, Consumer<DoubleStream> mDouble)216     private void testTakeWhileMulti(Consumer<Stream<Integer>> mRef,
217                                     Consumer<IntStream> mInt,
218                                     Consumer<LongStream> mLong,
219                                     Consumer<DoubleStream> mDouble) {
220         Map<String, Supplier<Stream<Integer>>> sources = new HashMap<>();
221         sources.put("Stream.generate()", () -> Stream.generate(() -> 1));
222         sources.put("Stream.iterate()", () -> Stream.iterate(1, x -> 1));
223         sources.put("Stream.iterate().unordered()", () -> Stream.iterate(1, x -> 1));
224         testWhileMulti(sources, mRef, mInt, mLong, mDouble);
225     }
226 
testDropWhileMulti(Consumer<Stream<Integer>> mRef, Consumer<IntStream> mInt, Consumer<LongStream> mLong, Consumer<DoubleStream> mDouble)227     private void testDropWhileMulti(Consumer<Stream<Integer>> mRef,
228                                     Consumer<IntStream> mInt,
229                                     Consumer<LongStream> mLong,
230                                     Consumer<DoubleStream> mDouble) {
231         Map<String, Supplier<Stream<Integer>>> sources = new HashMap<>();
232         sources.put("IntStream.range().boxed()",
233                     () -> IntStream.range(0, DROP_SOURCE_SIZE).boxed());
234         sources.put("IntStream.range().boxed().unordered()",
235                     () -> IntStream.range(0, DROP_SOURCE_SIZE).boxed().unordered());
236         sources.put("LinkedList.stream()",
237                     () -> IntStream.range(0, DROP_SOURCE_SIZE).boxed()
238                             .collect(toCollection(LinkedList::new))
239                             .stream());
240         sources.put("LinkedList.stream().unordered()",
241                     () -> IntStream.range(0, DROP_SOURCE_SIZE).boxed()
242                             .collect(toCollection(LinkedList::new))
243                             .stream()
244                             .unordered());
245         testWhileMulti(sources, mRef, mInt, mLong, mDouble);
246     }
247 
testWhileMulti(Map<String, Supplier<Stream<Integer>>> sources, Consumer<Stream<Integer>> mRef, Consumer<IntStream> mInt, Consumer<LongStream> mLong, Consumer<DoubleStream> mDouble)248     private void testWhileMulti(Map<String, Supplier<Stream<Integer>>> sources,
249                                 Consumer<Stream<Integer>> mRef,
250                                 Consumer<IntStream> mInt,
251                                 Consumer<LongStream> mLong,
252                                 Consumer<DoubleStream> mDouble) {
253         Map<String, Function<Stream<Integer>, Stream<Integer>>> transforms = new HashMap<>();
254         transforms.put("Stream.sequential()", s -> {
255             BooleanSupplier isWithinExecutionPeriod = within(System.currentTimeMillis(),
256                                                              EXECUTION_TIME_LIMIT);
257             return s.peek(e -> {
258                 if (!isWithinExecutionPeriod.getAsBoolean()) {
259                     throw new RuntimeException();
260                 }
261             });
262         });
263         transforms.put("Stream.parallel()", s -> {
264             BooleanSupplier isWithinExecutionPeriod = within(System.currentTimeMillis(),
265                                                              EXECUTION_TIME_LIMIT);
266             return s.parallel()
267                     .peek(e -> {
268                         if (!isWithinExecutionPeriod.getAsBoolean()) {
269                             throw new RuntimeException();
270                         }
271                     });
272         });
273 
274         Map<String, Consumer<Stream<Integer>>> actions = new HashMap<>();
275         actions.put("Ref", mRef);
276         actions.put("Int", s -> mInt.accept(s.mapToInt(e -> e)));
277         actions.put("Long", s -> mLong.accept(s.mapToLong(e -> e)));
278         actions.put("Double", s -> mDouble.accept(s.mapToDouble(e -> e)));
279         actions.put("Ref using defaults", s -> mRef.accept(DefaultMethodStreams.delegateTo(s)));
280         actions.put("Int using defaults", s -> mInt.accept(DefaultMethodStreams.delegateTo(s.mapToInt(e -> e))));
281         actions.put("Long using defaults", s -> mLong.accept(DefaultMethodStreams.delegateTo(s.mapToLong(e -> e))));
282         actions.put("Double using defaults", s -> mDouble.accept(DefaultMethodStreams.delegateTo(s.mapToDouble(e -> e))));
283 
284         for (Map.Entry<String, Supplier<Stream<Integer>>> s : sources.entrySet()) {
285             setContext("source", s.getKey());
286 
287             for (Map.Entry<String, Function<Stream<Integer>, Stream<Integer>>> t : transforms.entrySet()) {
288                 setContext("transform", t.getKey());
289 
290                 for (Map.Entry<String, Consumer<Stream<Integer>>> a : actions.entrySet()) {
291                     setContext("shape", a.getKey());
292 
293                     Stream<Integer> stream = s.getValue().get();
294                     stream = t.getValue().apply(stream);
295                     a.getValue().accept(stream);
296                 }
297             }
298         }
299     }
300 
within(long start, long durationInMillis)301     static BooleanSupplier within(long start, long durationInMillis) {
302         return () -> (System.currentTimeMillis() - start) < durationInMillis;
303     }
304 }
305