1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 38 import java.security.AccessControlContext; 39 import java.security.AccessController; 40 import java.security.PrivilegedAction; 41 import java.util.concurrent.locks.AbstractQueuedSynchronizer; 42 import java.util.concurrent.locks.Condition; 43 import java.util.concurrent.locks.ReentrantLock; 44 import java.util.concurrent.atomic.AtomicInteger; 45 import java.util.*; 46 47 /** 48 * An {@link ExecutorService} that executes each submitted task using 49 * one of possibly several pooled threads, normally configured 50 * using {@link Executors} factory methods. 51 * 52 * <p>Thread pools address two different problems: they usually 53 * provide improved performance when executing large numbers of 54 * asynchronous tasks, due to reduced per-task invocation overhead, 55 * and they provide a means of bounding and managing the resources, 56 * including threads, consumed when executing a collection of tasks. 57 * Each {@code ThreadPoolExecutor} also maintains some basic 58 * statistics, such as the number of completed tasks. 59 * 60 * <p>To be useful across a wide range of contexts, this class 61 * provides many adjustable parameters and extensibility 62 * hooks. However, programmers are urged to use the more convenient 63 * {@link Executors} factory methods {@link 64 * Executors#newCachedThreadPool} (unbounded thread pool, with 65 * automatic thread reclamation), {@link Executors#newFixedThreadPool} 66 * (fixed size thread pool) and {@link 67 * Executors#newSingleThreadExecutor} (single background thread), that 68 * preconfigure settings for the most common usage 69 * scenarios. Otherwise, use the following guide when manually 70 * configuring and tuning this class: 71 * 72 * <dl> 73 * 74 * <dt>Core and maximum pool sizes</dt> 75 * 76 * <dd>A {@code ThreadPoolExecutor} will automatically adjust the 77 * pool size (see {@link #getPoolSize}) 78 * according to the bounds set by 79 * corePoolSize (see {@link #getCorePoolSize}) and 80 * maximumPoolSize (see {@link #getMaximumPoolSize}). 81 * 82 * When a new task is submitted in method {@link #execute(Runnable)}, 83 * and fewer than corePoolSize threads are running, a new thread is 84 * created to handle the request, even if other worker threads are 85 * idle. If there are more than corePoolSize but less than 86 * maximumPoolSize threads running, a new thread will be created only 87 * if the queue is full. By setting corePoolSize and maximumPoolSize 88 * the same, you create a fixed-size thread pool. By setting 89 * maximumPoolSize to an essentially unbounded value such as {@code 90 * Integer.MAX_VALUE}, you allow the pool to accommodate an arbitrary 91 * number of concurrent tasks. Most typically, core and maximum pool 92 * sizes are set only upon construction, but they may also be changed 93 * dynamically using {@link #setCorePoolSize} and {@link 94 * #setMaximumPoolSize}. </dd> 95 * 96 * <dt>On-demand construction</dt> 97 * 98 * <dd>By default, even core threads are initially created and 99 * started only when new tasks arrive, but this can be overridden 100 * dynamically using method {@link #prestartCoreThread} or {@link 101 * #prestartAllCoreThreads}. You probably want to prestart threads if 102 * you construct the pool with a non-empty queue. </dd> 103 * 104 * <dt>Creating new threads</dt> 105 * 106 * <dd>New threads are created using a {@link ThreadFactory}. If not 107 * otherwise specified, a {@link Executors#defaultThreadFactory} is 108 * used, that creates threads to all be in the same {@link 109 * ThreadGroup} and with the same {@code NORM_PRIORITY} priority and 110 * non-daemon status. By supplying a different ThreadFactory, you can 111 * alter the thread's name, thread group, priority, daemon status, 112 * etc. If a {@code ThreadFactory} fails to create a thread when asked 113 * by returning null from {@code newThread}, the executor will 114 * continue, but might not be able to execute any tasks. Threads 115 * should possess the "modifyThread" {@code RuntimePermission}. If 116 * worker threads or other threads using the pool do not possess this 117 * permission, service may be degraded: configuration changes may not 118 * take effect in a timely manner, and a shutdown pool may remain in a 119 * state in which termination is possible but not completed.</dd> 120 * 121 * <dt>Keep-alive times</dt> 122 * 123 * <dd>If the pool currently has more than corePoolSize threads, 124 * excess threads will be terminated if they have been idle for more 125 * than the keepAliveTime (see {@link #getKeepAliveTime(TimeUnit)}). 126 * This provides a means of reducing resource consumption when the 127 * pool is not being actively used. If the pool becomes more active 128 * later, new threads will be constructed. This parameter can also be 129 * changed dynamically using method {@link #setKeepAliveTime(long, 130 * TimeUnit)}. Using a value of {@code Long.MAX_VALUE} {@link 131 * TimeUnit#NANOSECONDS} effectively disables idle threads from ever 132 * terminating prior to shut down. By default, the keep-alive policy 133 * applies only when there are more than corePoolSize threads. But 134 * method {@link #allowCoreThreadTimeOut(boolean)} can be used to 135 * apply this time-out policy to core threads as well, so long as the 136 * keepAliveTime value is non-zero. </dd> 137 * 138 * <dt>Queuing</dt> 139 * 140 * <dd>Any {@link BlockingQueue} may be used to transfer and hold 141 * submitted tasks. The use of this queue interacts with pool sizing: 142 * 143 * <ul> 144 * 145 * <li> If fewer than corePoolSize threads are running, the Executor 146 * always prefers adding a new thread 147 * rather than queuing.</li> 148 * 149 * <li> If corePoolSize or more threads are running, the Executor 150 * always prefers queuing a request rather than adding a new 151 * thread.</li> 152 * 153 * <li> If a request cannot be queued, a new thread is created unless 154 * this would exceed maximumPoolSize, in which case, the task will be 155 * rejected.</li> 156 * 157 * </ul> 158 * 159 * There are three general strategies for queuing: 160 * <ol> 161 * 162 * <li> <em> Direct handoffs.</em> A good default choice for a work 163 * queue is a {@link SynchronousQueue} that hands off tasks to threads 164 * without otherwise holding them. Here, an attempt to queue a task 165 * will fail if no threads are immediately available to run it, so a 166 * new thread will be constructed. This policy avoids lockups when 167 * handling sets of requests that might have internal dependencies. 168 * Direct handoffs generally require unbounded maximumPoolSizes to 169 * avoid rejection of new submitted tasks. This in turn admits the 170 * possibility of unbounded thread growth when commands continue to 171 * arrive on average faster than they can be processed. </li> 172 * 173 * <li><em> Unbounded queues.</em> Using an unbounded queue (for 174 * example a {@link LinkedBlockingQueue} without a predefined 175 * capacity) will cause new tasks to wait in the queue when all 176 * corePoolSize threads are busy. Thus, no more than corePoolSize 177 * threads will ever be created. (And the value of the maximumPoolSize 178 * therefore doesn't have any effect.) This may be appropriate when 179 * each task is completely independent of others, so tasks cannot 180 * affect each others execution; for example, in a web page server. 181 * While this style of queuing can be useful in smoothing out 182 * transient bursts of requests, it admits the possibility of 183 * unbounded work queue growth when commands continue to arrive on 184 * average faster than they can be processed. </li> 185 * 186 * <li><em>Bounded queues.</em> A bounded queue (for example, an 187 * {@link ArrayBlockingQueue}) helps prevent resource exhaustion when 188 * used with finite maximumPoolSizes, but can be more difficult to 189 * tune and control. Queue sizes and maximum pool sizes may be traded 190 * off for each other: Using large queues and small pools minimizes 191 * CPU usage, OS resources, and context-switching overhead, but can 192 * lead to artificially low throughput. If tasks frequently block (for 193 * example if they are I/O bound), a system may be able to schedule 194 * time for more threads than you otherwise allow. Use of small queues 195 * generally requires larger pool sizes, which keeps CPUs busier but 196 * may encounter unacceptable scheduling overhead, which also 197 * decreases throughput. </li> 198 * 199 * </ol> 200 * 201 * </dd> 202 * 203 * <dt>Rejected tasks</dt> 204 * 205 * <dd>New tasks submitted in method {@link #execute(Runnable)} will be 206 * <em>rejected</em> when the Executor has been shut down, and also when 207 * the Executor uses finite bounds for both maximum threads and work queue 208 * capacity, and is saturated. In either case, the {@code execute} method 209 * invokes the {@link 210 * RejectedExecutionHandler#rejectedExecution(Runnable, ThreadPoolExecutor)} 211 * method of its {@link RejectedExecutionHandler}. Four predefined handler 212 * policies are provided: 213 * 214 * <ol> 215 * 216 * <li> In the default {@link ThreadPoolExecutor.AbortPolicy}, the 217 * handler throws a runtime {@link RejectedExecutionException} upon 218 * rejection. </li> 219 * 220 * <li> In {@link ThreadPoolExecutor.CallerRunsPolicy}, the thread 221 * that invokes {@code execute} itself runs the task. This provides a 222 * simple feedback control mechanism that will slow down the rate that 223 * new tasks are submitted. </li> 224 * 225 * <li> In {@link ThreadPoolExecutor.DiscardPolicy}, a task that 226 * cannot be executed is simply dropped. </li> 227 * 228 * <li>In {@link ThreadPoolExecutor.DiscardOldestPolicy}, if the 229 * executor is not shut down, the task at the head of the work queue 230 * is dropped, and then execution is retried (which can fail again, 231 * causing this to be repeated.) </li> 232 * 233 * </ol> 234 * 235 * It is possible to define and use other kinds of {@link 236 * RejectedExecutionHandler} classes. Doing so requires some care 237 * especially when policies are designed to work only under particular 238 * capacity or queuing policies. </dd> 239 * 240 * <dt>Hook methods</dt> 241 * 242 * <dd>This class provides {@code protected} overridable 243 * {@link #beforeExecute(Thread, Runnable)} and 244 * {@link #afterExecute(Runnable, Throwable)} methods that are called 245 * before and after execution of each task. These can be used to 246 * manipulate the execution environment; for example, reinitializing 247 * ThreadLocals, gathering statistics, or adding log entries. 248 * Additionally, method {@link #terminated} can be overridden to perform 249 * any special processing that needs to be done once the Executor has 250 * fully terminated. 251 * 252 * <p>If hook or callback methods throw exceptions, internal worker 253 * threads may in turn fail and abruptly terminate.</dd> 254 * 255 * <dt>Queue maintenance</dt> 256 * 257 * <dd>Method {@link #getQueue()} allows access to the work queue 258 * for purposes of monitoring and debugging. Use of this method for 259 * any other purpose is strongly discouraged. Two supplied methods, 260 * {@link #remove(Runnable)} and {@link #purge} are available to 261 * assist in storage reclamation when large numbers of queued tasks 262 * become cancelled.</dd> 263 * 264 * <dt>Finalization</dt> 265 * 266 * <dd>A pool that is no longer referenced in a program <em>AND</em> 267 * has no remaining threads will be {@code shutdown} automatically. If 268 * you would like to ensure that unreferenced pools are reclaimed even 269 * if users forget to call {@link #shutdown}, then you must arrange 270 * that unused threads eventually die, by setting appropriate 271 * keep-alive times, using a lower bound of zero core threads and/or 272 * setting {@link #allowCoreThreadTimeOut(boolean)}. </dd> 273 * 274 * </dl> 275 * 276 * <p><b>Extension example</b>. Most extensions of this class 277 * override one or more of the protected hook methods. For example, 278 * here is a subclass that adds a simple pause/resume feature: 279 * 280 * <pre> {@code 281 * class PausableThreadPoolExecutor extends ThreadPoolExecutor { 282 * private boolean isPaused; 283 * private ReentrantLock pauseLock = new ReentrantLock(); 284 * private Condition unpaused = pauseLock.newCondition(); 285 * 286 * public PausableThreadPoolExecutor(...) { super(...); } 287 * 288 * protected void beforeExecute(Thread t, Runnable r) { 289 * super.beforeExecute(t, r); 290 * pauseLock.lock(); 291 * try { 292 * while (isPaused) unpaused.await(); 293 * } catch (InterruptedException ie) { 294 * t.interrupt(); 295 * } finally { 296 * pauseLock.unlock(); 297 * } 298 * } 299 * 300 * public void pause() { 301 * pauseLock.lock(); 302 * try { 303 * isPaused = true; 304 * } finally { 305 * pauseLock.unlock(); 306 * } 307 * } 308 * 309 * public void resume() { 310 * pauseLock.lock(); 311 * try { 312 * isPaused = false; 313 * unpaused.signalAll(); 314 * } finally { 315 * pauseLock.unlock(); 316 * } 317 * } 318 * }}</pre> 319 * 320 * @since 1.5 321 * @author Doug Lea 322 */ 323 public class ThreadPoolExecutor extends AbstractExecutorService { 324 /** 325 * The main pool control state, ctl, is an atomic integer packing 326 * two conceptual fields 327 * workerCount, indicating the effective number of threads 328 * runState, indicating whether running, shutting down etc 329 * 330 * In order to pack them into one int, we limit workerCount to 331 * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2 332 * billion) otherwise representable. If this is ever an issue in 333 * the future, the variable can be changed to be an AtomicLong, 334 * and the shift/mask constants below adjusted. But until the need 335 * arises, this code is a bit faster and simpler using an int. 336 * 337 * The workerCount is the number of workers that have been 338 * permitted to start and not permitted to stop. The value may be 339 * transiently different from the actual number of live threads, 340 * for example when a ThreadFactory fails to create a thread when 341 * asked, and when exiting threads are still performing 342 * bookkeeping before terminating. The user-visible pool size is 343 * reported as the current size of the workers set. 344 * 345 * The runState provides the main lifecycle control, taking on values: 346 * 347 * RUNNING: Accept new tasks and process queued tasks 348 * SHUTDOWN: Don't accept new tasks, but process queued tasks 349 * STOP: Don't accept new tasks, don't process queued tasks, 350 * and interrupt in-progress tasks 351 * TIDYING: All tasks have terminated, workerCount is zero, 352 * the thread transitioning to state TIDYING 353 * will run the terminated() hook method 354 * TERMINATED: terminated() has completed 355 * 356 * The numerical order among these values matters, to allow 357 * ordered comparisons. The runState monotonically increases over 358 * time, but need not hit each state. The transitions are: 359 * 360 * RUNNING -> SHUTDOWN 361 * On invocation of shutdown(), perhaps implicitly in finalize() 362 * (RUNNING or SHUTDOWN) -> STOP 363 * On invocation of shutdownNow() 364 * SHUTDOWN -> TIDYING 365 * When both queue and pool are empty 366 * STOP -> TIDYING 367 * When pool is empty 368 * TIDYING -> TERMINATED 369 * When the terminated() hook method has completed 370 * 371 * Threads waiting in awaitTermination() will return when the 372 * state reaches TERMINATED. 373 * 374 * Detecting the transition from SHUTDOWN to TIDYING is less 375 * straightforward than you'd like because the queue may become 376 * empty after non-empty and vice versa during SHUTDOWN state, but 377 * we can only terminate if, after seeing that it is empty, we see 378 * that workerCount is 0 (which sometimes entails a recheck -- see 379 * below). 380 */ 381 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 382 private static final int COUNT_BITS = Integer.SIZE - 3; 383 private static final int CAPACITY = (1 << COUNT_BITS) - 1; 384 385 // runState is stored in the high-order bits 386 private static final int RUNNING = -1 << COUNT_BITS; 387 private static final int SHUTDOWN = 0 << COUNT_BITS; 388 private static final int STOP = 1 << COUNT_BITS; 389 private static final int TIDYING = 2 << COUNT_BITS; 390 private static final int TERMINATED = 3 << COUNT_BITS; 391 392 // Packing and unpacking ctl runStateOf(int c)393 private static int runStateOf(int c) { return c & ~CAPACITY; } workerCountOf(int c)394 private static int workerCountOf(int c) { return c & CAPACITY; } ctlOf(int rs, int wc)395 private static int ctlOf(int rs, int wc) { return rs | wc; } 396 397 /* 398 * Bit field accessors that don't require unpacking ctl. 399 * These depend on the bit layout and on workerCount being never negative. 400 */ 401 runStateLessThan(int c, int s)402 private static boolean runStateLessThan(int c, int s) { 403 return c < s; 404 } 405 runStateAtLeast(int c, int s)406 private static boolean runStateAtLeast(int c, int s) { 407 return c >= s; 408 } 409 isRunning(int c)410 private static boolean isRunning(int c) { 411 return c < SHUTDOWN; 412 } 413 414 /** 415 * Attempts to CAS-increment the workerCount field of ctl. 416 */ compareAndIncrementWorkerCount(int expect)417 private boolean compareAndIncrementWorkerCount(int expect) { 418 return ctl.compareAndSet(expect, expect + 1); 419 } 420 421 /** 422 * Attempts to CAS-decrement the workerCount field of ctl. 423 */ compareAndDecrementWorkerCount(int expect)424 private boolean compareAndDecrementWorkerCount(int expect) { 425 return ctl.compareAndSet(expect, expect - 1); 426 } 427 428 /** 429 * Decrements the workerCount field of ctl. This is called only on 430 * abrupt termination of a thread (see processWorkerExit). Other 431 * decrements are performed within getTask. 432 */ decrementWorkerCount()433 private void decrementWorkerCount() { 434 do {} while (! compareAndDecrementWorkerCount(ctl.get())); 435 } 436 437 /** 438 * The queue used for holding tasks and handing off to worker 439 * threads. We do not require that workQueue.poll() returning 440 * null necessarily means that workQueue.isEmpty(), so rely 441 * solely on isEmpty to see if the queue is empty (which we must 442 * do for example when deciding whether to transition from 443 * SHUTDOWN to TIDYING). This accommodates special-purpose 444 * queues such as DelayQueues for which poll() is allowed to 445 * return null even if it may later return non-null when delays 446 * expire. 447 */ 448 private final BlockingQueue<Runnable> workQueue; 449 450 /** 451 * Lock held on access to workers set and related bookkeeping. 452 * While we could use a concurrent set of some sort, it turns out 453 * to be generally preferable to use a lock. Among the reasons is 454 * that this serializes interruptIdleWorkers, which avoids 455 * unnecessary interrupt storms, especially during shutdown. 456 * Otherwise exiting threads would concurrently interrupt those 457 * that have not yet interrupted. It also simplifies some of the 458 * associated statistics bookkeeping of largestPoolSize etc. We 459 * also hold mainLock on shutdown and shutdownNow, for the sake of 460 * ensuring workers set is stable while separately checking 461 * permission to interrupt and actually interrupting. 462 */ 463 private final ReentrantLock mainLock = new ReentrantLock(); 464 465 /** 466 * Set containing all worker threads in pool. Accessed only when 467 * holding mainLock. 468 */ 469 private final HashSet<Worker> workers = new HashSet<Worker>(); 470 471 /** 472 * Wait condition to support awaitTermination 473 */ 474 private final Condition termination = mainLock.newCondition(); 475 476 /** 477 * Tracks largest attained pool size. Accessed only under 478 * mainLock. 479 */ 480 private int largestPoolSize; 481 482 /** 483 * Counter for completed tasks. Updated only on termination of 484 * worker threads. Accessed only under mainLock. 485 */ 486 private long completedTaskCount; 487 488 /* 489 * All user control parameters are declared as volatiles so that 490 * ongoing actions are based on freshest values, but without need 491 * for locking, since no internal invariants depend on them 492 * changing synchronously with respect to other actions. 493 */ 494 495 /** 496 * Factory for new threads. All threads are created using this 497 * factory (via method addWorker). All callers must be prepared 498 * for addWorker to fail, which may reflect a system or user's 499 * policy limiting the number of threads. Even though it is not 500 * treated as an error, failure to create threads may result in 501 * new tasks being rejected or existing ones remaining stuck in 502 * the queue. 503 * 504 * We go further and preserve pool invariants even in the face of 505 * errors such as OutOfMemoryError, that might be thrown while 506 * trying to create threads. Such errors are rather common due to 507 * the need to allocate a native stack in Thread.start, and users 508 * will want to perform clean pool shutdown to clean up. There 509 * will likely be enough memory available for the cleanup code to 510 * complete without encountering yet another OutOfMemoryError. 511 */ 512 private volatile ThreadFactory threadFactory; 513 514 /** 515 * Handler called when saturated or shutdown in execute. 516 */ 517 private volatile RejectedExecutionHandler handler; 518 519 /** 520 * Timeout in nanoseconds for idle threads waiting for work. 521 * Threads use this timeout when there are more than corePoolSize 522 * present or if allowCoreThreadTimeOut. Otherwise they wait 523 * forever for new work. 524 */ 525 private volatile long keepAliveTime; 526 527 /** 528 * If false (default), core threads stay alive even when idle. 529 * If true, core threads use keepAliveTime to time out waiting 530 * for work. 531 */ 532 private volatile boolean allowCoreThreadTimeOut; 533 534 /** 535 * Core pool size is the minimum number of workers to keep alive 536 * (and not allow to time out etc) unless allowCoreThreadTimeOut 537 * is set, in which case the minimum is zero. 538 */ 539 private volatile int corePoolSize; 540 541 /** 542 * Maximum pool size. Note that the actual maximum is internally 543 * bounded by CAPACITY. 544 */ 545 private volatile int maximumPoolSize; 546 547 /** 548 * The default rejected execution handler 549 */ 550 private static final RejectedExecutionHandler defaultHandler = 551 new AbortPolicy(); 552 553 /** 554 * Permission required for callers of shutdown and shutdownNow. 555 * We additionally require (see checkShutdownAccess) that callers 556 * have permission to actually interrupt threads in the worker set 557 * (as governed by Thread.interrupt, which relies on 558 * ThreadGroup.checkAccess, which in turn relies on 559 * SecurityManager.checkAccess). Shutdowns are attempted only if 560 * these checks pass. 561 * 562 * All actual invocations of Thread.interrupt (see 563 * interruptIdleWorkers and interruptWorkers) ignore 564 * SecurityExceptions, meaning that the attempted interrupts 565 * silently fail. In the case of shutdown, they should not fail 566 * unless the SecurityManager has inconsistent policies, sometimes 567 * allowing access to a thread and sometimes not. In such cases, 568 * failure to actually interrupt threads may disable or delay full 569 * termination. Other uses of interruptIdleWorkers are advisory, 570 * and failure to actually interrupt will merely delay response to 571 * configuration changes so is not handled exceptionally. 572 */ 573 private static final RuntimePermission shutdownPerm = 574 new RuntimePermission("modifyThread"); 575 576 /* The context to be used when executing the finalizer, or null. */ 577 private final AccessControlContext acc; 578 579 /** 580 * Class Worker mainly maintains interrupt control state for 581 * threads running tasks, along with other minor bookkeeping. 582 * This class opportunistically extends AbstractQueuedSynchronizer 583 * to simplify acquiring and releasing a lock surrounding each 584 * task execution. This protects against interrupts that are 585 * intended to wake up a worker thread waiting for a task from 586 * instead interrupting a task being run. We implement a simple 587 * non-reentrant mutual exclusion lock rather than use 588 * ReentrantLock because we do not want worker tasks to be able to 589 * reacquire the lock when they invoke pool control methods like 590 * setCorePoolSize. Additionally, to suppress interrupts until 591 * the thread actually starts running tasks, we initialize lock 592 * state to a negative value, and clear it upon start (in 593 * runWorker). 594 */ 595 private final class Worker 596 extends AbstractQueuedSynchronizer 597 implements Runnable 598 { 599 /** 600 * This class will never be serialized, but we provide a 601 * serialVersionUID to suppress a javac warning. 602 */ 603 private static final long serialVersionUID = 6138294804551838833L; 604 605 /** Thread this worker is running in. Null if factory fails. */ 606 final Thread thread; 607 /** Initial task to run. Possibly null. */ 608 Runnable firstTask; 609 /** Per-thread task counter */ 610 volatile long completedTasks; 611 612 /** 613 * Creates with given first task and thread from ThreadFactory. 614 * @param firstTask the first task (null if none) 615 */ Worker(Runnable firstTask)616 Worker(Runnable firstTask) { 617 setState(-1); // inhibit interrupts until runWorker 618 this.firstTask = firstTask; 619 this.thread = getThreadFactory().newThread(this); 620 } 621 622 /** Delegates main run loop to outer runWorker */ run()623 public void run() { 624 runWorker(this); 625 } 626 627 // Lock methods 628 // 629 // The value 0 represents the unlocked state. 630 // The value 1 represents the locked state. 631 isHeldExclusively()632 protected boolean isHeldExclusively() { 633 return getState() != 0; 634 } 635 tryAcquire(int unused)636 protected boolean tryAcquire(int unused) { 637 if (compareAndSetState(0, 1)) { 638 setExclusiveOwnerThread(Thread.currentThread()); 639 return true; 640 } 641 return false; 642 } 643 tryRelease(int unused)644 protected boolean tryRelease(int unused) { 645 setExclusiveOwnerThread(null); 646 setState(0); 647 return true; 648 } 649 lock()650 public void lock() { acquire(1); } tryLock()651 public boolean tryLock() { return tryAcquire(1); } unlock()652 public void unlock() { release(1); } isLocked()653 public boolean isLocked() { return isHeldExclusively(); } 654 interruptIfStarted()655 void interruptIfStarted() { 656 Thread t; 657 if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { 658 try { 659 t.interrupt(); 660 } catch (SecurityException ignore) { 661 } 662 } 663 } 664 } 665 666 /* 667 * Methods for setting control state 668 */ 669 670 /** 671 * Transitions runState to given target, or leaves it alone if 672 * already at least the given target. 673 * 674 * @param targetState the desired state, either SHUTDOWN or STOP 675 * (but not TIDYING or TERMINATED -- use tryTerminate for that) 676 */ advanceRunState(int targetState)677 private void advanceRunState(int targetState) { 678 for (;;) { 679 int c = ctl.get(); 680 if (runStateAtLeast(c, targetState) || 681 ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) 682 break; 683 } 684 } 685 686 /** 687 * Transitions to TERMINATED state if either (SHUTDOWN and pool 688 * and queue empty) or (STOP and pool empty). If otherwise 689 * eligible to terminate but workerCount is nonzero, interrupts an 690 * idle worker to ensure that shutdown signals propagate. This 691 * method must be called following any action that might make 692 * termination possible -- reducing worker count or removing tasks 693 * from the queue during shutdown. The method is non-private to 694 * allow access from ScheduledThreadPoolExecutor. 695 */ tryTerminate()696 final void tryTerminate() { 697 for (;;) { 698 int c = ctl.get(); 699 if (isRunning(c) || 700 runStateAtLeast(c, TIDYING) || 701 (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) 702 return; 703 if (workerCountOf(c) != 0) { // Eligible to terminate 704 interruptIdleWorkers(ONLY_ONE); 705 return; 706 } 707 708 final ReentrantLock mainLock = this.mainLock; 709 mainLock.lock(); 710 try { 711 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { 712 try { 713 terminated(); 714 } finally { 715 ctl.set(ctlOf(TERMINATED, 0)); 716 termination.signalAll(); 717 } 718 return; 719 } 720 } finally { 721 mainLock.unlock(); 722 } 723 // else retry on failed CAS 724 } 725 } 726 727 /* 728 * Methods for controlling interrupts to worker threads. 729 */ 730 731 /** 732 * If there is a security manager, makes sure caller has 733 * permission to shut down threads in general (see shutdownPerm). 734 * If this passes, additionally makes sure the caller is allowed 735 * to interrupt each worker thread. This might not be true even if 736 * first check passed, if the SecurityManager treats some threads 737 * specially. 738 */ checkShutdownAccess()739 private void checkShutdownAccess() { 740 SecurityManager security = System.getSecurityManager(); 741 if (security != null) { 742 security.checkPermission(shutdownPerm); 743 final ReentrantLock mainLock = this.mainLock; 744 mainLock.lock(); 745 try { 746 for (Worker w : workers) 747 security.checkAccess(w.thread); 748 } finally { 749 mainLock.unlock(); 750 } 751 } 752 } 753 754 /** 755 * Interrupts all threads, even if active. Ignores SecurityExceptions 756 * (in which case some threads may remain uninterrupted). 757 */ interruptWorkers()758 private void interruptWorkers() { 759 final ReentrantLock mainLock = this.mainLock; 760 mainLock.lock(); 761 try { 762 for (Worker w : workers) 763 w.interruptIfStarted(); 764 } finally { 765 mainLock.unlock(); 766 } 767 } 768 769 /** 770 * Interrupts threads that might be waiting for tasks (as 771 * indicated by not being locked) so they can check for 772 * termination or configuration changes. Ignores 773 * SecurityExceptions (in which case some threads may remain 774 * uninterrupted). 775 * 776 * @param onlyOne If true, interrupt at most one worker. This is 777 * called only from tryTerminate when termination is otherwise 778 * enabled but there are still other workers. In this case, at 779 * most one waiting worker is interrupted to propagate shutdown 780 * signals in case all threads are currently waiting. 781 * Interrupting any arbitrary thread ensures that newly arriving 782 * workers since shutdown began will also eventually exit. 783 * To guarantee eventual termination, it suffices to always 784 * interrupt only one idle worker, but shutdown() interrupts all 785 * idle workers so that redundant workers exit promptly, not 786 * waiting for a straggler task to finish. 787 */ interruptIdleWorkers(boolean onlyOne)788 private void interruptIdleWorkers(boolean onlyOne) { 789 final ReentrantLock mainLock = this.mainLock; 790 mainLock.lock(); 791 try { 792 for (Worker w : workers) { 793 Thread t = w.thread; 794 if (!t.isInterrupted() && w.tryLock()) { 795 try { 796 t.interrupt(); 797 } catch (SecurityException ignore) { 798 } finally { 799 w.unlock(); 800 } 801 } 802 if (onlyOne) 803 break; 804 } 805 } finally { 806 mainLock.unlock(); 807 } 808 } 809 810 /** 811 * Common form of interruptIdleWorkers, to avoid having to 812 * remember what the boolean argument means. 813 */ interruptIdleWorkers()814 private void interruptIdleWorkers() { 815 interruptIdleWorkers(false); 816 } 817 818 private static final boolean ONLY_ONE = true; 819 820 /* 821 * Misc utilities, most of which are also exported to 822 * ScheduledThreadPoolExecutor 823 */ 824 825 /** 826 * Invokes the rejected execution handler for the given command. 827 * Package-protected for use by ScheduledThreadPoolExecutor. 828 */ reject(Runnable command)829 final void reject(Runnable command) { 830 handler.rejectedExecution(command, this); 831 } 832 833 /** 834 * Performs any further cleanup following run state transition on 835 * invocation of shutdown. A no-op here, but used by 836 * ScheduledThreadPoolExecutor to cancel delayed tasks. 837 */ onShutdown()838 void onShutdown() { 839 } 840 841 /** 842 * State check needed by ScheduledThreadPoolExecutor to 843 * enable running tasks during shutdown. 844 * 845 * @param shutdownOK true if should return true if SHUTDOWN 846 */ isRunningOrShutdown(boolean shutdownOK)847 final boolean isRunningOrShutdown(boolean shutdownOK) { 848 int rs = runStateOf(ctl.get()); 849 return rs == RUNNING || (rs == SHUTDOWN && shutdownOK); 850 } 851 852 /** 853 * Drains the task queue into a new list, normally using 854 * drainTo. But if the queue is a DelayQueue or any other kind of 855 * queue for which poll or drainTo may fail to remove some 856 * elements, it deletes them one by one. 857 */ drainQueue()858 private List<Runnable> drainQueue() { 859 BlockingQueue<Runnable> q = workQueue; 860 ArrayList<Runnable> taskList = new ArrayList<Runnable>(); 861 q.drainTo(taskList); 862 if (!q.isEmpty()) { 863 for (Runnable r : q.toArray(new Runnable[0])) { 864 if (q.remove(r)) 865 taskList.add(r); 866 } 867 } 868 return taskList; 869 } 870 871 /* 872 * Methods for creating, running and cleaning up after workers 873 */ 874 875 /** 876 * Checks if a new worker can be added with respect to current 877 * pool state and the given bound (either core or maximum). If so, 878 * the worker count is adjusted accordingly, and, if possible, a 879 * new worker is created and started, running firstTask as its 880 * first task. This method returns false if the pool is stopped or 881 * eligible to shut down. It also returns false if the thread 882 * factory fails to create a thread when asked. If the thread 883 * creation fails, either due to the thread factory returning 884 * null, or due to an exception (typically OutOfMemoryError in 885 * Thread.start()), we roll back cleanly. 886 * 887 * @param firstTask the task the new thread should run first (or 888 * null if none). Workers are created with an initial first task 889 * (in method execute()) to bypass queuing when there are fewer 890 * than corePoolSize threads (in which case we always start one), 891 * or when the queue is full (in which case we must bypass queue). 892 * Initially idle threads are usually created via 893 * prestartCoreThread or to replace other dying workers. 894 * 895 * @param core if true use corePoolSize as bound, else 896 * maximumPoolSize. (A boolean indicator is used here rather than a 897 * value to ensure reads of fresh values after checking other pool 898 * state). 899 * @return true if successful 900 */ addWorker(Runnable firstTask, boolean core)901 private boolean addWorker(Runnable firstTask, boolean core) { 902 retry: 903 for (;;) { 904 int c = ctl.get(); 905 int rs = runStateOf(c); 906 907 // Check if queue empty only if necessary. 908 if (rs >= SHUTDOWN && 909 ! (rs == SHUTDOWN && 910 firstTask == null && 911 ! workQueue.isEmpty())) 912 return false; 913 914 for (;;) { 915 int wc = workerCountOf(c); 916 if (wc >= CAPACITY || 917 wc >= (core ? corePoolSize : maximumPoolSize)) 918 return false; 919 if (compareAndIncrementWorkerCount(c)) 920 break retry; 921 c = ctl.get(); // Re-read ctl 922 if (runStateOf(c) != rs) 923 continue retry; 924 // else CAS failed due to workerCount change; retry inner loop 925 } 926 } 927 928 boolean workerStarted = false; 929 boolean workerAdded = false; 930 Worker w = null; 931 try { 932 w = new Worker(firstTask); 933 final Thread t = w.thread; 934 if (t != null) { 935 final ReentrantLock mainLock = this.mainLock; 936 mainLock.lock(); 937 try { 938 // Recheck while holding lock. 939 // Back out on ThreadFactory failure or if 940 // shut down before lock acquired. 941 int rs = runStateOf(ctl.get()); 942 943 if (rs < SHUTDOWN || 944 (rs == SHUTDOWN && firstTask == null)) { 945 if (t.isAlive()) // precheck that t is startable 946 throw new IllegalThreadStateException(); 947 workers.add(w); 948 int s = workers.size(); 949 if (s > largestPoolSize) 950 largestPoolSize = s; 951 workerAdded = true; 952 } 953 } finally { 954 mainLock.unlock(); 955 } 956 if (workerAdded) { 957 t.start(); 958 workerStarted = true; 959 } 960 } 961 } finally { 962 if (! workerStarted) 963 addWorkerFailed(w); 964 } 965 return workerStarted; 966 } 967 968 /** 969 * Rolls back the worker thread creation. 970 * - removes worker from workers, if present 971 * - decrements worker count 972 * - rechecks for termination, in case the existence of this 973 * worker was holding up termination 974 */ addWorkerFailed(Worker w)975 private void addWorkerFailed(Worker w) { 976 final ReentrantLock mainLock = this.mainLock; 977 mainLock.lock(); 978 try { 979 if (w != null) 980 workers.remove(w); 981 decrementWorkerCount(); 982 tryTerminate(); 983 } finally { 984 mainLock.unlock(); 985 } 986 } 987 988 /** 989 * Performs cleanup and bookkeeping for a dying worker. Called 990 * only from worker threads. Unless completedAbruptly is set, 991 * assumes that workerCount has already been adjusted to account 992 * for exit. This method removes thread from worker set, and 993 * possibly terminates the pool or replaces the worker if either 994 * it exited due to user task exception or if fewer than 995 * corePoolSize workers are running or queue is non-empty but 996 * there are no workers. 997 * 998 * @param w the worker 999 * @param completedAbruptly if the worker died due to user exception 1000 */ processWorkerExit(Worker w, boolean completedAbruptly)1001 private void processWorkerExit(Worker w, boolean completedAbruptly) { 1002 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted 1003 decrementWorkerCount(); 1004 1005 final ReentrantLock mainLock = this.mainLock; 1006 mainLock.lock(); 1007 try { 1008 completedTaskCount += w.completedTasks; 1009 workers.remove(w); 1010 } finally { 1011 mainLock.unlock(); 1012 } 1013 1014 tryTerminate(); 1015 1016 int c = ctl.get(); 1017 if (runStateLessThan(c, STOP)) { 1018 if (!completedAbruptly) { 1019 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; 1020 if (min == 0 && ! workQueue.isEmpty()) 1021 min = 1; 1022 if (workerCountOf(c) >= min) 1023 return; // replacement not needed 1024 } 1025 addWorker(null, false); 1026 } 1027 } 1028 1029 /** 1030 * Performs blocking or timed wait for a task, depending on 1031 * current configuration settings, or returns null if this worker 1032 * must exit because of any of: 1033 * 1. There are more than maximumPoolSize workers (due to 1034 * a call to setMaximumPoolSize). 1035 * 2. The pool is stopped. 1036 * 3. The pool is shutdown and the queue is empty. 1037 * 4. This worker timed out waiting for a task, and timed-out 1038 * workers are subject to termination (that is, 1039 * {@code allowCoreThreadTimeOut || workerCount > corePoolSize}) 1040 * both before and after the timed wait, and if the queue is 1041 * non-empty, this worker is not the last thread in the pool. 1042 * 1043 * @return task, or null if the worker must exit, in which case 1044 * workerCount is decremented 1045 */ getTask()1046 private Runnable getTask() { 1047 boolean timedOut = false; // Did the last poll() time out? 1048 1049 for (;;) { 1050 int c = ctl.get(); 1051 int rs = runStateOf(c); 1052 1053 // Check if queue empty only if necessary. 1054 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { 1055 decrementWorkerCount(); 1056 return null; 1057 } 1058 1059 int wc = workerCountOf(c); 1060 1061 // Are workers subject to culling? 1062 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; 1063 1064 if ((wc > maximumPoolSize || (timed && timedOut)) 1065 && (wc > 1 || workQueue.isEmpty())) { 1066 if (compareAndDecrementWorkerCount(c)) 1067 return null; 1068 continue; 1069 } 1070 1071 try { 1072 Runnable r = timed ? 1073 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : 1074 workQueue.take(); 1075 if (r != null) 1076 return r; 1077 timedOut = true; 1078 } catch (InterruptedException retry) { 1079 timedOut = false; 1080 } 1081 } 1082 } 1083 1084 /** 1085 * Main worker run loop. Repeatedly gets tasks from queue and 1086 * executes them, while coping with a number of issues: 1087 * 1088 * 1. We may start out with an initial task, in which case we 1089 * don't need to get the first one. Otherwise, as long as pool is 1090 * running, we get tasks from getTask. If it returns null then the 1091 * worker exits due to changed pool state or configuration 1092 * parameters. Other exits result from exception throws in 1093 * external code, in which case completedAbruptly holds, which 1094 * usually leads processWorkerExit to replace this thread. 1095 * 1096 * 2. Before running any task, the lock is acquired to prevent 1097 * other pool interrupts while the task is executing, and then we 1098 * ensure that unless pool is stopping, this thread does not have 1099 * its interrupt set. 1100 * 1101 * 3. Each task run is preceded by a call to beforeExecute, which 1102 * might throw an exception, in which case we cause thread to die 1103 * (breaking loop with completedAbruptly true) without processing 1104 * the task. 1105 * 1106 * 4. Assuming beforeExecute completes normally, we run the task, 1107 * gathering any of its thrown exceptions to send to afterExecute. 1108 * We separately handle RuntimeException, Error (both of which the 1109 * specs guarantee that we trap) and arbitrary Throwables. 1110 * Because we cannot rethrow Throwables within Runnable.run, we 1111 * wrap them within Errors on the way out (to the thread's 1112 * UncaughtExceptionHandler). Any thrown exception also 1113 * conservatively causes thread to die. 1114 * 1115 * 5. After task.run completes, we call afterExecute, which may 1116 * also throw an exception, which will also cause thread to 1117 * die. According to JLS Sec 14.20, this exception is the one that 1118 * will be in effect even if task.run throws. 1119 * 1120 * The net effect of the exception mechanics is that afterExecute 1121 * and the thread's UncaughtExceptionHandler have as accurate 1122 * information as we can provide about any problems encountered by 1123 * user code. 1124 * 1125 * @param w the worker 1126 */ runWorker(Worker w)1127 final void runWorker(Worker w) { 1128 Thread wt = Thread.currentThread(); 1129 Runnable task = w.firstTask; 1130 w.firstTask = null; 1131 w.unlock(); // allow interrupts 1132 boolean completedAbruptly = true; 1133 try { 1134 while (task != null || (task = getTask()) != null) { 1135 w.lock(); 1136 // If pool is stopping, ensure thread is interrupted; 1137 // if not, ensure thread is not interrupted. This 1138 // requires a recheck in second case to deal with 1139 // shutdownNow race while clearing interrupt 1140 if ((runStateAtLeast(ctl.get(), STOP) || 1141 (Thread.interrupted() && 1142 runStateAtLeast(ctl.get(), STOP))) && 1143 !wt.isInterrupted()) 1144 wt.interrupt(); 1145 try { 1146 beforeExecute(wt, task); 1147 Throwable thrown = null; 1148 try { 1149 task.run(); 1150 } catch (RuntimeException x) { 1151 thrown = x; throw x; 1152 } catch (Error x) { 1153 thrown = x; throw x; 1154 } catch (Throwable x) { 1155 thrown = x; throw new Error(x); 1156 } finally { 1157 afterExecute(task, thrown); 1158 } 1159 } finally { 1160 task = null; 1161 w.completedTasks++; 1162 w.unlock(); 1163 } 1164 } 1165 completedAbruptly = false; 1166 } finally { 1167 processWorkerExit(w, completedAbruptly); 1168 } 1169 } 1170 1171 // Public constructors and methods 1172 1173 /** 1174 * Creates a new {@code ThreadPoolExecutor} with the given initial 1175 * parameters and default thread factory and rejected execution handler. 1176 * It may be more convenient to use one of the {@link Executors} factory 1177 * methods instead of this general purpose constructor. 1178 * 1179 * @param corePoolSize the number of threads to keep in the pool, even 1180 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 1181 * @param maximumPoolSize the maximum number of threads to allow in the 1182 * pool 1183 * @param keepAliveTime when the number of threads is greater than 1184 * the core, this is the maximum time that excess idle threads 1185 * will wait for new tasks before terminating. 1186 * @param unit the time unit for the {@code keepAliveTime} argument 1187 * @param workQueue the queue to use for holding tasks before they are 1188 * executed. This queue will hold only the {@code Runnable} 1189 * tasks submitted by the {@code execute} method. 1190 * @throws IllegalArgumentException if one of the following holds:<br> 1191 * {@code corePoolSize < 0}<br> 1192 * {@code keepAliveTime < 0}<br> 1193 * {@code maximumPoolSize <= 0}<br> 1194 * {@code maximumPoolSize < corePoolSize} 1195 * @throws NullPointerException if {@code workQueue} is null 1196 */ ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)1197 public ThreadPoolExecutor(int corePoolSize, 1198 int maximumPoolSize, 1199 long keepAliveTime, 1200 TimeUnit unit, 1201 BlockingQueue<Runnable> workQueue) { 1202 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 1203 Executors.defaultThreadFactory(), defaultHandler); 1204 } 1205 1206 /** 1207 * Creates a new {@code ThreadPoolExecutor} with the given initial 1208 * parameters and default rejected execution handler. 1209 * 1210 * @param corePoolSize the number of threads to keep in the pool, even 1211 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 1212 * @param maximumPoolSize the maximum number of threads to allow in the 1213 * pool 1214 * @param keepAliveTime when the number of threads is greater than 1215 * the core, this is the maximum time that excess idle threads 1216 * will wait for new tasks before terminating. 1217 * @param unit the time unit for the {@code keepAliveTime} argument 1218 * @param workQueue the queue to use for holding tasks before they are 1219 * executed. This queue will hold only the {@code Runnable} 1220 * tasks submitted by the {@code execute} method. 1221 * @param threadFactory the factory to use when the executor 1222 * creates a new thread 1223 * @throws IllegalArgumentException if one of the following holds:<br> 1224 * {@code corePoolSize < 0}<br> 1225 * {@code keepAliveTime < 0}<br> 1226 * {@code maximumPoolSize <= 0}<br> 1227 * {@code maximumPoolSize < corePoolSize} 1228 * @throws NullPointerException if {@code workQueue} 1229 * or {@code threadFactory} is null 1230 */ ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)1231 public ThreadPoolExecutor(int corePoolSize, 1232 int maximumPoolSize, 1233 long keepAliveTime, 1234 TimeUnit unit, 1235 BlockingQueue<Runnable> workQueue, 1236 ThreadFactory threadFactory) { 1237 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 1238 threadFactory, defaultHandler); 1239 } 1240 1241 /** 1242 * Creates a new {@code ThreadPoolExecutor} with the given initial 1243 * parameters and default thread factory. 1244 * 1245 * @param corePoolSize the number of threads to keep in the pool, even 1246 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 1247 * @param maximumPoolSize the maximum number of threads to allow in the 1248 * pool 1249 * @param keepAliveTime when the number of threads is greater than 1250 * the core, this is the maximum time that excess idle threads 1251 * will wait for new tasks before terminating. 1252 * @param unit the time unit for the {@code keepAliveTime} argument 1253 * @param workQueue the queue to use for holding tasks before they are 1254 * executed. This queue will hold only the {@code Runnable} 1255 * tasks submitted by the {@code execute} method. 1256 * @param handler the handler to use when execution is blocked 1257 * because the thread bounds and queue capacities are reached 1258 * @throws IllegalArgumentException if one of the following holds:<br> 1259 * {@code corePoolSize < 0}<br> 1260 * {@code keepAliveTime < 0}<br> 1261 * {@code maximumPoolSize <= 0}<br> 1262 * {@code maximumPoolSize < corePoolSize} 1263 * @throws NullPointerException if {@code workQueue} 1264 * or {@code handler} is null 1265 */ ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)1266 public ThreadPoolExecutor(int corePoolSize, 1267 int maximumPoolSize, 1268 long keepAliveTime, 1269 TimeUnit unit, 1270 BlockingQueue<Runnable> workQueue, 1271 RejectedExecutionHandler handler) { 1272 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 1273 Executors.defaultThreadFactory(), handler); 1274 } 1275 1276 /** 1277 * Creates a new {@code ThreadPoolExecutor} with the given initial 1278 * parameters. 1279 * 1280 * @param corePoolSize the number of threads to keep in the pool, even 1281 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 1282 * @param maximumPoolSize the maximum number of threads to allow in the 1283 * pool 1284 * @param keepAliveTime when the number of threads is greater than 1285 * the core, this is the maximum time that excess idle threads 1286 * will wait for new tasks before terminating. 1287 * @param unit the time unit for the {@code keepAliveTime} argument 1288 * @param workQueue the queue to use for holding tasks before they are 1289 * executed. This queue will hold only the {@code Runnable} 1290 * tasks submitted by the {@code execute} method. 1291 * @param threadFactory the factory to use when the executor 1292 * creates a new thread 1293 * @param handler the handler to use when execution is blocked 1294 * because the thread bounds and queue capacities are reached 1295 * @throws IllegalArgumentException if one of the following holds:<br> 1296 * {@code corePoolSize < 0}<br> 1297 * {@code keepAliveTime < 0}<br> 1298 * {@code maximumPoolSize <= 0}<br> 1299 * {@code maximumPoolSize < corePoolSize} 1300 * @throws NullPointerException if {@code workQueue} 1301 * or {@code threadFactory} or {@code handler} is null 1302 */ ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)1303 public ThreadPoolExecutor(int corePoolSize, 1304 int maximumPoolSize, 1305 long keepAliveTime, 1306 TimeUnit unit, 1307 BlockingQueue<Runnable> workQueue, 1308 ThreadFactory threadFactory, 1309 RejectedExecutionHandler handler) { 1310 if (corePoolSize < 0 || 1311 maximumPoolSize <= 0 || 1312 maximumPoolSize < corePoolSize || 1313 keepAliveTime < 0) 1314 throw new IllegalArgumentException(); 1315 if (workQueue == null || threadFactory == null || handler == null) 1316 throw new NullPointerException(); 1317 this.acc = System.getSecurityManager() == null ? 1318 null : 1319 AccessController.getContext(); 1320 this.corePoolSize = corePoolSize; 1321 this.maximumPoolSize = maximumPoolSize; 1322 this.workQueue = workQueue; 1323 this.keepAliveTime = unit.toNanos(keepAliveTime); 1324 this.threadFactory = threadFactory; 1325 this.handler = handler; 1326 } 1327 1328 /** 1329 * Executes the given task sometime in the future. The task 1330 * may execute in a new thread or in an existing pooled thread. 1331 * 1332 * If the task cannot be submitted for execution, either because this 1333 * executor has been shutdown or because its capacity has been reached, 1334 * the task is handled by the current {@code RejectedExecutionHandler}. 1335 * 1336 * @param command the task to execute 1337 * @throws RejectedExecutionException at discretion of 1338 * {@code RejectedExecutionHandler}, if the task 1339 * cannot be accepted for execution 1340 * @throws NullPointerException if {@code command} is null 1341 */ execute(Runnable command)1342 public void execute(Runnable command) { 1343 if (command == null) 1344 throw new NullPointerException(); 1345 /* 1346 * Proceed in 3 steps: 1347 * 1348 * 1. If fewer than corePoolSize threads are running, try to 1349 * start a new thread with the given command as its first 1350 * task. The call to addWorker atomically checks runState and 1351 * workerCount, and so prevents false alarms that would add 1352 * threads when it shouldn't, by returning false. 1353 * 1354 * 2. If a task can be successfully queued, then we still need 1355 * to double-check whether we should have added a thread 1356 * (because existing ones died since last checking) or that 1357 * the pool shut down since entry into this method. So we 1358 * recheck state and if necessary roll back the enqueuing if 1359 * stopped, or start a new thread if there are none. 1360 * 1361 * 3. If we cannot queue task, then we try to add a new 1362 * thread. If it fails, we know we are shut down or saturated 1363 * and so reject the task. 1364 */ 1365 int c = ctl.get(); 1366 if (workerCountOf(c) < corePoolSize) { 1367 if (addWorker(command, true)) 1368 return; 1369 c = ctl.get(); 1370 } 1371 if (isRunning(c) && workQueue.offer(command)) { 1372 int recheck = ctl.get(); 1373 if (! isRunning(recheck) && remove(command)) 1374 reject(command); 1375 else if (workerCountOf(recheck) == 0) 1376 addWorker(null, false); 1377 } 1378 else if (!addWorker(command, false)) 1379 reject(command); 1380 } 1381 1382 /** 1383 * Initiates an orderly shutdown in which previously submitted 1384 * tasks are executed, but no new tasks will be accepted. 1385 * Invocation has no additional effect if already shut down. 1386 * 1387 * <p>This method does not wait for previously submitted tasks to 1388 * complete execution. Use {@link #awaitTermination awaitTermination} 1389 * to do that. 1390 * 1391 * @throws SecurityException {@inheritDoc} 1392 */ shutdown()1393 public void shutdown() { 1394 final ReentrantLock mainLock = this.mainLock; 1395 mainLock.lock(); 1396 try { 1397 checkShutdownAccess(); 1398 advanceRunState(SHUTDOWN); 1399 interruptIdleWorkers(); 1400 onShutdown(); // hook for ScheduledThreadPoolExecutor 1401 } finally { 1402 mainLock.unlock(); 1403 } 1404 tryTerminate(); 1405 } 1406 1407 /** 1408 * Attempts to stop all actively executing tasks, halts the 1409 * processing of waiting tasks, and returns a list of the tasks 1410 * that were awaiting execution. These tasks are drained (removed) 1411 * from the task queue upon return from this method. 1412 * 1413 * <p>This method does not wait for actively executing tasks to 1414 * terminate. Use {@link #awaitTermination awaitTermination} to 1415 * do that. 1416 * 1417 * <p>There are no guarantees beyond best-effort attempts to stop 1418 * processing actively executing tasks. This implementation 1419 * cancels tasks via {@link Thread#interrupt}, so any task that 1420 * fails to respond to interrupts may never terminate. 1421 * 1422 * @throws SecurityException {@inheritDoc} 1423 */ shutdownNow()1424 public List<Runnable> shutdownNow() { 1425 List<Runnable> tasks; 1426 final ReentrantLock mainLock = this.mainLock; 1427 mainLock.lock(); 1428 try { 1429 checkShutdownAccess(); 1430 advanceRunState(STOP); 1431 interruptWorkers(); 1432 tasks = drainQueue(); 1433 } finally { 1434 mainLock.unlock(); 1435 } 1436 tryTerminate(); 1437 return tasks; 1438 } 1439 isShutdown()1440 public boolean isShutdown() { 1441 return ! isRunning(ctl.get()); 1442 } 1443 1444 /** 1445 * Returns true if this executor is in the process of terminating 1446 * after {@link #shutdown} or {@link #shutdownNow} but has not 1447 * completely terminated. This method may be useful for 1448 * debugging. A return of {@code true} reported a sufficient 1449 * period after shutdown may indicate that submitted tasks have 1450 * ignored or suppressed interruption, causing this executor not 1451 * to properly terminate. 1452 * 1453 * @return {@code true} if terminating but not yet terminated 1454 */ isTerminating()1455 public boolean isTerminating() { 1456 int c = ctl.get(); 1457 return ! isRunning(c) && runStateLessThan(c, TERMINATED); 1458 } 1459 isTerminated()1460 public boolean isTerminated() { 1461 return runStateAtLeast(ctl.get(), TERMINATED); 1462 } 1463 awaitTermination(long timeout, TimeUnit unit)1464 public boolean awaitTermination(long timeout, TimeUnit unit) 1465 throws InterruptedException { 1466 long nanos = unit.toNanos(timeout); 1467 final ReentrantLock mainLock = this.mainLock; 1468 mainLock.lock(); 1469 try { 1470 for (;;) { 1471 if (runStateAtLeast(ctl.get(), TERMINATED)) 1472 return true; 1473 if (nanos <= 0) 1474 return false; 1475 nanos = termination.awaitNanos(nanos); 1476 } 1477 } finally { 1478 mainLock.unlock(); 1479 } 1480 } 1481 1482 /** 1483 * Invokes {@code shutdown} when this executor is no longer 1484 * referenced and it has no threads. 1485 */ finalize()1486 protected void finalize() { 1487 SecurityManager sm = System.getSecurityManager(); 1488 if (sm == null || acc == null) { 1489 shutdown(); 1490 } else { 1491 PrivilegedAction<Void> pa = () -> { shutdown(); return null; }; 1492 AccessController.doPrivileged(pa, acc); 1493 } 1494 } 1495 1496 /** 1497 * Sets the thread factory used to create new threads. 1498 * 1499 * @param threadFactory the new thread factory 1500 * @throws NullPointerException if threadFactory is null 1501 * @see #getThreadFactory 1502 */ setThreadFactory(ThreadFactory threadFactory)1503 public void setThreadFactory(ThreadFactory threadFactory) { 1504 if (threadFactory == null) 1505 throw new NullPointerException(); 1506 this.threadFactory = threadFactory; 1507 } 1508 1509 /** 1510 * Returns the thread factory used to create new threads. 1511 * 1512 * @return the current thread factory 1513 * @see #setThreadFactory(ThreadFactory) 1514 */ getThreadFactory()1515 public ThreadFactory getThreadFactory() { 1516 return threadFactory; 1517 } 1518 1519 /** 1520 * Sets a new handler for unexecutable tasks. 1521 * 1522 * @param handler the new handler 1523 * @throws NullPointerException if handler is null 1524 * @see #getRejectedExecutionHandler 1525 */ setRejectedExecutionHandler(RejectedExecutionHandler handler)1526 public void setRejectedExecutionHandler(RejectedExecutionHandler handler) { 1527 if (handler == null) 1528 throw new NullPointerException(); 1529 this.handler = handler; 1530 } 1531 1532 /** 1533 * Returns the current handler for unexecutable tasks. 1534 * 1535 * @return the current handler 1536 * @see #setRejectedExecutionHandler(RejectedExecutionHandler) 1537 */ getRejectedExecutionHandler()1538 public RejectedExecutionHandler getRejectedExecutionHandler() { 1539 return handler; 1540 } 1541 1542 /** 1543 * Sets the core number of threads. This overrides any value set 1544 * in the constructor. If the new value is smaller than the 1545 * current value, excess existing threads will be terminated when 1546 * they next become idle. If larger, new threads will, if needed, 1547 * be started to execute any queued tasks. 1548 * 1549 * @param corePoolSize the new core size 1550 * @throws IllegalArgumentException if {@code corePoolSize < 0} 1551 * @see #getCorePoolSize 1552 */ setCorePoolSize(int corePoolSize)1553 public void setCorePoolSize(int corePoolSize) { 1554 if (corePoolSize < 0) 1555 throw new IllegalArgumentException(); 1556 int delta = corePoolSize - this.corePoolSize; 1557 this.corePoolSize = corePoolSize; 1558 if (workerCountOf(ctl.get()) > corePoolSize) 1559 interruptIdleWorkers(); 1560 else if (delta > 0) { 1561 // We don't really know how many new threads are "needed". 1562 // As a heuristic, prestart enough new workers (up to new 1563 // core size) to handle the current number of tasks in 1564 // queue, but stop if queue becomes empty while doing so. 1565 int k = Math.min(delta, workQueue.size()); 1566 while (k-- > 0 && addWorker(null, true)) { 1567 if (workQueue.isEmpty()) 1568 break; 1569 } 1570 } 1571 } 1572 1573 /** 1574 * Returns the core number of threads. 1575 * 1576 * @return the core number of threads 1577 * @see #setCorePoolSize 1578 */ getCorePoolSize()1579 public int getCorePoolSize() { 1580 return corePoolSize; 1581 } 1582 1583 /** 1584 * Starts a core thread, causing it to idly wait for work. This 1585 * overrides the default policy of starting core threads only when 1586 * new tasks are executed. This method will return {@code false} 1587 * if all core threads have already been started. 1588 * 1589 * @return {@code true} if a thread was started 1590 */ prestartCoreThread()1591 public boolean prestartCoreThread() { 1592 return workerCountOf(ctl.get()) < corePoolSize && 1593 addWorker(null, true); 1594 } 1595 1596 /** 1597 * Same as prestartCoreThread except arranges that at least one 1598 * thread is started even if corePoolSize is 0. 1599 */ ensurePrestart()1600 void ensurePrestart() { 1601 int wc = workerCountOf(ctl.get()); 1602 if (wc < corePoolSize) 1603 addWorker(null, true); 1604 else if (wc == 0) 1605 addWorker(null, false); 1606 } 1607 1608 /** 1609 * Starts all core threads, causing them to idly wait for work. This 1610 * overrides the default policy of starting core threads only when 1611 * new tasks are executed. 1612 * 1613 * @return the number of threads started 1614 */ prestartAllCoreThreads()1615 public int prestartAllCoreThreads() { 1616 int n = 0; 1617 while (addWorker(null, true)) 1618 ++n; 1619 return n; 1620 } 1621 1622 /** 1623 * Returns true if this pool allows core threads to time out and 1624 * terminate if no tasks arrive within the keepAlive time, being 1625 * replaced if needed when new tasks arrive. When true, the same 1626 * keep-alive policy applying to non-core threads applies also to 1627 * core threads. When false (the default), core threads are never 1628 * terminated due to lack of incoming tasks. 1629 * 1630 * @return {@code true} if core threads are allowed to time out, 1631 * else {@code false} 1632 * 1633 * @since 1.6 1634 */ allowsCoreThreadTimeOut()1635 public boolean allowsCoreThreadTimeOut() { 1636 return allowCoreThreadTimeOut; 1637 } 1638 1639 /** 1640 * Sets the policy governing whether core threads may time out and 1641 * terminate if no tasks arrive within the keep-alive time, being 1642 * replaced if needed when new tasks arrive. When false, core 1643 * threads are never terminated due to lack of incoming 1644 * tasks. When true, the same keep-alive policy applying to 1645 * non-core threads applies also to core threads. To avoid 1646 * continual thread replacement, the keep-alive time must be 1647 * greater than zero when setting {@code true}. This method 1648 * should in general be called before the pool is actively used. 1649 * 1650 * @param value {@code true} if should time out, else {@code false} 1651 * @throws IllegalArgumentException if value is {@code true} 1652 * and the current keep-alive time is not greater than zero 1653 * 1654 * @since 1.6 1655 */ allowCoreThreadTimeOut(boolean value)1656 public void allowCoreThreadTimeOut(boolean value) { 1657 if (value && keepAliveTime <= 0) 1658 throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); 1659 if (value != allowCoreThreadTimeOut) { 1660 allowCoreThreadTimeOut = value; 1661 if (value) 1662 interruptIdleWorkers(); 1663 } 1664 } 1665 1666 /** 1667 * Sets the maximum allowed number of threads. This overrides any 1668 * value set in the constructor. If the new value is smaller than 1669 * the current value, excess existing threads will be 1670 * terminated when they next become idle. 1671 * 1672 * @param maximumPoolSize the new maximum 1673 * @throws IllegalArgumentException if the new maximum is 1674 * less than or equal to zero, or 1675 * less than the {@linkplain #getCorePoolSize core pool size} 1676 * @see #getMaximumPoolSize 1677 */ setMaximumPoolSize(int maximumPoolSize)1678 public void setMaximumPoolSize(int maximumPoolSize) { 1679 if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) 1680 throw new IllegalArgumentException(); 1681 this.maximumPoolSize = maximumPoolSize; 1682 if (workerCountOf(ctl.get()) > maximumPoolSize) 1683 interruptIdleWorkers(); 1684 } 1685 1686 /** 1687 * Returns the maximum allowed number of threads. 1688 * 1689 * @return the maximum allowed number of threads 1690 * @see #setMaximumPoolSize 1691 */ getMaximumPoolSize()1692 public int getMaximumPoolSize() { 1693 return maximumPoolSize; 1694 } 1695 1696 /** 1697 * Sets the time limit for which threads may remain idle before 1698 * being terminated. If there are more than the core number of 1699 * threads currently in the pool, after waiting this amount of 1700 * time without processing a task, excess threads will be 1701 * terminated. This overrides any value set in the constructor. 1702 * 1703 * @param time the time to wait. A time value of zero will cause 1704 * excess threads to terminate immediately after executing tasks. 1705 * @param unit the time unit of the {@code time} argument 1706 * @throws IllegalArgumentException if {@code time} less than zero or 1707 * if {@code time} is zero and {@code allowsCoreThreadTimeOut} 1708 * @see #getKeepAliveTime(TimeUnit) 1709 */ setKeepAliveTime(long time, TimeUnit unit)1710 public void setKeepAliveTime(long time, TimeUnit unit) { 1711 if (time < 0) 1712 throw new IllegalArgumentException(); 1713 if (time == 0 && allowsCoreThreadTimeOut()) 1714 throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); 1715 long keepAliveTime = unit.toNanos(time); 1716 long delta = keepAliveTime - this.keepAliveTime; 1717 this.keepAliveTime = keepAliveTime; 1718 if (delta < 0) 1719 interruptIdleWorkers(); 1720 } 1721 1722 /** 1723 * Returns the thread keep-alive time, which is the amount of time 1724 * that threads in excess of the core pool size may remain 1725 * idle before being terminated. 1726 * 1727 * @param unit the desired time unit of the result 1728 * @return the time limit 1729 * @see #setKeepAliveTime(long, TimeUnit) 1730 */ getKeepAliveTime(TimeUnit unit)1731 public long getKeepAliveTime(TimeUnit unit) { 1732 return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS); 1733 } 1734 1735 /* User-level queue utilities */ 1736 1737 /** 1738 * Returns the task queue used by this executor. Access to the 1739 * task queue is intended primarily for debugging and monitoring. 1740 * This queue may be in active use. Retrieving the task queue 1741 * does not prevent queued tasks from executing. 1742 * 1743 * @return the task queue 1744 */ getQueue()1745 public BlockingQueue<Runnable> getQueue() { 1746 return workQueue; 1747 } 1748 1749 /** 1750 * Removes this task from the executor's internal queue if it is 1751 * present, thus causing it not to be run if it has not already 1752 * started. 1753 * 1754 * <p>This method may be useful as one part of a cancellation 1755 * scheme. It may fail to remove tasks that have been converted 1756 * into other forms before being placed on the internal queue. For 1757 * example, a task entered using {@code submit} might be 1758 * converted into a form that maintains {@code Future} status. 1759 * However, in such cases, method {@link #purge} may be used to 1760 * remove those Futures that have been cancelled. 1761 * 1762 * @param task the task to remove 1763 * @return {@code true} if the task was removed 1764 */ remove(Runnable task)1765 public boolean remove(Runnable task) { 1766 boolean removed = workQueue.remove(task); 1767 tryTerminate(); // In case SHUTDOWN and now empty 1768 return removed; 1769 } 1770 1771 /** 1772 * Tries to remove from the work queue all {@link Future} 1773 * tasks that have been cancelled. This method can be useful as a 1774 * storage reclamation operation, that has no other impact on 1775 * functionality. Cancelled tasks are never executed, but may 1776 * accumulate in work queues until worker threads can actively 1777 * remove them. Invoking this method instead tries to remove them now. 1778 * However, this method may fail to remove tasks in 1779 * the presence of interference by other threads. 1780 */ purge()1781 public void purge() { 1782 final BlockingQueue<Runnable> q = workQueue; 1783 try { 1784 Iterator<Runnable> it = q.iterator(); 1785 while (it.hasNext()) { 1786 Runnable r = it.next(); 1787 if (r instanceof Future<?> && ((Future<?>)r).isCancelled()) 1788 it.remove(); 1789 } 1790 } catch (ConcurrentModificationException fallThrough) { 1791 // Take slow path if we encounter interference during traversal. 1792 // Make copy for traversal and call remove for cancelled entries. 1793 // The slow path is more likely to be O(N*N). 1794 for (Object r : q.toArray()) 1795 if (r instanceof Future<?> && ((Future<?>)r).isCancelled()) 1796 q.remove(r); 1797 } 1798 1799 tryTerminate(); // In case SHUTDOWN and now empty 1800 } 1801 1802 /* Statistics */ 1803 1804 /** 1805 * Returns the current number of threads in the pool. 1806 * 1807 * @return the number of threads 1808 */ getPoolSize()1809 public int getPoolSize() { 1810 final ReentrantLock mainLock = this.mainLock; 1811 mainLock.lock(); 1812 try { 1813 // Remove rare and surprising possibility of 1814 // isTerminated() && getPoolSize() > 0 1815 return runStateAtLeast(ctl.get(), TIDYING) ? 0 1816 : workers.size(); 1817 } finally { 1818 mainLock.unlock(); 1819 } 1820 } 1821 1822 /** 1823 * Returns the approximate number of threads that are actively 1824 * executing tasks. 1825 * 1826 * @return the number of threads 1827 */ getActiveCount()1828 public int getActiveCount() { 1829 final ReentrantLock mainLock = this.mainLock; 1830 mainLock.lock(); 1831 try { 1832 int n = 0; 1833 for (Worker w : workers) 1834 if (w.isLocked()) 1835 ++n; 1836 return n; 1837 } finally { 1838 mainLock.unlock(); 1839 } 1840 } 1841 1842 /** 1843 * Returns the largest number of threads that have ever 1844 * simultaneously been in the pool. 1845 * 1846 * @return the number of threads 1847 */ getLargestPoolSize()1848 public int getLargestPoolSize() { 1849 final ReentrantLock mainLock = this.mainLock; 1850 mainLock.lock(); 1851 try { 1852 return largestPoolSize; 1853 } finally { 1854 mainLock.unlock(); 1855 } 1856 } 1857 1858 /** 1859 * Returns the approximate total number of tasks that have ever been 1860 * scheduled for execution. Because the states of tasks and 1861 * threads may change dynamically during computation, the returned 1862 * value is only an approximation. 1863 * 1864 * @return the number of tasks 1865 */ getTaskCount()1866 public long getTaskCount() { 1867 final ReentrantLock mainLock = this.mainLock; 1868 mainLock.lock(); 1869 try { 1870 long n = completedTaskCount; 1871 for (Worker w : workers) { 1872 n += w.completedTasks; 1873 if (w.isLocked()) 1874 ++n; 1875 } 1876 return n + workQueue.size(); 1877 } finally { 1878 mainLock.unlock(); 1879 } 1880 } 1881 1882 /** 1883 * Returns the approximate total number of tasks that have 1884 * completed execution. Because the states of tasks and threads 1885 * may change dynamically during computation, the returned value 1886 * is only an approximation, but one that does not ever decrease 1887 * across successive calls. 1888 * 1889 * @return the number of tasks 1890 */ getCompletedTaskCount()1891 public long getCompletedTaskCount() { 1892 final ReentrantLock mainLock = this.mainLock; 1893 mainLock.lock(); 1894 try { 1895 long n = completedTaskCount; 1896 for (Worker w : workers) 1897 n += w.completedTasks; 1898 return n; 1899 } finally { 1900 mainLock.unlock(); 1901 } 1902 } 1903 1904 /** 1905 * Returns a string identifying this pool, as well as its state, 1906 * including indications of run state and estimated worker and 1907 * task counts. 1908 * 1909 * @return a string identifying this pool, as well as its state 1910 */ toString()1911 public String toString() { 1912 long ncompleted; 1913 int nworkers, nactive; 1914 final ReentrantLock mainLock = this.mainLock; 1915 mainLock.lock(); 1916 try { 1917 ncompleted = completedTaskCount; 1918 nactive = 0; 1919 nworkers = workers.size(); 1920 for (Worker w : workers) { 1921 ncompleted += w.completedTasks; 1922 if (w.isLocked()) 1923 ++nactive; 1924 } 1925 } finally { 1926 mainLock.unlock(); 1927 } 1928 int c = ctl.get(); 1929 String rs = (runStateLessThan(c, SHUTDOWN) ? "Running" : 1930 (runStateAtLeast(c, TERMINATED) ? "Terminated" : 1931 "Shutting down")); 1932 return super.toString() + 1933 "[" + rs + 1934 ", pool size = " + nworkers + 1935 ", active threads = " + nactive + 1936 ", queued tasks = " + workQueue.size() + 1937 ", completed tasks = " + ncompleted + 1938 "]"; 1939 } 1940 1941 /* Extension hooks */ 1942 1943 /** 1944 * Method invoked prior to executing the given Runnable in the 1945 * given thread. This method is invoked by thread {@code t} that 1946 * will execute task {@code r}, and may be used to re-initialize 1947 * ThreadLocals, or to perform logging. 1948 * 1949 * <p>This implementation does nothing, but may be customized in 1950 * subclasses. Note: To properly nest multiple overridings, subclasses 1951 * should generally invoke {@code super.beforeExecute} at the end of 1952 * this method. 1953 * 1954 * @param t the thread that will run task {@code r} 1955 * @param r the task that will be executed 1956 */ beforeExecute(Thread t, Runnable r)1957 protected void beforeExecute(Thread t, Runnable r) { } 1958 1959 /** 1960 * Method invoked upon completion of execution of the given Runnable. 1961 * This method is invoked by the thread that executed the task. If 1962 * non-null, the Throwable is the uncaught {@code RuntimeException} 1963 * or {@code Error} that caused execution to terminate abruptly. 1964 * 1965 * <p>This implementation does nothing, but may be customized in 1966 * subclasses. Note: To properly nest multiple overridings, subclasses 1967 * should generally invoke {@code super.afterExecute} at the 1968 * beginning of this method. 1969 * 1970 * <p><b>Note:</b> When actions are enclosed in tasks (such as 1971 * {@link FutureTask}) either explicitly or via methods such as 1972 * {@code submit}, these task objects catch and maintain 1973 * computational exceptions, and so they do not cause abrupt 1974 * termination, and the internal exceptions are <em>not</em> 1975 * passed to this method. If you would like to trap both kinds of 1976 * failures in this method, you can further probe for such cases, 1977 * as in this sample subclass that prints either the direct cause 1978 * or the underlying exception if a task has been aborted: 1979 * 1980 * <pre> {@code 1981 * class ExtendedExecutor extends ThreadPoolExecutor { 1982 * // ... 1983 * protected void afterExecute(Runnable r, Throwable t) { 1984 * super.afterExecute(r, t); 1985 * if (t == null && r instanceof Future<?>) { 1986 * try { 1987 * Object result = ((Future<?>) r).get(); 1988 * } catch (CancellationException ce) { 1989 * t = ce; 1990 * } catch (ExecutionException ee) { 1991 * t = ee.getCause(); 1992 * } catch (InterruptedException ie) { 1993 * Thread.currentThread().interrupt(); // ignore/reset 1994 * } 1995 * } 1996 * if (t != null) 1997 * System.out.println(t); 1998 * } 1999 * }}</pre> 2000 * 2001 * @param r the runnable that has completed 2002 * @param t the exception that caused termination, or null if 2003 * execution completed normally 2004 */ afterExecute(Runnable r, Throwable t)2005 protected void afterExecute(Runnable r, Throwable t) { } 2006 2007 /** 2008 * Method invoked when the Executor has terminated. Default 2009 * implementation does nothing. Note: To properly nest multiple 2010 * overridings, subclasses should generally invoke 2011 * {@code super.terminated} within this method. 2012 */ terminated()2013 protected void terminated() { } 2014 2015 /* Predefined RejectedExecutionHandlers */ 2016 2017 /** 2018 * A handler for rejected tasks that runs the rejected task 2019 * directly in the calling thread of the {@code execute} method, 2020 * unless the executor has been shut down, in which case the task 2021 * is discarded. 2022 */ 2023 public static class CallerRunsPolicy implements RejectedExecutionHandler { 2024 /** 2025 * Creates a {@code CallerRunsPolicy}. 2026 */ CallerRunsPolicy()2027 public CallerRunsPolicy() { } 2028 2029 /** 2030 * Executes task r in the caller's thread, unless the executor 2031 * has been shut down, in which case the task is discarded. 2032 * 2033 * @param r the runnable task requested to be executed 2034 * @param e the executor attempting to execute this task 2035 */ rejectedExecution(Runnable r, ThreadPoolExecutor e)2036 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 2037 if (!e.isShutdown()) { 2038 r.run(); 2039 } 2040 } 2041 } 2042 2043 /** 2044 * A handler for rejected tasks that throws a 2045 * {@code RejectedExecutionException}. 2046 */ 2047 public static class AbortPolicy implements RejectedExecutionHandler { 2048 /** 2049 * Creates an {@code AbortPolicy}. 2050 */ AbortPolicy()2051 public AbortPolicy() { } 2052 2053 /** 2054 * Always throws RejectedExecutionException. 2055 * 2056 * @param r the runnable task requested to be executed 2057 * @param e the executor attempting to execute this task 2058 * @throws RejectedExecutionException always 2059 */ rejectedExecution(Runnable r, ThreadPoolExecutor e)2060 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 2061 throw new RejectedExecutionException("Task " + r.toString() + 2062 " rejected from " + 2063 e.toString()); 2064 } 2065 } 2066 2067 /** 2068 * A handler for rejected tasks that silently discards the 2069 * rejected task. 2070 */ 2071 public static class DiscardPolicy implements RejectedExecutionHandler { 2072 /** 2073 * Creates a {@code DiscardPolicy}. 2074 */ DiscardPolicy()2075 public DiscardPolicy() { } 2076 2077 /** 2078 * Does nothing, which has the effect of discarding task r. 2079 * 2080 * @param r the runnable task requested to be executed 2081 * @param e the executor attempting to execute this task 2082 */ rejectedExecution(Runnable r, ThreadPoolExecutor e)2083 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 2084 } 2085 } 2086 2087 /** 2088 * A handler for rejected tasks that discards the oldest unhandled 2089 * request and then retries {@code execute}, unless the executor 2090 * is shut down, in which case the task is discarded. 2091 */ 2092 public static class DiscardOldestPolicy implements RejectedExecutionHandler { 2093 /** 2094 * Creates a {@code DiscardOldestPolicy} for the given executor. 2095 */ DiscardOldestPolicy()2096 public DiscardOldestPolicy() { } 2097 2098 /** 2099 * Obtains and ignores the next task that the executor 2100 * would otherwise execute, if one is immediately available, 2101 * and then retries execution of task r, unless the executor 2102 * is shut down, in which case task r is instead discarded. 2103 * 2104 * @param r the runnable task requested to be executed 2105 * @param e the executor attempting to execute this task 2106 */ rejectedExecution(Runnable r, ThreadPoolExecutor e)2107 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 2108 if (!e.isShutdown()) { 2109 e.getQueue().poll(); 2110 e.execute(r); 2111 } 2112 } 2113 } 2114 } 2115