1 /*
2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3  *
4  * This code is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 only, as
6  * published by the Free Software Foundation.
7  *
8  * This code is distributed in the hope that it will be useful, but WITHOUT
9  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
11  * version 2 for more details (a copy is included in the LICENSE file that
12  * accompanied this code).
13  *
14  * You should have received a copy of the GNU General Public License version
15  * 2 along with this work; if not, write to the Free Software Foundation,
16  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
17  *
18  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
19  * or visit www.oracle.com if you need additional information or have any
20  * questions.
21  */
22 
23 /*
24  * This file is available under and governed by the GNU General Public
25  * License version 2 only, as published by the Free Software Foundation.
26  * However, the following notice accompanied the original version of this
27  * file:
28  *
29  * Written by Doug Lea with assistance from members of JCP JSR-166
30  * Expert Group and released to the public domain, as explained at
31  * http://creativecommons.org/publicdomain/zero/1.0/
32  */
33 
34 /*
35  * @test
36  * @bug 4486658
37  * @summary  multiple producers and consumers using blocking queues
38  * @library /test/lib
39  */
40 
41 import static java.util.concurrent.TimeUnit.MILLISECONDS;
42 import static java.util.concurrent.TimeUnit.NANOSECONDS;
43 
44 import java.util.concurrent.ArrayBlockingQueue;
45 import java.util.concurrent.BlockingQueue;
46 import java.util.concurrent.CyclicBarrier;
47 import java.util.concurrent.ExecutorService;
48 import java.util.concurrent.Executors;
49 import java.util.concurrent.LinkedBlockingDeque;
50 import java.util.concurrent.LinkedBlockingQueue;
51 import java.util.concurrent.LinkedTransferQueue;
52 import java.util.concurrent.PriorityBlockingQueue;
53 import java.util.concurrent.SynchronousQueue;
54 import java.util.concurrent.atomic.AtomicInteger;
55 import jdk.test.lib.Utils;
56 
57 public class ProducerConsumerLoops {
58     static final long LONG_DELAY_MS = Utils.adjustTimeout(10_000);
59     static ExecutorService pool;
60 
main(String[] args)61     public static void main(String[] args) throws Exception {
62         final int maxPairs = (args.length > 0)
63             ? Integer.parseInt(args[0])
64             : 5;
65         int iters = 10000;
66 
67         pool = Executors.newCachedThreadPool();
68         for (int i = 1; i <= maxPairs; i += (i+1) >>> 1) {
69             // Adjust iterations to limit typical single runs to <= 10 ms;
70             // Notably, fair queues get fewer iters.
71             // Unbounded queues can legitimately OOME if iterations
72             // high enough, but we have a sufficiently low limit here.
73             run(new ArrayBlockingQueue<Integer>(100), i, 500);
74             run(new LinkedBlockingQueue<Integer>(100), i, 1000);
75             run(new LinkedBlockingDeque<Integer>(100), i, 1000);
76             run(new LinkedTransferQueue<Integer>(), i, 1000);
77             run(new PriorityBlockingQueue<Integer>(), i, 1000);
78             run(new SynchronousQueue<Integer>(), i, 400);
79             run(new SynchronousQueue<Integer>(true), i, 300);
80             run(new ArrayBlockingQueue<Integer>(100, true), i, 100);
81         }
82         pool.shutdown();
83         if (! pool.awaitTermination(LONG_DELAY_MS, MILLISECONDS))
84             throw new Error();
85         pool = null;
86    }
87 
run(BlockingQueue<Integer> queue, int pairs, int iters)88     static void run(BlockingQueue<Integer> queue, int pairs, int iters) throws Exception {
89         new ProducerConsumerLoops(queue, pairs, iters).run();
90     }
91 
92     final BlockingQueue<Integer> queue;
93     final int pairs;
94     final int iters;
95     final LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer();
96     final CyclicBarrier barrier;
97     final AtomicInteger checksum = new AtomicInteger(0);
98     Throwable fail;
99 
ProducerConsumerLoops(BlockingQueue<Integer> queue, int pairs, int iters)100     ProducerConsumerLoops(BlockingQueue<Integer> queue, int pairs, int iters) {
101         this.queue = queue;
102         this.pairs = pairs;
103         this.iters = iters;
104         this.barrier = new CyclicBarrier(2 * pairs + 1, timer);
105     }
106 
run()107     void run() throws Exception {
108         for (int i = 0; i < pairs; i++) {
109             pool.execute(new Producer());
110             pool.execute(new Consumer());
111         }
112         barrier.await();
113         barrier.await();
114         System.out.printf("%s, pairs=%d:  %d ms%n",
115                           queue.getClass().getSimpleName(), pairs,
116                           NANOSECONDS.toMillis(timer.getTime()));
117         if (checksum.get() != 0) throw new AssertionError("checksum mismatch");
118         if (fail != null) throw new AssertionError(fail);
119     }
120 
121     abstract class CheckedRunnable implements Runnable {
realRun()122         abstract void realRun() throws Throwable;
run()123         public final void run() {
124             try {
125                 realRun();
126             } catch (Throwable t) {
127                 fail = t;
128                 t.printStackTrace();
129                 throw new AssertionError(t);
130             }
131         }
132     }
133 
134     class Producer extends CheckedRunnable {
realRun()135         void realRun() throws Throwable {
136             barrier.await();
137             int s = 0;
138             int l = hashCode();
139             for (int i = 0; i < iters; i++) {
140                 l = LoopHelpers.compute2(l);
141                 queue.put(new Integer(l));
142                 s += LoopHelpers.compute1(l);
143             }
144             checksum.getAndAdd(s);
145             barrier.await();
146         }
147     }
148 
149     class Consumer extends CheckedRunnable {
realRun()150         void realRun() throws Throwable {
151             barrier.await();
152             int l = 0;
153             int s = 0;
154             for (int i = 0; i < iters; i++) {
155                 l = LoopHelpers.compute1(queue.take().intValue());
156                 s += l;
157             }
158             checksum.getAndAdd(-s);
159             barrier.await();
160         }
161     }
162 }
163