1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. 7 * 8 * This code is distributed in the hope that it will be useful, but WITHOUT 9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 11 * version 2 for more details (a copy is included in the LICENSE file that 12 * accompanied this code). 13 * 14 * You should have received a copy of the GNU General Public License version 15 * 2 along with this work; if not, write to the Free Software Foundation, 16 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 17 * 18 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 19 * or visit www.oracle.com if you need additional information or have any 20 * questions. 21 */ 22 23 /* 24 * This file is available under and governed by the GNU General Public 25 * License version 2 only, as published by the Free Software Foundation. 26 * However, the following notice accompanied the original version of this 27 * file: 28 * 29 * Written by Doug Lea with assistance from members of JCP JSR-166 30 * Expert Group and released to the public domain, as explained at 31 * http://creativecommons.org/publicdomain/zero/1.0/ 32 */ 33 34 import static java.util.concurrent.TimeUnit.MILLISECONDS; 35 import static java.util.concurrent.TimeUnit.NANOSECONDS; 36 import static java.util.concurrent.TimeUnit.SECONDS; 37 38 import java.util.ArrayList; 39 import java.util.Collection; 40 import java.util.Collections; 41 import java.util.HashSet; 42 import java.util.List; 43 import java.util.concurrent.BlockingQueue; 44 import java.util.concurrent.Callable; 45 import java.util.concurrent.CancellationException; 46 import java.util.concurrent.CountDownLatch; 47 import java.util.concurrent.Delayed; 48 import java.util.concurrent.ExecutionException; 49 import java.util.concurrent.ExecutorService; 50 import java.util.concurrent.Future; 51 import java.util.concurrent.RejectedExecutionException; 52 import java.util.concurrent.RejectedExecutionHandler; 53 import java.util.concurrent.RunnableScheduledFuture; 54 import java.util.concurrent.ScheduledFuture; 55 import java.util.concurrent.ScheduledThreadPoolExecutor; 56 import java.util.concurrent.ThreadFactory; 57 import java.util.concurrent.ThreadLocalRandom; 58 import java.util.concurrent.ThreadPoolExecutor; 59 import java.util.concurrent.TimeoutException; 60 import java.util.concurrent.TimeUnit; 61 import java.util.concurrent.atomic.AtomicBoolean; 62 import java.util.concurrent.atomic.AtomicInteger; 63 import java.util.concurrent.atomic.AtomicLong; 64 import java.util.stream.Stream; 65 66 import junit.framework.Test; 67 import junit.framework.TestSuite; 68 69 public class ScheduledExecutorSubclassTest extends JSR166TestCase { main(String[] args)70 public static void main(String[] args) { 71 main(suite(), args); 72 } suite()73 public static Test suite() { 74 return new TestSuite(ScheduledExecutorSubclassTest.class); 75 } 76 77 static class CustomTask<V> implements RunnableScheduledFuture<V> { 78 private final RunnableScheduledFuture<V> task; 79 volatile boolean ran; CustomTask(RunnableScheduledFuture<V> task)80 CustomTask(RunnableScheduledFuture<V> task) { this.task = task; } isPeriodic()81 public boolean isPeriodic() { return task.isPeriodic(); } run()82 public void run() { 83 ran = true; 84 task.run(); 85 } getDelay(TimeUnit unit)86 public long getDelay(TimeUnit unit) { return task.getDelay(unit); } compareTo(Delayed t)87 public int compareTo(Delayed t) { 88 return task.compareTo(((CustomTask)t).task); 89 } cancel(boolean mayInterruptIfRunning)90 public boolean cancel(boolean mayInterruptIfRunning) { 91 return task.cancel(mayInterruptIfRunning); 92 } isCancelled()93 public boolean isCancelled() { return task.isCancelled(); } isDone()94 public boolean isDone() { return task.isDone(); } get()95 public V get() throws InterruptedException, ExecutionException { 96 V v = task.get(); 97 assertTrue(ran); 98 return v; 99 } get(long time, TimeUnit unit)100 public V get(long time, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { 101 V v = task.get(time, unit); 102 assertTrue(ran); 103 return v; 104 } 105 } 106 107 public class CustomExecutor extends ScheduledThreadPoolExecutor { 108 decorateTask(Runnable r, RunnableScheduledFuture<V> task)109 protected <V> RunnableScheduledFuture<V> decorateTask(Runnable r, RunnableScheduledFuture<V> task) { 110 return new CustomTask<V>(task); 111 } 112 decorateTask(Callable<V> c, RunnableScheduledFuture<V> task)113 protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> c, RunnableScheduledFuture<V> task) { 114 return new CustomTask<V>(task); 115 } CustomExecutor(int corePoolSize)116 CustomExecutor(int corePoolSize) { super(corePoolSize); } CustomExecutor(int corePoolSize, RejectedExecutionHandler handler)117 CustomExecutor(int corePoolSize, RejectedExecutionHandler handler) { 118 super(corePoolSize, handler); 119 } 120 CustomExecutor(int corePoolSize, ThreadFactory threadFactory)121 CustomExecutor(int corePoolSize, ThreadFactory threadFactory) { 122 super(corePoolSize, threadFactory); 123 } CustomExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler)124 CustomExecutor(int corePoolSize, ThreadFactory threadFactory, 125 RejectedExecutionHandler handler) { 126 super(corePoolSize, threadFactory, handler); 127 } 128 129 } 130 131 /** 132 * execute successfully executes a runnable 133 */ testExecute()134 public void testExecute() throws InterruptedException { 135 final CustomExecutor p = new CustomExecutor(1); 136 try (PoolCleaner cleaner = cleaner(p)) { 137 final CountDownLatch done = new CountDownLatch(1); 138 final Runnable task = new CheckedRunnable() { 139 public void realRun() { done.countDown(); }}; 140 p.execute(task); 141 await(done); 142 } 143 } 144 145 /** 146 * delayed schedule of callable successfully executes after delay 147 */ testSchedule1()148 public void testSchedule1() throws Exception { 149 final CountDownLatch done = new CountDownLatch(1); 150 final CustomExecutor p = new CustomExecutor(1); 151 try (PoolCleaner cleaner = cleaner(p, done)) { 152 final long startTime = System.nanoTime(); 153 Callable<Boolean> task = new CheckedCallable<>() { 154 public Boolean realCall() { 155 done.countDown(); 156 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 157 return Boolean.TRUE; 158 }}; 159 Future<Boolean> f = p.schedule(task, timeoutMillis(), MILLISECONDS); 160 assertSame(Boolean.TRUE, f.get()); 161 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 162 } 163 } 164 165 /** 166 * delayed schedule of runnable successfully executes after delay 167 */ testSchedule3()168 public void testSchedule3() throws Exception { 169 final CustomExecutor p = new CustomExecutor(1); 170 try (PoolCleaner cleaner = cleaner(p)) { 171 final long startTime = System.nanoTime(); 172 final CountDownLatch done = new CountDownLatch(1); 173 Runnable task = new CheckedRunnable() { 174 public void realRun() { 175 done.countDown(); 176 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 177 }}; 178 Future<?> f = p.schedule(task, timeoutMillis(), MILLISECONDS); 179 await(done); 180 assertNull(f.get(LONG_DELAY_MS, MILLISECONDS)); 181 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 182 } 183 } 184 185 /** 186 * scheduleAtFixedRate executes runnable after given initial delay 187 */ testSchedule4()188 public void testSchedule4() throws InterruptedException { 189 final CustomExecutor p = new CustomExecutor(1); 190 try (PoolCleaner cleaner = cleaner(p)) { 191 final long startTime = System.nanoTime(); 192 final CountDownLatch done = new CountDownLatch(1); 193 Runnable task = new CheckedRunnable() { 194 public void realRun() { 195 done.countDown(); 196 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 197 }}; 198 ScheduledFuture<?> f = 199 p.scheduleAtFixedRate(task, timeoutMillis(), 200 LONG_DELAY_MS, MILLISECONDS); 201 await(done); 202 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 203 f.cancel(true); 204 } 205 } 206 207 /** 208 * scheduleWithFixedDelay executes runnable after given initial delay 209 */ testSchedule5()210 public void testSchedule5() throws InterruptedException { 211 final CustomExecutor p = new CustomExecutor(1); 212 try (PoolCleaner cleaner = cleaner(p)) { 213 final long startTime = System.nanoTime(); 214 final CountDownLatch done = new CountDownLatch(1); 215 Runnable task = new CheckedRunnable() { 216 public void realRun() { 217 done.countDown(); 218 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 219 }}; 220 ScheduledFuture<?> f = 221 p.scheduleWithFixedDelay(task, timeoutMillis(), 222 LONG_DELAY_MS, MILLISECONDS); 223 await(done); 224 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 225 f.cancel(true); 226 } 227 } 228 229 static class RunnableCounter implements Runnable { 230 AtomicInteger count = new AtomicInteger(0); run()231 public void run() { count.getAndIncrement(); } 232 } 233 234 /** 235 * scheduleAtFixedRate executes series of tasks at given rate. 236 * Eventually, it must hold that: 237 * cycles - 1 <= elapsedMillis/delay < cycles 238 */ testFixedRateSequence()239 public void testFixedRateSequence() throws InterruptedException { 240 final CustomExecutor p = new CustomExecutor(1); 241 try (PoolCleaner cleaner = cleaner(p)) { 242 for (int delay = 1; delay <= LONG_DELAY_MS; delay *= 3) { 243 final long startTime = System.nanoTime(); 244 final int cycles = 8; 245 final CountDownLatch done = new CountDownLatch(cycles); 246 final Runnable task = new CheckedRunnable() { 247 public void realRun() { done.countDown(); }}; 248 final ScheduledFuture<?> periodicTask = 249 p.scheduleAtFixedRate(task, 0, delay, MILLISECONDS); 250 final int totalDelayMillis = (cycles - 1) * delay; 251 await(done, totalDelayMillis + LONG_DELAY_MS); 252 periodicTask.cancel(true); 253 final long elapsedMillis = millisElapsedSince(startTime); 254 assertTrue(elapsedMillis >= totalDelayMillis); 255 if (elapsedMillis <= cycles * delay) 256 return; 257 // else retry with longer delay 258 } 259 fail("unexpected execution rate"); 260 } 261 } 262 263 /** 264 * scheduleWithFixedDelay executes series of tasks with given period. 265 * Eventually, it must hold that each task starts at least delay and at 266 * most 2 * delay after the termination of the previous task. 267 */ testFixedDelaySequence()268 public void testFixedDelaySequence() throws InterruptedException { 269 final CustomExecutor p = new CustomExecutor(1); 270 try (PoolCleaner cleaner = cleaner(p)) { 271 for (int delay = 1; delay <= LONG_DELAY_MS; delay *= 3) { 272 final long startTime = System.nanoTime(); 273 final AtomicLong previous = new AtomicLong(startTime); 274 final AtomicBoolean tryLongerDelay = new AtomicBoolean(false); 275 final int cycles = 8; 276 final CountDownLatch done = new CountDownLatch(cycles); 277 final int d = delay; 278 final Runnable task = new CheckedRunnable() { 279 public void realRun() { 280 long now = System.nanoTime(); 281 long elapsedMillis 282 = NANOSECONDS.toMillis(now - previous.get()); 283 if (done.getCount() == cycles) { // first execution 284 if (elapsedMillis >= d) 285 tryLongerDelay.set(true); 286 } else { 287 assertTrue(elapsedMillis >= d); 288 if (elapsedMillis >= 2 * d) 289 tryLongerDelay.set(true); 290 } 291 previous.set(now); 292 done.countDown(); 293 }}; 294 final ScheduledFuture<?> periodicTask = 295 p.scheduleWithFixedDelay(task, 0, delay, MILLISECONDS); 296 final int totalDelayMillis = (cycles - 1) * delay; 297 await(done, totalDelayMillis + cycles * LONG_DELAY_MS); 298 periodicTask.cancel(true); 299 final long elapsedMillis = millisElapsedSince(startTime); 300 assertTrue(elapsedMillis >= totalDelayMillis); 301 if (!tryLongerDelay.get()) 302 return; 303 // else retry with longer delay 304 } 305 fail("unexpected execution rate"); 306 } 307 } 308 309 /** 310 * Submitting null tasks throws NullPointerException 311 */ testNullTaskSubmission()312 public void testNullTaskSubmission() { 313 final CustomExecutor p = new CustomExecutor(1); 314 try (PoolCleaner cleaner = cleaner(p)) { 315 assertNullTaskSubmissionThrowsNullPointerException(p); 316 } 317 } 318 319 /** 320 * Submitted tasks are rejected when shutdown 321 */ testSubmittedTasksRejectedWhenShutdown()322 public void testSubmittedTasksRejectedWhenShutdown() throws InterruptedException { 323 final CustomExecutor p = new CustomExecutor(2); 324 final ThreadLocalRandom rnd = ThreadLocalRandom.current(); 325 final CountDownLatch threadsStarted = new CountDownLatch(p.getCorePoolSize()); 326 final CountDownLatch done = new CountDownLatch(1); 327 final Runnable r = () -> { 328 threadsStarted.countDown(); 329 for (;;) { 330 try { 331 done.await(); 332 return; 333 } catch (InterruptedException shutdownNowDeliberatelyIgnored) {} 334 }}; 335 final Callable<Boolean> c = () -> { 336 threadsStarted.countDown(); 337 for (;;) { 338 try { 339 done.await(); 340 return Boolean.TRUE; 341 } catch (InterruptedException shutdownNowDeliberatelyIgnored) {} 342 }}; 343 344 try (PoolCleaner cleaner = cleaner(p, done)) { 345 for (int i = p.getCorePoolSize(); i--> 0; ) { 346 switch (rnd.nextInt(4)) { 347 case 0: p.execute(r); break; 348 case 1: assertFalse(p.submit(r).isDone()); break; 349 case 2: assertFalse(p.submit(r, Boolean.TRUE).isDone()); break; 350 case 3: assertFalse(p.submit(c).isDone()); break; 351 } 352 } 353 354 // ScheduledThreadPoolExecutor has an unbounded queue, so never saturated. 355 await(threadsStarted); 356 357 if (rnd.nextBoolean()) 358 p.shutdownNow(); 359 else 360 p.shutdown(); 361 // Pool is shutdown, but not yet terminated 362 assertTaskSubmissionsAreRejected(p); 363 assertFalse(p.isTerminated()); 364 365 done.countDown(); // release blocking tasks 366 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS)); 367 368 assertTaskSubmissionsAreRejected(p); 369 } 370 assertEquals(p.getCorePoolSize(), p.getCompletedTaskCount()); 371 } 372 373 /** 374 * getActiveCount increases but doesn't overestimate, when a 375 * thread becomes active 376 */ testGetActiveCount()377 public void testGetActiveCount() throws InterruptedException { 378 final CountDownLatch done = new CountDownLatch(1); 379 final ThreadPoolExecutor p = new CustomExecutor(2); 380 try (PoolCleaner cleaner = cleaner(p, done)) { 381 final CountDownLatch threadStarted = new CountDownLatch(1); 382 assertEquals(0, p.getActiveCount()); 383 p.execute(new CheckedRunnable() { 384 public void realRun() throws InterruptedException { 385 threadStarted.countDown(); 386 assertEquals(1, p.getActiveCount()); 387 await(done); 388 }}); 389 await(threadStarted); 390 assertEquals(1, p.getActiveCount()); 391 } 392 } 393 394 /** 395 * getCompletedTaskCount increases, but doesn't overestimate, 396 * when tasks complete 397 */ testGetCompletedTaskCount()398 public void testGetCompletedTaskCount() throws InterruptedException { 399 final ThreadPoolExecutor p = new CustomExecutor(2); 400 try (PoolCleaner cleaner = cleaner(p)) { 401 final CountDownLatch threadStarted = new CountDownLatch(1); 402 final CountDownLatch threadProceed = new CountDownLatch(1); 403 final CountDownLatch threadDone = new CountDownLatch(1); 404 assertEquals(0, p.getCompletedTaskCount()); 405 p.execute(new CheckedRunnable() { 406 public void realRun() throws InterruptedException { 407 threadStarted.countDown(); 408 assertEquals(0, p.getCompletedTaskCount()); 409 await(threadProceed); 410 threadDone.countDown(); 411 }}); 412 await(threadStarted); 413 assertEquals(0, p.getCompletedTaskCount()); 414 threadProceed.countDown(); 415 await(threadDone); 416 long startTime = System.nanoTime(); 417 while (p.getCompletedTaskCount() != 1) { 418 if (millisElapsedSince(startTime) > LONG_DELAY_MS) 419 fail("timed out"); 420 Thread.yield(); 421 } 422 } 423 } 424 425 /** 426 * getCorePoolSize returns size given in constructor if not otherwise set 427 */ testGetCorePoolSize()428 public void testGetCorePoolSize() { 429 final CustomExecutor p = new CustomExecutor(1); 430 try (PoolCleaner cleaner = cleaner(p)) { 431 assertEquals(1, p.getCorePoolSize()); 432 } 433 } 434 435 /** 436 * getLargestPoolSize increases, but doesn't overestimate, when 437 * multiple threads active 438 */ testGetLargestPoolSize()439 public void testGetLargestPoolSize() throws InterruptedException { 440 final int THREADS = 3; 441 final CountDownLatch done = new CountDownLatch(1); 442 final ThreadPoolExecutor p = new CustomExecutor(THREADS); 443 try (PoolCleaner cleaner = cleaner(p, done)) { 444 final CountDownLatch threadsStarted = new CountDownLatch(THREADS); 445 assertEquals(0, p.getLargestPoolSize()); 446 for (int i = 0; i < THREADS; i++) 447 p.execute(new CheckedRunnable() { 448 public void realRun() throws InterruptedException { 449 threadsStarted.countDown(); 450 await(done); 451 assertEquals(THREADS, p.getLargestPoolSize()); 452 }}); 453 await(threadsStarted); 454 assertEquals(THREADS, p.getLargestPoolSize()); 455 } 456 assertEquals(THREADS, p.getLargestPoolSize()); 457 } 458 459 /** 460 * getPoolSize increases, but doesn't overestimate, when threads 461 * become active 462 */ testGetPoolSize()463 public void testGetPoolSize() throws InterruptedException { 464 final CountDownLatch done = new CountDownLatch(1); 465 final ThreadPoolExecutor p = new CustomExecutor(1); 466 try (PoolCleaner cleaner = cleaner(p, done)) { 467 final CountDownLatch threadStarted = new CountDownLatch(1); 468 assertEquals(0, p.getPoolSize()); 469 p.execute(new CheckedRunnable() { 470 public void realRun() throws InterruptedException { 471 threadStarted.countDown(); 472 assertEquals(1, p.getPoolSize()); 473 await(done); 474 }}); 475 await(threadStarted); 476 assertEquals(1, p.getPoolSize()); 477 } 478 } 479 480 /** 481 * getTaskCount increases, but doesn't overestimate, when tasks 482 * submitted 483 */ testGetTaskCount()484 public void testGetTaskCount() throws InterruptedException { 485 final int TASKS = 3; 486 final CountDownLatch done = new CountDownLatch(1); 487 final ThreadPoolExecutor p = new CustomExecutor(1); 488 try (PoolCleaner cleaner = cleaner(p, done)) { 489 final CountDownLatch threadStarted = new CountDownLatch(1); 490 assertEquals(0, p.getTaskCount()); 491 assertEquals(0, p.getCompletedTaskCount()); 492 p.execute(new CheckedRunnable() { 493 public void realRun() throws InterruptedException { 494 threadStarted.countDown(); 495 await(done); 496 }}); 497 await(threadStarted); 498 assertEquals(1, p.getTaskCount()); 499 assertEquals(0, p.getCompletedTaskCount()); 500 for (int i = 0; i < TASKS; i++) { 501 assertEquals(1 + i, p.getTaskCount()); 502 p.execute(new CheckedRunnable() { 503 public void realRun() throws InterruptedException { 504 threadStarted.countDown(); 505 assertEquals(1 + TASKS, p.getTaskCount()); 506 await(done); 507 }}); 508 } 509 assertEquals(1 + TASKS, p.getTaskCount()); 510 assertEquals(0, p.getCompletedTaskCount()); 511 } 512 assertEquals(1 + TASKS, p.getTaskCount()); 513 assertEquals(1 + TASKS, p.getCompletedTaskCount()); 514 } 515 516 /** 517 * getThreadFactory returns factory in constructor if not set 518 */ testGetThreadFactory()519 public void testGetThreadFactory() { 520 final ThreadFactory threadFactory = new SimpleThreadFactory(); 521 final CustomExecutor p = new CustomExecutor(1, threadFactory); 522 try (PoolCleaner cleaner = cleaner(p)) { 523 assertSame(threadFactory, p.getThreadFactory()); 524 } 525 } 526 527 /** 528 * setThreadFactory sets the thread factory returned by getThreadFactory 529 */ testSetThreadFactory()530 public void testSetThreadFactory() { 531 final ThreadFactory threadFactory = new SimpleThreadFactory(); 532 final CustomExecutor p = new CustomExecutor(1); 533 try (PoolCleaner cleaner = cleaner(p)) { 534 p.setThreadFactory(threadFactory); 535 assertSame(threadFactory, p.getThreadFactory()); 536 } 537 } 538 539 /** 540 * setThreadFactory(null) throws NPE 541 */ testSetThreadFactoryNull()542 public void testSetThreadFactoryNull() { 543 final CustomExecutor p = new CustomExecutor(1); 544 try (PoolCleaner cleaner = cleaner(p)) { 545 try { 546 p.setThreadFactory(null); 547 shouldThrow(); 548 } catch (NullPointerException success) {} 549 } 550 } 551 552 /** 553 * isShutdown is false before shutdown, true after 554 */ testIsShutdown()555 public void testIsShutdown() { 556 final CustomExecutor p = new CustomExecutor(1); 557 try (PoolCleaner cleaner = cleaner(p)) { 558 assertFalse(p.isShutdown()); 559 try { p.shutdown(); } catch (SecurityException ok) { return; } 560 assertTrue(p.isShutdown()); 561 } 562 } 563 564 /** 565 * isTerminated is false before termination, true after 566 */ testIsTerminated()567 public void testIsTerminated() throws InterruptedException { 568 final CountDownLatch done = new CountDownLatch(1); 569 final ThreadPoolExecutor p = new CustomExecutor(1); 570 try (PoolCleaner cleaner = cleaner(p)) { 571 final CountDownLatch threadStarted = new CountDownLatch(1); 572 p.execute(new CheckedRunnable() { 573 public void realRun() throws InterruptedException { 574 assertFalse(p.isTerminated()); 575 threadStarted.countDown(); 576 await(done); 577 }}); 578 await(threadStarted); 579 assertFalse(p.isTerminated()); 580 assertFalse(p.isTerminating()); 581 done.countDown(); 582 try { p.shutdown(); } catch (SecurityException ok) { return; } 583 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS)); 584 assertTrue(p.isTerminated()); 585 } 586 } 587 588 /** 589 * isTerminating is not true when running or when terminated 590 */ testIsTerminating()591 public void testIsTerminating() throws InterruptedException { 592 final CountDownLatch done = new CountDownLatch(1); 593 final ThreadPoolExecutor p = new CustomExecutor(1); 594 try (PoolCleaner cleaner = cleaner(p)) { 595 final CountDownLatch threadStarted = new CountDownLatch(1); 596 assertFalse(p.isTerminating()); 597 p.execute(new CheckedRunnable() { 598 public void realRun() throws InterruptedException { 599 assertFalse(p.isTerminating()); 600 threadStarted.countDown(); 601 await(done); 602 }}); 603 await(threadStarted); 604 assertFalse(p.isTerminating()); 605 done.countDown(); 606 try { p.shutdown(); } catch (SecurityException ok) { return; } 607 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS)); 608 assertTrue(p.isTerminated()); 609 assertFalse(p.isTerminating()); 610 } 611 } 612 613 /** 614 * getQueue returns the work queue, which contains queued tasks 615 */ testGetQueue()616 public void testGetQueue() throws InterruptedException { 617 final CountDownLatch done = new CountDownLatch(1); 618 final ScheduledThreadPoolExecutor p = new CustomExecutor(1); 619 try (PoolCleaner cleaner = cleaner(p, done)) { 620 final CountDownLatch threadStarted = new CountDownLatch(1); 621 @SuppressWarnings("unchecked") 622 ScheduledFuture<?>[] tasks = (ScheduledFuture<?>[])new ScheduledFuture[5]; 623 for (int i = 0; i < tasks.length; i++) { 624 Runnable r = new CheckedRunnable() { 625 public void realRun() throws InterruptedException { 626 threadStarted.countDown(); 627 await(done); 628 }}; 629 tasks[i] = p.schedule(r, 1, MILLISECONDS); 630 } 631 await(threadStarted); 632 BlockingQueue<Runnable> q = p.getQueue(); 633 assertTrue(q.contains(tasks[tasks.length - 1])); 634 assertFalse(q.contains(tasks[0])); 635 } 636 } 637 638 /** 639 * remove(task) removes queued task, and fails to remove active task 640 */ testRemove()641 public void testRemove() throws InterruptedException { 642 final CountDownLatch done = new CountDownLatch(1); 643 final ScheduledThreadPoolExecutor p = new CustomExecutor(1); 644 try (PoolCleaner cleaner = cleaner(p, done)) { 645 @SuppressWarnings("unchecked") 646 ScheduledFuture<?>[] tasks = (ScheduledFuture<?>[])new ScheduledFuture[5]; 647 final CountDownLatch threadStarted = new CountDownLatch(1); 648 for (int i = 0; i < tasks.length; i++) { 649 Runnable r = new CheckedRunnable() { 650 public void realRun() throws InterruptedException { 651 threadStarted.countDown(); 652 await(done); 653 }}; 654 tasks[i] = p.schedule(r, 1, MILLISECONDS); 655 } 656 await(threadStarted); 657 BlockingQueue<Runnable> q = p.getQueue(); 658 assertFalse(p.remove((Runnable)tasks[0])); 659 assertTrue(q.contains((Runnable)tasks[4])); 660 assertTrue(q.contains((Runnable)tasks[3])); 661 assertTrue(p.remove((Runnable)tasks[4])); 662 assertFalse(p.remove((Runnable)tasks[4])); 663 assertFalse(q.contains((Runnable)tasks[4])); 664 assertTrue(q.contains((Runnable)tasks[3])); 665 assertTrue(p.remove((Runnable)tasks[3])); 666 assertFalse(q.contains((Runnable)tasks[3])); 667 } 668 } 669 670 /** 671 * purge removes cancelled tasks from the queue 672 */ testPurge()673 public void testPurge() throws InterruptedException { 674 @SuppressWarnings("unchecked") 675 ScheduledFuture<?>[] tasks = (ScheduledFuture<?>[])new ScheduledFuture[5]; 676 final Runnable releaser = new Runnable() { public void run() { 677 for (ScheduledFuture<?> task : tasks) 678 if (task != null) task.cancel(true); }}; 679 final CustomExecutor p = new CustomExecutor(1); 680 try (PoolCleaner cleaner = cleaner(p, releaser)) { 681 for (int i = 0; i < tasks.length; i++) 682 tasks[i] = p.schedule(possiblyInterruptedRunnable(SMALL_DELAY_MS), 683 LONG_DELAY_MS, MILLISECONDS); 684 int max = tasks.length; 685 if (tasks[4].cancel(true)) --max; 686 if (tasks[3].cancel(true)) --max; 687 // There must eventually be an interference-free point at 688 // which purge will not fail. (At worst, when queue is empty.) 689 long startTime = System.nanoTime(); 690 do { 691 p.purge(); 692 long count = p.getTaskCount(); 693 if (count == max) 694 return; 695 } while (millisElapsedSince(startTime) < LONG_DELAY_MS); 696 fail("Purge failed to remove cancelled tasks"); 697 } 698 } 699 700 /** 701 * shutdownNow returns a list containing tasks that were not run, 702 * and those tasks are drained from the queue 703 */ testShutdownNow()704 public void testShutdownNow() throws InterruptedException { 705 final int poolSize = 2; 706 final int count = 5; 707 final AtomicInteger ran = new AtomicInteger(0); 708 final CustomExecutor p = new CustomExecutor(poolSize); 709 final CountDownLatch threadsStarted = new CountDownLatch(poolSize); 710 Runnable waiter = new CheckedRunnable() { public void realRun() { 711 threadsStarted.countDown(); 712 try { 713 MILLISECONDS.sleep(LONGER_DELAY_MS); 714 } catch (InterruptedException success) {} 715 ran.getAndIncrement(); 716 }}; 717 for (int i = 0; i < count; i++) 718 p.execute(waiter); 719 await(threadsStarted); 720 assertEquals(poolSize, p.getActiveCount()); 721 assertEquals(0, p.getCompletedTaskCount()); 722 final List<Runnable> queuedTasks; 723 try { 724 queuedTasks = p.shutdownNow(); 725 } catch (SecurityException ok) { 726 return; // Allowed in case test doesn't have privs 727 } 728 assertTrue(p.isShutdown()); 729 assertTrue(p.getQueue().isEmpty()); 730 assertEquals(count - poolSize, queuedTasks.size()); 731 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS)); 732 assertTrue(p.isTerminated()); 733 assertEquals(poolSize, ran.get()); 734 assertEquals(poolSize, p.getCompletedTaskCount()); 735 } 736 737 /** 738 * shutdownNow returns a list containing tasks that were not run, 739 * and those tasks are drained from the queue 740 */ testShutdownNow_delayedTasks()741 public void testShutdownNow_delayedTasks() throws InterruptedException { 742 final CustomExecutor p = new CustomExecutor(1); 743 List<ScheduledFuture<?>> tasks = new ArrayList<>(); 744 for (int i = 0; i < 3; i++) { 745 Runnable r = new NoOpRunnable(); 746 tasks.add(p.schedule(r, 9, SECONDS)); 747 tasks.add(p.scheduleAtFixedRate(r, 9, 9, SECONDS)); 748 tasks.add(p.scheduleWithFixedDelay(r, 9, 9, SECONDS)); 749 } 750 if (testImplementationDetails) 751 assertEquals(new HashSet<Object>(tasks), new HashSet<Object>(p.getQueue())); 752 final List<Runnable> queuedTasks; 753 try { 754 queuedTasks = p.shutdownNow(); 755 } catch (SecurityException ok) { 756 return; // Allowed in case test doesn't have privs 757 } 758 assertTrue(p.isShutdown()); 759 assertTrue(p.getQueue().isEmpty()); 760 if (testImplementationDetails) 761 assertEquals(new HashSet<Object>(tasks), new HashSet<Object>(queuedTasks)); 762 assertEquals(tasks.size(), queuedTasks.size()); 763 for (ScheduledFuture<?> task : tasks) { 764 assertFalse(((CustomTask)task).ran); 765 assertFalse(task.isDone()); 766 assertFalse(task.isCancelled()); 767 } 768 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS)); 769 assertTrue(p.isTerminated()); 770 } 771 772 /** 773 * By default, periodic tasks are cancelled at shutdown. 774 * By default, delayed tasks keep running after shutdown. 775 * Check that changing the default values work: 776 * - setExecuteExistingDelayedTasksAfterShutdownPolicy 777 * - setContinueExistingPeriodicTasksAfterShutdownPolicy 778 */ 779 @SuppressWarnings("FutureReturnValueIgnored") testShutdown_cancellation()780 public void testShutdown_cancellation() throws Exception { 781 final int poolSize = 4; 782 final CustomExecutor p = new CustomExecutor(poolSize); 783 final BlockingQueue<Runnable> q = p.getQueue(); 784 final ThreadLocalRandom rnd = ThreadLocalRandom.current(); 785 final long delay = rnd.nextInt(2); 786 final int rounds = rnd.nextInt(1, 3); 787 final boolean effectiveDelayedPolicy; 788 final boolean effectivePeriodicPolicy; 789 final boolean effectiveRemovePolicy; 790 791 if (rnd.nextBoolean()) 792 p.setExecuteExistingDelayedTasksAfterShutdownPolicy( 793 effectiveDelayedPolicy = rnd.nextBoolean()); 794 else 795 effectiveDelayedPolicy = true; 796 assertEquals(effectiveDelayedPolicy, 797 p.getExecuteExistingDelayedTasksAfterShutdownPolicy()); 798 799 if (rnd.nextBoolean()) 800 p.setContinueExistingPeriodicTasksAfterShutdownPolicy( 801 effectivePeriodicPolicy = rnd.nextBoolean()); 802 else 803 effectivePeriodicPolicy = false; 804 assertEquals(effectivePeriodicPolicy, 805 p.getContinueExistingPeriodicTasksAfterShutdownPolicy()); 806 807 if (rnd.nextBoolean()) 808 p.setRemoveOnCancelPolicy( 809 effectiveRemovePolicy = rnd.nextBoolean()); 810 else 811 effectiveRemovePolicy = false; 812 assertEquals(effectiveRemovePolicy, 813 p.getRemoveOnCancelPolicy()); 814 815 final boolean periodicTasksContinue = effectivePeriodicPolicy && rnd.nextBoolean(); 816 817 // Strategy: Wedge the pool with one wave of "blocker" tasks, 818 // then add a second wave that waits in the queue until unblocked. 819 final AtomicInteger ran = new AtomicInteger(0); 820 final CountDownLatch poolBlocked = new CountDownLatch(poolSize); 821 final CountDownLatch unblock = new CountDownLatch(1); 822 final RuntimeException exception = new RuntimeException(); 823 824 class Task implements Runnable { 825 public void run() { 826 try { 827 ran.getAndIncrement(); 828 poolBlocked.countDown(); 829 await(unblock); 830 } catch (Throwable fail) { threadUnexpectedException(fail); } 831 } 832 } 833 834 class PeriodicTask extends Task { 835 PeriodicTask(int rounds) { this.rounds = rounds; } 836 int rounds; 837 public void run() { 838 if (--rounds == 0) super.run(); 839 // throw exception to surely terminate this periodic task, 840 // but in a separate execution and in a detectable way. 841 if (rounds == -1) throw exception; 842 } 843 } 844 845 Runnable task = new Task(); 846 847 List<Future<?>> immediates = new ArrayList<>(); 848 List<Future<?>> delayeds = new ArrayList<>(); 849 List<Future<?>> periodics = new ArrayList<>(); 850 851 immediates.add(p.submit(task)); 852 delayeds.add(p.schedule(task, delay, MILLISECONDS)); 853 periodics.add(p.scheduleAtFixedRate( 854 new PeriodicTask(rounds), delay, 1, MILLISECONDS)); 855 periodics.add(p.scheduleWithFixedDelay( 856 new PeriodicTask(rounds), delay, 1, MILLISECONDS)); 857 858 await(poolBlocked); 859 860 assertEquals(poolSize, ran.get()); 861 assertEquals(poolSize, p.getActiveCount()); 862 assertTrue(q.isEmpty()); 863 864 // Add second wave of tasks. 865 immediates.add(p.submit(task)); 866 delayeds.add(p.schedule(task, effectiveDelayedPolicy ? delay : LONG_DELAY_MS, MILLISECONDS)); 867 periodics.add(p.scheduleAtFixedRate( 868 new PeriodicTask(rounds), delay, 1, MILLISECONDS)); 869 periodics.add(p.scheduleWithFixedDelay( 870 new PeriodicTask(rounds), delay, 1, MILLISECONDS)); 871 872 assertEquals(poolSize, q.size()); 873 assertEquals(poolSize, ran.get()); 874 875 immediates.forEach( 876 f -> assertTrue(((ScheduledFuture)f).getDelay(NANOSECONDS) <= 0L)); 877 878 Stream.of(immediates, delayeds, periodics).flatMap(Collection::stream) 879 .forEach(f -> assertFalse(f.isDone())); 880 881 try { p.shutdown(); } catch (SecurityException ok) { return; } 882 assertTrue(p.isShutdown()); 883 assertTrue(p.isTerminating()); 884 assertFalse(p.isTerminated()); 885 886 if (rnd.nextBoolean()) 887 assertThrows( 888 RejectedExecutionException.class, 889 () -> p.submit(task), 890 () -> p.schedule(task, 1, SECONDS), 891 () -> p.scheduleAtFixedRate( 892 new PeriodicTask(1), 1, 1, SECONDS), 893 () -> p.scheduleWithFixedDelay( 894 new PeriodicTask(2), 1, 1, SECONDS)); 895 896 assertTrue(q.contains(immediates.get(1))); 897 assertTrue(!effectiveDelayedPolicy 898 ^ q.contains(delayeds.get(1))); 899 assertTrue(!effectivePeriodicPolicy 900 ^ q.containsAll(periodics.subList(2, 4))); 901 902 immediates.forEach(f -> assertFalse(f.isDone())); 903 904 assertFalse(delayeds.get(0).isDone()); 905 if (effectiveDelayedPolicy) 906 assertFalse(delayeds.get(1).isDone()); 907 else 908 assertTrue(delayeds.get(1).isCancelled()); 909 910 if (effectivePeriodicPolicy) 911 periodics.forEach( 912 f -> { 913 assertFalse(f.isDone()); 914 if (!periodicTasksContinue) { 915 assertTrue(f.cancel(false)); 916 assertTrue(f.isCancelled()); 917 } 918 }); 919 else { 920 periodics.subList(0, 2).forEach(f -> assertFalse(f.isDone())); 921 periodics.subList(2, 4).forEach(f -> assertTrue(f.isCancelled())); 922 } 923 924 unblock.countDown(); // Release all pool threads 925 926 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS)); 927 assertFalse(p.isTerminating()); 928 assertTrue(p.isTerminated()); 929 930 assertTrue(q.isEmpty()); 931 932 Stream.of(immediates, delayeds, periodics).flatMap(Collection::stream) 933 .forEach(f -> assertTrue(f.isDone())); 934 935 for (Future<?> f : immediates) assertNull(f.get()); 936 937 assertNull(delayeds.get(0).get()); 938 if (effectiveDelayedPolicy) 939 assertNull(delayeds.get(1).get()); 940 else 941 assertTrue(delayeds.get(1).isCancelled()); 942 943 if (periodicTasksContinue) 944 periodics.forEach( 945 f -> { 946 try { f.get(); } 947 catch (ExecutionException success) { 948 assertSame(exception, success.getCause()); 949 } 950 catch (Throwable fail) { threadUnexpectedException(fail); } 951 }); 952 else 953 periodics.forEach(f -> assertTrue(f.isCancelled())); 954 955 assertEquals(poolSize + 1 956 + (effectiveDelayedPolicy ? 1 : 0) 957 + (periodicTasksContinue ? 2 : 0), 958 ran.get()); 959 } 960 961 /** 962 * completed submit of callable returns result 963 */ testSubmitCallable()964 public void testSubmitCallable() throws Exception { 965 final ExecutorService e = new CustomExecutor(2); 966 try (PoolCleaner cleaner = cleaner(e)) { 967 Future<String> future = e.submit(new StringTask()); 968 String result = future.get(); 969 assertSame(TEST_STRING, result); 970 } 971 } 972 973 /** 974 * completed submit of runnable returns successfully 975 */ testSubmitRunnable()976 public void testSubmitRunnable() throws Exception { 977 final ExecutorService e = new CustomExecutor(2); 978 try (PoolCleaner cleaner = cleaner(e)) { 979 Future<?> future = e.submit(new NoOpRunnable()); 980 future.get(); 981 assertTrue(future.isDone()); 982 } 983 } 984 985 /** 986 * completed submit of (runnable, result) returns result 987 */ testSubmitRunnable2()988 public void testSubmitRunnable2() throws Exception { 989 final ExecutorService e = new CustomExecutor(2); 990 try (PoolCleaner cleaner = cleaner(e)) { 991 Future<String> future = e.submit(new NoOpRunnable(), TEST_STRING); 992 String result = future.get(); 993 assertSame(TEST_STRING, result); 994 } 995 } 996 997 /** 998 * invokeAny(null) throws NPE 999 */ testInvokeAny1()1000 public void testInvokeAny1() throws Exception { 1001 final ExecutorService e = new CustomExecutor(2); 1002 try (PoolCleaner cleaner = cleaner(e)) { 1003 try { 1004 e.invokeAny(null); 1005 shouldThrow(); 1006 } catch (NullPointerException success) {} 1007 } 1008 } 1009 1010 /** 1011 * invokeAny(empty collection) throws IllegalArgumentException 1012 */ testInvokeAny2()1013 public void testInvokeAny2() throws Exception { 1014 final ExecutorService e = new CustomExecutor(2); 1015 try (PoolCleaner cleaner = cleaner(e)) { 1016 try { 1017 e.invokeAny(new ArrayList<Callable<String>>()); 1018 shouldThrow(); 1019 } catch (IllegalArgumentException success) {} 1020 } 1021 } 1022 1023 /** 1024 * invokeAny(c) throws NPE if c has null elements 1025 */ testInvokeAny3()1026 public void testInvokeAny3() throws Exception { 1027 final CountDownLatch latch = new CountDownLatch(1); 1028 final ExecutorService e = new CustomExecutor(2); 1029 try (PoolCleaner cleaner = cleaner(e)) { 1030 List<Callable<String>> l = new ArrayList<>(); 1031 l.add(latchAwaitingStringTask(latch)); 1032 l.add(null); 1033 try { 1034 e.invokeAny(l); 1035 shouldThrow(); 1036 } catch (NullPointerException success) {} 1037 latch.countDown(); 1038 } 1039 } 1040 1041 /** 1042 * invokeAny(c) throws ExecutionException if no task completes 1043 */ testInvokeAny4()1044 public void testInvokeAny4() throws Exception { 1045 final ExecutorService e = new CustomExecutor(2); 1046 try (PoolCleaner cleaner = cleaner(e)) { 1047 List<Callable<String>> l = new ArrayList<>(); 1048 l.add(new NPETask()); 1049 try { 1050 e.invokeAny(l); 1051 shouldThrow(); 1052 } catch (ExecutionException success) { 1053 assertTrue(success.getCause() instanceof NullPointerException); 1054 } 1055 } 1056 } 1057 1058 /** 1059 * invokeAny(c) returns result of some task 1060 */ testInvokeAny5()1061 public void testInvokeAny5() throws Exception { 1062 final ExecutorService e = new CustomExecutor(2); 1063 try (PoolCleaner cleaner = cleaner(e)) { 1064 List<Callable<String>> l = new ArrayList<>(); 1065 l.add(new StringTask()); 1066 l.add(new StringTask()); 1067 String result = e.invokeAny(l); 1068 assertSame(TEST_STRING, result); 1069 } 1070 } 1071 1072 /** 1073 * invokeAll(null) throws NPE 1074 */ testInvokeAll1()1075 public void testInvokeAll1() throws Exception { 1076 final ExecutorService e = new CustomExecutor(2); 1077 try (PoolCleaner cleaner = cleaner(e)) { 1078 try { 1079 e.invokeAll(null); 1080 shouldThrow(); 1081 } catch (NullPointerException success) {} 1082 } 1083 } 1084 1085 /** 1086 * invokeAll(empty collection) returns empty list 1087 */ testInvokeAll2()1088 public void testInvokeAll2() throws Exception { 1089 final ExecutorService e = new CustomExecutor(2); 1090 final Collection<Callable<String>> emptyCollection 1091 = Collections.emptyList(); 1092 try (PoolCleaner cleaner = cleaner(e)) { 1093 List<Future<String>> r = e.invokeAll(emptyCollection); 1094 assertTrue(r.isEmpty()); 1095 } 1096 } 1097 1098 /** 1099 * invokeAll(c) throws NPE if c has null elements 1100 */ testInvokeAll3()1101 public void testInvokeAll3() throws Exception { 1102 final ExecutorService e = new CustomExecutor(2); 1103 try (PoolCleaner cleaner = cleaner(e)) { 1104 List<Callable<String>> l = new ArrayList<>(); 1105 l.add(new StringTask()); 1106 l.add(null); 1107 try { 1108 e.invokeAll(l); 1109 shouldThrow(); 1110 } catch (NullPointerException success) {} 1111 } 1112 } 1113 1114 /** 1115 * get of invokeAll(c) throws exception on failed task 1116 */ testInvokeAll4()1117 public void testInvokeAll4() throws Exception { 1118 final ExecutorService e = new CustomExecutor(2); 1119 try (PoolCleaner cleaner = cleaner(e)) { 1120 List<Callable<String>> l = new ArrayList<>(); 1121 l.add(new NPETask()); 1122 List<Future<String>> futures = e.invokeAll(l); 1123 assertEquals(1, futures.size()); 1124 try { 1125 futures.get(0).get(); 1126 shouldThrow(); 1127 } catch (ExecutionException success) { 1128 assertTrue(success.getCause() instanceof NullPointerException); 1129 } 1130 } 1131 } 1132 1133 /** 1134 * invokeAll(c) returns results of all completed tasks 1135 */ testInvokeAll5()1136 public void testInvokeAll5() throws Exception { 1137 final ExecutorService e = new CustomExecutor(2); 1138 try (PoolCleaner cleaner = cleaner(e)) { 1139 List<Callable<String>> l = new ArrayList<>(); 1140 l.add(new StringTask()); 1141 l.add(new StringTask()); 1142 List<Future<String>> futures = e.invokeAll(l); 1143 assertEquals(2, futures.size()); 1144 for (Future<String> future : futures) 1145 assertSame(TEST_STRING, future.get()); 1146 } 1147 } 1148 1149 /** 1150 * timed invokeAny(null) throws NPE 1151 */ testTimedInvokeAny1()1152 public void testTimedInvokeAny1() throws Exception { 1153 final ExecutorService e = new CustomExecutor(2); 1154 try (PoolCleaner cleaner = cleaner(e)) { 1155 try { 1156 e.invokeAny(null, randomTimeout(), randomTimeUnit()); 1157 shouldThrow(); 1158 } catch (NullPointerException success) {} 1159 } 1160 } 1161 1162 /** 1163 * timed invokeAny(,,null) throws NPE 1164 */ testTimedInvokeAnyNullTimeUnit()1165 public void testTimedInvokeAnyNullTimeUnit() throws Exception { 1166 final ExecutorService e = new CustomExecutor(2); 1167 try (PoolCleaner cleaner = cleaner(e)) { 1168 List<Callable<String>> l = new ArrayList<>(); 1169 l.add(new StringTask()); 1170 try { 1171 e.invokeAny(l, randomTimeout(), null); 1172 shouldThrow(); 1173 } catch (NullPointerException success) {} 1174 } 1175 } 1176 1177 /** 1178 * timed invokeAny(empty collection) throws IllegalArgumentException 1179 */ testTimedInvokeAny2()1180 public void testTimedInvokeAny2() throws Exception { 1181 final ExecutorService e = new CustomExecutor(2); 1182 final Collection<Callable<String>> emptyCollection 1183 = Collections.emptyList(); 1184 try (PoolCleaner cleaner = cleaner(e)) { 1185 try { 1186 e.invokeAny(emptyCollection, randomTimeout(), randomTimeUnit()); 1187 shouldThrow(); 1188 } catch (IllegalArgumentException success) {} 1189 } 1190 } 1191 1192 /** 1193 * timed invokeAny(c) throws NPE if c has null elements 1194 */ testTimedInvokeAny3()1195 public void testTimedInvokeAny3() throws Exception { 1196 CountDownLatch latch = new CountDownLatch(1); 1197 final ExecutorService e = new CustomExecutor(2); 1198 try (PoolCleaner cleaner = cleaner(e)) { 1199 List<Callable<String>> l = new ArrayList<>(); 1200 l.add(latchAwaitingStringTask(latch)); 1201 l.add(null); 1202 try { 1203 e.invokeAny(l, randomTimeout(), randomTimeUnit()); 1204 shouldThrow(); 1205 } catch (NullPointerException success) {} 1206 latch.countDown(); 1207 } 1208 } 1209 1210 /** 1211 * timed invokeAny(c) throws ExecutionException if no task completes 1212 */ testTimedInvokeAny4()1213 public void testTimedInvokeAny4() throws Exception { 1214 final ExecutorService e = new CustomExecutor(2); 1215 try (PoolCleaner cleaner = cleaner(e)) { 1216 long startTime = System.nanoTime(); 1217 List<Callable<String>> l = new ArrayList<>(); 1218 l.add(new NPETask()); 1219 try { 1220 e.invokeAny(l, LONG_DELAY_MS, MILLISECONDS); 1221 shouldThrow(); 1222 } catch (ExecutionException success) { 1223 assertTrue(success.getCause() instanceof NullPointerException); 1224 } 1225 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); 1226 } 1227 } 1228 1229 /** 1230 * timed invokeAny(c) returns result of some task 1231 */ 1232 public void testTimedInvokeAny5() throws Exception { 1233 final ExecutorService e = new CustomExecutor(2); 1234 try (PoolCleaner cleaner = cleaner(e)) { 1235 long startTime = System.nanoTime(); 1236 List<Callable<String>> l = new ArrayList<>(); 1237 l.add(new StringTask()); 1238 l.add(new StringTask()); 1239 String result = e.invokeAny(l, LONG_DELAY_MS, MILLISECONDS); 1240 assertSame(TEST_STRING, result); 1241 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); 1242 } 1243 } 1244 1245 /** 1246 * timed invokeAll(null) throws NullPointerException 1247 */ 1248 public void testTimedInvokeAll1() throws Exception { 1249 final ExecutorService e = new CustomExecutor(2); 1250 try (PoolCleaner cleaner = cleaner(e)) { 1251 try { 1252 e.invokeAll(null, randomTimeout(), randomTimeUnit()); 1253 shouldThrow(); 1254 } catch (NullPointerException success) {} 1255 } 1256 } 1257 1258 /** 1259 * timed invokeAll(,,null) throws NullPointerException 1260 */ 1261 public void testTimedInvokeAllNullTimeUnit() throws Exception { 1262 final ExecutorService e = new CustomExecutor(2); 1263 try (PoolCleaner cleaner = cleaner(e)) { 1264 List<Callable<String>> l = new ArrayList<>(); 1265 l.add(new StringTask()); 1266 try { 1267 e.invokeAll(l, randomTimeout(), null); 1268 shouldThrow(); 1269 } catch (NullPointerException success) {} 1270 } 1271 } 1272 1273 /** 1274 * timed invokeAll(empty collection) returns empty list 1275 */ 1276 public void testTimedInvokeAll2() throws Exception { 1277 final ExecutorService e = new CustomExecutor(2); 1278 final Collection<Callable<String>> emptyCollection 1279 = Collections.emptyList(); 1280 try (PoolCleaner cleaner = cleaner(e)) { 1281 List<Future<String>> r = 1282 e.invokeAll(emptyCollection, randomTimeout(), randomTimeUnit()); 1283 assertTrue(r.isEmpty()); 1284 } 1285 } 1286 1287 /** 1288 * timed invokeAll(c) throws NPE if c has null elements 1289 */ 1290 public void testTimedInvokeAll3() throws Exception { 1291 final ExecutorService e = new CustomExecutor(2); 1292 try (PoolCleaner cleaner = cleaner(e)) { 1293 List<Callable<String>> l = new ArrayList<>(); 1294 l.add(new StringTask()); 1295 l.add(null); 1296 try { 1297 e.invokeAll(l, randomTimeout(), randomTimeUnit()); 1298 shouldThrow(); 1299 } catch (NullPointerException success) {} 1300 } 1301 } 1302 1303 /** 1304 * get of element of invokeAll(c) throws exception on failed task 1305 */ 1306 public void testTimedInvokeAll4() throws Exception { 1307 final ExecutorService e = new CustomExecutor(2); 1308 final Collection<Callable<String>> c = new ArrayList<>(); 1309 c.add(new NPETask()); 1310 try (PoolCleaner cleaner = cleaner(e)) { 1311 List<Future<String>> futures = 1312 e.invokeAll(c, LONG_DELAY_MS, MILLISECONDS); 1313 assertEquals(1, futures.size()); 1314 try { 1315 futures.get(0).get(); 1316 shouldThrow(); 1317 } catch (ExecutionException success) { 1318 assertTrue(success.getCause() instanceof NullPointerException); 1319 } 1320 } 1321 } 1322 1323 /** 1324 * timed invokeAll(c) returns results of all completed tasks 1325 */ 1326 public void testTimedInvokeAll5() throws Exception { 1327 final ExecutorService e = new CustomExecutor(2); 1328 try (PoolCleaner cleaner = cleaner(e)) { 1329 List<Callable<String>> l = new ArrayList<>(); 1330 l.add(new StringTask()); 1331 l.add(new StringTask()); 1332 List<Future<String>> futures = 1333 e.invokeAll(l, LONG_DELAY_MS, MILLISECONDS); 1334 assertEquals(2, futures.size()); 1335 for (Future<String> future : futures) 1336 assertSame(TEST_STRING, future.get()); 1337 } 1338 } 1339 1340 /** 1341 * timed invokeAll(c) cancels tasks not completed by timeout 1342 */ 1343 public void testTimedInvokeAll6() throws Exception { 1344 for (long timeout = timeoutMillis();;) { 1345 final CountDownLatch done = new CountDownLatch(1); 1346 final Callable<String> waiter = new CheckedCallable<>() { 1347 public String realCall() { 1348 try { done.await(LONG_DELAY_MS, MILLISECONDS); } 1349 catch (InterruptedException ok) {} 1350 return "1"; }}; 1351 final ExecutorService p = new CustomExecutor(2); 1352 try (PoolCleaner cleaner = cleaner(p, done)) { 1353 List<Callable<String>> tasks = new ArrayList<>(); 1354 tasks.add(new StringTask("0")); 1355 tasks.add(waiter); 1356 tasks.add(new StringTask("2")); 1357 long startTime = System.nanoTime(); 1358 List<Future<String>> futures = 1359 p.invokeAll(tasks, timeout, MILLISECONDS); 1360 assertEquals(tasks.size(), futures.size()); 1361 assertTrue(millisElapsedSince(startTime) >= timeout); 1362 for (Future<?> future : futures) 1363 assertTrue(future.isDone()); 1364 assertTrue(futures.get(1).isCancelled()); 1365 try { 1366 assertEquals("0", futures.get(0).get()); 1367 assertEquals("2", futures.get(2).get()); 1368 break; 1369 } catch (CancellationException retryWithLongerTimeout) { 1370 timeout *= 2; 1371 if (timeout >= LONG_DELAY_MS / 2) 1372 fail("expected exactly one task to be cancelled"); 1373 } 1374 } 1375 } 1376 } 1377 1378 } 1379