1 /* 2 * Written by Doug Lea with assistance from members of JCP JSR-166 3 * Expert Group and released to the public domain, as explained at 4 * http://creativecommons.org/licenses/publicdomain 5 */ 6 7 package java.util.concurrent; 8 import java.util.concurrent.locks.*; 9 import java.util.*; 10 11 /** 12 * An {@link ExecutorService} that executes each submitted task using 13 * one of possibly several pooled threads, normally configured 14 * using {@link Executors} factory methods. 15 * 16 * <p>Thread pools address two different problems: they usually 17 * provide improved performance when executing large numbers of 18 * asynchronous tasks, due to reduced per-task invocation overhead, 19 * and they provide a means of bounding and managing the resources, 20 * including threads, consumed when executing a collection of tasks. 21 * Each <tt>ThreadPoolExecutor</tt> also maintains some basic 22 * statistics, such as the number of completed tasks. 23 * 24 * <p>To be useful across a wide range of contexts, this class 25 * provides many adjustable parameters and extensibility 26 * hooks. However, programmers are urged to use the more convenient 27 * {@link Executors} factory methods {@link 28 * Executors#newCachedThreadPool} (unbounded thread pool, with 29 * automatic thread reclamation), {@link Executors#newFixedThreadPool} 30 * (fixed size thread pool) and {@link 31 * Executors#newSingleThreadExecutor} (single background thread), that 32 * preconfigure settings for the most common usage 33 * scenarios. Otherwise, use the following guide when manually 34 * configuring and tuning this class: 35 * 36 * <dl> 37 * 38 * <dt>Core and maximum pool sizes</dt> 39 * 40 * <dd>A <tt>ThreadPoolExecutor</tt> will automatically adjust the 41 * pool size 42 * (see {@link ThreadPoolExecutor#getPoolSize}) 43 * according to the bounds set by corePoolSize 44 * (see {@link ThreadPoolExecutor#getCorePoolSize}) 45 * and 46 * maximumPoolSize 47 * (see {@link ThreadPoolExecutor#getMaximumPoolSize}). 48 * When a new task is submitted in method {@link 49 * ThreadPoolExecutor#execute}, and fewer than corePoolSize threads 50 * are running, a new thread is created to handle the request, even if 51 * other worker threads are idle. If there are more than 52 * corePoolSize but less than maximumPoolSize threads running, a new 53 * thread will be created only if the queue is full. By setting 54 * corePoolSize and maximumPoolSize the same, you create a fixed-size 55 * thread pool. By setting maximumPoolSize to an essentially unbounded 56 * value such as <tt>Integer.MAX_VALUE</tt>, you allow the pool to 57 * accommodate an arbitrary number of concurrent tasks. Most typically, 58 * core and maximum pool sizes are set only upon construction, but they 59 * may also be changed dynamically using {@link 60 * ThreadPoolExecutor#setCorePoolSize} and {@link 61 * ThreadPoolExecutor#setMaximumPoolSize}. <dd> 62 * 63 * <dt> On-demand construction 64 * 65 * <dd> By default, even core threads are initially created and 66 * started only when new tasks arrive, but this can be overridden 67 * dynamically using method {@link 68 * ThreadPoolExecutor#prestartCoreThread} or 69 * {@link ThreadPoolExecutor#prestartAllCoreThreads}. 70 * You probably want to prestart threads if you construct the 71 * pool with a non-empty queue. </dd> 72 * 73 * <dt>Creating new threads</dt> 74 * 75 * <dd>New threads are created using a {@link 76 * java.util.concurrent.ThreadFactory}. If not otherwise specified, a 77 * {@link Executors#defaultThreadFactory} is used, that creates threads to all 78 * be in the same {@link ThreadGroup} and with the same 79 * <tt>NORM_PRIORITY</tt> priority and non-daemon status. By supplying 80 * a different ThreadFactory, you can alter the thread's name, thread 81 * group, priority, daemon status, etc. If a <tt>ThreadFactory</tt> fails to create 82 * a thread when asked by returning null from <tt>newThread</tt>, 83 * the executor will continue, but might 84 * not be able to execute any tasks. </dd> 85 * 86 * <dt>Keep-alive times</dt> 87 * 88 * <dd>If the pool currently has more than corePoolSize threads, 89 * excess threads will be terminated if they have been idle for more 90 * than the keepAliveTime (see {@link 91 * ThreadPoolExecutor#getKeepAliveTime}). This provides a means of 92 * reducing resource consumption when the pool is not being actively 93 * used. If the pool becomes more active later, new threads will be 94 * constructed. This parameter can also be changed dynamically using 95 * method {@link ThreadPoolExecutor#setKeepAliveTime}. Using a value 96 * of <tt>Long.MAX_VALUE</tt> {@link TimeUnit#NANOSECONDS} effectively 97 * disables idle threads from ever terminating prior to shut down. By 98 * default, the keep-alive policy applies only when there are more 99 * than corePoolSizeThreads. But method {@link 100 * ThreadPoolExecutor#allowCoreThreadTimeOut} can be used to apply 101 * this time-out policy to core threads as well, so long as 102 * the keepAliveTime value is non-zero. </dd> 103 * 104 * <dt>Queuing</dt> 105 * 106 * <dd>Any {@link BlockingQueue} may be used to transfer and hold 107 * submitted tasks. The use of this queue interacts with pool sizing: 108 * 109 * <ul> 110 * 111 * <li> If fewer than corePoolSize threads are running, the Executor 112 * always prefers adding a new thread 113 * rather than queuing.</li> 114 * 115 * <li> If corePoolSize or more threads are running, the Executor 116 * always prefers queuing a request rather than adding a new 117 * thread.</li> 118 * 119 * <li> If a request cannot be queued, a new thread is created unless 120 * this would exceed maximumPoolSize, in which case, the task will be 121 * rejected.</li> 122 * 123 * </ul> 124 * 125 * There are three general strategies for queuing: 126 * <ol> 127 * 128 * <li> <em> Direct handoffs.</em> A good default choice for a work 129 * queue is a {@link SynchronousQueue} that hands off tasks to threads 130 * without otherwise holding them. Here, an attempt to queue a task 131 * will fail if no threads are immediately available to run it, so a 132 * new thread will be constructed. This policy avoids lockups when 133 * handling sets of requests that might have internal dependencies. 134 * Direct handoffs generally require unbounded maximumPoolSizes to 135 * avoid rejection of new submitted tasks. This in turn admits the 136 * possibility of unbounded thread growth when commands continue to 137 * arrive on average faster than they can be processed. </li> 138 * 139 * <li><em> Unbounded queues.</em> Using an unbounded queue (for 140 * example a {@link LinkedBlockingQueue} without a predefined 141 * capacity) will cause new tasks to wait in the queue when all 142 * corePoolSize threads are busy. Thus, no more than corePoolSize 143 * threads will ever be created. (And the value of the maximumPoolSize 144 * therefore doesn't have any effect.) This may be appropriate when 145 * each task is completely independent of others, so tasks cannot 146 * affect each others execution; for example, in a web page server. 147 * While this style of queuing can be useful in smoothing out 148 * transient bursts of requests, it admits the possibility of 149 * unbounded work queue growth when commands continue to arrive on 150 * average faster than they can be processed. </li> 151 * 152 * <li><em>Bounded queues.</em> A bounded queue (for example, an 153 * {@link ArrayBlockingQueue}) helps prevent resource exhaustion when 154 * used with finite maximumPoolSizes, but can be more difficult to 155 * tune and control. Queue sizes and maximum pool sizes may be traded 156 * off for each other: Using large queues and small pools minimizes 157 * CPU usage, OS resources, and context-switching overhead, but can 158 * lead to artificially low throughput. If tasks frequently block (for 159 * example if they are I/O bound), a system may be able to schedule 160 * time for more threads than you otherwise allow. Use of small queues 161 * generally requires larger pool sizes, which keeps CPUs busier but 162 * may encounter unacceptable scheduling overhead, which also 163 * decreases throughput. </li> 164 * 165 * </ol> 166 * 167 * </dd> 168 * 169 * <dt>Rejected tasks</dt> 170 * 171 * <dd> New tasks submitted in method {@link 172 * ThreadPoolExecutor#execute} will be <em>rejected</em> when the 173 * Executor has been shut down, and also when the Executor uses finite 174 * bounds for both maximum threads and work queue capacity, and is 175 * saturated. In either case, the <tt>execute</tt> method invokes the 176 * {@link RejectedExecutionHandler#rejectedExecution} method of its 177 * {@link RejectedExecutionHandler}. Four predefined handler policies 178 * are provided: 179 * 180 * <ol> 181 * 182 * <li> In the 183 * default {@link ThreadPoolExecutor.AbortPolicy}, the handler throws a 184 * runtime {@link RejectedExecutionException} upon rejection. </li> 185 * 186 * <li> In {@link 187 * ThreadPoolExecutor.CallerRunsPolicy}, the thread that invokes 188 * <tt>execute</tt> itself runs the task. This provides a simple 189 * feedback control mechanism that will slow down the rate that new 190 * tasks are submitted. </li> 191 * 192 * <li> In {@link ThreadPoolExecutor.DiscardPolicy}, 193 * a task that cannot be executed is simply dropped. </li> 194 * 195 * <li>In {@link 196 * ThreadPoolExecutor.DiscardOldestPolicy}, if the executor is not 197 * shut down, the task at the head of the work queue is dropped, and 198 * then execution is retried (which can fail again, causing this to be 199 * repeated.) </li> 200 * 201 * </ol> 202 * 203 * It is possible to define and use other kinds of {@link 204 * RejectedExecutionHandler} classes. Doing so requires some care 205 * especially when policies are designed to work only under particular 206 * capacity or queuing policies. </dd> 207 * 208 * <dt>Hook methods</dt> 209 * 210 * <dd>This class provides <tt>protected</tt> overridable {@link 211 * ThreadPoolExecutor#beforeExecute} and {@link 212 * ThreadPoolExecutor#afterExecute} methods that are called before and 213 * after execution of each task. These can be used to manipulate the 214 * execution environment; for example, reinitializing ThreadLocals, 215 * gathering statistics, or adding log entries. Additionally, method 216 * {@link ThreadPoolExecutor#terminated} can be overridden to perform 217 * any special processing that needs to be done once the Executor has 218 * fully terminated. 219 * 220 * <p>If hook or callback methods throw 221 * exceptions, internal worker threads may in turn fail and 222 * abruptly terminate.</dd> 223 * 224 * <dt>Queue maintenance</dt> 225 * 226 * <dd> Method {@link ThreadPoolExecutor#getQueue} allows access to 227 * the work queue for purposes of monitoring and debugging. Use of 228 * this method for any other purpose is strongly discouraged. Two 229 * supplied methods, {@link ThreadPoolExecutor#remove} and {@link 230 * ThreadPoolExecutor#purge} are available to assist in storage 231 * reclamation when large numbers of queued tasks become 232 * cancelled.</dd> 233 * 234 * <dt>Finalization</dt> 235 * 236 * <dd> A pool that is no longer referenced in a program <em>AND</em> 237 * has no remaining threads will be <tt>shutdown</tt> 238 * automatically. If you would like to ensure that unreferenced pools 239 * are reclaimed even if users forget to call {@link 240 * ThreadPoolExecutor#shutdown}, then you must arrange that unused 241 * threads eventually die, by setting appropriate keep-alive times, 242 * using a lower bound of zero core threads and/or setting {@link 243 * ThreadPoolExecutor#allowCoreThreadTimeOut}. </dd> </dl> 244 * 245 * <p> <b>Extension example</b>. Most extensions of this class 246 * override one or more of the protected hook methods. For example, 247 * here is a subclass that adds a simple pause/resume feature: 248 * 249 * <pre> 250 * class PausableThreadPoolExecutor extends ThreadPoolExecutor { 251 * private boolean isPaused; 252 * private ReentrantLock pauseLock = new ReentrantLock(); 253 * private Condition unpaused = pauseLock.newCondition(); 254 * 255 * public PausableThreadPoolExecutor(...) { super(...); } 256 * 257 * protected void beforeExecute(Thread t, Runnable r) { 258 * super.beforeExecute(t, r); 259 * pauseLock.lock(); 260 * try { 261 * while (isPaused) unpaused.await(); 262 * } catch (InterruptedException ie) { 263 * t.interrupt(); 264 * } finally { 265 * pauseLock.unlock(); 266 * } 267 * } 268 * 269 * public void pause() { 270 * pauseLock.lock(); 271 * try { 272 * isPaused = true; 273 * } finally { 274 * pauseLock.unlock(); 275 * } 276 * } 277 * 278 * public void resume() { 279 * pauseLock.lock(); 280 * try { 281 * isPaused = false; 282 * unpaused.signalAll(); 283 * } finally { 284 * pauseLock.unlock(); 285 * } 286 * } 287 * } 288 * </pre> 289 * @since 1.5 290 * @author Doug Lea 291 */ 292 public class ThreadPoolExecutor extends AbstractExecutorService { 293 /** 294 * Only used to force toArray() to produce a Runnable[]. 295 */ 296 private static final Runnable[] EMPTY_RUNNABLE_ARRAY = new Runnable[0]; 297 298 /** 299 * Permission for checking shutdown 300 */ 301 private static final RuntimePermission shutdownPerm = 302 new RuntimePermission("modifyThread"); 303 304 /** 305 * Queue used for holding tasks and handing off to worker threads. 306 */ 307 private final BlockingQueue<Runnable> workQueue; 308 309 /** 310 * Lock held on updates to poolSize, corePoolSize, maximumPoolSize, and 311 * workers set. 312 */ 313 private final ReentrantLock mainLock = new ReentrantLock(); 314 315 /** 316 * Wait condition to support awaitTermination 317 */ 318 private final Condition termination = mainLock.newCondition(); 319 320 /** 321 * Set containing all worker threads in pool. 322 */ 323 private final HashSet<Worker> workers = new HashSet<Worker>(); 324 325 /** 326 * Timeout in nanoseconds for idle threads waiting for work. 327 * Threads use this timeout only when there are more than 328 * corePoolSize present. Otherwise they wait forever for new work. 329 */ 330 private volatile long keepAliveTime; 331 332 /** 333 * If false (default) core threads stay alive even when idle. 334 * If true, core threads use keepAliveTime to time out waiting for work. 335 */ 336 private volatile boolean allowCoreThreadTimeOut; 337 338 /** 339 * Core pool size, updated only while holding mainLock, 340 * but volatile to allow concurrent readability even 341 * during updates. 342 */ 343 private volatile int corePoolSize; 344 345 /** 346 * Maximum pool size, updated only while holding mainLock 347 * but volatile to allow concurrent readability even 348 * during updates. 349 */ 350 private volatile int maximumPoolSize; 351 352 /** 353 * Current pool size, updated only while holding mainLock 354 * but volatile to allow concurrent readability even 355 * during updates. 356 */ 357 private volatile int poolSize; 358 359 /** 360 * Lifecycle state 361 */ 362 volatile int runState; 363 364 // Special values for runState 365 /** Normal, not-shutdown mode */ 366 static final int RUNNING = 0; 367 /** Controlled shutdown mode */ 368 static final int SHUTDOWN = 1; 369 /** Immediate shutdown mode */ 370 static final int STOP = 2; 371 /** Final state */ 372 static final int TERMINATED = 3; 373 374 /** 375 * Handler called when saturated or shutdown in execute. 376 */ 377 private volatile RejectedExecutionHandler handler; 378 379 /** 380 * Factory for new threads. 381 */ 382 private volatile ThreadFactory threadFactory; 383 384 /** 385 * Tracks largest attained pool size. 386 */ 387 private int largestPoolSize; 388 389 /** 390 * Counter for completed tasks. Updated only on termination of 391 * worker threads. 392 */ 393 private long completedTaskCount; 394 395 /** 396 * The default rejected execution handler 397 */ 398 private static final RejectedExecutionHandler defaultHandler = 399 new AbortPolicy(); 400 401 /** 402 * Invokes the rejected execution handler for the given command. 403 */ reject(Runnable command)404 void reject(Runnable command) { 405 handler.rejectedExecution(command, this); 406 } 407 408 /** 409 * Creates and returns a new thread running firstTask as its first 410 * task. Call only while holding mainLock. 411 * @param firstTask the task the new thread should run first (or 412 * null if none) 413 * @return the new thread, or null if threadFactory fails to create thread 414 */ addThread(Runnable firstTask)415 private Thread addThread(Runnable firstTask) { 416 if (runState == TERMINATED) // Don't create thread if terminated 417 return null; 418 Worker w = new Worker(firstTask); 419 Thread t = threadFactory.newThread(w); 420 if (t != null) { 421 w.thread = t; 422 workers.add(w); 423 int nt = ++poolSize; 424 if (nt > largestPoolSize) 425 largestPoolSize = nt; 426 } 427 return t; 428 } 429 430 /** 431 * Creates and starts a new thread running firstTask as its first 432 * task, only if fewer than corePoolSize threads are running. 433 * @param firstTask the task the new thread should run first (or 434 * null if none) 435 * @return true if successful. 436 */ addIfUnderCorePoolSize(Runnable firstTask)437 private boolean addIfUnderCorePoolSize(Runnable firstTask) { 438 Thread t = null; 439 final ReentrantLock mainLock = this.mainLock; 440 mainLock.lock(); 441 try { 442 if (poolSize < corePoolSize) 443 t = addThread(firstTask); 444 } finally { 445 mainLock.unlock(); 446 } 447 if (t == null) 448 return false; 449 t.start(); 450 return true; 451 } 452 453 /** 454 * Creates and starts a new thread only if fewer than maximumPoolSize 455 * threads are running. The new thread runs as its first task the 456 * next task in queue, or if there is none, the given task. 457 * @param firstTask the task the new thread should run first (or 458 * null if none) 459 * @return 0 if a new thread cannot be created, a positive number 460 * if firstTask will be run in a new thread, or a negative number 461 * if a new thread was created but is running some other task, in 462 * which case the caller must try some other way to run firstTask 463 * (perhaps by calling this method again). 464 */ addIfUnderMaximumPoolSize(Runnable firstTask)465 private int addIfUnderMaximumPoolSize(Runnable firstTask) { 466 Thread t = null; 467 int status = 0; 468 final ReentrantLock mainLock = this.mainLock; 469 mainLock.lock(); 470 try { 471 if (poolSize < maximumPoolSize) { 472 Runnable next = workQueue.poll(); 473 if (next == null) { 474 next = firstTask; 475 status = 1; 476 } else 477 status = -1; 478 t = addThread(next); 479 } 480 } finally { 481 mainLock.unlock(); 482 } 483 if (t == null) 484 return 0; 485 t.start(); 486 return status; 487 } 488 489 490 /** 491 * Gets the next task for a worker thread to run. 492 * @return the task 493 */ getTask()494 Runnable getTask() { 495 for (;;) { 496 try { 497 switch (runState) { 498 case RUNNING: { 499 // untimed wait if core and not allowing core timeout 500 if (poolSize <= corePoolSize && !allowCoreThreadTimeOut) 501 return workQueue.take(); 502 503 long timeout = keepAliveTime; 504 if (timeout <= 0) // die immediately for 0 timeout 505 return null; 506 Runnable r = workQueue.poll(timeout, TimeUnit.NANOSECONDS); 507 if (r != null) 508 return r; 509 if (poolSize > corePoolSize || allowCoreThreadTimeOut) 510 return null; // timed out 511 // Else, after timeout, the pool shrank. Retry 512 break; 513 } 514 515 case SHUTDOWN: { 516 // Help drain queue 517 Runnable r = workQueue.poll(); 518 if (r != null) 519 return r; 520 521 // Check if can terminate 522 if (workQueue.isEmpty()) { 523 interruptIdleWorkers(); 524 return null; 525 } 526 527 // Else there could still be delayed tasks in queue. 528 return workQueue.take(); 529 } 530 531 case STOP: 532 return null; 533 default: 534 assert false; 535 } 536 } catch (InterruptedException ie) { 537 // On interruption, re-check runstate 538 } 539 } 540 } 541 542 /** 543 * Wakes up all threads that might be waiting for tasks. 544 */ interruptIdleWorkers()545 void interruptIdleWorkers() { 546 final ReentrantLock mainLock = this.mainLock; 547 mainLock.lock(); 548 try { 549 for (Worker w : workers) 550 w.interruptIfIdle(); 551 } finally { 552 mainLock.unlock(); 553 } 554 } 555 556 /** 557 * Performs bookkeeping for a terminated worker thread. 558 * @param w the worker 559 */ workerDone(Worker w)560 void workerDone(Worker w) { 561 final ReentrantLock mainLock = this.mainLock; 562 mainLock.lock(); 563 try { 564 completedTaskCount += w.completedTasks; 565 workers.remove(w); 566 if (--poolSize > 0) 567 return; 568 569 // Else, this is the last thread. Deal with potential shutdown. 570 571 int state = runState; 572 assert state != TERMINATED; 573 574 if (state != STOP) { 575 // If there are queued tasks but no threads, create 576 // replacement thread. We must create it initially 577 // idle to avoid orphaned tasks in case addThread 578 // fails. This also handles case of delayed tasks 579 // that will sometime later become runnable. 580 if (!workQueue.isEmpty()) { 581 Thread t = addThread(null); 582 if (t != null) 583 t.start(); 584 return; 585 } 586 587 // Otherwise, we can exit without replacement 588 if (state == RUNNING) 589 return; 590 } 591 592 // Either state is STOP, or state is SHUTDOWN and there is 593 // no work to do. So we can terminate. 594 termination.signalAll(); 595 runState = TERMINATED; 596 // fall through to call terminate() outside of lock. 597 } finally { 598 mainLock.unlock(); 599 } 600 601 assert runState == TERMINATED; 602 terminated(); 603 } 604 605 /** 606 * Worker threads 607 */ 608 private class Worker implements Runnable { 609 610 /** 611 * The runLock is acquired and released surrounding each task 612 * execution. It mainly protects against interrupts that are 613 * intended to cancel the worker thread from instead 614 * interrupting the task being run. 615 */ 616 private final ReentrantLock runLock = new ReentrantLock(); 617 618 /** 619 * Initial task to run before entering run loop 620 */ 621 private Runnable firstTask; 622 623 /** 624 * Per thread completed task counter; accumulated 625 * into completedTaskCount upon termination. 626 */ 627 volatile long completedTasks; 628 629 /** 630 * Thread this worker is running in. Acts as a final field, 631 * but cannot be set until thread is created. 632 */ 633 Thread thread; 634 Worker(Runnable firstTask)635 Worker(Runnable firstTask) { 636 this.firstTask = firstTask; 637 } 638 isActive()639 boolean isActive() { 640 return runLock.isLocked(); 641 } 642 643 /** 644 * Interrupts thread if not running a task. 645 */ interruptIfIdle()646 void interruptIfIdle() { 647 final ReentrantLock runLock = this.runLock; 648 if (runLock.tryLock()) { 649 try { 650 thread.interrupt(); 651 } finally { 652 runLock.unlock(); 653 } 654 } 655 } 656 657 /** 658 * Interrupts thread even if running a task. 659 */ interruptNow()660 void interruptNow() { 661 thread.interrupt(); 662 } 663 664 /** 665 * Runs a single task between before/after methods. 666 */ runTask(Runnable task)667 private void runTask(Runnable task) { 668 final ReentrantLock runLock = this.runLock; 669 runLock.lock(); 670 try { 671 // If not shutting down then clear an outstanding interrupt. 672 if (runState != STOP && 673 Thread.interrupted() && 674 runState == STOP) // Re-interrupt if stopped after clearing 675 thread.interrupt(); 676 boolean ran = false; 677 beforeExecute(thread, task); 678 try { 679 task.run(); 680 ran = true; 681 afterExecute(task, null); 682 ++completedTasks; 683 } catch (RuntimeException ex) { 684 if (!ran) 685 afterExecute(task, ex); 686 // Else the exception occurred within 687 // afterExecute itself in which case we don't 688 // want to call it again. 689 throw ex; 690 } 691 } finally { 692 runLock.unlock(); 693 } 694 } 695 696 /** 697 * Main run loop 698 */ run()699 public void run() { 700 try { 701 Runnable task = firstTask; 702 firstTask = null; 703 while (task != null || (task = getTask()) != null) { 704 runTask(task); 705 task = null; // unnecessary but can help GC 706 } 707 } finally { 708 workerDone(this); 709 } 710 } 711 } 712 713 // Public methods 714 715 /** 716 * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial 717 * parameters and default thread factory and rejected execution handler. 718 * It may be more convenient to use one of the {@link Executors} factory 719 * methods instead of this general purpose constructor. 720 * 721 * @param corePoolSize the number of threads to keep in the 722 * pool, even if they are idle. 723 * @param maximumPoolSize the maximum number of threads to allow in the 724 * pool. 725 * @param keepAliveTime when the number of threads is greater than 726 * the core, this is the maximum time that excess idle threads 727 * will wait for new tasks before terminating. 728 * @param unit the time unit for the keepAliveTime 729 * argument. 730 * @param workQueue the queue to use for holding tasks before they 731 * are executed. This queue will hold only the <tt>Runnable</tt> 732 * tasks submitted by the <tt>execute</tt> method. 733 * @throws IllegalArgumentException if corePoolSize, or 734 * keepAliveTime less than zero, or if maximumPoolSize less than or 735 * equal to zero, or if corePoolSize greater than maximumPoolSize. 736 * @throws NullPointerException if <tt>workQueue</tt> is null 737 */ ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)738 public ThreadPoolExecutor(int corePoolSize, 739 int maximumPoolSize, 740 long keepAliveTime, 741 TimeUnit unit, 742 BlockingQueue<Runnable> workQueue) { 743 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 744 Executors.defaultThreadFactory(), defaultHandler); 745 } 746 747 /** 748 * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial 749 * parameters and default rejected execution handler. 750 * 751 * @param corePoolSize the number of threads to keep in the 752 * pool, even if they are idle. 753 * @param maximumPoolSize the maximum number of threads to allow in the 754 * pool. 755 * @param keepAliveTime when the number of threads is greater than 756 * the core, this is the maximum time that excess idle threads 757 * will wait for new tasks before terminating. 758 * @param unit the time unit for the keepAliveTime 759 * argument. 760 * @param workQueue the queue to use for holding tasks before they 761 * are executed. This queue will hold only the <tt>Runnable</tt> 762 * tasks submitted by the <tt>execute</tt> method. 763 * @param threadFactory the factory to use when the executor 764 * creates a new thread. 765 * @throws IllegalArgumentException if corePoolSize, or 766 * keepAliveTime less than zero, or if maximumPoolSize less than or 767 * equal to zero, or if corePoolSize greater than maximumPoolSize. 768 * @throws NullPointerException if <tt>workQueue</tt> 769 * or <tt>threadFactory</tt> are null. 770 */ ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)771 public ThreadPoolExecutor(int corePoolSize, 772 int maximumPoolSize, 773 long keepAliveTime, 774 TimeUnit unit, 775 BlockingQueue<Runnable> workQueue, 776 ThreadFactory threadFactory) { 777 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 778 threadFactory, defaultHandler); 779 } 780 781 /** 782 * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial 783 * parameters and default thread factory. 784 * 785 * @param corePoolSize the number of threads to keep in the 786 * pool, even if they are idle. 787 * @param maximumPoolSize the maximum number of threads to allow in the 788 * pool. 789 * @param keepAliveTime when the number of threads is greater than 790 * the core, this is the maximum time that excess idle threads 791 * will wait for new tasks before terminating. 792 * @param unit the time unit for the keepAliveTime 793 * argument. 794 * @param workQueue the queue to use for holding tasks before they 795 * are executed. This queue will hold only the <tt>Runnable</tt> 796 * tasks submitted by the <tt>execute</tt> method. 797 * @param handler the handler to use when execution is blocked 798 * because the thread bounds and queue capacities are reached. 799 * @throws IllegalArgumentException if corePoolSize, or 800 * keepAliveTime less than zero, or if maximumPoolSize less than or 801 * equal to zero, or if corePoolSize greater than maximumPoolSize. 802 * @throws NullPointerException if <tt>workQueue</tt> 803 * or <tt>handler</tt> are null. 804 */ ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)805 public ThreadPoolExecutor(int corePoolSize, 806 int maximumPoolSize, 807 long keepAliveTime, 808 TimeUnit unit, 809 BlockingQueue<Runnable> workQueue, 810 RejectedExecutionHandler handler) { 811 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 812 Executors.defaultThreadFactory(), handler); 813 } 814 815 /** 816 * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial 817 * parameters. 818 * 819 * @param corePoolSize the number of threads to keep in the 820 * pool, even if they are idle. 821 * @param maximumPoolSize the maximum number of threads to allow in the 822 * pool. 823 * @param keepAliveTime when the number of threads is greater than 824 * the core, this is the maximum time that excess idle threads 825 * will wait for new tasks before terminating. 826 * @param unit the time unit for the keepAliveTime 827 * argument. 828 * @param workQueue the queue to use for holding tasks before they 829 * are executed. This queue will hold only the <tt>Runnable</tt> 830 * tasks submitted by the <tt>execute</tt> method. 831 * @param threadFactory the factory to use when the executor 832 * creates a new thread. 833 * @param handler the handler to use when execution is blocked 834 * because the thread bounds and queue capacities are reached. 835 * @throws IllegalArgumentException if corePoolSize, or 836 * keepAliveTime less than zero, or if maximumPoolSize less than or 837 * equal to zero, or if corePoolSize greater than maximumPoolSize. 838 * @throws NullPointerException if <tt>workQueue</tt> 839 * or <tt>threadFactory</tt> or <tt>handler</tt> are null. 840 */ ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)841 public ThreadPoolExecutor(int corePoolSize, 842 int maximumPoolSize, 843 long keepAliveTime, 844 TimeUnit unit, 845 BlockingQueue<Runnable> workQueue, 846 ThreadFactory threadFactory, 847 RejectedExecutionHandler handler) { 848 if (corePoolSize < 0 || 849 maximumPoolSize <= 0 || 850 maximumPoolSize < corePoolSize || 851 keepAliveTime < 0) 852 throw new IllegalArgumentException(); 853 if (workQueue == null || threadFactory == null || handler == null) 854 throw new NullPointerException(); 855 this.corePoolSize = corePoolSize; 856 this.maximumPoolSize = maximumPoolSize; 857 this.workQueue = workQueue; 858 this.keepAliveTime = unit.toNanos(keepAliveTime); 859 this.threadFactory = threadFactory; 860 this.handler = handler; 861 } 862 863 864 /** 865 * Executes the given task sometime in the future. The task 866 * may execute in a new thread or in an existing pooled thread. 867 * 868 * If the task cannot be submitted for execution, either because this 869 * executor has been shutdown or because its capacity has been reached, 870 * the task is handled by the current <tt>RejectedExecutionHandler</tt>. 871 * 872 * @param command the task to execute 873 * @throws RejectedExecutionException at discretion of 874 * <tt>RejectedExecutionHandler</tt>, if task cannot be accepted 875 * for execution 876 * @throws NullPointerException if command is null 877 */ execute(Runnable command)878 public void execute(Runnable command) { 879 if (command == null) 880 throw new NullPointerException(); 881 for (;;) { 882 if (runState != RUNNING) { 883 reject(command); 884 return; 885 } 886 if (poolSize < corePoolSize && addIfUnderCorePoolSize(command)) 887 return; 888 if (workQueue.offer(command)) 889 return; 890 int status = addIfUnderMaximumPoolSize(command); 891 if (status > 0) // created new thread 892 return; 893 if (status == 0) { // failed to create thread 894 reject(command); 895 return; 896 } 897 // Retry if created a new thread but it is busy with another task 898 } 899 } 900 901 /** 902 * Initiates an orderly shutdown in which previously submitted 903 * tasks are executed, but no new tasks will be 904 * accepted. Invocation has no additional effect if already shut 905 * down. 906 * @throws SecurityException if a security manager exists and 907 * shutting down this ExecutorService may manipulate threads that 908 * the caller is not permitted to modify because it does not hold 909 * {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>, 910 * or the security manager's <tt>checkAccess</tt> method denies access. 911 */ shutdown()912 public void shutdown() { 913 // Fail if caller doesn't have modifyThread permission. 914 SecurityManager security = System.getSecurityManager(); 915 if (security != null) 916 security.checkPermission(shutdownPerm); 917 918 boolean fullyTerminated = false; 919 final ReentrantLock mainLock = this.mainLock; 920 mainLock.lock(); 921 try { 922 if (workers.size() > 0) { 923 // Check if caller can modify worker threads. This 924 // might not be true even if passed above check, if 925 // the SecurityManager treats some threads specially. 926 if (security != null) { 927 for (Worker w: workers) 928 security.checkAccess(w.thread); 929 } 930 931 int state = runState; 932 if (state == RUNNING) // don't override shutdownNow 933 runState = SHUTDOWN; 934 935 try { 936 for (Worker w: workers) 937 w.interruptIfIdle(); 938 } catch (SecurityException se) { 939 // If SecurityManager allows above checks, but 940 // then unexpectedly throws exception when 941 // interrupting threads (which it ought not do), 942 // back out as cleanly as we can. Some threads may 943 // have been killed but we remain in non-shutdown 944 // state. 945 runState = state; 946 throw se; 947 } 948 } 949 else { // If no workers, trigger full termination now 950 fullyTerminated = true; 951 runState = TERMINATED; 952 termination.signalAll(); 953 } 954 } finally { 955 mainLock.unlock(); 956 } 957 if (fullyTerminated) 958 terminated(); 959 } 960 961 962 /** 963 * Attempts to stop all actively executing tasks, halts the 964 * processing of waiting tasks, and returns a list of the tasks 965 * that were awaiting execution. 966 * 967 * <p>There are no guarantees beyond best-effort attempts to stop 968 * processing actively executing tasks. This implementation 969 * cancels tasks via {@link Thread#interrupt}, so any task that 970 * fails to respond to interrupts may never terminate. 971 * 972 * @return list of tasks that never commenced execution 973 * @throws SecurityException if a security manager exists and 974 * shutting down this ExecutorService may manipulate threads that 975 * the caller is not permitted to modify because it does not hold 976 * {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>, 977 * or the security manager's <tt>checkAccess</tt> method denies access. 978 */ shutdownNow()979 public List<Runnable> shutdownNow() { 980 // Almost the same code as shutdown() 981 SecurityManager security = System.getSecurityManager(); 982 if (security != null) 983 security.checkPermission(shutdownPerm); 984 985 boolean fullyTerminated = false; 986 final ReentrantLock mainLock = this.mainLock; 987 mainLock.lock(); 988 try { 989 if (workers.size() > 0) { 990 if (security != null) { 991 for (Worker w: workers) 992 security.checkAccess(w.thread); 993 } 994 995 int state = runState; 996 if (state != TERMINATED) 997 runState = STOP; 998 try { 999 for (Worker w : workers) 1000 w.interruptNow(); 1001 } catch (SecurityException se) { 1002 runState = state; // back out; 1003 throw se; 1004 } 1005 } 1006 else { // If no workers, trigger full termination now 1007 fullyTerminated = true; 1008 runState = TERMINATED; 1009 termination.signalAll(); 1010 } 1011 } finally { 1012 mainLock.unlock(); 1013 } 1014 if (fullyTerminated) 1015 terminated(); 1016 return Arrays.asList(workQueue.toArray(EMPTY_RUNNABLE_ARRAY)); 1017 } 1018 isShutdown()1019 public boolean isShutdown() { 1020 return runState != RUNNING; 1021 } 1022 1023 /** 1024 * Returns true if this executor is in the process of terminating 1025 * after <tt>shutdown</tt> or <tt>shutdownNow</tt> but has not 1026 * completely terminated. This method may be useful for 1027 * debugging. A return of <tt>true</tt> reported a sufficient 1028 * period after shutdown may indicate that submitted tasks have 1029 * ignored or suppressed interruption, causing this executor not 1030 * to properly terminate. 1031 * @return true if terminating but not yet terminated. 1032 */ isTerminating()1033 public boolean isTerminating() { 1034 return runState == STOP; 1035 } 1036 isTerminated()1037 public boolean isTerminated() { 1038 return runState == TERMINATED; 1039 } 1040 awaitTermination(long timeout, TimeUnit unit)1041 public boolean awaitTermination(long timeout, TimeUnit unit) 1042 throws InterruptedException { 1043 long nanos = unit.toNanos(timeout); 1044 final ReentrantLock mainLock = this.mainLock; 1045 mainLock.lock(); 1046 try { 1047 for (;;) { 1048 if (runState == TERMINATED) 1049 return true; 1050 if (nanos <= 0) 1051 return false; 1052 nanos = termination.awaitNanos(nanos); 1053 } 1054 } finally { 1055 mainLock.unlock(); 1056 } 1057 } 1058 1059 /** 1060 * Invokes <tt>shutdown</tt> when this executor is no longer 1061 * referenced. 1062 */ finalize()1063 protected void finalize() { 1064 shutdown(); 1065 } 1066 1067 /** 1068 * Sets the thread factory used to create new threads. 1069 * 1070 * @param threadFactory the new thread factory 1071 * @throws NullPointerException if threadFactory is null 1072 * @see #getThreadFactory 1073 */ setThreadFactory(ThreadFactory threadFactory)1074 public void setThreadFactory(ThreadFactory threadFactory) { 1075 if (threadFactory == null) 1076 throw new NullPointerException(); 1077 this.threadFactory = threadFactory; 1078 } 1079 1080 /** 1081 * Returns the thread factory used to create new threads. 1082 * 1083 * @return the current thread factory 1084 * @see #setThreadFactory 1085 */ getThreadFactory()1086 public ThreadFactory getThreadFactory() { 1087 return threadFactory; 1088 } 1089 1090 /** 1091 * Sets a new handler for unexecutable tasks. 1092 * 1093 * @param handler the new handler 1094 * @throws NullPointerException if handler is null 1095 * @see #getRejectedExecutionHandler 1096 */ setRejectedExecutionHandler(RejectedExecutionHandler handler)1097 public void setRejectedExecutionHandler(RejectedExecutionHandler handler) { 1098 if (handler == null) 1099 throw new NullPointerException(); 1100 this.handler = handler; 1101 } 1102 1103 /** 1104 * Returns the current handler for unexecutable tasks. 1105 * 1106 * @return the current handler 1107 * @see #setRejectedExecutionHandler 1108 */ getRejectedExecutionHandler()1109 public RejectedExecutionHandler getRejectedExecutionHandler() { 1110 return handler; 1111 } 1112 1113 /** 1114 * Returns the task queue used by this executor. Access to the 1115 * task queue is intended primarily for debugging and monitoring. 1116 * This queue may be in active use. Retrieving the task queue 1117 * does not prevent queued tasks from executing. 1118 * 1119 * @return the task queue 1120 */ getQueue()1121 public BlockingQueue<Runnable> getQueue() { 1122 return workQueue; 1123 } 1124 1125 /** 1126 * Removes this task from the executor's internal queue if it is 1127 * present, thus causing it not to be run if it has not already 1128 * started. 1129 * 1130 * <p> This method may be useful as one part of a cancellation 1131 * scheme. It may fail to remove tasks that have been converted 1132 * into other forms before being placed on the internal queue. For 1133 * example, a task entered using <tt>submit</tt> might be 1134 * converted into a form that maintains <tt>Future</tt> status. 1135 * However, in such cases, method {@link ThreadPoolExecutor#purge} 1136 * may be used to remove those Futures that have been cancelled. 1137 * 1138 * @param task the task to remove 1139 * @return true if the task was removed 1140 */ remove(Runnable task)1141 public boolean remove(Runnable task) { 1142 return getQueue().remove(task); 1143 } 1144 1145 1146 /** 1147 * Tries to remove from the work queue all {@link Future} 1148 * tasks that have been cancelled. This method can be useful as a 1149 * storage reclamation operation, that has no other impact on 1150 * functionality. Cancelled tasks are never executed, but may 1151 * accumulate in work queues until worker threads can actively 1152 * remove them. Invoking this method instead tries to remove them now. 1153 * However, this method may fail to remove tasks in 1154 * the presence of interference by other threads. 1155 */ purge()1156 public void purge() { 1157 // Fail if we encounter interference during traversal 1158 try { 1159 Iterator<Runnable> it = getQueue().iterator(); 1160 while (it.hasNext()) { 1161 Runnable r = it.next(); 1162 if (r instanceof Future<?>) { 1163 Future<?> c = (Future<?>)r; 1164 if (c.isCancelled()) 1165 it.remove(); 1166 } 1167 } 1168 } 1169 catch (ConcurrentModificationException ex) { 1170 return; 1171 } 1172 } 1173 1174 /** 1175 * Sets the core number of threads. This overrides any value set 1176 * in the constructor. If the new value is smaller than the 1177 * current value, excess existing threads will be terminated when 1178 * they next become idle. If larger, new threads will, if needed, 1179 * be started to execute any queued tasks. 1180 * 1181 * @param corePoolSize the new core size 1182 * @throws IllegalArgumentException if <tt>corePoolSize</tt> 1183 * less than zero 1184 * @see #getCorePoolSize 1185 */ setCorePoolSize(int corePoolSize)1186 public void setCorePoolSize(int corePoolSize) { 1187 if (corePoolSize < 0) 1188 throw new IllegalArgumentException(); 1189 final ReentrantLock mainLock = this.mainLock; 1190 mainLock.lock(); 1191 try { 1192 int extra = this.corePoolSize - corePoolSize; 1193 this.corePoolSize = corePoolSize; 1194 if (extra < 0) { 1195 int n = workQueue.size(); 1196 // We have to create initially-idle threads here 1197 // because we otherwise have no recourse about 1198 // what to do with a dequeued task if addThread fails. 1199 while (extra++ < 0 && n-- > 0 && poolSize < corePoolSize ) { 1200 Thread t = addThread(null); 1201 if (t != null) 1202 t.start(); 1203 else 1204 break; 1205 } 1206 } 1207 else if (extra > 0 && poolSize > corePoolSize) { 1208 Iterator<Worker> it = workers.iterator(); 1209 while (it.hasNext() && 1210 extra-- > 0 && 1211 poolSize > corePoolSize && 1212 workQueue.remainingCapacity() == 0) 1213 it.next().interruptIfIdle(); 1214 } 1215 } finally { 1216 mainLock.unlock(); 1217 } 1218 } 1219 1220 /** 1221 * Returns the core number of threads. 1222 * 1223 * @return the core number of threads 1224 * @see #setCorePoolSize 1225 */ getCorePoolSize()1226 public int getCorePoolSize() { 1227 return corePoolSize; 1228 } 1229 1230 /** 1231 * Starts a core thread, causing it to idly wait for work. This 1232 * overrides the default policy of starting core threads only when 1233 * new tasks are executed. This method will return <tt>false</tt> 1234 * if all core threads have already been started. 1235 * @return true if a thread was started 1236 */ prestartCoreThread()1237 public boolean prestartCoreThread() { 1238 return addIfUnderCorePoolSize(null); 1239 } 1240 1241 /** 1242 * Starts all core threads, causing them to idly wait for work. This 1243 * overrides the default policy of starting core threads only when 1244 * new tasks are executed. 1245 * @return the number of threads started. 1246 */ prestartAllCoreThreads()1247 public int prestartAllCoreThreads() { 1248 int n = 0; 1249 while (addIfUnderCorePoolSize(null)) 1250 ++n; 1251 return n; 1252 } 1253 1254 /** 1255 * Returns true if this pool allows core threads to time out and 1256 * terminate if no tasks arrive within the keepAlive time, being 1257 * replaced if needed when new tasks arrive. When true, the same 1258 * keep-alive policy applying to non-core threads applies also to 1259 * core threads. When false (the default), core threads are never 1260 * terminated due to lack of incoming tasks. 1261 * @return <tt>true</tt> if core threads are allowed to time out, 1262 * else <tt>false</tt> 1263 * 1264 * @since 1.6 1265 */ allowsCoreThreadTimeOut()1266 public boolean allowsCoreThreadTimeOut() { 1267 return allowCoreThreadTimeOut; 1268 } 1269 1270 /** 1271 * Sets the policy governing whether core threads may time out and 1272 * terminate if no tasks arrive within the keep-alive time, being 1273 * replaced if needed when new tasks arrive. When false, core 1274 * threads are never terminated due to lack of incoming 1275 * tasks. When true, the same keep-alive policy applying to 1276 * non-core threads applies also to core threads. To avoid 1277 * continual thread replacement, the keep-alive time must be 1278 * greater than zero when setting <tt>true</tt>. This method 1279 * should in general be called before the pool is actively used. 1280 * @param value <tt>true</tt> if should time out, else <tt>false</tt> 1281 * @throws IllegalArgumentException if value is <tt>true</tt> 1282 * and the current keep-alive time is not greater than zero. 1283 * 1284 * @since 1.6 1285 */ allowCoreThreadTimeOut(boolean value)1286 public void allowCoreThreadTimeOut(boolean value) { 1287 if (value && keepAliveTime <= 0) 1288 throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); 1289 1290 allowCoreThreadTimeOut = value; 1291 } 1292 1293 /** 1294 * Sets the maximum allowed number of threads. This overrides any 1295 * value set in the constructor. If the new value is smaller than 1296 * the current value, excess existing threads will be 1297 * terminated when they next become idle. 1298 * 1299 * @param maximumPoolSize the new maximum 1300 * @throws IllegalArgumentException if the new maximum is 1301 * less than or equal to zero, or 1302 * less than the {@linkplain #getCorePoolSize core pool size} 1303 * @see #getMaximumPoolSize 1304 */ setMaximumPoolSize(int maximumPoolSize)1305 public void setMaximumPoolSize(int maximumPoolSize) { 1306 if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) 1307 throw new IllegalArgumentException(); 1308 final ReentrantLock mainLock = this.mainLock; 1309 mainLock.lock(); 1310 try { 1311 int extra = this.maximumPoolSize - maximumPoolSize; 1312 this.maximumPoolSize = maximumPoolSize; 1313 if (extra > 0 && poolSize > maximumPoolSize) { 1314 Iterator<Worker> it = workers.iterator(); 1315 while (it.hasNext() && 1316 extra > 0 && 1317 poolSize > maximumPoolSize) { 1318 it.next().interruptIfIdle(); 1319 --extra; 1320 } 1321 } 1322 } finally { 1323 mainLock.unlock(); 1324 } 1325 } 1326 1327 /** 1328 * Returns the maximum allowed number of threads. 1329 * 1330 * @return the maximum allowed number of threads 1331 * @see #setMaximumPoolSize 1332 */ getMaximumPoolSize()1333 public int getMaximumPoolSize() { 1334 return maximumPoolSize; 1335 } 1336 1337 /** 1338 * Sets the time limit for which threads may remain idle before 1339 * being terminated. If there are more than the core number of 1340 * threads currently in the pool, after waiting this amount of 1341 * time without processing a task, excess threads will be 1342 * terminated. This overrides any value set in the constructor. 1343 * @param time the time to wait. A time value of zero will cause 1344 * excess threads to terminate immediately after executing tasks. 1345 * @param unit the time unit of the time argument 1346 * @throws IllegalArgumentException if time less than zero or 1347 * if time is zero and allowsCoreThreadTimeOut 1348 * @see #getKeepAliveTime 1349 */ setKeepAliveTime(long time, TimeUnit unit)1350 public void setKeepAliveTime(long time, TimeUnit unit) { 1351 if (time < 0) 1352 throw new IllegalArgumentException(); 1353 if (time == 0 && allowsCoreThreadTimeOut()) 1354 throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); 1355 this.keepAliveTime = unit.toNanos(time); 1356 } 1357 1358 /** 1359 * Returns the thread keep-alive time, which is the amount of time 1360 * which threads in excess of the core pool size may remain 1361 * idle before being terminated. 1362 * 1363 * @param unit the desired time unit of the result 1364 * @return the time limit 1365 * @see #setKeepAliveTime 1366 */ getKeepAliveTime(TimeUnit unit)1367 public long getKeepAliveTime(TimeUnit unit) { 1368 return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS); 1369 } 1370 1371 /* Statistics */ 1372 1373 /** 1374 * Returns the current number of threads in the pool. 1375 * 1376 * @return the number of threads 1377 */ getPoolSize()1378 public int getPoolSize() { 1379 return poolSize; 1380 } 1381 1382 /** 1383 * Returns the approximate number of threads that are actively 1384 * executing tasks. 1385 * 1386 * @return the number of threads 1387 */ getActiveCount()1388 public int getActiveCount() { 1389 final ReentrantLock mainLock = this.mainLock; 1390 mainLock.lock(); 1391 try { 1392 int n = 0; 1393 for (Worker w : workers) { 1394 if (w.isActive()) 1395 ++n; 1396 } 1397 return n; 1398 } finally { 1399 mainLock.unlock(); 1400 } 1401 } 1402 1403 /** 1404 * Returns the largest number of threads that have ever 1405 * simultaneously been in the pool. 1406 * 1407 * @return the number of threads 1408 */ getLargestPoolSize()1409 public int getLargestPoolSize() { 1410 final ReentrantLock mainLock = this.mainLock; 1411 mainLock.lock(); 1412 try { 1413 return largestPoolSize; 1414 } finally { 1415 mainLock.unlock(); 1416 } 1417 } 1418 1419 /** 1420 * Returns the approximate total number of tasks that have been 1421 * scheduled for execution. Because the states of tasks and 1422 * threads may change dynamically during computation, the returned 1423 * value is only an approximation, but one that does not ever 1424 * decrease across successive calls. 1425 * 1426 * @return the number of tasks 1427 */ getTaskCount()1428 public long getTaskCount() { 1429 final ReentrantLock mainLock = this.mainLock; 1430 mainLock.lock(); 1431 try { 1432 long n = completedTaskCount; 1433 for (Worker w : workers) { 1434 n += w.completedTasks; 1435 if (w.isActive()) 1436 ++n; 1437 } 1438 return n + workQueue.size(); 1439 } finally { 1440 mainLock.unlock(); 1441 } 1442 } 1443 1444 /** 1445 * Returns the approximate total number of tasks that have 1446 * completed execution. Because the states of tasks and threads 1447 * may change dynamically during computation, the returned value 1448 * is only an approximation, but one that does not ever decrease 1449 * across successive calls. 1450 * 1451 * @return the number of tasks 1452 */ getCompletedTaskCount()1453 public long getCompletedTaskCount() { 1454 final ReentrantLock mainLock = this.mainLock; 1455 mainLock.lock(); 1456 try { 1457 long n = completedTaskCount; 1458 for (Worker w : workers) 1459 n += w.completedTasks; 1460 return n; 1461 } finally { 1462 mainLock.unlock(); 1463 } 1464 } 1465 1466 /** 1467 * Method invoked prior to executing the given Runnable in the 1468 * given thread. This method is invoked by thread <tt>t</tt> that 1469 * will execute task <tt>r</tt>, and may be used to re-initialize 1470 * ThreadLocals, or to perform logging. 1471 * 1472 * <p>This implementation does nothing, but may be customized in 1473 * subclasses. Note: To properly nest multiple overridings, subclasses 1474 * should generally invoke <tt>super.beforeExecute</tt> at the end of 1475 * this method. 1476 * 1477 * @param t the thread that will run task r. 1478 * @param r the task that will be executed. 1479 */ beforeExecute(Thread t, Runnable r)1480 protected void beforeExecute(Thread t, Runnable r) { } 1481 1482 /** 1483 * Method invoked upon completion of execution of the given Runnable. 1484 * This method is invoked by the thread that executed the task. If 1485 * non-null, the Throwable is the uncaught <tt>RuntimeException</tt> 1486 * or <tt>Error</tt> that caused execution to terminate abruptly. 1487 * 1488 * <p><b>Note:</b> When actions are enclosed in tasks (such as 1489 * {@link FutureTask}) either explicitly or via methods such as 1490 * <tt>submit</tt>, these task objects catch and maintain 1491 * computational exceptions, and so they do not cause abrupt 1492 * termination, and the internal exceptions are <em>not</em> 1493 * passed to this method. 1494 * 1495 * <p>This implementation does nothing, but may be customized in 1496 * subclasses. Note: To properly nest multiple overridings, subclasses 1497 * should generally invoke <tt>super.afterExecute</tt> at the 1498 * beginning of this method. 1499 * 1500 * @param r the runnable that has completed. 1501 * @param t the exception that caused termination, or null if 1502 * execution completed normally. 1503 */ afterExecute(Runnable r, Throwable t)1504 protected void afterExecute(Runnable r, Throwable t) { } 1505 1506 /** 1507 * Method invoked when the Executor has terminated. Default 1508 * implementation does nothing. Note: To properly nest multiple 1509 * overridings, subclasses should generally invoke 1510 * <tt>super.terminated</tt> within this method. 1511 */ terminated()1512 protected void terminated() { } 1513 1514 /** 1515 * A handler for rejected tasks that runs the rejected task 1516 * directly in the calling thread of the <tt>execute</tt> method, 1517 * unless the executor has been shut down, in which case the task 1518 * is discarded. 1519 */ 1520 public static class CallerRunsPolicy implements RejectedExecutionHandler { 1521 /** 1522 * Creates a <tt>CallerRunsPolicy</tt>. 1523 */ CallerRunsPolicy()1524 public CallerRunsPolicy() { } 1525 1526 /** 1527 * Executes task r in the caller's thread, unless the executor 1528 * has been shut down, in which case the task is discarded. 1529 * @param r the runnable task requested to be executed 1530 * @param e the executor attempting to execute this task 1531 */ rejectedExecution(Runnable r, ThreadPoolExecutor e)1532 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 1533 if (!e.isShutdown()) { 1534 r.run(); 1535 } 1536 } 1537 } 1538 1539 /** 1540 * A handler for rejected tasks that throws a 1541 * <tt>RejectedExecutionException</tt>. 1542 */ 1543 public static class AbortPolicy implements RejectedExecutionHandler { 1544 /** 1545 * Creates an <tt>AbortPolicy</tt>. 1546 */ AbortPolicy()1547 public AbortPolicy() { } 1548 1549 /** 1550 * Always throws RejectedExecutionException. 1551 * @param r the runnable task requested to be executed 1552 * @param e the executor attempting to execute this task 1553 * @throws RejectedExecutionException always. 1554 */ rejectedExecution(Runnable r, ThreadPoolExecutor e)1555 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 1556 throw new RejectedExecutionException(); 1557 } 1558 } 1559 1560 /** 1561 * A handler for rejected tasks that silently discards the 1562 * rejected task. 1563 */ 1564 public static class DiscardPolicy implements RejectedExecutionHandler { 1565 /** 1566 * Creates a <tt>DiscardPolicy</tt>. 1567 */ DiscardPolicy()1568 public DiscardPolicy() { } 1569 1570 /** 1571 * Does nothing, which has the effect of discarding task r. 1572 * @param r the runnable task requested to be executed 1573 * @param e the executor attempting to execute this task 1574 */ rejectedExecution(Runnable r, ThreadPoolExecutor e)1575 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 1576 } 1577 } 1578 1579 /** 1580 * A handler for rejected tasks that discards the oldest unhandled 1581 * request and then retries <tt>execute</tt>, unless the executor 1582 * is shut down, in which case the task is discarded. 1583 */ 1584 public static class DiscardOldestPolicy implements RejectedExecutionHandler { 1585 /** 1586 * Creates a <tt>DiscardOldestPolicy</tt> for the given executor. 1587 */ DiscardOldestPolicy()1588 public DiscardOldestPolicy() { } 1589 1590 /** 1591 * Obtains and ignores the next task that the executor 1592 * would otherwise execute, if one is immediately available, 1593 * and then retries execution of task r, unless the executor 1594 * is shut down, in which case task r is instead discarded. 1595 * @param r the runnable task requested to be executed 1596 * @param e the executor attempting to execute this task 1597 */ rejectedExecution(Runnable r, ThreadPoolExecutor e)1598 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 1599 if (!e.isShutdown()) { 1600 e.getQueue().poll(); 1601 e.execute(r); 1602 } 1603 } 1604 } 1605 } 1606