1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.ipc;
20 
21 import static org.junit.Assert.assertEquals;
22 
23 import java.util.ArrayList;
24 import java.util.HashMap;
25 import java.util.concurrent.BlockingQueue;
26 import java.util.concurrent.LinkedBlockingQueue;
27 
28 import org.junit.Test;
29 
30 public class TestCallQueueManager {
31   private CallQueueManager<FakeCall> manager;
32 
33   public class FakeCall {
34     public final int tag; // Can be used for unique identification
35 
FakeCall(int tag)36     public FakeCall(int tag) {
37       this.tag = tag;
38     }
39   }
40 
41   /**
42    * Putter produces FakeCalls
43    */
44   public class Putter implements Runnable {
45     private final CallQueueManager<FakeCall> cq;
46 
47     public final int tag;
48     public volatile int callsAdded = 0; // How many calls we added, accurate unless interrupted
49     private final int maxCalls;
50 
51     private volatile boolean isRunning = true;
52 
Putter(CallQueueManager<FakeCall> aCq, int maxCalls, int tag)53     public Putter(CallQueueManager<FakeCall> aCq, int maxCalls, int tag) {
54       this.maxCalls = maxCalls;
55       this.cq = aCq;
56       this.tag = tag;
57     }
58 
run()59     public void run() {
60       try {
61         // Fill up to max (which is infinite if maxCalls < 0)
62         while (isRunning && (callsAdded < maxCalls || maxCalls < 0)) {
63           cq.put(new FakeCall(this.tag));
64           callsAdded++;
65         }
66       } catch (InterruptedException e) {
67         return;
68       }
69     }
70 
stop()71     public void stop() {
72       this.isRunning = false;
73     }
74   }
75 
76   /**
77    * Taker consumes FakeCalls
78    */
79   public class Taker implements Runnable {
80     private final CallQueueManager<FakeCall> cq;
81 
82     public final int tag; // if >= 0 means we will only take the matching tag, and put back
83                           // anything else
84     public volatile int callsTaken = 0; // total calls taken, accurate if we aren't interrupted
85     public volatile FakeCall lastResult = null; // the last thing we took
86     private final int maxCalls; // maximum calls to take
87 
Taker(CallQueueManager<FakeCall> aCq, int maxCalls, int tag)88     public Taker(CallQueueManager<FakeCall> aCq, int maxCalls, int tag) {
89       this.maxCalls = maxCalls;
90       this.cq = aCq;
91       this.tag = tag;
92     }
93 
run()94     public void run() {
95       try {
96         // Take while we don't exceed maxCalls, or if maxCalls is undefined (< 0)
97         while (callsTaken < maxCalls || maxCalls < 0) {
98           FakeCall res = cq.take();
99 
100           if (tag >= 0 && res.tag != this.tag) {
101             // This call does not match our tag, we should put it back and try again
102             cq.put(res);
103           } else {
104             callsTaken++;
105             lastResult = res;
106           }
107         }
108       } catch (InterruptedException e) {
109         return;
110       }
111     }
112   }
113 
114   // Assert we can take exactly the numberOfTakes
assertCanTake(CallQueueManager<FakeCall> cq, int numberOfTakes, int takeAttempts)115   public void assertCanTake(CallQueueManager<FakeCall> cq, int numberOfTakes,
116     int takeAttempts) throws InterruptedException {
117 
118     Taker taker = new Taker(cq, takeAttempts, -1);
119     Thread t = new Thread(taker);
120     t.start();
121     t.join(100);
122 
123     assertEquals(taker.callsTaken, numberOfTakes);
124     t.interrupt();
125   }
126 
127   // Assert we can put exactly the numberOfPuts
assertCanPut(CallQueueManager<FakeCall> cq, int numberOfPuts, int putAttempts)128   public void assertCanPut(CallQueueManager<FakeCall> cq, int numberOfPuts,
129     int putAttempts) throws InterruptedException {
130 
131     Putter putter = new Putter(cq, putAttempts, -1);
132     Thread t = new Thread(putter);
133     t.start();
134     t.join(100);
135 
136     assertEquals(putter.callsAdded, numberOfPuts);
137     t.interrupt();
138   }
139 
140 
141   private static final Class<? extends BlockingQueue<FakeCall>> queueClass
142       = CallQueueManager.convertQueueClass(LinkedBlockingQueue.class, FakeCall.class);
143 
144   @Test
testCallQueueCapacity()145   public void testCallQueueCapacity() throws InterruptedException {
146     manager = new CallQueueManager<FakeCall>(queueClass, 10, "", null);
147 
148     assertCanPut(manager, 10, 20); // Will stop at 10 due to capacity
149   }
150 
151   @Test
testEmptyConsume()152   public void testEmptyConsume() throws InterruptedException {
153     manager = new CallQueueManager<FakeCall>(queueClass, 10, "", null);
154 
155     assertCanTake(manager, 0, 1); // Fails since it's empty
156   }
157 
158   @Test(timeout=60000)
testSwapUnderContention()159   public void testSwapUnderContention() throws InterruptedException {
160     manager = new CallQueueManager<FakeCall>(queueClass, 5000, "", null);
161 
162     ArrayList<Putter> producers = new ArrayList<Putter>();
163     ArrayList<Taker> consumers = new ArrayList<Taker>();
164 
165     HashMap<Runnable, Thread> threads = new HashMap<Runnable, Thread>();
166 
167     // Create putters and takers
168     for (int i=0; i < 50; i++) {
169       Putter p = new Putter(manager, -1, -1);
170       Thread pt = new Thread(p);
171       producers.add(p);
172       threads.put(p, pt);
173 
174       pt.start();
175     }
176 
177     for (int i=0; i < 20; i++) {
178       Taker t = new Taker(manager, -1, -1);
179       Thread tt = new Thread(t);
180       consumers.add(t);
181       threads.put(t, tt);
182 
183       tt.start();
184     }
185 
186     Thread.sleep(10);
187 
188     for (int i=0; i < 5; i++) {
189       manager.swapQueue(queueClass, 5000, "", null);
190     }
191 
192     // Stop the producers
193     for (Putter p : producers) {
194       p.stop();
195     }
196 
197     // Wait for consumers to wake up, then consume
198     Thread.sleep(2000);
199     assertEquals(0, manager.size());
200 
201     // Ensure no calls were dropped
202     long totalCallsCreated = 0;
203     for (Putter p : producers) {
204       threads.get(p).interrupt();
205     }
206     for (Putter p : producers) {
207       threads.get(p).join();
208       totalCallsCreated += p.callsAdded;
209     }
210 
211     long totalCallsConsumed = 0;
212     for (Taker t : consumers) {
213       threads.get(t).interrupt();
214     }
215     for (Taker t : consumers) {
216       threads.get(t).join();
217       totalCallsConsumed += t.callsTaken;
218     }
219 
220     assertEquals(totalCallsConsumed, totalCallsCreated);
221   }
222 }