1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.ipc; 20 21 import static org.junit.Assert.assertEquals; 22 23 import java.util.ArrayList; 24 import java.util.HashMap; 25 import java.util.concurrent.BlockingQueue; 26 import java.util.concurrent.LinkedBlockingQueue; 27 28 import org.junit.Test; 29 30 public class TestCallQueueManager { 31 private CallQueueManager<FakeCall> manager; 32 33 public class FakeCall { 34 public final int tag; // Can be used for unique identification 35 FakeCall(int tag)36 public FakeCall(int tag) { 37 this.tag = tag; 38 } 39 } 40 41 /** 42 * Putter produces FakeCalls 43 */ 44 public class Putter implements Runnable { 45 private final CallQueueManager<FakeCall> cq; 46 47 public final int tag; 48 public volatile int callsAdded = 0; // How many calls we added, accurate unless interrupted 49 private final int maxCalls; 50 51 private volatile boolean isRunning = true; 52 Putter(CallQueueManager<FakeCall> aCq, int maxCalls, int tag)53 public Putter(CallQueueManager<FakeCall> aCq, int maxCalls, int tag) { 54 this.maxCalls = maxCalls; 55 this.cq = aCq; 56 this.tag = tag; 57 } 58 run()59 public void run() { 60 try { 61 // Fill up to max (which is infinite if maxCalls < 0) 62 while (isRunning && (callsAdded < maxCalls || maxCalls < 0)) { 63 cq.put(new FakeCall(this.tag)); 64 callsAdded++; 65 } 66 } catch (InterruptedException e) { 67 return; 68 } 69 } 70 stop()71 public void stop() { 72 this.isRunning = false; 73 } 74 } 75 76 /** 77 * Taker consumes FakeCalls 78 */ 79 public class Taker implements Runnable { 80 private final CallQueueManager<FakeCall> cq; 81 82 public final int tag; // if >= 0 means we will only take the matching tag, and put back 83 // anything else 84 public volatile int callsTaken = 0; // total calls taken, accurate if we aren't interrupted 85 public volatile FakeCall lastResult = null; // the last thing we took 86 private final int maxCalls; // maximum calls to take 87 Taker(CallQueueManager<FakeCall> aCq, int maxCalls, int tag)88 public Taker(CallQueueManager<FakeCall> aCq, int maxCalls, int tag) { 89 this.maxCalls = maxCalls; 90 this.cq = aCq; 91 this.tag = tag; 92 } 93 run()94 public void run() { 95 try { 96 // Take while we don't exceed maxCalls, or if maxCalls is undefined (< 0) 97 while (callsTaken < maxCalls || maxCalls < 0) { 98 FakeCall res = cq.take(); 99 100 if (tag >= 0 && res.tag != this.tag) { 101 // This call does not match our tag, we should put it back and try again 102 cq.put(res); 103 } else { 104 callsTaken++; 105 lastResult = res; 106 } 107 } 108 } catch (InterruptedException e) { 109 return; 110 } 111 } 112 } 113 114 // Assert we can take exactly the numberOfTakes assertCanTake(CallQueueManager<FakeCall> cq, int numberOfTakes, int takeAttempts)115 public void assertCanTake(CallQueueManager<FakeCall> cq, int numberOfTakes, 116 int takeAttempts) throws InterruptedException { 117 118 Taker taker = new Taker(cq, takeAttempts, -1); 119 Thread t = new Thread(taker); 120 t.start(); 121 t.join(100); 122 123 assertEquals(taker.callsTaken, numberOfTakes); 124 t.interrupt(); 125 } 126 127 // Assert we can put exactly the numberOfPuts assertCanPut(CallQueueManager<FakeCall> cq, int numberOfPuts, int putAttempts)128 public void assertCanPut(CallQueueManager<FakeCall> cq, int numberOfPuts, 129 int putAttempts) throws InterruptedException { 130 131 Putter putter = new Putter(cq, putAttempts, -1); 132 Thread t = new Thread(putter); 133 t.start(); 134 t.join(100); 135 136 assertEquals(putter.callsAdded, numberOfPuts); 137 t.interrupt(); 138 } 139 140 141 private static final Class<? extends BlockingQueue<FakeCall>> queueClass 142 = CallQueueManager.convertQueueClass(LinkedBlockingQueue.class, FakeCall.class); 143 144 @Test testCallQueueCapacity()145 public void testCallQueueCapacity() throws InterruptedException { 146 manager = new CallQueueManager<FakeCall>(queueClass, 10, "", null); 147 148 assertCanPut(manager, 10, 20); // Will stop at 10 due to capacity 149 } 150 151 @Test testEmptyConsume()152 public void testEmptyConsume() throws InterruptedException { 153 manager = new CallQueueManager<FakeCall>(queueClass, 10, "", null); 154 155 assertCanTake(manager, 0, 1); // Fails since it's empty 156 } 157 158 @Test(timeout=60000) testSwapUnderContention()159 public void testSwapUnderContention() throws InterruptedException { 160 manager = new CallQueueManager<FakeCall>(queueClass, 5000, "", null); 161 162 ArrayList<Putter> producers = new ArrayList<Putter>(); 163 ArrayList<Taker> consumers = new ArrayList<Taker>(); 164 165 HashMap<Runnable, Thread> threads = new HashMap<Runnable, Thread>(); 166 167 // Create putters and takers 168 for (int i=0; i < 50; i++) { 169 Putter p = new Putter(manager, -1, -1); 170 Thread pt = new Thread(p); 171 producers.add(p); 172 threads.put(p, pt); 173 174 pt.start(); 175 } 176 177 for (int i=0; i < 20; i++) { 178 Taker t = new Taker(manager, -1, -1); 179 Thread tt = new Thread(t); 180 consumers.add(t); 181 threads.put(t, tt); 182 183 tt.start(); 184 } 185 186 Thread.sleep(10); 187 188 for (int i=0; i < 5; i++) { 189 manager.swapQueue(queueClass, 5000, "", null); 190 } 191 192 // Stop the producers 193 for (Putter p : producers) { 194 p.stop(); 195 } 196 197 // Wait for consumers to wake up, then consume 198 Thread.sleep(2000); 199 assertEquals(0, manager.size()); 200 201 // Ensure no calls were dropped 202 long totalCallsCreated = 0; 203 for (Putter p : producers) { 204 threads.get(p).interrupt(); 205 } 206 for (Putter p : producers) { 207 threads.get(p).join(); 208 totalCallsCreated += p.callsAdded; 209 } 210 211 long totalCallsConsumed = 0; 212 for (Taker t : consumers) { 213 threads.get(t).interrupt(); 214 } 215 for (Taker t : consumers) { 216 threads.get(t).join(); 217 totalCallsConsumed += t.callsTaken; 218 } 219 220 assertEquals(totalCallsConsumed, totalCallsCreated); 221 } 222 }