1 /* 2 * Copyright (c) 2015, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 package org.openjdk.tests.java.util.stream; 24 25 import org.testng.annotations.Test; 26 27 import java.util.HashMap; 28 import java.util.LinkedList; 29 import java.util.Map; 30 import java.util.concurrent.TimeUnit; 31 import java.util.concurrent.atomic.AtomicLong; 32 import java.util.function.BooleanSupplier; 33 import java.util.function.Consumer; 34 import java.util.function.Function; 35 import java.util.function.Supplier; 36 import java.util.stream.DefaultMethodStreams; 37 import java.util.stream.DoubleStream; 38 import java.util.stream.IntStream; 39 import java.util.stream.LongStream; 40 import java.util.stream.OpTestCase; 41 import java.util.stream.Stream; 42 43 import static java.util.stream.Collectors.toCollection; 44 45 /* 46 * @test 47 * @bug 8071597 48 */ 49 @Test 50 public class WhileOpStatefulTest extends OpTestCase { 51 static final long COUNT_PERIOD = 100; 52 53 static final long EXECUTION_TIME_LIMIT = TimeUnit.SECONDS.toMillis(10); 54 55 static final long TAKE_WHILE_COUNT_LIMIT = 100_000; 56 57 static final int DROP_SOURCE_SIZE = 10_000; 58 59 static final long DROP_WHILE_COUNT_LIMIT = 5000; 60 61 @Test testTimedTakeWithCount()62 public void testTimedTakeWithCount() { 63 testTakeWhileMulti( 64 s -> { 65 BooleanSupplier isWithinTakePeriod = 66 within(System.currentTimeMillis(), COUNT_PERIOD); 67 s.takeWhile(e -> isWithinTakePeriod.getAsBoolean()) 68 .mapToLong(e -> 1).reduce(0, Long::sum); 69 }, 70 s -> { 71 BooleanSupplier isWithinTakePeriod = 72 within(System.currentTimeMillis(), COUNT_PERIOD); 73 s.takeWhile(e -> isWithinTakePeriod.getAsBoolean()) 74 .mapToLong(e -> 1).reduce(0, Long::sum); 75 }, 76 s -> { 77 BooleanSupplier isWithinTakePeriod = 78 within(System.currentTimeMillis(), COUNT_PERIOD); 79 s.takeWhile(e -> isWithinTakePeriod.getAsBoolean()) 80 .map(e -> 1).reduce(0, Long::sum); 81 }, 82 s -> { 83 BooleanSupplier isWithinTakePeriod = 84 within(System.currentTimeMillis(), COUNT_PERIOD); 85 s.takeWhile(e -> isWithinTakePeriod.getAsBoolean()) 86 .mapToLong(e -> 1).reduce(0, Long::sum); 87 }); 88 } 89 90 @Test(groups = { "serialization-hostile" }) testCountTakeWithCount()91 public void testCountTakeWithCount() { 92 testTakeWhileMulti( 93 s -> { 94 AtomicLong c = new AtomicLong(); 95 long rc = s.takeWhile(e -> c.getAndIncrement() < TAKE_WHILE_COUNT_LIMIT) 96 .mapToLong(e -> 1).reduce(0, Long::sum); 97 assertTrue(rc <= c.get()); 98 }, 99 s -> { 100 AtomicLong c = new AtomicLong(); 101 long rc = s.takeWhile(e -> c.getAndIncrement() < TAKE_WHILE_COUNT_LIMIT) 102 .mapToLong(e -> 1).reduce(0, Long::sum); 103 assertTrue(rc <= c.get()); 104 }, 105 s -> { 106 AtomicLong c = new AtomicLong(); 107 long rc = s.takeWhile(e -> c.getAndIncrement() < TAKE_WHILE_COUNT_LIMIT) 108 .map(e -> 1).reduce(0, Long::sum); 109 assertTrue(rc <= c.get()); 110 }, 111 s -> { 112 AtomicLong c = new AtomicLong(); 113 long rc = s.takeWhile(e -> c.getAndIncrement() < TAKE_WHILE_COUNT_LIMIT) 114 .mapToLong(e -> 1).reduce(0, Long::sum); 115 assertTrue(rc <= c.get()); 116 }); 117 } 118 119 @Test(groups = { "serialization-hostile" }) testCountTakeWithToArray()120 public void testCountTakeWithToArray() { 121 testTakeWhileMulti( 122 s -> { 123 AtomicLong c = new AtomicLong(); 124 Object[] ra = s.takeWhile(e -> c.getAndIncrement() < TAKE_WHILE_COUNT_LIMIT) 125 .toArray(); 126 assertTrue(ra.length <= c.get()); 127 }, 128 s -> { 129 AtomicLong c = new AtomicLong(); 130 int[] ra = s.takeWhile(e -> c.getAndIncrement() < TAKE_WHILE_COUNT_LIMIT) 131 .toArray(); 132 assertTrue(ra.length <= c.get()); 133 }, 134 s -> { 135 AtomicLong c = new AtomicLong(); 136 long[] ra = s.takeWhile(e -> c.getAndIncrement() < TAKE_WHILE_COUNT_LIMIT) 137 .toArray(); 138 assertTrue(ra.length <= c.get()); 139 }, 140 s -> { 141 AtomicLong c = new AtomicLong(); 142 double[] ra = s.takeWhile(e -> c.getAndIncrement() < TAKE_WHILE_COUNT_LIMIT) 143 .toArray(); 144 assertTrue(ra.length <= c.get()); 145 }); 146 } 147 148 149 @Test(groups = { "serialization-hostile" }) testCountDropWithCount()150 public void testCountDropWithCount() { 151 testDropWhileMulti( 152 s -> { 153 AtomicLong c = new AtomicLong(); 154 long rc = s.dropWhile(e -> c.getAndIncrement() < DROP_WHILE_COUNT_LIMIT) 155 .mapToLong(e -> 1).reduce(0, Long::sum); 156 assertTrue(c.get() >= DROP_WHILE_COUNT_LIMIT); 157 assertTrue(rc <= DROP_SOURCE_SIZE); 158 }, 159 s -> { 160 AtomicLong c = new AtomicLong(); 161 long rc = s.dropWhile(e -> c.getAndIncrement() < DROP_WHILE_COUNT_LIMIT) 162 .mapToLong(e -> 1).reduce(0, Long::sum); 163 assertTrue(c.get() >= DROP_WHILE_COUNT_LIMIT); 164 assertTrue(rc <= DROP_SOURCE_SIZE); 165 }, 166 s -> { 167 AtomicLong c = new AtomicLong(); 168 long rc = s.dropWhile(e -> c.getAndIncrement() < DROP_WHILE_COUNT_LIMIT) 169 .map(e -> 1).reduce(0, Long::sum); 170 assertTrue(c.get() >= DROP_WHILE_COUNT_LIMIT); 171 assertTrue(rc <= DROP_SOURCE_SIZE); 172 }, 173 s -> { 174 AtomicLong c = new AtomicLong(); 175 long rc = s.dropWhile(e -> c.getAndIncrement() < DROP_WHILE_COUNT_LIMIT) 176 .mapToLong(e -> 1).reduce(0, Long::sum); 177 assertTrue(c.get() >= DROP_WHILE_COUNT_LIMIT); 178 assertTrue(rc <= DROP_SOURCE_SIZE); 179 }); 180 } 181 182 @Test(groups = { "serialization-hostile" }) testCountDropWithToArray()183 public void testCountDropWithToArray() { 184 testDropWhileMulti( 185 s -> { 186 AtomicLong c = new AtomicLong(); 187 Object[] ra = s.dropWhile(e -> c.getAndIncrement() < DROP_WHILE_COUNT_LIMIT) 188 .toArray(); 189 assertTrue(c.get() >= DROP_WHILE_COUNT_LIMIT); 190 assertTrue(ra.length <= DROP_SOURCE_SIZE); 191 }, 192 s -> { 193 AtomicLong c = new AtomicLong(); 194 int[] ra = s.dropWhile(e -> c.getAndIncrement() < DROP_WHILE_COUNT_LIMIT) 195 .toArray(); 196 assertTrue(c.get() >= DROP_WHILE_COUNT_LIMIT); 197 assertTrue(ra.length <= DROP_SOURCE_SIZE); 198 }, 199 s -> { 200 AtomicLong c = new AtomicLong(); 201 long[] ra = s.dropWhile(e -> c.getAndIncrement() < DROP_WHILE_COUNT_LIMIT) 202 .toArray(); 203 assertTrue(c.get() >= DROP_WHILE_COUNT_LIMIT); 204 assertTrue(ra.length <= DROP_SOURCE_SIZE); 205 }, 206 s -> { 207 AtomicLong c = new AtomicLong(); 208 double[] ra = s.dropWhile(e -> c.getAndIncrement() < DROP_WHILE_COUNT_LIMIT) 209 .toArray(); 210 assertTrue(c.get() >= DROP_WHILE_COUNT_LIMIT); 211 assertTrue(ra.length <= DROP_SOURCE_SIZE); 212 }); 213 } 214 215 testTakeWhileMulti(Consumer<Stream<Integer>> mRef, Consumer<IntStream> mInt, Consumer<LongStream> mLong, Consumer<DoubleStream> mDouble)216 private void testTakeWhileMulti(Consumer<Stream<Integer>> mRef, 217 Consumer<IntStream> mInt, 218 Consumer<LongStream> mLong, 219 Consumer<DoubleStream> mDouble) { 220 Map<String, Supplier<Stream<Integer>>> sources = new HashMap<>(); 221 sources.put("Stream.generate()", () -> Stream.generate(() -> 1)); 222 sources.put("Stream.iterate()", () -> Stream.iterate(1, x -> 1)); 223 sources.put("Stream.iterate().unordered()", () -> Stream.iterate(1, x -> 1)); 224 testWhileMulti(sources, mRef, mInt, mLong, mDouble); 225 } 226 testDropWhileMulti(Consumer<Stream<Integer>> mRef, Consumer<IntStream> mInt, Consumer<LongStream> mLong, Consumer<DoubleStream> mDouble)227 private void testDropWhileMulti(Consumer<Stream<Integer>> mRef, 228 Consumer<IntStream> mInt, 229 Consumer<LongStream> mLong, 230 Consumer<DoubleStream> mDouble) { 231 Map<String, Supplier<Stream<Integer>>> sources = new HashMap<>(); 232 sources.put("IntStream.range().boxed()", 233 () -> IntStream.range(0, DROP_SOURCE_SIZE).boxed()); 234 sources.put("IntStream.range().boxed().unordered()", 235 () -> IntStream.range(0, DROP_SOURCE_SIZE).boxed().unordered()); 236 sources.put("LinkedList.stream()", 237 () -> IntStream.range(0, DROP_SOURCE_SIZE).boxed() 238 .collect(toCollection(LinkedList::new)) 239 .stream()); 240 sources.put("LinkedList.stream().unordered()", 241 () -> IntStream.range(0, DROP_SOURCE_SIZE).boxed() 242 .collect(toCollection(LinkedList::new)) 243 .stream() 244 .unordered()); 245 testWhileMulti(sources, mRef, mInt, mLong, mDouble); 246 } 247 testWhileMulti(Map<String, Supplier<Stream<Integer>>> sources, Consumer<Stream<Integer>> mRef, Consumer<IntStream> mInt, Consumer<LongStream> mLong, Consumer<DoubleStream> mDouble)248 private void testWhileMulti(Map<String, Supplier<Stream<Integer>>> sources, 249 Consumer<Stream<Integer>> mRef, 250 Consumer<IntStream> mInt, 251 Consumer<LongStream> mLong, 252 Consumer<DoubleStream> mDouble) { 253 Map<String, Function<Stream<Integer>, Stream<Integer>>> transforms = new HashMap<>(); 254 transforms.put("Stream.sequential()", s -> { 255 BooleanSupplier isWithinExecutionPeriod = within(System.currentTimeMillis(), 256 EXECUTION_TIME_LIMIT); 257 return s.peek(e -> { 258 if (!isWithinExecutionPeriod.getAsBoolean()) { 259 throw new RuntimeException(); 260 } 261 }); 262 }); 263 transforms.put("Stream.parallel()", s -> { 264 BooleanSupplier isWithinExecutionPeriod = within(System.currentTimeMillis(), 265 EXECUTION_TIME_LIMIT); 266 return s.parallel() 267 .peek(e -> { 268 if (!isWithinExecutionPeriod.getAsBoolean()) { 269 throw new RuntimeException(); 270 } 271 }); 272 }); 273 274 Map<String, Consumer<Stream<Integer>>> actions = new HashMap<>(); 275 actions.put("Ref", mRef); 276 actions.put("Int", s -> mInt.accept(s.mapToInt(e -> e))); 277 actions.put("Long", s -> mLong.accept(s.mapToLong(e -> e))); 278 actions.put("Double", s -> mDouble.accept(s.mapToDouble(e -> e))); 279 actions.put("Ref using defaults", s -> mRef.accept(DefaultMethodStreams.delegateTo(s))); 280 actions.put("Int using defaults", s -> mInt.accept(DefaultMethodStreams.delegateTo(s.mapToInt(e -> e)))); 281 actions.put("Long using defaults", s -> mLong.accept(DefaultMethodStreams.delegateTo(s.mapToLong(e -> e)))); 282 actions.put("Double using defaults", s -> mDouble.accept(DefaultMethodStreams.delegateTo(s.mapToDouble(e -> e)))); 283 284 for (Map.Entry<String, Supplier<Stream<Integer>>> s : sources.entrySet()) { 285 setContext("source", s.getKey()); 286 287 for (Map.Entry<String, Function<Stream<Integer>, Stream<Integer>>> t : transforms.entrySet()) { 288 setContext("transform", t.getKey()); 289 290 for (Map.Entry<String, Consumer<Stream<Integer>>> a : actions.entrySet()) { 291 setContext("shape", a.getKey()); 292 293 Stream<Integer> stream = s.getValue().get(); 294 stream = t.getValue().apply(stream); 295 a.getValue().accept(stream); 296 } 297 } 298 } 299 } 300 within(long start, long durationInMillis)301 static BooleanSupplier within(long start, long durationInMillis) { 302 return () -> (System.currentTimeMillis() - start) < durationInMillis; 303 } 304 } 305