1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. 7 * 8 * This code is distributed in the hope that it will be useful, but WITHOUT 9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 11 * version 2 for more details (a copy is included in the LICENSE file that 12 * accompanied this code). 13 * 14 * You should have received a copy of the GNU General Public License version 15 * 2 along with this work; if not, write to the Free Software Foundation, 16 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 17 * 18 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 19 * or visit www.oracle.com if you need additional information or have any 20 * questions. 21 */ 22 23 /* 24 * This file is available under and governed by the GNU General Public 25 * License version 2 only, as published by the Free Software Foundation. 26 * However, the following notice accompanied the original version of this 27 * file: 28 * 29 * Written by Doug Lea with assistance from members of JCP JSR-166 30 * Expert Group and released to the public domain, as explained at 31 * http://creativecommons.org/publicdomain/zero/1.0/ 32 */ 33 34 import static java.util.concurrent.TimeUnit.MILLISECONDS; 35 import static java.util.concurrent.TimeUnit.NANOSECONDS; 36 import static java.util.concurrent.TimeUnit.SECONDS; 37 38 import java.util.ArrayList; 39 import java.util.Collection; 40 import java.util.Collections; 41 import java.util.HashSet; 42 import java.util.List; 43 import java.util.concurrent.BlockingQueue; 44 import java.util.concurrent.Callable; 45 import java.util.concurrent.CancellationException; 46 import java.util.concurrent.CountDownLatch; 47 import java.util.concurrent.Delayed; 48 import java.util.concurrent.ExecutionException; 49 import java.util.concurrent.ExecutorService; 50 import java.util.concurrent.Future; 51 import java.util.concurrent.RejectedExecutionException; 52 import java.util.concurrent.RejectedExecutionHandler; 53 import java.util.concurrent.RunnableScheduledFuture; 54 import java.util.concurrent.ScheduledFuture; 55 import java.util.concurrent.ScheduledThreadPoolExecutor; 56 import java.util.concurrent.ThreadFactory; 57 import java.util.concurrent.ThreadLocalRandom; 58 import java.util.concurrent.ThreadPoolExecutor; 59 import java.util.concurrent.TimeoutException; 60 import java.util.concurrent.TimeUnit; 61 import java.util.concurrent.atomic.AtomicBoolean; 62 import java.util.concurrent.atomic.AtomicInteger; 63 import java.util.concurrent.atomic.AtomicLong; 64 import java.util.stream.Stream; 65 66 import junit.framework.Test; 67 import junit.framework.TestSuite; 68 69 public class ScheduledExecutorSubclassTest extends JSR166TestCase { main(String[] args)70 public static void main(String[] args) { 71 main(suite(), args); 72 } suite()73 public static Test suite() { 74 return new TestSuite(ScheduledExecutorSubclassTest.class); 75 } 76 77 static class CustomTask<V> implements RunnableScheduledFuture<V> { 78 private final RunnableScheduledFuture<V> task; 79 volatile boolean ran; CustomTask(RunnableScheduledFuture<V> task)80 CustomTask(RunnableScheduledFuture<V> task) { this.task = task; } isPeriodic()81 public boolean isPeriodic() { return task.isPeriodic(); } run()82 public void run() { 83 ran = true; 84 task.run(); 85 } getDelay(TimeUnit unit)86 public long getDelay(TimeUnit unit) { return task.getDelay(unit); } compareTo(Delayed t)87 public int compareTo(Delayed t) { 88 return task.compareTo(((CustomTask)t).task); 89 } cancel(boolean mayInterruptIfRunning)90 public boolean cancel(boolean mayInterruptIfRunning) { 91 return task.cancel(mayInterruptIfRunning); 92 } isCancelled()93 public boolean isCancelled() { return task.isCancelled(); } isDone()94 public boolean isDone() { return task.isDone(); } get()95 public V get() throws InterruptedException, ExecutionException { 96 V v = task.get(); 97 assertTrue(ran); 98 return v; 99 } get(long time, TimeUnit unit)100 public V get(long time, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { 101 V v = task.get(time, unit); 102 assertTrue(ran); 103 return v; 104 } 105 } 106 107 public class CustomExecutor extends ScheduledThreadPoolExecutor { 108 decorateTask(Runnable r, RunnableScheduledFuture<V> task)109 protected <V> RunnableScheduledFuture<V> decorateTask(Runnable r, RunnableScheduledFuture<V> task) { 110 return new CustomTask<V>(task); 111 } 112 decorateTask(Callable<V> c, RunnableScheduledFuture<V> task)113 protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> c, RunnableScheduledFuture<V> task) { 114 return new CustomTask<V>(task); 115 } CustomExecutor(int corePoolSize)116 CustomExecutor(int corePoolSize) { super(corePoolSize); } CustomExecutor(int corePoolSize, RejectedExecutionHandler handler)117 CustomExecutor(int corePoolSize, RejectedExecutionHandler handler) { 118 super(corePoolSize, handler); 119 } 120 CustomExecutor(int corePoolSize, ThreadFactory threadFactory)121 CustomExecutor(int corePoolSize, ThreadFactory threadFactory) { 122 super(corePoolSize, threadFactory); 123 } CustomExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler)124 CustomExecutor(int corePoolSize, ThreadFactory threadFactory, 125 RejectedExecutionHandler handler) { 126 super(corePoolSize, threadFactory, handler); 127 } 128 129 } 130 131 /** 132 * execute successfully executes a runnable 133 */ testExecute()134 public void testExecute() throws InterruptedException { 135 final CustomExecutor p = new CustomExecutor(1); 136 try (PoolCleaner cleaner = cleaner(p)) { 137 final CountDownLatch done = new CountDownLatch(1); 138 final Runnable task = new CheckedRunnable() { 139 public void realRun() { done.countDown(); }}; 140 p.execute(task); 141 await(done); 142 } 143 } 144 145 /** 146 * delayed schedule of callable successfully executes after delay 147 */ testSchedule1()148 public void testSchedule1() throws Exception { 149 final CountDownLatch done = new CountDownLatch(1); 150 final CustomExecutor p = new CustomExecutor(1); 151 try (PoolCleaner cleaner = cleaner(p, done)) { 152 final long startTime = System.nanoTime(); 153 Callable task = new CheckedCallable<Boolean>() { 154 public Boolean realCall() { 155 done.countDown(); 156 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 157 return Boolean.TRUE; 158 }}; 159 Future f = p.schedule(task, timeoutMillis(), MILLISECONDS); 160 assertSame(Boolean.TRUE, f.get()); 161 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 162 } 163 } 164 165 /** 166 * delayed schedule of runnable successfully executes after delay 167 */ testSchedule3()168 public void testSchedule3() throws Exception { 169 final CustomExecutor p = new CustomExecutor(1); 170 try (PoolCleaner cleaner = cleaner(p)) { 171 final long startTime = System.nanoTime(); 172 final CountDownLatch done = new CountDownLatch(1); 173 Runnable task = new CheckedRunnable() { 174 public void realRun() { 175 done.countDown(); 176 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 177 }}; 178 Future f = p.schedule(task, timeoutMillis(), MILLISECONDS); 179 await(done); 180 assertNull(f.get(LONG_DELAY_MS, MILLISECONDS)); 181 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 182 } 183 } 184 185 /** 186 * scheduleAtFixedRate executes runnable after given initial delay 187 */ testSchedule4()188 public void testSchedule4() throws InterruptedException { 189 final CustomExecutor p = new CustomExecutor(1); 190 try (PoolCleaner cleaner = cleaner(p)) { 191 final long startTime = System.nanoTime(); 192 final CountDownLatch done = new CountDownLatch(1); 193 Runnable task = new CheckedRunnable() { 194 public void realRun() { 195 done.countDown(); 196 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 197 }}; 198 ScheduledFuture f = 199 p.scheduleAtFixedRate(task, timeoutMillis(), 200 LONG_DELAY_MS, MILLISECONDS); 201 await(done); 202 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 203 f.cancel(true); 204 } 205 } 206 207 /** 208 * scheduleWithFixedDelay executes runnable after given initial delay 209 */ testSchedule5()210 public void testSchedule5() throws InterruptedException { 211 final CustomExecutor p = new CustomExecutor(1); 212 try (PoolCleaner cleaner = cleaner(p)) { 213 final long startTime = System.nanoTime(); 214 final CountDownLatch done = new CountDownLatch(1); 215 Runnable task = new CheckedRunnable() { 216 public void realRun() { 217 done.countDown(); 218 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 219 }}; 220 ScheduledFuture f = 221 p.scheduleWithFixedDelay(task, timeoutMillis(), 222 LONG_DELAY_MS, MILLISECONDS); 223 await(done); 224 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 225 f.cancel(true); 226 } 227 } 228 229 static class RunnableCounter implements Runnable { 230 AtomicInteger count = new AtomicInteger(0); run()231 public void run() { count.getAndIncrement(); } 232 } 233 234 /** 235 * scheduleAtFixedRate executes series of tasks at given rate. 236 * Eventually, it must hold that: 237 * cycles - 1 <= elapsedMillis/delay < cycles 238 */ testFixedRateSequence()239 public void testFixedRateSequence() throws InterruptedException { 240 final CustomExecutor p = new CustomExecutor(1); 241 try (PoolCleaner cleaner = cleaner(p)) { 242 for (int delay = 1; delay <= LONG_DELAY_MS; delay *= 3) { 243 final long startTime = System.nanoTime(); 244 final int cycles = 8; 245 final CountDownLatch done = new CountDownLatch(cycles); 246 final Runnable task = new CheckedRunnable() { 247 public void realRun() { done.countDown(); }}; 248 final ScheduledFuture periodicTask = 249 p.scheduleAtFixedRate(task, 0, delay, MILLISECONDS); 250 final int totalDelayMillis = (cycles - 1) * delay; 251 await(done, totalDelayMillis + LONG_DELAY_MS); 252 periodicTask.cancel(true); 253 final long elapsedMillis = millisElapsedSince(startTime); 254 assertTrue(elapsedMillis >= totalDelayMillis); 255 if (elapsedMillis <= cycles * delay) 256 return; 257 // else retry with longer delay 258 } 259 fail("unexpected execution rate"); 260 } 261 } 262 263 /** 264 * scheduleWithFixedDelay executes series of tasks with given period. 265 * Eventually, it must hold that each task starts at least delay and at 266 * most 2 * delay after the termination of the previous task. 267 */ testFixedDelaySequence()268 public void testFixedDelaySequence() throws InterruptedException { 269 final CustomExecutor p = new CustomExecutor(1); 270 try (PoolCleaner cleaner = cleaner(p)) { 271 for (int delay = 1; delay <= LONG_DELAY_MS; delay *= 3) { 272 final long startTime = System.nanoTime(); 273 final AtomicLong previous = new AtomicLong(startTime); 274 final AtomicBoolean tryLongerDelay = new AtomicBoolean(false); 275 final int cycles = 8; 276 final CountDownLatch done = new CountDownLatch(cycles); 277 final int d = delay; 278 final Runnable task = new CheckedRunnable() { 279 public void realRun() { 280 long now = System.nanoTime(); 281 long elapsedMillis 282 = NANOSECONDS.toMillis(now - previous.get()); 283 if (done.getCount() == cycles) { // first execution 284 if (elapsedMillis >= d) 285 tryLongerDelay.set(true); 286 } else { 287 assertTrue(elapsedMillis >= d); 288 if (elapsedMillis >= 2 * d) 289 tryLongerDelay.set(true); 290 } 291 previous.set(now); 292 done.countDown(); 293 }}; 294 final ScheduledFuture periodicTask = 295 p.scheduleWithFixedDelay(task, 0, delay, MILLISECONDS); 296 final int totalDelayMillis = (cycles - 1) * delay; 297 await(done, totalDelayMillis + cycles * LONG_DELAY_MS); 298 periodicTask.cancel(true); 299 final long elapsedMillis = millisElapsedSince(startTime); 300 assertTrue(elapsedMillis >= totalDelayMillis); 301 if (!tryLongerDelay.get()) 302 return; 303 // else retry with longer delay 304 } 305 fail("unexpected execution rate"); 306 } 307 } 308 309 /** 310 * Submitting null tasks throws NullPointerException 311 */ testNullTaskSubmission()312 public void testNullTaskSubmission() { 313 final CustomExecutor p = new CustomExecutor(1); 314 try (PoolCleaner cleaner = cleaner(p)) { 315 assertNullTaskSubmissionThrowsNullPointerException(p); 316 } 317 } 318 319 /** 320 * Submitted tasks are rejected when shutdown 321 */ testSubmittedTasksRejectedWhenShutdown()322 public void testSubmittedTasksRejectedWhenShutdown() throws InterruptedException { 323 final CustomExecutor p = new CustomExecutor(2); 324 final ThreadLocalRandom rnd = ThreadLocalRandom.current(); 325 final CountDownLatch threadsStarted = new CountDownLatch(p.getCorePoolSize()); 326 final CountDownLatch done = new CountDownLatch(1); 327 final Runnable r = () -> { 328 threadsStarted.countDown(); 329 for (;;) { 330 try { 331 done.await(); 332 return; 333 } catch (InterruptedException shutdownNowDeliberatelyIgnored) {} 334 }}; 335 final Callable<Boolean> c = () -> { 336 threadsStarted.countDown(); 337 for (;;) { 338 try { 339 done.await(); 340 return Boolean.TRUE; 341 } catch (InterruptedException shutdownNowDeliberatelyIgnored) {} 342 }}; 343 344 try (PoolCleaner cleaner = cleaner(p, done)) { 345 for (int i = p.getCorePoolSize(); i--> 0; ) { 346 switch (rnd.nextInt(4)) { 347 case 0: p.execute(r); break; 348 case 1: assertFalse(p.submit(r).isDone()); break; 349 case 2: assertFalse(p.submit(r, Boolean.TRUE).isDone()); break; 350 case 3: assertFalse(p.submit(c).isDone()); break; 351 } 352 } 353 354 // ScheduledThreadPoolExecutor has an unbounded queue, so never saturated. 355 await(threadsStarted); 356 357 if (rnd.nextBoolean()) 358 p.shutdownNow(); 359 else 360 p.shutdown(); 361 // Pool is shutdown, but not yet terminated 362 assertTaskSubmissionsAreRejected(p); 363 assertFalse(p.isTerminated()); 364 365 done.countDown(); // release blocking tasks 366 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS)); 367 368 assertTaskSubmissionsAreRejected(p); 369 } 370 assertEquals(p.getCorePoolSize(), p.getCompletedTaskCount()); 371 } 372 373 /** 374 * getActiveCount increases but doesn't overestimate, when a 375 * thread becomes active 376 */ testGetActiveCount()377 public void testGetActiveCount() throws InterruptedException { 378 final CountDownLatch done = new CountDownLatch(1); 379 final ThreadPoolExecutor p = new CustomExecutor(2); 380 try (PoolCleaner cleaner = cleaner(p, done)) { 381 final CountDownLatch threadStarted = new CountDownLatch(1); 382 assertEquals(0, p.getActiveCount()); 383 p.execute(new CheckedRunnable() { 384 public void realRun() throws InterruptedException { 385 threadStarted.countDown(); 386 assertEquals(1, p.getActiveCount()); 387 await(done); 388 }}); 389 await(threadStarted); 390 assertEquals(1, p.getActiveCount()); 391 } 392 } 393 394 /** 395 * getCompletedTaskCount increases, but doesn't overestimate, 396 * when tasks complete 397 */ testGetCompletedTaskCount()398 public void testGetCompletedTaskCount() throws InterruptedException { 399 final ThreadPoolExecutor p = new CustomExecutor(2); 400 try (PoolCleaner cleaner = cleaner(p)) { 401 final CountDownLatch threadStarted = new CountDownLatch(1); 402 final CountDownLatch threadProceed = new CountDownLatch(1); 403 final CountDownLatch threadDone = new CountDownLatch(1); 404 assertEquals(0, p.getCompletedTaskCount()); 405 p.execute(new CheckedRunnable() { 406 public void realRun() throws InterruptedException { 407 threadStarted.countDown(); 408 assertEquals(0, p.getCompletedTaskCount()); 409 await(threadProceed); 410 threadDone.countDown(); 411 }}); 412 await(threadStarted); 413 assertEquals(0, p.getCompletedTaskCount()); 414 threadProceed.countDown(); 415 await(threadDone); 416 long startTime = System.nanoTime(); 417 while (p.getCompletedTaskCount() != 1) { 418 if (millisElapsedSince(startTime) > LONG_DELAY_MS) 419 fail("timed out"); 420 Thread.yield(); 421 } 422 } 423 } 424 425 /** 426 * getCorePoolSize returns size given in constructor if not otherwise set 427 */ testGetCorePoolSize()428 public void testGetCorePoolSize() { 429 final CustomExecutor p = new CustomExecutor(1); 430 try (PoolCleaner cleaner = cleaner(p)) { 431 assertEquals(1, p.getCorePoolSize()); 432 } 433 } 434 435 /** 436 * getLargestPoolSize increases, but doesn't overestimate, when 437 * multiple threads active 438 */ testGetLargestPoolSize()439 public void testGetLargestPoolSize() throws InterruptedException { 440 final int THREADS = 3; 441 final CountDownLatch done = new CountDownLatch(1); 442 final ThreadPoolExecutor p = new CustomExecutor(THREADS); 443 try (PoolCleaner cleaner = cleaner(p, done)) { 444 final CountDownLatch threadsStarted = new CountDownLatch(THREADS); 445 assertEquals(0, p.getLargestPoolSize()); 446 for (int i = 0; i < THREADS; i++) 447 p.execute(new CheckedRunnable() { 448 public void realRun() throws InterruptedException { 449 threadsStarted.countDown(); 450 await(done); 451 assertEquals(THREADS, p.getLargestPoolSize()); 452 }}); 453 await(threadsStarted); 454 assertEquals(THREADS, p.getLargestPoolSize()); 455 } 456 assertEquals(THREADS, p.getLargestPoolSize()); 457 } 458 459 /** 460 * getPoolSize increases, but doesn't overestimate, when threads 461 * become active 462 */ testGetPoolSize()463 public void testGetPoolSize() throws InterruptedException { 464 final CountDownLatch done = new CountDownLatch(1); 465 final ThreadPoolExecutor p = new CustomExecutor(1); 466 try (PoolCleaner cleaner = cleaner(p, done)) { 467 final CountDownLatch threadStarted = new CountDownLatch(1); 468 assertEquals(0, p.getPoolSize()); 469 p.execute(new CheckedRunnable() { 470 public void realRun() throws InterruptedException { 471 threadStarted.countDown(); 472 assertEquals(1, p.getPoolSize()); 473 await(done); 474 }}); 475 await(threadStarted); 476 assertEquals(1, p.getPoolSize()); 477 } 478 } 479 480 /** 481 * getTaskCount increases, but doesn't overestimate, when tasks 482 * submitted 483 */ testGetTaskCount()484 public void testGetTaskCount() throws InterruptedException { 485 final int TASKS = 3; 486 final CountDownLatch done = new CountDownLatch(1); 487 final ThreadPoolExecutor p = new CustomExecutor(1); 488 try (PoolCleaner cleaner = cleaner(p, done)) { 489 final CountDownLatch threadStarted = new CountDownLatch(1); 490 assertEquals(0, p.getTaskCount()); 491 assertEquals(0, p.getCompletedTaskCount()); 492 p.execute(new CheckedRunnable() { 493 public void realRun() throws InterruptedException { 494 threadStarted.countDown(); 495 await(done); 496 }}); 497 await(threadStarted); 498 assertEquals(1, p.getTaskCount()); 499 assertEquals(0, p.getCompletedTaskCount()); 500 for (int i = 0; i < TASKS; i++) { 501 assertEquals(1 + i, p.getTaskCount()); 502 p.execute(new CheckedRunnable() { 503 public void realRun() throws InterruptedException { 504 threadStarted.countDown(); 505 assertEquals(1 + TASKS, p.getTaskCount()); 506 await(done); 507 }}); 508 } 509 assertEquals(1 + TASKS, p.getTaskCount()); 510 assertEquals(0, p.getCompletedTaskCount()); 511 } 512 assertEquals(1 + TASKS, p.getTaskCount()); 513 assertEquals(1 + TASKS, p.getCompletedTaskCount()); 514 } 515 516 /** 517 * getThreadFactory returns factory in constructor if not set 518 */ testGetThreadFactory()519 public void testGetThreadFactory() { 520 final ThreadFactory threadFactory = new SimpleThreadFactory(); 521 final CustomExecutor p = new CustomExecutor(1, threadFactory); 522 try (PoolCleaner cleaner = cleaner(p)) { 523 assertSame(threadFactory, p.getThreadFactory()); 524 } 525 } 526 527 /** 528 * setThreadFactory sets the thread factory returned by getThreadFactory 529 */ testSetThreadFactory()530 public void testSetThreadFactory() { 531 final ThreadFactory threadFactory = new SimpleThreadFactory(); 532 final CustomExecutor p = new CustomExecutor(1); 533 try (PoolCleaner cleaner = cleaner(p)) { 534 p.setThreadFactory(threadFactory); 535 assertSame(threadFactory, p.getThreadFactory()); 536 } 537 } 538 539 /** 540 * setThreadFactory(null) throws NPE 541 */ testSetThreadFactoryNull()542 public void testSetThreadFactoryNull() { 543 final CustomExecutor p = new CustomExecutor(1); 544 try (PoolCleaner cleaner = cleaner(p)) { 545 try { 546 p.setThreadFactory(null); 547 shouldThrow(); 548 } catch (NullPointerException success) {} 549 } 550 } 551 552 /** 553 * isShutdown is false before shutdown, true after 554 */ testIsShutdown()555 public void testIsShutdown() { 556 final CustomExecutor p = new CustomExecutor(1); 557 try (PoolCleaner cleaner = cleaner(p)) { 558 assertFalse(p.isShutdown()); 559 try { p.shutdown(); } catch (SecurityException ok) { return; } 560 assertTrue(p.isShutdown()); 561 } 562 } 563 564 /** 565 * isTerminated is false before termination, true after 566 */ testIsTerminated()567 public void testIsTerminated() throws InterruptedException { 568 final CountDownLatch done = new CountDownLatch(1); 569 final ThreadPoolExecutor p = new CustomExecutor(1); 570 try (PoolCleaner cleaner = cleaner(p)) { 571 final CountDownLatch threadStarted = new CountDownLatch(1); 572 p.execute(new CheckedRunnable() { 573 public void realRun() throws InterruptedException { 574 assertFalse(p.isTerminated()); 575 threadStarted.countDown(); 576 await(done); 577 }}); 578 await(threadStarted); 579 assertFalse(p.isTerminated()); 580 assertFalse(p.isTerminating()); 581 done.countDown(); 582 try { p.shutdown(); } catch (SecurityException ok) { return; } 583 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS)); 584 assertTrue(p.isTerminated()); 585 } 586 } 587 588 /** 589 * isTerminating is not true when running or when terminated 590 */ testIsTerminating()591 public void testIsTerminating() throws InterruptedException { 592 final CountDownLatch done = new CountDownLatch(1); 593 final ThreadPoolExecutor p = new CustomExecutor(1); 594 try (PoolCleaner cleaner = cleaner(p)) { 595 final CountDownLatch threadStarted = new CountDownLatch(1); 596 assertFalse(p.isTerminating()); 597 p.execute(new CheckedRunnable() { 598 public void realRun() throws InterruptedException { 599 assertFalse(p.isTerminating()); 600 threadStarted.countDown(); 601 await(done); 602 }}); 603 await(threadStarted); 604 assertFalse(p.isTerminating()); 605 done.countDown(); 606 try { p.shutdown(); } catch (SecurityException ok) { return; } 607 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS)); 608 assertTrue(p.isTerminated()); 609 assertFalse(p.isTerminating()); 610 } 611 } 612 613 /** 614 * getQueue returns the work queue, which contains queued tasks 615 */ testGetQueue()616 public void testGetQueue() throws InterruptedException { 617 final CountDownLatch done = new CountDownLatch(1); 618 final ScheduledThreadPoolExecutor p = new CustomExecutor(1); 619 try (PoolCleaner cleaner = cleaner(p, done)) { 620 final CountDownLatch threadStarted = new CountDownLatch(1); 621 ScheduledFuture[] tasks = new ScheduledFuture[5]; 622 for (int i = 0; i < tasks.length; i++) { 623 Runnable r = new CheckedRunnable() { 624 public void realRun() throws InterruptedException { 625 threadStarted.countDown(); 626 await(done); 627 }}; 628 tasks[i] = p.schedule(r, 1, MILLISECONDS); 629 } 630 await(threadStarted); 631 BlockingQueue<Runnable> q = p.getQueue(); 632 assertTrue(q.contains(tasks[tasks.length - 1])); 633 assertFalse(q.contains(tasks[0])); 634 } 635 } 636 637 /** 638 * remove(task) removes queued task, and fails to remove active task 639 */ testRemove()640 public void testRemove() throws InterruptedException { 641 final CountDownLatch done = new CountDownLatch(1); 642 final ScheduledThreadPoolExecutor p = new CustomExecutor(1); 643 try (PoolCleaner cleaner = cleaner(p, done)) { 644 ScheduledFuture[] tasks = new ScheduledFuture[5]; 645 final CountDownLatch threadStarted = new CountDownLatch(1); 646 for (int i = 0; i < tasks.length; i++) { 647 Runnable r = new CheckedRunnable() { 648 public void realRun() throws InterruptedException { 649 threadStarted.countDown(); 650 await(done); 651 }}; 652 tasks[i] = p.schedule(r, 1, MILLISECONDS); 653 } 654 await(threadStarted); 655 BlockingQueue<Runnable> q = p.getQueue(); 656 assertFalse(p.remove((Runnable)tasks[0])); 657 assertTrue(q.contains((Runnable)tasks[4])); 658 assertTrue(q.contains((Runnable)tasks[3])); 659 assertTrue(p.remove((Runnable)tasks[4])); 660 assertFalse(p.remove((Runnable)tasks[4])); 661 assertFalse(q.contains((Runnable)tasks[4])); 662 assertTrue(q.contains((Runnable)tasks[3])); 663 assertTrue(p.remove((Runnable)tasks[3])); 664 assertFalse(q.contains((Runnable)tasks[3])); 665 } 666 } 667 668 /** 669 * purge removes cancelled tasks from the queue 670 */ testPurge()671 public void testPurge() throws InterruptedException { 672 final ScheduledFuture[] tasks = new ScheduledFuture[5]; 673 final Runnable releaser = new Runnable() { public void run() { 674 for (ScheduledFuture task : tasks) 675 if (task != null) task.cancel(true); }}; 676 final CustomExecutor p = new CustomExecutor(1); 677 try (PoolCleaner cleaner = cleaner(p, releaser)) { 678 for (int i = 0; i < tasks.length; i++) 679 tasks[i] = p.schedule(possiblyInterruptedRunnable(SMALL_DELAY_MS), 680 LONG_DELAY_MS, MILLISECONDS); 681 int max = tasks.length; 682 if (tasks[4].cancel(true)) --max; 683 if (tasks[3].cancel(true)) --max; 684 // There must eventually be an interference-free point at 685 // which purge will not fail. (At worst, when queue is empty.) 686 long startTime = System.nanoTime(); 687 do { 688 p.purge(); 689 long count = p.getTaskCount(); 690 if (count == max) 691 return; 692 } while (millisElapsedSince(startTime) < LONG_DELAY_MS); 693 fail("Purge failed to remove cancelled tasks"); 694 } 695 } 696 697 /** 698 * shutdownNow returns a list containing tasks that were not run, 699 * and those tasks are drained from the queue 700 */ testShutdownNow()701 public void testShutdownNow() throws InterruptedException { 702 final int poolSize = 2; 703 final int count = 5; 704 final AtomicInteger ran = new AtomicInteger(0); 705 final CustomExecutor p = new CustomExecutor(poolSize); 706 final CountDownLatch threadsStarted = new CountDownLatch(poolSize); 707 Runnable waiter = new CheckedRunnable() { public void realRun() { 708 threadsStarted.countDown(); 709 try { 710 MILLISECONDS.sleep(LONGER_DELAY_MS); 711 } catch (InterruptedException success) {} 712 ran.getAndIncrement(); 713 }}; 714 for (int i = 0; i < count; i++) 715 p.execute(waiter); 716 await(threadsStarted); 717 assertEquals(poolSize, p.getActiveCount()); 718 assertEquals(0, p.getCompletedTaskCount()); 719 final List<Runnable> queuedTasks; 720 try { 721 queuedTasks = p.shutdownNow(); 722 } catch (SecurityException ok) { 723 return; // Allowed in case test doesn't have privs 724 } 725 assertTrue(p.isShutdown()); 726 assertTrue(p.getQueue().isEmpty()); 727 assertEquals(count - poolSize, queuedTasks.size()); 728 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS)); 729 assertTrue(p.isTerminated()); 730 assertEquals(poolSize, ran.get()); 731 assertEquals(poolSize, p.getCompletedTaskCount()); 732 } 733 734 /** 735 * shutdownNow returns a list containing tasks that were not run, 736 * and those tasks are drained from the queue 737 */ testShutdownNow_delayedTasks()738 public void testShutdownNow_delayedTasks() throws InterruptedException { 739 final CustomExecutor p = new CustomExecutor(1); 740 List<ScheduledFuture> tasks = new ArrayList<>(); 741 for (int i = 0; i < 3; i++) { 742 Runnable r = new NoOpRunnable(); 743 tasks.add(p.schedule(r, 9, SECONDS)); 744 tasks.add(p.scheduleAtFixedRate(r, 9, 9, SECONDS)); 745 tasks.add(p.scheduleWithFixedDelay(r, 9, 9, SECONDS)); 746 } 747 if (testImplementationDetails) 748 assertEquals(new HashSet(tasks), new HashSet(p.getQueue())); 749 final List<Runnable> queuedTasks; 750 try { 751 queuedTasks = p.shutdownNow(); 752 } catch (SecurityException ok) { 753 return; // Allowed in case test doesn't have privs 754 } 755 assertTrue(p.isShutdown()); 756 assertTrue(p.getQueue().isEmpty()); 757 if (testImplementationDetails) 758 assertEquals(new HashSet(tasks), new HashSet(queuedTasks)); 759 assertEquals(tasks.size(), queuedTasks.size()); 760 for (ScheduledFuture task : tasks) { 761 assertFalse(((CustomTask)task).ran); 762 assertFalse(task.isDone()); 763 assertFalse(task.isCancelled()); 764 } 765 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS)); 766 assertTrue(p.isTerminated()); 767 } 768 769 /** 770 * By default, periodic tasks are cancelled at shutdown. 771 * By default, delayed tasks keep running after shutdown. 772 * Check that changing the default values work: 773 * - setExecuteExistingDelayedTasksAfterShutdownPolicy 774 * - setContinueExistingPeriodicTasksAfterShutdownPolicy 775 */ 776 @SuppressWarnings("FutureReturnValueIgnored") testShutdown_cancellation()777 public void testShutdown_cancellation() throws Exception { 778 final int poolSize = 4; 779 final CustomExecutor p = new CustomExecutor(poolSize); 780 final BlockingQueue<Runnable> q = p.getQueue(); 781 final ThreadLocalRandom rnd = ThreadLocalRandom.current(); 782 final long delay = rnd.nextInt(2); 783 final int rounds = rnd.nextInt(1, 3); 784 final boolean effectiveDelayedPolicy; 785 final boolean effectivePeriodicPolicy; 786 final boolean effectiveRemovePolicy; 787 788 if (rnd.nextBoolean()) 789 p.setExecuteExistingDelayedTasksAfterShutdownPolicy( 790 effectiveDelayedPolicy = rnd.nextBoolean()); 791 else 792 effectiveDelayedPolicy = true; 793 assertEquals(effectiveDelayedPolicy, 794 p.getExecuteExistingDelayedTasksAfterShutdownPolicy()); 795 796 if (rnd.nextBoolean()) 797 p.setContinueExistingPeriodicTasksAfterShutdownPolicy( 798 effectivePeriodicPolicy = rnd.nextBoolean()); 799 else 800 effectivePeriodicPolicy = false; 801 assertEquals(effectivePeriodicPolicy, 802 p.getContinueExistingPeriodicTasksAfterShutdownPolicy()); 803 804 if (rnd.nextBoolean()) 805 p.setRemoveOnCancelPolicy( 806 effectiveRemovePolicy = rnd.nextBoolean()); 807 else 808 effectiveRemovePolicy = false; 809 assertEquals(effectiveRemovePolicy, 810 p.getRemoveOnCancelPolicy()); 811 812 final boolean periodicTasksContinue = effectivePeriodicPolicy && rnd.nextBoolean(); 813 814 // Strategy: Wedge the pool with one wave of "blocker" tasks, 815 // then add a second wave that waits in the queue until unblocked. 816 final AtomicInteger ran = new AtomicInteger(0); 817 final CountDownLatch poolBlocked = new CountDownLatch(poolSize); 818 final CountDownLatch unblock = new CountDownLatch(1); 819 final RuntimeException exception = new RuntimeException(); 820 821 class Task implements Runnable { 822 public void run() { 823 try { 824 ran.getAndIncrement(); 825 poolBlocked.countDown(); 826 await(unblock); 827 } catch (Throwable fail) { threadUnexpectedException(fail); } 828 } 829 } 830 831 class PeriodicTask extends Task { 832 PeriodicTask(int rounds) { this.rounds = rounds; } 833 int rounds; 834 public void run() { 835 if (--rounds == 0) super.run(); 836 // throw exception to surely terminate this periodic task, 837 // but in a separate execution and in a detectable way. 838 if (rounds == -1) throw exception; 839 } 840 } 841 842 Runnable task = new Task(); 843 844 List<Future<?>> immediates = new ArrayList<>(); 845 List<Future<?>> delayeds = new ArrayList<>(); 846 List<Future<?>> periodics = new ArrayList<>(); 847 848 immediates.add(p.submit(task)); 849 delayeds.add(p.schedule(task, delay, MILLISECONDS)); 850 periodics.add(p.scheduleAtFixedRate( 851 new PeriodicTask(rounds), delay, 1, MILLISECONDS)); 852 periodics.add(p.scheduleWithFixedDelay( 853 new PeriodicTask(rounds), delay, 1, MILLISECONDS)); 854 855 await(poolBlocked); 856 857 assertEquals(poolSize, ran.get()); 858 assertEquals(poolSize, p.getActiveCount()); 859 assertTrue(q.isEmpty()); 860 861 // Add second wave of tasks. 862 immediates.add(p.submit(task)); 863 delayeds.add(p.schedule(task, effectiveDelayedPolicy ? delay : LONG_DELAY_MS, MILLISECONDS)); 864 periodics.add(p.scheduleAtFixedRate( 865 new PeriodicTask(rounds), delay, 1, MILLISECONDS)); 866 periodics.add(p.scheduleWithFixedDelay( 867 new PeriodicTask(rounds), delay, 1, MILLISECONDS)); 868 869 assertEquals(poolSize, q.size()); 870 assertEquals(poolSize, ran.get()); 871 872 immediates.forEach( 873 f -> assertTrue(((ScheduledFuture)f).getDelay(NANOSECONDS) <= 0L)); 874 875 Stream.of(immediates, delayeds, periodics).flatMap(Collection::stream) 876 .forEach(f -> assertFalse(f.isDone())); 877 878 try { p.shutdown(); } catch (SecurityException ok) { return; } 879 assertTrue(p.isShutdown()); 880 assertTrue(p.isTerminating()); 881 assertFalse(p.isTerminated()); 882 883 if (rnd.nextBoolean()) 884 assertThrows( 885 RejectedExecutionException.class, 886 () -> p.submit(task), 887 () -> p.schedule(task, 1, SECONDS), 888 () -> p.scheduleAtFixedRate( 889 new PeriodicTask(1), 1, 1, SECONDS), 890 () -> p.scheduleWithFixedDelay( 891 new PeriodicTask(2), 1, 1, SECONDS)); 892 893 assertTrue(q.contains(immediates.get(1))); 894 assertTrue(!effectiveDelayedPolicy 895 ^ q.contains(delayeds.get(1))); 896 assertTrue(!effectivePeriodicPolicy 897 ^ q.containsAll(periodics.subList(2, 4))); 898 899 immediates.forEach(f -> assertFalse(f.isDone())); 900 901 assertFalse(delayeds.get(0).isDone()); 902 if (effectiveDelayedPolicy) 903 assertFalse(delayeds.get(1).isDone()); 904 else 905 assertTrue(delayeds.get(1).isCancelled()); 906 907 if (effectivePeriodicPolicy) 908 periodics.forEach( 909 f -> { 910 assertFalse(f.isDone()); 911 if (!periodicTasksContinue) { 912 assertTrue(f.cancel(false)); 913 assertTrue(f.isCancelled()); 914 } 915 }); 916 else { 917 periodics.subList(0, 2).forEach(f -> assertFalse(f.isDone())); 918 periodics.subList(2, 4).forEach(f -> assertTrue(f.isCancelled())); 919 } 920 921 unblock.countDown(); // Release all pool threads 922 923 assertTrue(p.awaitTermination(LONG_DELAY_MS, MILLISECONDS)); 924 assertFalse(p.isTerminating()); 925 assertTrue(p.isTerminated()); 926 927 assertTrue(q.isEmpty()); 928 929 Stream.of(immediates, delayeds, periodics).flatMap(Collection::stream) 930 .forEach(f -> assertTrue(f.isDone())); 931 932 for (Future<?> f : immediates) assertNull(f.get()); 933 934 assertNull(delayeds.get(0).get()); 935 if (effectiveDelayedPolicy) 936 assertNull(delayeds.get(1).get()); 937 else 938 assertTrue(delayeds.get(1).isCancelled()); 939 940 if (periodicTasksContinue) 941 periodics.forEach( 942 f -> { 943 try { f.get(); } 944 catch (ExecutionException success) { 945 assertSame(exception, success.getCause()); 946 } 947 catch (Throwable fail) { threadUnexpectedException(fail); } 948 }); 949 else 950 periodics.forEach(f -> assertTrue(f.isCancelled())); 951 952 assertEquals(poolSize + 1 953 + (effectiveDelayedPolicy ? 1 : 0) 954 + (periodicTasksContinue ? 2 : 0), 955 ran.get()); 956 } 957 958 /** 959 * completed submit of callable returns result 960 */ testSubmitCallable()961 public void testSubmitCallable() throws Exception { 962 final ExecutorService e = new CustomExecutor(2); 963 try (PoolCleaner cleaner = cleaner(e)) { 964 Future<String> future = e.submit(new StringTask()); 965 String result = future.get(); 966 assertSame(TEST_STRING, result); 967 } 968 } 969 970 /** 971 * completed submit of runnable returns successfully 972 */ testSubmitRunnable()973 public void testSubmitRunnable() throws Exception { 974 final ExecutorService e = new CustomExecutor(2); 975 try (PoolCleaner cleaner = cleaner(e)) { 976 Future<?> future = e.submit(new NoOpRunnable()); 977 future.get(); 978 assertTrue(future.isDone()); 979 } 980 } 981 982 /** 983 * completed submit of (runnable, result) returns result 984 */ testSubmitRunnable2()985 public void testSubmitRunnable2() throws Exception { 986 final ExecutorService e = new CustomExecutor(2); 987 try (PoolCleaner cleaner = cleaner(e)) { 988 Future<String> future = e.submit(new NoOpRunnable(), TEST_STRING); 989 String result = future.get(); 990 assertSame(TEST_STRING, result); 991 } 992 } 993 994 /** 995 * invokeAny(null) throws NPE 996 */ testInvokeAny1()997 public void testInvokeAny1() throws Exception { 998 final ExecutorService e = new CustomExecutor(2); 999 try (PoolCleaner cleaner = cleaner(e)) { 1000 try { 1001 e.invokeAny(null); 1002 shouldThrow(); 1003 } catch (NullPointerException success) {} 1004 } 1005 } 1006 1007 /** 1008 * invokeAny(empty collection) throws IllegalArgumentException 1009 */ testInvokeAny2()1010 public void testInvokeAny2() throws Exception { 1011 final ExecutorService e = new CustomExecutor(2); 1012 try (PoolCleaner cleaner = cleaner(e)) { 1013 try { 1014 e.invokeAny(new ArrayList<Callable<String>>()); 1015 shouldThrow(); 1016 } catch (IllegalArgumentException success) {} 1017 } 1018 } 1019 1020 /** 1021 * invokeAny(c) throws NPE if c has null elements 1022 */ testInvokeAny3()1023 public void testInvokeAny3() throws Exception { 1024 final CountDownLatch latch = new CountDownLatch(1); 1025 final ExecutorService e = new CustomExecutor(2); 1026 try (PoolCleaner cleaner = cleaner(e)) { 1027 List<Callable<String>> l = new ArrayList<>(); 1028 l.add(latchAwaitingStringTask(latch)); 1029 l.add(null); 1030 try { 1031 e.invokeAny(l); 1032 shouldThrow(); 1033 } catch (NullPointerException success) {} 1034 latch.countDown(); 1035 } 1036 } 1037 1038 /** 1039 * invokeAny(c) throws ExecutionException if no task completes 1040 */ testInvokeAny4()1041 public void testInvokeAny4() throws Exception { 1042 final ExecutorService e = new CustomExecutor(2); 1043 try (PoolCleaner cleaner = cleaner(e)) { 1044 List<Callable<String>> l = new ArrayList<>(); 1045 l.add(new NPETask()); 1046 try { 1047 e.invokeAny(l); 1048 shouldThrow(); 1049 } catch (ExecutionException success) { 1050 assertTrue(success.getCause() instanceof NullPointerException); 1051 } 1052 } 1053 } 1054 1055 /** 1056 * invokeAny(c) returns result of some task 1057 */ testInvokeAny5()1058 public void testInvokeAny5() throws Exception { 1059 final ExecutorService e = new CustomExecutor(2); 1060 try (PoolCleaner cleaner = cleaner(e)) { 1061 List<Callable<String>> l = new ArrayList<>(); 1062 l.add(new StringTask()); 1063 l.add(new StringTask()); 1064 String result = e.invokeAny(l); 1065 assertSame(TEST_STRING, result); 1066 } 1067 } 1068 1069 /** 1070 * invokeAll(null) throws NPE 1071 */ testInvokeAll1()1072 public void testInvokeAll1() throws Exception { 1073 final ExecutorService e = new CustomExecutor(2); 1074 try (PoolCleaner cleaner = cleaner(e)) { 1075 try { 1076 e.invokeAll(null); 1077 shouldThrow(); 1078 } catch (NullPointerException success) {} 1079 } 1080 } 1081 1082 /** 1083 * invokeAll(empty collection) returns empty list 1084 */ testInvokeAll2()1085 public void testInvokeAll2() throws Exception { 1086 final ExecutorService e = new CustomExecutor(2); 1087 final Collection<Callable<String>> emptyCollection 1088 = Collections.emptyList(); 1089 try (PoolCleaner cleaner = cleaner(e)) { 1090 List<Future<String>> r = e.invokeAll(emptyCollection); 1091 assertTrue(r.isEmpty()); 1092 } 1093 } 1094 1095 /** 1096 * invokeAll(c) throws NPE if c has null elements 1097 */ testInvokeAll3()1098 public void testInvokeAll3() throws Exception { 1099 final ExecutorService e = new CustomExecutor(2); 1100 try (PoolCleaner cleaner = cleaner(e)) { 1101 List<Callable<String>> l = new ArrayList<>(); 1102 l.add(new StringTask()); 1103 l.add(null); 1104 try { 1105 e.invokeAll(l); 1106 shouldThrow(); 1107 } catch (NullPointerException success) {} 1108 } 1109 } 1110 1111 /** 1112 * get of invokeAll(c) throws exception on failed task 1113 */ testInvokeAll4()1114 public void testInvokeAll4() throws Exception { 1115 final ExecutorService e = new CustomExecutor(2); 1116 try (PoolCleaner cleaner = cleaner(e)) { 1117 List<Callable<String>> l = new ArrayList<>(); 1118 l.add(new NPETask()); 1119 List<Future<String>> futures = e.invokeAll(l); 1120 assertEquals(1, futures.size()); 1121 try { 1122 futures.get(0).get(); 1123 shouldThrow(); 1124 } catch (ExecutionException success) { 1125 assertTrue(success.getCause() instanceof NullPointerException); 1126 } 1127 } 1128 } 1129 1130 /** 1131 * invokeAll(c) returns results of all completed tasks 1132 */ testInvokeAll5()1133 public void testInvokeAll5() throws Exception { 1134 final ExecutorService e = new CustomExecutor(2); 1135 try (PoolCleaner cleaner = cleaner(e)) { 1136 List<Callable<String>> l = new ArrayList<>(); 1137 l.add(new StringTask()); 1138 l.add(new StringTask()); 1139 List<Future<String>> futures = e.invokeAll(l); 1140 assertEquals(2, futures.size()); 1141 for (Future<String> future : futures) 1142 assertSame(TEST_STRING, future.get()); 1143 } 1144 } 1145 1146 /** 1147 * timed invokeAny(null) throws NPE 1148 */ testTimedInvokeAny1()1149 public void testTimedInvokeAny1() throws Exception { 1150 final ExecutorService e = new CustomExecutor(2); 1151 try (PoolCleaner cleaner = cleaner(e)) { 1152 try { 1153 e.invokeAny(null, randomTimeout(), randomTimeUnit()); 1154 shouldThrow(); 1155 } catch (NullPointerException success) {} 1156 } 1157 } 1158 1159 /** 1160 * timed invokeAny(,,null) throws NPE 1161 */ testTimedInvokeAnyNullTimeUnit()1162 public void testTimedInvokeAnyNullTimeUnit() throws Exception { 1163 final ExecutorService e = new CustomExecutor(2); 1164 try (PoolCleaner cleaner = cleaner(e)) { 1165 List<Callable<String>> l = new ArrayList<>(); 1166 l.add(new StringTask()); 1167 try { 1168 e.invokeAny(l, randomTimeout(), null); 1169 shouldThrow(); 1170 } catch (NullPointerException success) {} 1171 } 1172 } 1173 1174 /** 1175 * timed invokeAny(empty collection) throws IllegalArgumentException 1176 */ testTimedInvokeAny2()1177 public void testTimedInvokeAny2() throws Exception { 1178 final ExecutorService e = new CustomExecutor(2); 1179 final Collection<Callable<String>> emptyCollection 1180 = Collections.emptyList(); 1181 try (PoolCleaner cleaner = cleaner(e)) { 1182 try { 1183 e.invokeAny(emptyCollection, randomTimeout(), randomTimeUnit()); 1184 shouldThrow(); 1185 } catch (IllegalArgumentException success) {} 1186 } 1187 } 1188 1189 /** 1190 * timed invokeAny(c) throws NPE if c has null elements 1191 */ testTimedInvokeAny3()1192 public void testTimedInvokeAny3() throws Exception { 1193 CountDownLatch latch = new CountDownLatch(1); 1194 final ExecutorService e = new CustomExecutor(2); 1195 try (PoolCleaner cleaner = cleaner(e)) { 1196 List<Callable<String>> l = new ArrayList<>(); 1197 l.add(latchAwaitingStringTask(latch)); 1198 l.add(null); 1199 try { 1200 e.invokeAny(l, randomTimeout(), randomTimeUnit()); 1201 shouldThrow(); 1202 } catch (NullPointerException success) {} 1203 latch.countDown(); 1204 } 1205 } 1206 1207 /** 1208 * timed invokeAny(c) throws ExecutionException if no task completes 1209 */ testTimedInvokeAny4()1210 public void testTimedInvokeAny4() throws Exception { 1211 final ExecutorService e = new CustomExecutor(2); 1212 try (PoolCleaner cleaner = cleaner(e)) { 1213 long startTime = System.nanoTime(); 1214 List<Callable<String>> l = new ArrayList<>(); 1215 l.add(new NPETask()); 1216 try { 1217 e.invokeAny(l, LONG_DELAY_MS, MILLISECONDS); 1218 shouldThrow(); 1219 } catch (ExecutionException success) { 1220 assertTrue(success.getCause() instanceof NullPointerException); 1221 } 1222 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); 1223 } 1224 } 1225 1226 /** 1227 * timed invokeAny(c) returns result of some task 1228 */ 1229 public void testTimedInvokeAny5() throws Exception { 1230 final ExecutorService e = new CustomExecutor(2); 1231 try (PoolCleaner cleaner = cleaner(e)) { 1232 long startTime = System.nanoTime(); 1233 List<Callable<String>> l = new ArrayList<>(); 1234 l.add(new StringTask()); 1235 l.add(new StringTask()); 1236 String result = e.invokeAny(l, LONG_DELAY_MS, MILLISECONDS); 1237 assertSame(TEST_STRING, result); 1238 assertTrue(millisElapsedSince(startTime) < LONG_DELAY_MS); 1239 } 1240 } 1241 1242 /** 1243 * timed invokeAll(null) throws NullPointerException 1244 */ 1245 public void testTimedInvokeAll1() throws Exception { 1246 final ExecutorService e = new CustomExecutor(2); 1247 try (PoolCleaner cleaner = cleaner(e)) { 1248 try { 1249 e.invokeAll(null, randomTimeout(), randomTimeUnit()); 1250 shouldThrow(); 1251 } catch (NullPointerException success) {} 1252 } 1253 } 1254 1255 /** 1256 * timed invokeAll(,,null) throws NullPointerException 1257 */ 1258 public void testTimedInvokeAllNullTimeUnit() throws Exception { 1259 final ExecutorService e = new CustomExecutor(2); 1260 try (PoolCleaner cleaner = cleaner(e)) { 1261 List<Callable<String>> l = new ArrayList<>(); 1262 l.add(new StringTask()); 1263 try { 1264 e.invokeAll(l, randomTimeout(), null); 1265 shouldThrow(); 1266 } catch (NullPointerException success) {} 1267 } 1268 } 1269 1270 /** 1271 * timed invokeAll(empty collection) returns empty list 1272 */ 1273 public void testTimedInvokeAll2() throws Exception { 1274 final ExecutorService e = new CustomExecutor(2); 1275 final Collection<Callable<String>> emptyCollection 1276 = Collections.emptyList(); 1277 try (PoolCleaner cleaner = cleaner(e)) { 1278 List<Future<String>> r = 1279 e.invokeAll(emptyCollection, randomTimeout(), randomTimeUnit()); 1280 assertTrue(r.isEmpty()); 1281 } 1282 } 1283 1284 /** 1285 * timed invokeAll(c) throws NPE if c has null elements 1286 */ 1287 public void testTimedInvokeAll3() throws Exception { 1288 final ExecutorService e = new CustomExecutor(2); 1289 try (PoolCleaner cleaner = cleaner(e)) { 1290 List<Callable<String>> l = new ArrayList<>(); 1291 l.add(new StringTask()); 1292 l.add(null); 1293 try { 1294 e.invokeAll(l, randomTimeout(), randomTimeUnit()); 1295 shouldThrow(); 1296 } catch (NullPointerException success) {} 1297 } 1298 } 1299 1300 /** 1301 * get of element of invokeAll(c) throws exception on failed task 1302 */ 1303 public void testTimedInvokeAll4() throws Exception { 1304 final ExecutorService e = new CustomExecutor(2); 1305 final Collection<Callable<String>> c = new ArrayList<>(); 1306 c.add(new NPETask()); 1307 try (PoolCleaner cleaner = cleaner(e)) { 1308 List<Future<String>> futures = 1309 e.invokeAll(c, LONG_DELAY_MS, MILLISECONDS); 1310 assertEquals(1, futures.size()); 1311 try { 1312 futures.get(0).get(); 1313 shouldThrow(); 1314 } catch (ExecutionException success) { 1315 assertTrue(success.getCause() instanceof NullPointerException); 1316 } 1317 } 1318 } 1319 1320 /** 1321 * timed invokeAll(c) returns results of all completed tasks 1322 */ 1323 public void testTimedInvokeAll5() throws Exception { 1324 final ExecutorService e = new CustomExecutor(2); 1325 try (PoolCleaner cleaner = cleaner(e)) { 1326 List<Callable<String>> l = new ArrayList<>(); 1327 l.add(new StringTask()); 1328 l.add(new StringTask()); 1329 List<Future<String>> futures = 1330 e.invokeAll(l, LONG_DELAY_MS, MILLISECONDS); 1331 assertEquals(2, futures.size()); 1332 for (Future<String> future : futures) 1333 assertSame(TEST_STRING, future.get()); 1334 } 1335 } 1336 1337 /** 1338 * timed invokeAll(c) cancels tasks not completed by timeout 1339 */ 1340 public void testTimedInvokeAll6() throws Exception { 1341 for (long timeout = timeoutMillis();;) { 1342 final CountDownLatch done = new CountDownLatch(1); 1343 final Callable<String> waiter = new CheckedCallable<String>() { 1344 public String realCall() { 1345 try { done.await(LONG_DELAY_MS, MILLISECONDS); } 1346 catch (InterruptedException ok) {} 1347 return "1"; }}; 1348 final ExecutorService p = new CustomExecutor(2); 1349 try (PoolCleaner cleaner = cleaner(p, done)) { 1350 List<Callable<String>> tasks = new ArrayList<>(); 1351 tasks.add(new StringTask("0")); 1352 tasks.add(waiter); 1353 tasks.add(new StringTask("2")); 1354 long startTime = System.nanoTime(); 1355 List<Future<String>> futures = 1356 p.invokeAll(tasks, timeout, MILLISECONDS); 1357 assertEquals(tasks.size(), futures.size()); 1358 assertTrue(millisElapsedSince(startTime) >= timeout); 1359 for (Future future : futures) 1360 assertTrue(future.isDone()); 1361 assertTrue(futures.get(1).isCancelled()); 1362 try { 1363 assertEquals("0", futures.get(0).get()); 1364 assertEquals("2", futures.get(2).get()); 1365 break; 1366 } catch (CancellationException retryWithLongerTimeout) { 1367 timeout *= 2; 1368 if (timeout >= LONG_DELAY_MS / 2) 1369 fail("expected exactly one task to be cancelled"); 1370 } 1371 } 1372 } 1373 } 1374 1375 } 1376