1 /*
2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3  *
4  * This code is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 only, as
6  * published by the Free Software Foundation.
7  *
8  * This code is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
11  * version 2 for more details (a copy is included in the LICENSE file that
12  * accompanied this code).
13  *
14  * You should have received a copy of the GNU General Public License version
15  * 2 along with this work; if not, write to the Free Software Foundation,
16  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
17  *
18  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
19  * or visit www.oracle.com if you need additional information or have any
20  * questions.
21  */
22 
23 /*
24  * This file is available under and governed by the GNU General Public
25  * License version 2 only, as published by the Free Software Foundation.
26  * However, the following notice accompanied the original version of this
27  * file:
28  *
29  * Written by Doug Lea with assistance from members of JCP JSR-166
30  * Expert Group and released to the public domain, as explained at
31  * http://creativecommons.org/publicdomain/zero/1.0/
32  * Other contributors include Andrew Wright, Jeffrey Hayes,
33  * Pat Fisher, Mike Judd.
34  */
35 
36 import static java.util.concurrent.TimeUnit.MILLISECONDS;
37 
38 import java.util.concurrent.BrokenBarrierException;
39 import java.util.concurrent.CountDownLatch;
40 import java.util.concurrent.CyclicBarrier;
41 import java.util.concurrent.ExecutorService;
42 import java.util.concurrent.Executors;
43 import java.util.concurrent.ThreadLocalRandom;
44 import java.util.concurrent.TimeoutException;
45 import java.util.concurrent.atomic.AtomicInteger;
46 
47 import junit.framework.Test;
48 import junit.framework.TestSuite;
49 
50 public class CyclicBarrierTest extends JSR166TestCase {
main(String[] args)51     public static void main(String[] args) {
52         main(suite(), args);
53     }
suite()54     public static Test suite() {
55         return new TestSuite(CyclicBarrierTest.class);
56     }
57 
58     /**
59      * Spin-waits till the number of waiters == numberOfWaiters.
60      */
awaitNumberWaiting(CyclicBarrier barrier, int numberOfWaiters)61     void awaitNumberWaiting(CyclicBarrier barrier, int numberOfWaiters) {
62         long startTime = System.nanoTime();
63         while (barrier.getNumberWaiting() != numberOfWaiters) {
64             if (millisElapsedSince(startTime) > LONG_DELAY_MS)
65                 fail("timed out");
66             Thread.yield();
67         }
68     }
69 
70     /**
71      * Creating with negative parties throws IllegalArgumentException
72      */
testConstructor1()73     public void testConstructor1() {
74         try {
75             new CyclicBarrier(-1, (Runnable)null);
76             shouldThrow();
77         } catch (IllegalArgumentException success) {}
78     }
79 
80     /**
81      * Creating with negative parties and no action throws
82      * IllegalArgumentException
83      */
testConstructor2()84     public void testConstructor2() {
85         try {
86             new CyclicBarrier(-1);
87             shouldThrow();
88         } catch (IllegalArgumentException success) {}
89     }
90 
91     /**
92      * getParties returns the number of parties given in constructor
93      */
testGetParties()94     public void testGetParties() {
95         CyclicBarrier b = new CyclicBarrier(2);
96         assertEquals(2, b.getParties());
97         assertEquals(0, b.getNumberWaiting());
98     }
99 
100     /**
101      * A 1-party barrier triggers after single await
102      */
testSingleParty()103     public void testSingleParty() throws Exception {
104         CyclicBarrier b = new CyclicBarrier(1);
105         assertEquals(1, b.getParties());
106         assertEquals(0, b.getNumberWaiting());
107         b.await();
108         b.await();
109         assertEquals(0, b.getNumberWaiting());
110     }
111 
112     /**
113      * The supplied barrier action is run at barrier
114      */
testBarrierAction()115     public void testBarrierAction() throws Exception {
116         final AtomicInteger count = new AtomicInteger(0);
117         final Runnable incCount = new Runnable() { public void run() {
118             count.getAndIncrement(); }};
119         CyclicBarrier b = new CyclicBarrier(1, incCount);
120         assertEquals(1, b.getParties());
121         assertEquals(0, b.getNumberWaiting());
122         b.await();
123         b.await();
124         assertEquals(0, b.getNumberWaiting());
125         assertEquals(2, count.get());
126     }
127 
128     /**
129      * A 2-party/thread barrier triggers after both threads invoke await
130      */
testTwoParties()131     public void testTwoParties() throws Exception {
132         final CyclicBarrier b = new CyclicBarrier(2);
133         Thread t = newStartedThread(new CheckedRunnable() {
134             public void realRun() throws Exception {
135                 b.await();
136                 b.await();
137                 b.await();
138                 b.await();
139             }});
140 
141         b.await();
142         b.await();
143         b.await();
144         b.await();
145         awaitTermination(t);
146     }
147 
148     /**
149      * An interruption in one party causes others waiting in await to
150      * throw BrokenBarrierException
151      */
testAwait1_Interrupted_BrokenBarrier()152     public void testAwait1_Interrupted_BrokenBarrier() {
153         final CyclicBarrier c = new CyclicBarrier(3);
154         final CountDownLatch pleaseInterrupt = new CountDownLatch(2);
155         Thread t1 = new ThreadShouldThrow(InterruptedException.class) {
156             public void realRun() throws Exception {
157                 pleaseInterrupt.countDown();
158                 c.await();
159             }};
160         Thread t2 = new ThreadShouldThrow(BrokenBarrierException.class) {
161             public void realRun() throws Exception {
162                 pleaseInterrupt.countDown();
163                 c.await();
164             }};
165 
166         t1.start();
167         t2.start();
168         await(pleaseInterrupt);
169         t1.interrupt();
170         awaitTermination(t1);
171         awaitTermination(t2);
172     }
173 
174     /**
175      * An interruption in one party causes others waiting in timed await to
176      * throw BrokenBarrierException
177      */
testAwait2_Interrupted_BrokenBarrier()178     public void testAwait2_Interrupted_BrokenBarrier() throws Exception {
179         final CyclicBarrier c = new CyclicBarrier(3);
180         final CountDownLatch pleaseInterrupt = new CountDownLatch(2);
181         Thread t1 = new ThreadShouldThrow(InterruptedException.class) {
182             public void realRun() throws Exception {
183                 pleaseInterrupt.countDown();
184                 c.await(LONG_DELAY_MS, MILLISECONDS);
185             }};
186         Thread t2 = new ThreadShouldThrow(BrokenBarrierException.class) {
187             public void realRun() throws Exception {
188                 pleaseInterrupt.countDown();
189                 c.await(LONG_DELAY_MS, MILLISECONDS);
190             }};
191 
192         t1.start();
193         t2.start();
194         await(pleaseInterrupt);
195         t1.interrupt();
196         awaitTermination(t1);
197         awaitTermination(t2);
198     }
199 
200     /**
201      * A timeout in timed await throws TimeoutException
202      */
testAwait3_TimeoutException()203     public void testAwait3_TimeoutException() throws InterruptedException {
204         final CyclicBarrier c = new CyclicBarrier(2);
205         Thread t = newStartedThread(new CheckedRunnable() {
206             public void realRun() throws Exception {
207                 long startTime = System.nanoTime();
208                 try {
209                     c.await(timeoutMillis(), MILLISECONDS);
210                     shouldThrow();
211                 } catch (TimeoutException success) {}
212                 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
213             }});
214 
215         awaitTermination(t);
216     }
217 
218     /**
219      * A timeout in one party causes others waiting in timed await to
220      * throw BrokenBarrierException
221      */
testAwait4_Timeout_BrokenBarrier()222     public void testAwait4_Timeout_BrokenBarrier() throws InterruptedException {
223         final CyclicBarrier c = new CyclicBarrier(3);
224         Thread t1 = newStartedThread(new CheckedRunnable() {
225             public void realRun() throws Exception {
226                 try {
227                     c.await(LONG_DELAY_MS, MILLISECONDS);
228                     shouldThrow();
229                 } catch (BrokenBarrierException success) {}
230             }});
231         Thread t2 = newStartedThread(new CheckedRunnable() {
232             public void realRun() throws Exception {
233                 awaitNumberWaiting(c, 1);
234                 long startTime = System.nanoTime();
235                 try {
236                     c.await(timeoutMillis(), MILLISECONDS);
237                     shouldThrow();
238                 } catch (TimeoutException success) {}
239                 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
240             }});
241 
242         awaitTermination(t1);
243         awaitTermination(t2);
244     }
245 
246     /**
247      * A timeout in one party causes others waiting in await to
248      * throw BrokenBarrierException
249      */
testAwait5_Timeout_BrokenBarrier()250     public void testAwait5_Timeout_BrokenBarrier() throws InterruptedException {
251         final CyclicBarrier c = new CyclicBarrier(3);
252         Thread t1 = newStartedThread(new CheckedRunnable() {
253             public void realRun() throws Exception {
254                 try {
255                     c.await();
256                     shouldThrow();
257                 } catch (BrokenBarrierException success) {}
258             }});
259         Thread t2 = newStartedThread(new CheckedRunnable() {
260             public void realRun() throws Exception {
261                 awaitNumberWaiting(c, 1);
262                 long startTime = System.nanoTime();
263                 try {
264                     c.await(timeoutMillis(), MILLISECONDS);
265                     shouldThrow();
266                 } catch (TimeoutException success) {}
267                 assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
268             }});
269 
270         awaitTermination(t1);
271         awaitTermination(t2);
272     }
273 
274     /**
275      * A reset of an active barrier causes waiting threads to throw
276      * BrokenBarrierException
277      */
testReset_BrokenBarrier()278     public void testReset_BrokenBarrier() throws InterruptedException {
279         final CyclicBarrier c = new CyclicBarrier(3);
280         final CountDownLatch pleaseReset = new CountDownLatch(2);
281         Thread t1 = new ThreadShouldThrow(BrokenBarrierException.class) {
282             public void realRun() throws Exception {
283                 pleaseReset.countDown();
284                 c.await();
285             }};
286         Thread t2 = new ThreadShouldThrow(BrokenBarrierException.class) {
287             public void realRun() throws Exception {
288                 pleaseReset.countDown();
289                 c.await();
290             }};
291 
292         t1.start();
293         t2.start();
294         await(pleaseReset);
295 
296         awaitNumberWaiting(c, 2);
297         c.reset();
298         awaitTermination(t1);
299         awaitTermination(t2);
300     }
301 
302     /**
303      * A reset before threads enter barrier does not throw
304      * BrokenBarrierException
305      */
testReset_NoBrokenBarrier()306     public void testReset_NoBrokenBarrier() throws Exception {
307         final CyclicBarrier c = new CyclicBarrier(3);
308         c.reset();
309 
310         Thread t1 = newStartedThread(new CheckedRunnable() {
311             public void realRun() throws Exception {
312                 c.await();
313             }});
314         Thread t2 = newStartedThread(new CheckedRunnable() {
315             public void realRun() throws Exception {
316                 c.await();
317             }});
318 
319         c.await();
320         awaitTermination(t1);
321         awaitTermination(t2);
322     }
323 
324     /**
325      * Reset of a non-broken barrier does not break barrier
326      */
testResetWithoutBreakage()327     public void testResetWithoutBreakage() throws Exception {
328         final CyclicBarrier barrier = new CyclicBarrier(3);
329         for (int i = 0; i < 3; i++) {
330             final CyclicBarrier start = new CyclicBarrier(3);
331             Thread t1 = newStartedThread(new CheckedRunnable() {
332                 public void realRun() throws Exception {
333                     start.await();
334                     barrier.await();
335                 }});
336 
337             Thread t2 = newStartedThread(new CheckedRunnable() {
338                 public void realRun() throws Exception {
339                     start.await();
340                     barrier.await();
341                 }});
342 
343             start.await();
344             barrier.await();
345             awaitTermination(t1);
346             awaitTermination(t2);
347             assertFalse(barrier.isBroken());
348             assertEquals(0, barrier.getNumberWaiting());
349             if (i == 1) barrier.reset();
350             assertFalse(barrier.isBroken());
351             assertEquals(0, barrier.getNumberWaiting());
352         }
353     }
354 
355     /**
356      * Reset of a barrier after interruption reinitializes it.
357      */
testResetAfterInterrupt()358     public void testResetAfterInterrupt() throws Exception {
359         final CyclicBarrier barrier = new CyclicBarrier(3);
360         for (int i = 0; i < 2; i++) {
361             final CyclicBarrier start = new CyclicBarrier(3);
362             Thread t1 = new ThreadShouldThrow(InterruptedException.class) {
363                 public void realRun() throws Exception {
364                     start.await();
365                     barrier.await();
366                 }};
367 
368             Thread t2 = new ThreadShouldThrow(BrokenBarrierException.class) {
369                 public void realRun() throws Exception {
370                     start.await();
371                     barrier.await();
372                 }};
373 
374             t1.start();
375             t2.start();
376             start.await();
377             t1.interrupt();
378             awaitTermination(t1);
379             awaitTermination(t2);
380             assertTrue(barrier.isBroken());
381             assertEquals(0, barrier.getNumberWaiting());
382             barrier.reset();
383             assertFalse(barrier.isBroken());
384             assertEquals(0, barrier.getNumberWaiting());
385         }
386     }
387 
388     /**
389      * Reset of a barrier after timeout reinitializes it.
390      */
testResetAfterTimeout()391     public void testResetAfterTimeout() throws Exception {
392         final CyclicBarrier barrier = new CyclicBarrier(3);
393         for (int i = 0; i < 2; i++) {
394             assertEquals(0, barrier.getNumberWaiting());
395             Thread t1 = newStartedThread(new CheckedRunnable() {
396                 public void realRun() throws Exception {
397                     try {
398                         barrier.await();
399                         shouldThrow();
400                     } catch (BrokenBarrierException success) {}
401                 }});
402             Thread t2 = newStartedThread(new CheckedRunnable() {
403                 public void realRun() throws Exception {
404                     awaitNumberWaiting(barrier, 1);
405                     long startTime = System.nanoTime();
406                     try {
407                         barrier.await(timeoutMillis(), MILLISECONDS);
408                         shouldThrow();
409                     } catch (TimeoutException success) {}
410                     assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
411                 }});
412 
413             awaitTermination(t1);
414             awaitTermination(t2);
415             assertEquals(0, barrier.getNumberWaiting());
416             assertTrue(barrier.isBroken());
417             assertEquals(0, barrier.getNumberWaiting());
418             barrier.reset();
419             assertFalse(barrier.isBroken());
420             assertEquals(0, barrier.getNumberWaiting());
421         }
422     }
423 
424     /**
425      * Reset of a barrier after a failed command reinitializes it.
426      */
testResetAfterCommandException()427     public void testResetAfterCommandException() throws Exception {
428         final CyclicBarrier barrier =
429             new CyclicBarrier(3, new Runnable() {
430                     public void run() {
431                         throw new NullPointerException(); }});
432         for (int i = 0; i < 2; i++) {
433             final CyclicBarrier start = new CyclicBarrier(3);
434             Thread t1 = new ThreadShouldThrow(BrokenBarrierException.class) {
435                 public void realRun() throws Exception {
436                     start.await();
437                     barrier.await();
438                 }};
439 
440             Thread t2 = new ThreadShouldThrow(BrokenBarrierException.class) {
441                 public void realRun() throws Exception {
442                     start.await();
443                     barrier.await();
444                 }};
445 
446             t1.start();
447             t2.start();
448             start.await();
449             awaitNumberWaiting(barrier, 2);
450             try {
451                 barrier.await();
452                 shouldThrow();
453             } catch (NullPointerException success) {}
454             awaitTermination(t1);
455             awaitTermination(t2);
456             assertTrue(barrier.isBroken());
457             assertEquals(0, barrier.getNumberWaiting());
458             barrier.reset();
459             assertFalse(barrier.isBroken());
460             assertEquals(0, barrier.getNumberWaiting());
461         }
462     }
463 
464     /**
465      * There can be more threads calling await() than parties, as long as each
466      * task only calls await once and the task count is a multiple of parties.
467      */
testMoreTasksThanParties()468     public void testMoreTasksThanParties() throws Exception {
469         final ThreadLocalRandom rnd = ThreadLocalRandom.current();
470         final int parties = rnd.nextInt(1, 5);
471         final int nTasks = rnd.nextInt(1, 5) * parties;
472         final AtomicInteger tripCount = new AtomicInteger(0);
473         final AtomicInteger awaitCount = new AtomicInteger(0);
474         final CyclicBarrier barrier =
475             new CyclicBarrier(parties, () -> tripCount.getAndIncrement());
476         final ExecutorService e = Executors.newFixedThreadPool(nTasks);
477         final Runnable awaiter = () -> {
478             try {
479                 if (randomBoolean())
480                     barrier.await();
481                 else
482                     barrier.await(LONG_DELAY_MS, MILLISECONDS);
483                 awaitCount.getAndIncrement();
484             } catch (Throwable fail) { threadUnexpectedException(fail); }};
485         try (PoolCleaner cleaner = cleaner(e)) {
486             for (int i = nTasks; i--> 0; )
487                 e.execute(awaiter);
488         }
489         assertEquals(nTasks / parties, tripCount.get());
490         assertEquals(nTasks, awaitCount.get());
491         assertEquals(0, barrier.getNumberWaiting());
492     }
493 }
494