1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. 7 * 8 * This code is distributed in the hope that it will be useful, but WITHOUT 9 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 10 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 11 * version 2 for more details (a copy is included in the LICENSE file that 12 * accompanied this code). 13 * 14 * You should have received a copy of the GNU General Public License version 15 * 2 along with this work; if not, write to the Free Software Foundation, 16 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 17 * 18 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 19 * or visit www.oracle.com if you need additional information or have any 20 * questions. 21 */ 22 23 /* 24 * This file is available under and governed by the GNU General Public 25 * License version 2 only, as published by the Free Software Foundation. 26 * However, the following notice accompanied the original version of this 27 * file: 28 * 29 * Written by Doug Lea with assistance from members of JCP JSR-166 30 * Expert Group and released to the public domain, as explained at 31 * http://creativecommons.org/publicdomain/zero/1.0/ 32 * Other contributors include Andrew Wright, Jeffrey Hayes, 33 * Pat Fisher, Mike Judd. 34 */ 35 36 import static java.util.concurrent.TimeUnit.MILLISECONDS; 37 38 import java.util.concurrent.BrokenBarrierException; 39 import java.util.concurrent.CountDownLatch; 40 import java.util.concurrent.CyclicBarrier; 41 import java.util.concurrent.ExecutorService; 42 import java.util.concurrent.Executors; 43 import java.util.concurrent.ThreadLocalRandom; 44 import java.util.concurrent.TimeoutException; 45 import java.util.concurrent.atomic.AtomicInteger; 46 47 import junit.framework.Test; 48 import junit.framework.TestSuite; 49 50 public class CyclicBarrierTest extends JSR166TestCase { main(String[] args)51 public static void main(String[] args) { 52 main(suite(), args); 53 } suite()54 public static Test suite() { 55 return new TestSuite(CyclicBarrierTest.class); 56 } 57 58 /** 59 * Spin-waits till the number of waiters == numberOfWaiters. 60 */ awaitNumberWaiting(CyclicBarrier barrier, int numberOfWaiters)61 void awaitNumberWaiting(CyclicBarrier barrier, int numberOfWaiters) { 62 long startTime = System.nanoTime(); 63 while (barrier.getNumberWaiting() != numberOfWaiters) { 64 if (millisElapsedSince(startTime) > LONG_DELAY_MS) 65 fail("timed out"); 66 Thread.yield(); 67 } 68 } 69 70 /** 71 * Creating with negative parties throws IllegalArgumentException 72 */ testConstructor1()73 public void testConstructor1() { 74 try { 75 new CyclicBarrier(-1, (Runnable)null); 76 shouldThrow(); 77 } catch (IllegalArgumentException success) {} 78 } 79 80 /** 81 * Creating with negative parties and no action throws 82 * IllegalArgumentException 83 */ testConstructor2()84 public void testConstructor2() { 85 try { 86 new CyclicBarrier(-1); 87 shouldThrow(); 88 } catch (IllegalArgumentException success) {} 89 } 90 91 /** 92 * getParties returns the number of parties given in constructor 93 */ testGetParties()94 public void testGetParties() { 95 CyclicBarrier b = new CyclicBarrier(2); 96 assertEquals(2, b.getParties()); 97 assertEquals(0, b.getNumberWaiting()); 98 } 99 100 /** 101 * A 1-party barrier triggers after single await 102 */ testSingleParty()103 public void testSingleParty() throws Exception { 104 CyclicBarrier b = new CyclicBarrier(1); 105 assertEquals(1, b.getParties()); 106 assertEquals(0, b.getNumberWaiting()); 107 b.await(); 108 b.await(); 109 assertEquals(0, b.getNumberWaiting()); 110 } 111 112 /** 113 * The supplied barrier action is run at barrier 114 */ testBarrierAction()115 public void testBarrierAction() throws Exception { 116 final AtomicInteger count = new AtomicInteger(0); 117 final Runnable incCount = new Runnable() { public void run() { 118 count.getAndIncrement(); }}; 119 CyclicBarrier b = new CyclicBarrier(1, incCount); 120 assertEquals(1, b.getParties()); 121 assertEquals(0, b.getNumberWaiting()); 122 b.await(); 123 b.await(); 124 assertEquals(0, b.getNumberWaiting()); 125 assertEquals(2, count.get()); 126 } 127 128 /** 129 * A 2-party/thread barrier triggers after both threads invoke await 130 */ testTwoParties()131 public void testTwoParties() throws Exception { 132 final CyclicBarrier b = new CyclicBarrier(2); 133 Thread t = newStartedThread(new CheckedRunnable() { 134 public void realRun() throws Exception { 135 b.await(); 136 b.await(); 137 b.await(); 138 b.await(); 139 }}); 140 141 b.await(); 142 b.await(); 143 b.await(); 144 b.await(); 145 awaitTermination(t); 146 } 147 148 /** 149 * An interruption in one party causes others waiting in await to 150 * throw BrokenBarrierException 151 */ testAwait1_Interrupted_BrokenBarrier()152 public void testAwait1_Interrupted_BrokenBarrier() { 153 final CyclicBarrier c = new CyclicBarrier(3); 154 final CountDownLatch pleaseInterrupt = new CountDownLatch(2); 155 Thread t1 = new ThreadShouldThrow(InterruptedException.class) { 156 public void realRun() throws Exception { 157 pleaseInterrupt.countDown(); 158 c.await(); 159 }}; 160 Thread t2 = new ThreadShouldThrow(BrokenBarrierException.class) { 161 public void realRun() throws Exception { 162 pleaseInterrupt.countDown(); 163 c.await(); 164 }}; 165 166 t1.start(); 167 t2.start(); 168 await(pleaseInterrupt); 169 t1.interrupt(); 170 awaitTermination(t1); 171 awaitTermination(t2); 172 } 173 174 /** 175 * An interruption in one party causes others waiting in timed await to 176 * throw BrokenBarrierException 177 */ testAwait2_Interrupted_BrokenBarrier()178 public void testAwait2_Interrupted_BrokenBarrier() throws Exception { 179 final CyclicBarrier c = new CyclicBarrier(3); 180 final CountDownLatch pleaseInterrupt = new CountDownLatch(2); 181 Thread t1 = new ThreadShouldThrow(InterruptedException.class) { 182 public void realRun() throws Exception { 183 pleaseInterrupt.countDown(); 184 c.await(LONG_DELAY_MS, MILLISECONDS); 185 }}; 186 Thread t2 = new ThreadShouldThrow(BrokenBarrierException.class) { 187 public void realRun() throws Exception { 188 pleaseInterrupt.countDown(); 189 c.await(LONG_DELAY_MS, MILLISECONDS); 190 }}; 191 192 t1.start(); 193 t2.start(); 194 await(pleaseInterrupt); 195 t1.interrupt(); 196 awaitTermination(t1); 197 awaitTermination(t2); 198 } 199 200 /** 201 * A timeout in timed await throws TimeoutException 202 */ testAwait3_TimeoutException()203 public void testAwait3_TimeoutException() throws InterruptedException { 204 final CyclicBarrier c = new CyclicBarrier(2); 205 Thread t = newStartedThread(new CheckedRunnable() { 206 public void realRun() throws Exception { 207 long startTime = System.nanoTime(); 208 try { 209 c.await(timeoutMillis(), MILLISECONDS); 210 shouldThrow(); 211 } catch (TimeoutException success) {} 212 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 213 }}); 214 215 awaitTermination(t); 216 } 217 218 /** 219 * A timeout in one party causes others waiting in timed await to 220 * throw BrokenBarrierException 221 */ testAwait4_Timeout_BrokenBarrier()222 public void testAwait4_Timeout_BrokenBarrier() throws InterruptedException { 223 final CyclicBarrier c = new CyclicBarrier(3); 224 Thread t1 = newStartedThread(new CheckedRunnable() { 225 public void realRun() throws Exception { 226 try { 227 c.await(LONG_DELAY_MS, MILLISECONDS); 228 shouldThrow(); 229 } catch (BrokenBarrierException success) {} 230 }}); 231 Thread t2 = newStartedThread(new CheckedRunnable() { 232 public void realRun() throws Exception { 233 awaitNumberWaiting(c, 1); 234 long startTime = System.nanoTime(); 235 try { 236 c.await(timeoutMillis(), MILLISECONDS); 237 shouldThrow(); 238 } catch (TimeoutException success) {} 239 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 240 }}); 241 242 awaitTermination(t1); 243 awaitTermination(t2); 244 } 245 246 /** 247 * A timeout in one party causes others waiting in await to 248 * throw BrokenBarrierException 249 */ testAwait5_Timeout_BrokenBarrier()250 public void testAwait5_Timeout_BrokenBarrier() throws InterruptedException { 251 final CyclicBarrier c = new CyclicBarrier(3); 252 Thread t1 = newStartedThread(new CheckedRunnable() { 253 public void realRun() throws Exception { 254 try { 255 c.await(); 256 shouldThrow(); 257 } catch (BrokenBarrierException success) {} 258 }}); 259 Thread t2 = newStartedThread(new CheckedRunnable() { 260 public void realRun() throws Exception { 261 awaitNumberWaiting(c, 1); 262 long startTime = System.nanoTime(); 263 try { 264 c.await(timeoutMillis(), MILLISECONDS); 265 shouldThrow(); 266 } catch (TimeoutException success) {} 267 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 268 }}); 269 270 awaitTermination(t1); 271 awaitTermination(t2); 272 } 273 274 /** 275 * A reset of an active barrier causes waiting threads to throw 276 * BrokenBarrierException 277 */ testReset_BrokenBarrier()278 public void testReset_BrokenBarrier() throws InterruptedException { 279 final CyclicBarrier c = new CyclicBarrier(3); 280 final CountDownLatch pleaseReset = new CountDownLatch(2); 281 Thread t1 = new ThreadShouldThrow(BrokenBarrierException.class) { 282 public void realRun() throws Exception { 283 pleaseReset.countDown(); 284 c.await(); 285 }}; 286 Thread t2 = new ThreadShouldThrow(BrokenBarrierException.class) { 287 public void realRun() throws Exception { 288 pleaseReset.countDown(); 289 c.await(); 290 }}; 291 292 t1.start(); 293 t2.start(); 294 await(pleaseReset); 295 296 awaitNumberWaiting(c, 2); 297 c.reset(); 298 awaitTermination(t1); 299 awaitTermination(t2); 300 } 301 302 /** 303 * A reset before threads enter barrier does not throw 304 * BrokenBarrierException 305 */ testReset_NoBrokenBarrier()306 public void testReset_NoBrokenBarrier() throws Exception { 307 final CyclicBarrier c = new CyclicBarrier(3); 308 c.reset(); 309 310 Thread t1 = newStartedThread(new CheckedRunnable() { 311 public void realRun() throws Exception { 312 c.await(); 313 }}); 314 Thread t2 = newStartedThread(new CheckedRunnable() { 315 public void realRun() throws Exception { 316 c.await(); 317 }}); 318 319 c.await(); 320 awaitTermination(t1); 321 awaitTermination(t2); 322 } 323 324 /** 325 * Reset of a non-broken barrier does not break barrier 326 */ testResetWithoutBreakage()327 public void testResetWithoutBreakage() throws Exception { 328 final CyclicBarrier barrier = new CyclicBarrier(3); 329 for (int i = 0; i < 3; i++) { 330 final CyclicBarrier start = new CyclicBarrier(3); 331 Thread t1 = newStartedThread(new CheckedRunnable() { 332 public void realRun() throws Exception { 333 start.await(); 334 barrier.await(); 335 }}); 336 337 Thread t2 = newStartedThread(new CheckedRunnable() { 338 public void realRun() throws Exception { 339 start.await(); 340 barrier.await(); 341 }}); 342 343 start.await(); 344 barrier.await(); 345 awaitTermination(t1); 346 awaitTermination(t2); 347 assertFalse(barrier.isBroken()); 348 assertEquals(0, barrier.getNumberWaiting()); 349 if (i == 1) barrier.reset(); 350 assertFalse(barrier.isBroken()); 351 assertEquals(0, barrier.getNumberWaiting()); 352 } 353 } 354 355 /** 356 * Reset of a barrier after interruption reinitializes it. 357 */ testResetAfterInterrupt()358 public void testResetAfterInterrupt() throws Exception { 359 final CyclicBarrier barrier = new CyclicBarrier(3); 360 for (int i = 0; i < 2; i++) { 361 final CyclicBarrier start = new CyclicBarrier(3); 362 Thread t1 = new ThreadShouldThrow(InterruptedException.class) { 363 public void realRun() throws Exception { 364 start.await(); 365 barrier.await(); 366 }}; 367 368 Thread t2 = new ThreadShouldThrow(BrokenBarrierException.class) { 369 public void realRun() throws Exception { 370 start.await(); 371 barrier.await(); 372 }}; 373 374 t1.start(); 375 t2.start(); 376 start.await(); 377 t1.interrupt(); 378 awaitTermination(t1); 379 awaitTermination(t2); 380 assertTrue(barrier.isBroken()); 381 assertEquals(0, barrier.getNumberWaiting()); 382 barrier.reset(); 383 assertFalse(barrier.isBroken()); 384 assertEquals(0, barrier.getNumberWaiting()); 385 } 386 } 387 388 /** 389 * Reset of a barrier after timeout reinitializes it. 390 */ testResetAfterTimeout()391 public void testResetAfterTimeout() throws Exception { 392 final CyclicBarrier barrier = new CyclicBarrier(3); 393 for (int i = 0; i < 2; i++) { 394 assertEquals(0, barrier.getNumberWaiting()); 395 Thread t1 = newStartedThread(new CheckedRunnable() { 396 public void realRun() throws Exception { 397 try { 398 barrier.await(); 399 shouldThrow(); 400 } catch (BrokenBarrierException success) {} 401 }}); 402 Thread t2 = newStartedThread(new CheckedRunnable() { 403 public void realRun() throws Exception { 404 awaitNumberWaiting(barrier, 1); 405 long startTime = System.nanoTime(); 406 try { 407 barrier.await(timeoutMillis(), MILLISECONDS); 408 shouldThrow(); 409 } catch (TimeoutException success) {} 410 assertTrue(millisElapsedSince(startTime) >= timeoutMillis()); 411 }}); 412 413 awaitTermination(t1); 414 awaitTermination(t2); 415 assertEquals(0, barrier.getNumberWaiting()); 416 assertTrue(barrier.isBroken()); 417 assertEquals(0, barrier.getNumberWaiting()); 418 barrier.reset(); 419 assertFalse(barrier.isBroken()); 420 assertEquals(0, barrier.getNumberWaiting()); 421 } 422 } 423 424 /** 425 * Reset of a barrier after a failed command reinitializes it. 426 */ testResetAfterCommandException()427 public void testResetAfterCommandException() throws Exception { 428 final CyclicBarrier barrier = 429 new CyclicBarrier(3, new Runnable() { 430 public void run() { 431 throw new NullPointerException(); }}); 432 for (int i = 0; i < 2; i++) { 433 final CyclicBarrier start = new CyclicBarrier(3); 434 Thread t1 = new ThreadShouldThrow(BrokenBarrierException.class) { 435 public void realRun() throws Exception { 436 start.await(); 437 barrier.await(); 438 }}; 439 440 Thread t2 = new ThreadShouldThrow(BrokenBarrierException.class) { 441 public void realRun() throws Exception { 442 start.await(); 443 barrier.await(); 444 }}; 445 446 t1.start(); 447 t2.start(); 448 start.await(); 449 awaitNumberWaiting(barrier, 2); 450 try { 451 barrier.await(); 452 shouldThrow(); 453 } catch (NullPointerException success) {} 454 awaitTermination(t1); 455 awaitTermination(t2); 456 assertTrue(barrier.isBroken()); 457 assertEquals(0, barrier.getNumberWaiting()); 458 barrier.reset(); 459 assertFalse(barrier.isBroken()); 460 assertEquals(0, barrier.getNumberWaiting()); 461 } 462 } 463 464 /** 465 * There can be more threads calling await() than parties, as long as each 466 * task only calls await once and the task count is a multiple of parties. 467 */ testMoreTasksThanParties()468 public void testMoreTasksThanParties() throws Exception { 469 final ThreadLocalRandom rnd = ThreadLocalRandom.current(); 470 final int parties = rnd.nextInt(1, 5); 471 final int nTasks = rnd.nextInt(1, 5) * parties; 472 final AtomicInteger tripCount = new AtomicInteger(0); 473 final AtomicInteger awaitCount = new AtomicInteger(0); 474 final CyclicBarrier barrier = 475 new CyclicBarrier(parties, () -> tripCount.getAndIncrement()); 476 final ExecutorService e = Executors.newFixedThreadPool(nTasks); 477 final Runnable awaiter = () -> { 478 try { 479 if (randomBoolean()) 480 barrier.await(); 481 else 482 barrier.await(LONG_DELAY_MS, MILLISECONDS); 483 awaitCount.getAndIncrement(); 484 } catch (Throwable fail) { threadUnexpectedException(fail); }}; 485 try (PoolCleaner cleaner = cleaner(e)) { 486 for (int i = nTasks; i--> 0; ) 487 e.execute(awaiter); 488 } 489 assertEquals(nTasks / parties, tripCount.get()); 490 assertEquals(nTasks, awaitCount.get()); 491 assertEquals(0, barrier.getNumberWaiting()); 492 } 493 } 494