1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 38 import java.util.concurrent.TimeUnit; 39 import java.util.concurrent.TimeoutException; 40 import java.util.concurrent.atomic.AtomicReference; 41 import java.util.concurrent.locks.LockSupport; 42 43 /** 44 * A reusable synchronization barrier, similar in functionality to 45 * {@link java.util.concurrent.CyclicBarrier CyclicBarrier} and 46 * {@link java.util.concurrent.CountDownLatch CountDownLatch} 47 * but supporting more flexible usage. 48 * 49 * <p><b>Registration.</b> Unlike the case for other barriers, the 50 * number of parties <em>registered</em> to synchronize on a phaser 51 * may vary over time. Tasks may be registered at any time (using 52 * methods {@link #register}, {@link #bulkRegister}, or forms of 53 * constructors establishing initial numbers of parties), and 54 * optionally deregistered upon any arrival (using {@link 55 * #arriveAndDeregister}). As is the case with most basic 56 * synchronization constructs, registration and deregistration affect 57 * only internal counts; they do not establish any further internal 58 * bookkeeping, so tasks cannot query whether they are registered. 59 * (However, you can introduce such bookkeeping by subclassing this 60 * class.) 61 * 62 * <p><b>Synchronization.</b> Like a {@code CyclicBarrier}, a {@code 63 * Phaser} may be repeatedly awaited. Method {@link 64 * #arriveAndAwaitAdvance} has effect analogous to {@link 65 * java.util.concurrent.CyclicBarrier#await CyclicBarrier.await}. Each 66 * generation of a phaser has an associated phase number. The phase 67 * number starts at zero, and advances when all parties arrive at the 68 * phaser, wrapping around to zero after reaching {@code 69 * Integer.MAX_VALUE}. The use of phase numbers enables independent 70 * control of actions upon arrival at a phaser and upon awaiting 71 * others, via two kinds of methods that may be invoked by any 72 * registered party: 73 * 74 * <ul> 75 * 76 * <li> <b>Arrival.</b> Methods {@link #arrive} and 77 * {@link #arriveAndDeregister} record arrival. These methods 78 * do not block, but return an associated <em>arrival phase 79 * number</em>; that is, the phase number of the phaser to which 80 * the arrival applied. When the final party for a given phase 81 * arrives, an optional action is performed and the phase 82 * advances. These actions are performed by the party 83 * triggering a phase advance, and are arranged by overriding 84 * method {@link #onAdvance(int, int)}, which also controls 85 * termination. Overriding this method is similar to, but more 86 * flexible than, providing a barrier action to a {@code 87 * CyclicBarrier}. 88 * 89 * <li> <b>Waiting.</b> Method {@link #awaitAdvance} requires an 90 * argument indicating an arrival phase number, and returns when 91 * the phaser advances to (or is already at) a different phase. 92 * Unlike similar constructions using {@code CyclicBarrier}, 93 * method {@code awaitAdvance} continues to wait even if the 94 * waiting thread is interrupted. Interruptible and timeout 95 * versions are also available, but exceptions encountered while 96 * tasks wait interruptibly or with timeout do not change the 97 * state of the phaser. If necessary, you can perform any 98 * associated recovery within handlers of those exceptions, 99 * often after invoking {@code forceTermination}. Phasers may 100 * also be used by tasks executing in a {@link ForkJoinPool}, 101 * which will ensure sufficient parallelism to execute tasks 102 * when others are blocked waiting for a phase to advance. 103 * 104 * </ul> 105 * 106 * <p><b>Termination.</b> A phaser may enter a <em>termination</em> 107 * state, that may be checked using method {@link #isTerminated}. Upon 108 * termination, all synchronization methods immediately return without 109 * waiting for advance, as indicated by a negative return value. 110 * Similarly, attempts to register upon termination have no effect. 111 * Termination is triggered when an invocation of {@code onAdvance} 112 * returns {@code true}. The default implementation returns {@code 113 * true} if a deregistration has caused the number of registered 114 * parties to become zero. As illustrated below, when phasers control 115 * actions with a fixed number of iterations, it is often convenient 116 * to override this method to cause termination when the current phase 117 * number reaches a threshold. Method {@link #forceTermination} is 118 * also available to abruptly release waiting threads and allow them 119 * to terminate. 120 * 121 * <p><b>Tiering.</b> Phasers may be <em>tiered</em> (i.e., 122 * constructed in tree structures) to reduce contention. Phasers with 123 * large numbers of parties that would otherwise experience heavy 124 * synchronization contention costs may instead be set up so that 125 * groups of sub-phasers share a common parent. This may greatly 126 * increase throughput even though it incurs greater per-operation 127 * overhead. 128 * 129 * <p>In a tree of tiered phasers, registration and deregistration of 130 * child phasers with their parent are managed automatically. 131 * Whenever the number of registered parties of a child phaser becomes 132 * non-zero (as established in the {@link #Phaser(Phaser,int)} 133 * constructor, {@link #register}, or {@link #bulkRegister}), the 134 * child phaser is registered with its parent. Whenever the number of 135 * registered parties becomes zero as the result of an invocation of 136 * {@link #arriveAndDeregister}, the child phaser is deregistered 137 * from its parent. 138 * 139 * <p><b>Monitoring.</b> While synchronization methods may be invoked 140 * only by registered parties, the current state of a phaser may be 141 * monitored by any caller. At any given moment there are {@link 142 * #getRegisteredParties} parties in total, of which {@link 143 * #getArrivedParties} have arrived at the current phase ({@link 144 * #getPhase}). When the remaining ({@link #getUnarrivedParties}) 145 * parties arrive, the phase advances. The values returned by these 146 * methods may reflect transient states and so are not in general 147 * useful for synchronization control. Method {@link #toString} 148 * returns snapshots of these state queries in a form convenient for 149 * informal monitoring. 150 * 151 * <p><b>Sample usages:</b> 152 * 153 * <p>A {@code Phaser} may be used instead of a {@code CountDownLatch} 154 * to control a one-shot action serving a variable number of parties. 155 * The typical idiom is for the method setting this up to first 156 * register, then start the actions, then deregister, as in: 157 * 158 * <pre> {@code 159 * void runTasks(List<Runnable> tasks) { 160 * final Phaser phaser = new Phaser(1); // "1" to register self 161 * // create and start threads 162 * for (final Runnable task : tasks) { 163 * phaser.register(); 164 * new Thread() { 165 * public void run() { 166 * phaser.arriveAndAwaitAdvance(); // await all creation 167 * task.run(); 168 * } 169 * }.start(); 170 * } 171 * 172 * // allow threads to start and deregister self 173 * phaser.arriveAndDeregister(); 174 * }}</pre> 175 * 176 * <p>One way to cause a set of threads to repeatedly perform actions 177 * for a given number of iterations is to override {@code onAdvance}: 178 * 179 * <pre> {@code 180 * void startTasks(List<Runnable> tasks, final int iterations) { 181 * final Phaser phaser = new Phaser() { 182 * protected boolean onAdvance(int phase, int registeredParties) { 183 * return phase >= iterations || registeredParties == 0; 184 * } 185 * }; 186 * phaser.register(); 187 * for (final Runnable task : tasks) { 188 * phaser.register(); 189 * new Thread() { 190 * public void run() { 191 * do { 192 * task.run(); 193 * phaser.arriveAndAwaitAdvance(); 194 * } while (!phaser.isTerminated()); 195 * } 196 * }.start(); 197 * } 198 * phaser.arriveAndDeregister(); // deregister self, don't wait 199 * }}</pre> 200 * 201 * If the main task must later await termination, it 202 * may re-register and then execute a similar loop: 203 * <pre> {@code 204 * // ... 205 * phaser.register(); 206 * while (!phaser.isTerminated()) 207 * phaser.arriveAndAwaitAdvance();}</pre> 208 * 209 * <p>Related constructions may be used to await particular phase numbers 210 * in contexts where you are sure that the phase will never wrap around 211 * {@code Integer.MAX_VALUE}. For example: 212 * 213 * <pre> {@code 214 * void awaitPhase(Phaser phaser, int phase) { 215 * int p = phaser.register(); // assumes caller not already registered 216 * while (p < phase) { 217 * if (phaser.isTerminated()) 218 * // ... deal with unexpected termination 219 * else 220 * p = phaser.arriveAndAwaitAdvance(); 221 * } 222 * phaser.arriveAndDeregister(); 223 * }}</pre> 224 * 225 * 226 * <p>To create a set of {@code n} tasks using a tree of phasers, you 227 * could use code of the following form, assuming a Task class with a 228 * constructor accepting a {@code Phaser} that it registers with upon 229 * construction. After invocation of {@code build(new Task[n], 0, n, 230 * new Phaser())}, these tasks could then be started, for example by 231 * submitting to a pool: 232 * 233 * <pre> {@code 234 * void build(Task[] tasks, int lo, int hi, Phaser ph) { 235 * if (hi - lo > TASKS_PER_PHASER) { 236 * for (int i = lo; i < hi; i += TASKS_PER_PHASER) { 237 * int j = Math.min(i + TASKS_PER_PHASER, hi); 238 * build(tasks, i, j, new Phaser(ph)); 239 * } 240 * } else { 241 * for (int i = lo; i < hi; ++i) 242 * tasks[i] = new Task(ph); 243 * // assumes new Task(ph) performs ph.register() 244 * } 245 * }}</pre> 246 * 247 * The best value of {@code TASKS_PER_PHASER} depends mainly on 248 * expected synchronization rates. A value as low as four may 249 * be appropriate for extremely small per-phase task bodies (thus 250 * high rates), or up to hundreds for extremely large ones. 251 * 252 * <p><b>Implementation notes</b>: This implementation restricts the 253 * maximum number of parties to 65535. Attempts to register additional 254 * parties result in {@code IllegalStateException}. However, you can and 255 * should create tiered phasers to accommodate arbitrarily large sets 256 * of participants. 257 * 258 * @since 1.7 259 * @author Doug Lea 260 */ 261 public class Phaser { 262 /* 263 * This class implements an extension of X10 "clocks". Thanks to 264 * Vijay Saraswat for the idea, and to Vivek Sarkar for 265 * enhancements to extend functionality. 266 */ 267 268 /** 269 * Primary state representation, holding four bit-fields: 270 * 271 * unarrived -- the number of parties yet to hit barrier (bits 0-15) 272 * parties -- the number of parties to wait (bits 16-31) 273 * phase -- the generation of the barrier (bits 32-62) 274 * terminated -- set if barrier is terminated (bit 63 / sign) 275 * 276 * Except that a phaser with no registered parties is 277 * distinguished by the otherwise illegal state of having zero 278 * parties and one unarrived parties (encoded as EMPTY below). 279 * 280 * To efficiently maintain atomicity, these values are packed into 281 * a single (atomic) long. Good performance relies on keeping 282 * state decoding and encoding simple, and keeping race windows 283 * short. 284 * 285 * All state updates are performed via CAS except initial 286 * registration of a sub-phaser (i.e., one with a non-null 287 * parent). In this (relatively rare) case, we use built-in 288 * synchronization to lock while first registering with its 289 * parent. 290 * 291 * The phase of a subphaser is allowed to lag that of its 292 * ancestors until it is actually accessed -- see method 293 * reconcileState. 294 */ 295 private volatile long state; 296 297 private static final int MAX_PARTIES = 0xffff; 298 private static final int MAX_PHASE = Integer.MAX_VALUE; 299 private static final int PARTIES_SHIFT = 16; 300 private static final int PHASE_SHIFT = 32; 301 private static final int UNARRIVED_MASK = 0xffff; // to mask ints 302 private static final long PARTIES_MASK = 0xffff0000L; // to mask longs 303 private static final long COUNTS_MASK = 0xffffffffL; 304 private static final long TERMINATION_BIT = 1L << 63; 305 306 // some special values 307 private static final int ONE_ARRIVAL = 1; 308 private static final int ONE_PARTY = 1 << PARTIES_SHIFT; 309 private static final int ONE_DEREGISTER = ONE_ARRIVAL|ONE_PARTY; 310 private static final int EMPTY = 1; 311 312 // The following unpacking methods are usually manually inlined 313 unarrivedOf(long s)314 private static int unarrivedOf(long s) { 315 int counts = (int)s; 316 return (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK); 317 } 318 partiesOf(long s)319 private static int partiesOf(long s) { 320 return (int)s >>> PARTIES_SHIFT; 321 } 322 phaseOf(long s)323 private static int phaseOf(long s) { 324 return (int)(s >>> PHASE_SHIFT); 325 } 326 arrivedOf(long s)327 private static int arrivedOf(long s) { 328 int counts = (int)s; 329 return (counts == EMPTY) ? 0 : 330 (counts >>> PARTIES_SHIFT) - (counts & UNARRIVED_MASK); 331 } 332 333 /** 334 * The parent of this phaser, or null if none 335 */ 336 private final Phaser parent; 337 338 /** 339 * The root of phaser tree. Equals this if not in a tree. 340 */ 341 private final Phaser root; 342 343 /** 344 * Heads of Treiber stacks for waiting threads. To eliminate 345 * contention when releasing some threads while adding others, we 346 * use two of them, alternating across even and odd phases. 347 * Subphasers share queues with root to speed up releases. 348 */ 349 private final AtomicReference<QNode> evenQ; 350 private final AtomicReference<QNode> oddQ; 351 queueFor(int phase)352 private AtomicReference<QNode> queueFor(int phase) { 353 return ((phase & 1) == 0) ? evenQ : oddQ; 354 } 355 356 /** 357 * Returns message string for bounds exceptions on arrival. 358 */ badArrive(long s)359 private String badArrive(long s) { 360 return "Attempted arrival of unregistered party for " + 361 stateToString(s); 362 } 363 364 /** 365 * Returns message string for bounds exceptions on registration. 366 */ badRegister(long s)367 private String badRegister(long s) { 368 return "Attempt to register more than " + 369 MAX_PARTIES + " parties for " + stateToString(s); 370 } 371 372 /** 373 * Main implementation for methods arrive and arriveAndDeregister. 374 * Manually tuned to speed up and minimize race windows for the 375 * common case of just decrementing unarrived field. 376 * 377 * @param adjust value to subtract from state; 378 * ONE_ARRIVAL for arrive, 379 * ONE_DEREGISTER for arriveAndDeregister 380 */ doArrive(int adjust)381 private int doArrive(int adjust) { 382 final Phaser root = this.root; 383 for (;;) { 384 long s = (root == this) ? state : reconcileState(); 385 int phase = (int)(s >>> PHASE_SHIFT); 386 if (phase < 0) 387 return phase; 388 int counts = (int)s; 389 int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK); 390 if (unarrived <= 0) 391 throw new IllegalStateException(badArrive(s)); 392 if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) { 393 if (unarrived == 1) { 394 long n = s & PARTIES_MASK; // base of next state 395 int nextUnarrived = (int)n >>> PARTIES_SHIFT; 396 if (root == this) { 397 if (onAdvance(phase, nextUnarrived)) 398 n |= TERMINATION_BIT; 399 else if (nextUnarrived == 0) 400 n |= EMPTY; 401 else 402 n |= nextUnarrived; 403 int nextPhase = (phase + 1) & MAX_PHASE; 404 n |= (long)nextPhase << PHASE_SHIFT; 405 UNSAFE.compareAndSwapLong(this, stateOffset, s, n); 406 releaseWaiters(phase); 407 } 408 else if (nextUnarrived == 0) { // propagate deregistration 409 phase = parent.doArrive(ONE_DEREGISTER); 410 UNSAFE.compareAndSwapLong(this, stateOffset, 411 s, s | EMPTY); 412 } 413 else 414 phase = parent.doArrive(ONE_ARRIVAL); 415 } 416 return phase; 417 } 418 } 419 } 420 421 /** 422 * Implementation of register, bulkRegister 423 * 424 * @param registrations number to add to both parties and 425 * unarrived fields. Must be greater than zero. 426 */ doRegister(int registrations)427 private int doRegister(int registrations) { 428 // adjustment to state 429 long adjust = ((long)registrations << PARTIES_SHIFT) | registrations; 430 final Phaser parent = this.parent; 431 int phase; 432 for (;;) { 433 long s = (parent == null) ? state : reconcileState(); 434 int counts = (int)s; 435 int parties = counts >>> PARTIES_SHIFT; 436 int unarrived = counts & UNARRIVED_MASK; 437 if (registrations > MAX_PARTIES - parties) 438 throw new IllegalStateException(badRegister(s)); 439 phase = (int)(s >>> PHASE_SHIFT); 440 if (phase < 0) 441 break; 442 if (counts != EMPTY) { // not 1st registration 443 if (parent == null || reconcileState() == s) { 444 if (unarrived == 0) // wait out advance 445 root.internalAwaitAdvance(phase, null); 446 else if (UNSAFE.compareAndSwapLong(this, stateOffset, 447 s, s + adjust)) 448 break; 449 } 450 } 451 else if (parent == null) { // 1st root registration 452 long next = ((long)phase << PHASE_SHIFT) | adjust; 453 if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next)) 454 break; 455 } 456 else { 457 synchronized (this) { // 1st sub registration 458 if (state == s) { // recheck under lock 459 phase = parent.doRegister(1); 460 if (phase < 0) 461 break; 462 // finish registration whenever parent registration 463 // succeeded, even when racing with termination, 464 // since these are part of the same "transaction". 465 while (!UNSAFE.compareAndSwapLong 466 (this, stateOffset, s, 467 ((long)phase << PHASE_SHIFT) | adjust)) { 468 s = state; 469 phase = (int)(root.state >>> PHASE_SHIFT); 470 // assert (int)s == EMPTY; 471 } 472 break; 473 } 474 } 475 } 476 } 477 return phase; 478 } 479 480 /** 481 * Resolves lagged phase propagation from root if necessary. 482 * Reconciliation normally occurs when root has advanced but 483 * subphasers have not yet done so, in which case they must finish 484 * their own advance by setting unarrived to parties (or if 485 * parties is zero, resetting to unregistered EMPTY state). 486 * 487 * @return reconciled state 488 */ reconcileState()489 private long reconcileState() { 490 final Phaser root = this.root; 491 long s = state; 492 if (root != this) { 493 int phase, p; 494 // CAS to root phase with current parties, tripping unarrived 495 while ((phase = (int)(root.state >>> PHASE_SHIFT)) != 496 (int)(s >>> PHASE_SHIFT) && 497 !UNSAFE.compareAndSwapLong 498 (this, stateOffset, s, 499 s = (((long)phase << PHASE_SHIFT) | 500 ((phase < 0) ? (s & COUNTS_MASK) : 501 (((p = (int)s >>> PARTIES_SHIFT) == 0) ? EMPTY : 502 ((s & PARTIES_MASK) | p)))))) 503 s = state; 504 } 505 return s; 506 } 507 508 /** 509 * Creates a new phaser with no initially registered parties, no 510 * parent, and initial phase number 0. Any thread using this 511 * phaser will need to first register for it. 512 */ Phaser()513 public Phaser() { 514 this(null, 0); 515 } 516 517 /** 518 * Creates a new phaser with the given number of registered 519 * unarrived parties, no parent, and initial phase number 0. 520 * 521 * @param parties the number of parties required to advance to the 522 * next phase 523 * @throws IllegalArgumentException if parties less than zero 524 * or greater than the maximum number of parties supported 525 */ Phaser(int parties)526 public Phaser(int parties) { 527 this(null, parties); 528 } 529 530 /** 531 * Equivalent to {@link #Phaser(Phaser, int) Phaser(parent, 0)}. 532 * 533 * @param parent the parent phaser 534 */ Phaser(Phaser parent)535 public Phaser(Phaser parent) { 536 this(parent, 0); 537 } 538 539 /** 540 * Creates a new phaser with the given parent and number of 541 * registered unarrived parties. When the given parent is non-null 542 * and the given number of parties is greater than zero, this 543 * child phaser is registered with its parent. 544 * 545 * @param parent the parent phaser 546 * @param parties the number of parties required to advance to the 547 * next phase 548 * @throws IllegalArgumentException if parties less than zero 549 * or greater than the maximum number of parties supported 550 */ Phaser(Phaser parent, int parties)551 public Phaser(Phaser parent, int parties) { 552 if (parties >>> PARTIES_SHIFT != 0) 553 throw new IllegalArgumentException("Illegal number of parties"); 554 int phase = 0; 555 this.parent = parent; 556 if (parent != null) { 557 final Phaser root = parent.root; 558 this.root = root; 559 this.evenQ = root.evenQ; 560 this.oddQ = root.oddQ; 561 if (parties != 0) 562 phase = parent.doRegister(1); 563 } 564 else { 565 this.root = this; 566 this.evenQ = new AtomicReference<QNode>(); 567 this.oddQ = new AtomicReference<QNode>(); 568 } 569 this.state = (parties == 0) ? (long)EMPTY : 570 ((long)phase << PHASE_SHIFT) | 571 ((long)parties << PARTIES_SHIFT) | 572 ((long)parties); 573 } 574 575 /** 576 * Adds a new unarrived party to this phaser. If an ongoing 577 * invocation of {@link #onAdvance} is in progress, this method 578 * may await its completion before returning. If this phaser has 579 * a parent, and this phaser previously had no registered parties, 580 * this child phaser is also registered with its parent. If 581 * this phaser is terminated, the attempt to register has 582 * no effect, and a negative value is returned. 583 * 584 * @return the arrival phase number to which this registration 585 * applied. If this value is negative, then this phaser has 586 * terminated, in which case registration has no effect. 587 * @throws IllegalStateException if attempting to register more 588 * than the maximum supported number of parties 589 */ register()590 public int register() { 591 return doRegister(1); 592 } 593 594 /** 595 * Adds the given number of new unarrived parties to this phaser. 596 * If an ongoing invocation of {@link #onAdvance} is in progress, 597 * this method may await its completion before returning. If this 598 * phaser has a parent, and the given number of parties is greater 599 * than zero, and this phaser previously had no registered 600 * parties, this child phaser is also registered with its parent. 601 * If this phaser is terminated, the attempt to register has no 602 * effect, and a negative value is returned. 603 * 604 * @param parties the number of additional parties required to 605 * advance to the next phase 606 * @return the arrival phase number to which this registration 607 * applied. If this value is negative, then this phaser has 608 * terminated, in which case registration has no effect. 609 * @throws IllegalStateException if attempting to register more 610 * than the maximum supported number of parties 611 * @throws IllegalArgumentException if {@code parties < 0} 612 */ bulkRegister(int parties)613 public int bulkRegister(int parties) { 614 if (parties < 0) 615 throw new IllegalArgumentException(); 616 if (parties == 0) 617 return getPhase(); 618 return doRegister(parties); 619 } 620 621 /** 622 * Arrives at this phaser, without waiting for others to arrive. 623 * 624 * <p>It is a usage error for an unregistered party to invoke this 625 * method. However, this error may result in an {@code 626 * IllegalStateException} only upon some subsequent operation on 627 * this phaser, if ever. 628 * 629 * @return the arrival phase number, or a negative value if terminated 630 * @throws IllegalStateException if not terminated and the number 631 * of unarrived parties would become negative 632 */ arrive()633 public int arrive() { 634 return doArrive(ONE_ARRIVAL); 635 } 636 637 /** 638 * Arrives at this phaser and deregisters from it without waiting 639 * for others to arrive. Deregistration reduces the number of 640 * parties required to advance in future phases. If this phaser 641 * has a parent, and deregistration causes this phaser to have 642 * zero parties, this phaser is also deregistered from its parent. 643 * 644 * <p>It is a usage error for an unregistered party to invoke this 645 * method. However, this error may result in an {@code 646 * IllegalStateException} only upon some subsequent operation on 647 * this phaser, if ever. 648 * 649 * @return the arrival phase number, or a negative value if terminated 650 * @throws IllegalStateException if not terminated and the number 651 * of registered or unarrived parties would become negative 652 */ arriveAndDeregister()653 public int arriveAndDeregister() { 654 return doArrive(ONE_DEREGISTER); 655 } 656 657 /** 658 * Arrives at this phaser and awaits others. Equivalent in effect 659 * to {@code awaitAdvance(arrive())}. If you need to await with 660 * interruption or timeout, you can arrange this with an analogous 661 * construction using one of the other forms of the {@code 662 * awaitAdvance} method. If instead you need to deregister upon 663 * arrival, use {@code awaitAdvance(arriveAndDeregister())}. 664 * 665 * <p>It is a usage error for an unregistered party to invoke this 666 * method. However, this error may result in an {@code 667 * IllegalStateException} only upon some subsequent operation on 668 * this phaser, if ever. 669 * 670 * @return the arrival phase number, or the (negative) 671 * {@linkplain #getPhase() current phase} if terminated 672 * @throws IllegalStateException if not terminated and the number 673 * of unarrived parties would become negative 674 */ arriveAndAwaitAdvance()675 public int arriveAndAwaitAdvance() { 676 // Specialization of doArrive+awaitAdvance eliminating some reads/paths 677 final Phaser root = this.root; 678 for (;;) { 679 long s = (root == this) ? state : reconcileState(); 680 int phase = (int)(s >>> PHASE_SHIFT); 681 if (phase < 0) 682 return phase; 683 int counts = (int)s; 684 int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK); 685 if (unarrived <= 0) 686 throw new IllegalStateException(badArrive(s)); 687 if (UNSAFE.compareAndSwapLong(this, stateOffset, s, 688 s -= ONE_ARRIVAL)) { 689 if (unarrived > 1) 690 return root.internalAwaitAdvance(phase, null); 691 if (root != this) 692 return parent.arriveAndAwaitAdvance(); 693 long n = s & PARTIES_MASK; // base of next state 694 int nextUnarrived = (int)n >>> PARTIES_SHIFT; 695 if (onAdvance(phase, nextUnarrived)) 696 n |= TERMINATION_BIT; 697 else if (nextUnarrived == 0) 698 n |= EMPTY; 699 else 700 n |= nextUnarrived; 701 int nextPhase = (phase + 1) & MAX_PHASE; 702 n |= (long)nextPhase << PHASE_SHIFT; 703 if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n)) 704 return (int)(state >>> PHASE_SHIFT); // terminated 705 releaseWaiters(phase); 706 return nextPhase; 707 } 708 } 709 } 710 711 /** 712 * Awaits the phase of this phaser to advance from the given phase 713 * value, returning immediately if the current phase is not equal 714 * to the given phase value or this phaser is terminated. 715 * 716 * @param phase an arrival phase number, or negative value if 717 * terminated; this argument is normally the value returned by a 718 * previous call to {@code arrive} or {@code arriveAndDeregister}. 719 * @return the next arrival phase number, or the argument if it is 720 * negative, or the (negative) {@linkplain #getPhase() current phase} 721 * if terminated 722 */ awaitAdvance(int phase)723 public int awaitAdvance(int phase) { 724 final Phaser root = this.root; 725 long s = (root == this) ? state : reconcileState(); 726 int p = (int)(s >>> PHASE_SHIFT); 727 if (phase < 0) 728 return phase; 729 if (p == phase) 730 return root.internalAwaitAdvance(phase, null); 731 return p; 732 } 733 734 /** 735 * Awaits the phase of this phaser to advance from the given phase 736 * value, throwing {@code InterruptedException} if interrupted 737 * while waiting, or returning immediately if the current phase is 738 * not equal to the given phase value or this phaser is 739 * terminated. 740 * 741 * @param phase an arrival phase number, or negative value if 742 * terminated; this argument is normally the value returned by a 743 * previous call to {@code arrive} or {@code arriveAndDeregister}. 744 * @return the next arrival phase number, or the argument if it is 745 * negative, or the (negative) {@linkplain #getPhase() current phase} 746 * if terminated 747 * @throws InterruptedException if thread interrupted while waiting 748 */ awaitAdvanceInterruptibly(int phase)749 public int awaitAdvanceInterruptibly(int phase) 750 throws InterruptedException { 751 final Phaser root = this.root; 752 long s = (root == this) ? state : reconcileState(); 753 int p = (int)(s >>> PHASE_SHIFT); 754 if (phase < 0) 755 return phase; 756 if (p == phase) { 757 QNode node = new QNode(this, phase, true, false, 0L); 758 p = root.internalAwaitAdvance(phase, node); 759 if (node.wasInterrupted) 760 throw new InterruptedException(); 761 } 762 return p; 763 } 764 765 /** 766 * Awaits the phase of this phaser to advance from the given phase 767 * value or the given timeout to elapse, throwing {@code 768 * InterruptedException} if interrupted while waiting, or 769 * returning immediately if the current phase is not equal to the 770 * given phase value or this phaser is terminated. 771 * 772 * @param phase an arrival phase number, or negative value if 773 * terminated; this argument is normally the value returned by a 774 * previous call to {@code arrive} or {@code arriveAndDeregister}. 775 * @param timeout how long to wait before giving up, in units of 776 * {@code unit} 777 * @param unit a {@code TimeUnit} determining how to interpret the 778 * {@code timeout} parameter 779 * @return the next arrival phase number, or the argument if it is 780 * negative, or the (negative) {@linkplain #getPhase() current phase} 781 * if terminated 782 * @throws InterruptedException if thread interrupted while waiting 783 * @throws TimeoutException if timed out while waiting 784 */ awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)785 public int awaitAdvanceInterruptibly(int phase, 786 long timeout, TimeUnit unit) 787 throws InterruptedException, TimeoutException { 788 long nanos = unit.toNanos(timeout); 789 final Phaser root = this.root; 790 long s = (root == this) ? state : reconcileState(); 791 int p = (int)(s >>> PHASE_SHIFT); 792 if (phase < 0) 793 return phase; 794 if (p == phase) { 795 QNode node = new QNode(this, phase, true, true, nanos); 796 p = root.internalAwaitAdvance(phase, node); 797 if (node.wasInterrupted) 798 throw new InterruptedException(); 799 else if (p == phase) 800 throw new TimeoutException(); 801 } 802 return p; 803 } 804 805 /** 806 * Forces this phaser to enter termination state. Counts of 807 * registered parties are unaffected. If this phaser is a member 808 * of a tiered set of phasers, then all of the phasers in the set 809 * are terminated. If this phaser is already terminated, this 810 * method has no effect. This method may be useful for 811 * coordinating recovery after one or more tasks encounter 812 * unexpected exceptions. 813 */ forceTermination()814 public void forceTermination() { 815 // Only need to change root state 816 final Phaser root = this.root; 817 long s; 818 while ((s = root.state) >= 0) { 819 if (UNSAFE.compareAndSwapLong(root, stateOffset, 820 s, s | TERMINATION_BIT)) { 821 // signal all threads 822 releaseWaiters(0); // Waiters on evenQ 823 releaseWaiters(1); // Waiters on oddQ 824 return; 825 } 826 } 827 } 828 829 /** 830 * Returns the current phase number. The maximum phase number is 831 * {@code Integer.MAX_VALUE}, after which it restarts at 832 * zero. Upon termination, the phase number is negative, 833 * in which case the prevailing phase prior to termination 834 * may be obtained via {@code getPhase() + Integer.MIN_VALUE}. 835 * 836 * @return the phase number, or a negative value if terminated 837 */ getPhase()838 public final int getPhase() { 839 return (int)(root.state >>> PHASE_SHIFT); 840 } 841 842 /** 843 * Returns the number of parties registered at this phaser. 844 * 845 * @return the number of parties 846 */ getRegisteredParties()847 public int getRegisteredParties() { 848 return partiesOf(state); 849 } 850 851 /** 852 * Returns the number of registered parties that have arrived at 853 * the current phase of this phaser. If this phaser has terminated, 854 * the returned value is meaningless and arbitrary. 855 * 856 * @return the number of arrived parties 857 */ getArrivedParties()858 public int getArrivedParties() { 859 return arrivedOf(reconcileState()); 860 } 861 862 /** 863 * Returns the number of registered parties that have not yet 864 * arrived at the current phase of this phaser. If this phaser has 865 * terminated, the returned value is meaningless and arbitrary. 866 * 867 * @return the number of unarrived parties 868 */ getUnarrivedParties()869 public int getUnarrivedParties() { 870 return unarrivedOf(reconcileState()); 871 } 872 873 /** 874 * Returns the parent of this phaser, or {@code null} if none. 875 * 876 * @return the parent of this phaser, or {@code null} if none 877 */ getParent()878 public Phaser getParent() { 879 return parent; 880 } 881 882 /** 883 * Returns the root ancestor of this phaser, which is the same as 884 * this phaser if it has no parent. 885 * 886 * @return the root ancestor of this phaser 887 */ getRoot()888 public Phaser getRoot() { 889 return root; 890 } 891 892 /** 893 * Returns {@code true} if this phaser has been terminated. 894 * 895 * @return {@code true} if this phaser has been terminated 896 */ isTerminated()897 public boolean isTerminated() { 898 return root.state < 0L; 899 } 900 901 /** 902 * Overridable method to perform an action upon impending phase 903 * advance, and to control termination. This method is invoked 904 * upon arrival of the party advancing this phaser (when all other 905 * waiting parties are dormant). If this method returns {@code 906 * true}, this phaser will be set to a final termination state 907 * upon advance, and subsequent calls to {@link #isTerminated} 908 * will return true. Any (unchecked) Exception or Error thrown by 909 * an invocation of this method is propagated to the party 910 * attempting to advance this phaser, in which case no advance 911 * occurs. 912 * 913 * <p>The arguments to this method provide the state of the phaser 914 * prevailing for the current transition. The effects of invoking 915 * arrival, registration, and waiting methods on this phaser from 916 * within {@code onAdvance} are unspecified and should not be 917 * relied on. 918 * 919 * <p>If this phaser is a member of a tiered set of phasers, then 920 * {@code onAdvance} is invoked only for its root phaser on each 921 * advance. 922 * 923 * <p>To support the most common use cases, the default 924 * implementation of this method returns {@code true} when the 925 * number of registered parties has become zero as the result of a 926 * party invoking {@code arriveAndDeregister}. You can disable 927 * this behavior, thus enabling continuation upon future 928 * registrations, by overriding this method to always return 929 * {@code false}: 930 * 931 * <pre> {@code 932 * Phaser phaser = new Phaser() { 933 * protected boolean onAdvance(int phase, int parties) { return false; } 934 * }}</pre> 935 * 936 * @param phase the current phase number on entry to this method, 937 * before this phaser is advanced 938 * @param registeredParties the current number of registered parties 939 * @return {@code true} if this phaser should terminate 940 */ onAdvance(int phase, int registeredParties)941 protected boolean onAdvance(int phase, int registeredParties) { 942 return registeredParties == 0; 943 } 944 945 /** 946 * Returns a string identifying this phaser, as well as its 947 * state. The state, in brackets, includes the String {@code 948 * "phase = "} followed by the phase number, {@code "parties = "} 949 * followed by the number of registered parties, and {@code 950 * "arrived = "} followed by the number of arrived parties. 951 * 952 * @return a string identifying this phaser, as well as its state 953 */ toString()954 public String toString() { 955 return stateToString(reconcileState()); 956 } 957 958 /** 959 * Implementation of toString and string-based error messages 960 */ stateToString(long s)961 private String stateToString(long s) { 962 return super.toString() + 963 "[phase = " + phaseOf(s) + 964 " parties = " + partiesOf(s) + 965 " arrived = " + arrivedOf(s) + "]"; 966 } 967 968 // Waiting mechanics 969 970 /** 971 * Removes and signals threads from queue for phase. 972 */ releaseWaiters(int phase)973 private void releaseWaiters(int phase) { 974 QNode q; // first element of queue 975 Thread t; // its thread 976 AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ; 977 while ((q = head.get()) != null && 978 q.phase != (int)(root.state >>> PHASE_SHIFT)) { 979 if (head.compareAndSet(q, q.next) && 980 (t = q.thread) != null) { 981 q.thread = null; 982 LockSupport.unpark(t); 983 } 984 } 985 } 986 987 /** 988 * Variant of releaseWaiters that additionally tries to remove any 989 * nodes no longer waiting for advance due to timeout or 990 * interrupt. Currently, nodes are removed only if they are at 991 * head of queue, which suffices to reduce memory footprint in 992 * most usages. 993 * 994 * @return current phase on exit 995 */ abortWait(int phase)996 private int abortWait(int phase) { 997 AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ; 998 for (;;) { 999 Thread t; 1000 QNode q = head.get(); 1001 int p = (int)(root.state >>> PHASE_SHIFT); 1002 if (q == null || ((t = q.thread) != null && q.phase == p)) 1003 return p; 1004 if (head.compareAndSet(q, q.next) && t != null) { 1005 q.thread = null; 1006 LockSupport.unpark(t); 1007 } 1008 } 1009 } 1010 1011 /** The number of CPUs, for spin control */ 1012 private static final int NCPU = Runtime.getRuntime().availableProcessors(); 1013 1014 /** 1015 * The number of times to spin before blocking while waiting for 1016 * advance, per arrival while waiting. On multiprocessors, fully 1017 * blocking and waking up a large number of threads all at once is 1018 * usually a very slow process, so we use rechargeable spins to 1019 * avoid it when threads regularly arrive: When a thread in 1020 * internalAwaitAdvance notices another arrival before blocking, 1021 * and there appear to be enough CPUs available, it spins 1022 * SPINS_PER_ARRIVAL more times before blocking. The value trades 1023 * off good-citizenship vs big unnecessary slowdowns. 1024 */ 1025 static final int SPINS_PER_ARRIVAL = (NCPU < 2) ? 1 : 1 << 8; 1026 1027 /** 1028 * Possibly blocks and waits for phase to advance unless aborted. 1029 * Call only on root phaser. 1030 * 1031 * @param phase current phase 1032 * @param node if non-null, the wait node to track interrupt and timeout; 1033 * if null, denotes noninterruptible wait 1034 * @return current phase 1035 */ internalAwaitAdvance(int phase, QNode node)1036 private int internalAwaitAdvance(int phase, QNode node) { 1037 // assert root == this; 1038 releaseWaiters(phase-1); // ensure old queue clean 1039 boolean queued = false; // true when node is enqueued 1040 int lastUnarrived = 0; // to increase spins upon change 1041 int spins = SPINS_PER_ARRIVAL; 1042 long s; 1043 int p; 1044 while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) { 1045 if (node == null) { // spinning in noninterruptible mode 1046 int unarrived = (int)s & UNARRIVED_MASK; 1047 if (unarrived != lastUnarrived && 1048 (lastUnarrived = unarrived) < NCPU) 1049 spins += SPINS_PER_ARRIVAL; 1050 boolean interrupted = Thread.interrupted(); 1051 if (interrupted || --spins < 0) { // need node to record intr 1052 node = new QNode(this, phase, false, false, 0L); 1053 node.wasInterrupted = interrupted; 1054 } 1055 } 1056 else if (node.isReleasable()) // done or aborted 1057 break; 1058 else if (!queued) { // push onto queue 1059 AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ; 1060 QNode q = node.next = head.get(); 1061 if ((q == null || q.phase == phase) && 1062 (int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq 1063 queued = head.compareAndSet(q, node); 1064 } 1065 else { 1066 try { 1067 ForkJoinPool.managedBlock(node); 1068 } catch (InterruptedException ie) { 1069 node.wasInterrupted = true; 1070 } 1071 } 1072 } 1073 1074 if (node != null) { 1075 if (node.thread != null) 1076 node.thread = null; // avoid need for unpark() 1077 if (node.wasInterrupted && !node.interruptible) 1078 Thread.currentThread().interrupt(); 1079 if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase) 1080 return abortWait(phase); // possibly clean up on abort 1081 } 1082 releaseWaiters(phase); 1083 return p; 1084 } 1085 1086 /** 1087 * Wait nodes for Treiber stack representing wait queue 1088 */ 1089 static final class QNode implements ForkJoinPool.ManagedBlocker { 1090 final Phaser phaser; 1091 final int phase; 1092 final boolean interruptible; 1093 final boolean timed; 1094 boolean wasInterrupted; 1095 long nanos; 1096 final long deadline; 1097 volatile Thread thread; // nulled to cancel wait 1098 QNode next; 1099 QNode(Phaser phaser, int phase, boolean interruptible, boolean timed, long nanos)1100 QNode(Phaser phaser, int phase, boolean interruptible, 1101 boolean timed, long nanos) { 1102 this.phaser = phaser; 1103 this.phase = phase; 1104 this.interruptible = interruptible; 1105 this.nanos = nanos; 1106 this.timed = timed; 1107 this.deadline = timed ? System.nanoTime() + nanos : 0L; 1108 thread = Thread.currentThread(); 1109 } 1110 isReleasable()1111 public boolean isReleasable() { 1112 if (thread == null) 1113 return true; 1114 if (phaser.getPhase() != phase) { 1115 thread = null; 1116 return true; 1117 } 1118 if (Thread.interrupted()) 1119 wasInterrupted = true; 1120 if (wasInterrupted && interruptible) { 1121 thread = null; 1122 return true; 1123 } 1124 if (timed) { 1125 if (nanos > 0L) { 1126 nanos = deadline - System.nanoTime(); 1127 } 1128 if (nanos <= 0L) { 1129 thread = null; 1130 return true; 1131 } 1132 } 1133 return false; 1134 } 1135 block()1136 public boolean block() { 1137 if (isReleasable()) 1138 return true; 1139 else if (!timed) 1140 LockSupport.park(this); 1141 else if (nanos > 0L) 1142 LockSupport.parkNanos(this, nanos); 1143 return isReleasable(); 1144 } 1145 } 1146 1147 // Unsafe mechanics 1148 1149 private static final sun.misc.Unsafe UNSAFE; 1150 private static final long stateOffset; 1151 static { 1152 try { 1153 UNSAFE = sun.misc.Unsafe.getUnsafe(); 1154 Class<?> k = Phaser.class; 1155 stateOffset = UNSAFE.objectFieldOffset 1156 (k.getDeclaredField("state")); 1157 } catch (Exception e) { 1158 throw new Error(e); 1159 } 1160 } 1161 } 1162