1 package org.jgroups.protocols; 2 3 import org.jgroups.Address; 4 import org.jgroups.Event; 5 import org.jgroups.Global; 6 import org.jgroups.Message; 7 import org.jgroups.conf.ClassConfigurator; 8 import org.jgroups.stack.Protocol; 9 import org.jgroups.util.TimeScheduler; 10 import org.jgroups.util.TimeScheduler2; 11 import org.jgroups.util.Util; 12 import org.testng.annotations.DataProvider; 13 import org.testng.annotations.Test; 14 15 import java.util.ArrayList; 16 import java.util.Collections; 17 import java.util.List; 18 import java.util.concurrent.ConcurrentLinkedQueue; 19 import java.util.concurrent.CountDownLatch; 20 import java.util.concurrent.TimeUnit; 21 import java.util.concurrent.atomic.AtomicInteger; 22 import java.util.concurrent.atomic.AtomicLong; 23 import java.util.concurrent.locks.Condition; 24 import java.util.concurrent.locks.Lock; 25 import java.util.concurrent.locks.ReentrantLock; 26 27 28 /** 29 * Tests time for N threads to deliver M messages to UNICAST2 30 * @author Bela Ban 31 */ 32 @Test(groups=Global.FUNCTIONAL, sequential=true) 33 public class UNICAST2_StressTest { 34 static final int NUM_MSGS=1000000; 35 static final int NUM_THREADS=50; 36 static final int MAX_MSG_BATCH_SIZE=50000; 37 38 static final short UNICAST_ID=ClassConfigurator.getProtocolId(UNICAST2.class); 39 40 @DataProvider(name="createTimer") createTimer()41 Object[][] createTimer() { 42 return Util.createTimer(); 43 } 44 45 @Test(dataProvider="createTimer") stressTest(TimeScheduler timer)46 public static void stressTest(TimeScheduler timer) { 47 start(NUM_THREADS, NUM_MSGS, false, MAX_MSG_BATCH_SIZE, timer); 48 } 49 50 @Test(dataProvider="createTimer") stressTestOOB(TimeScheduler timer)51 public static void stressTestOOB(TimeScheduler timer) { 52 start(NUM_THREADS, NUM_MSGS, true, MAX_MSG_BATCH_SIZE, timer); 53 } 54 start(final int num_threads, final int num_msgs, boolean oob, int max_msg_batch_size, TimeScheduler timer)55 private static void start(final int num_threads, final int num_msgs, boolean oob, int max_msg_batch_size, TimeScheduler timer) { 56 final UNICAST2 unicast=new UNICAST2(); 57 final AtomicInteger counter=new AtomicInteger(num_msgs); 58 final AtomicLong seqno=new AtomicLong(1); 59 final AtomicInteger delivered_msgs=new AtomicInteger(0); 60 final Lock lock=new ReentrantLock(); 61 final Condition all_msgs_delivered=lock.newCondition(); 62 final ConcurrentLinkedQueue<Long> delivered_msg_list=new ConcurrentLinkedQueue<Long>(); 63 final Address local_addr=Util.createRandomAddress(); 64 final Address sender=Util.createRandomAddress(); 65 66 if(timer == null) 67 timer=new TimeScheduler2(); 68 unicast.setTimer(timer); 69 System.out.println("timer is a " + timer.getClass()); 70 71 72 unicast.setDownProtocol(new Protocol() { 73 public Object down(Event evt) {return null;} 74 }); 75 76 unicast.setUpProtocol(new Protocol() { 77 public Object up(Event evt) { 78 if(evt.getType() == Event.MSG) { 79 delivered_msgs.incrementAndGet(); 80 UNICAST2.Unicast2Header hdr=(UNICAST2.Unicast2Header)((Message)evt.getArg()).getHeader(UNICAST_ID); 81 if(hdr != null) 82 delivered_msg_list.add(hdr.getSeqno()); 83 84 if(delivered_msgs.get() >= num_msgs) { 85 lock.lock(); 86 try { 87 all_msgs_delivered.signalAll(); 88 } 89 finally { 90 lock.unlock(); 91 } 92 } 93 } 94 return null; 95 } 96 }); 97 98 unicast.down(new Event(Event.SET_LOCAL_ADDRESS, local_addr)); 99 100 unicast.setMaxMessageBatchSize(max_msg_batch_size); 101 unicast.setValue("max_bytes", 20000); 102 103 // send the first message manually, to initialize the AckReceiverWindow tables 104 Message msg=createMessage(local_addr, sender, 1L, oob, true); 105 unicast.up(new Event(Event.MSG, msg)); 106 Util.sleep(500); 107 108 109 final CountDownLatch latch=new CountDownLatch(1); 110 Sender[] adders=new Sender[num_threads]; 111 for(int i=0; i < adders.length; i++) { 112 adders[i]=new Sender(unicast, latch, counter, seqno, oob, local_addr, sender); 113 adders[i].start(); 114 } 115 116 long start=System.currentTimeMillis(); 117 latch.countDown(); // starts all adders 118 119 lock.lock(); 120 try { 121 while(delivered_msgs.get() < num_msgs) { 122 try { 123 all_msgs_delivered.await(1000, TimeUnit.MILLISECONDS); 124 System.out.println("received " + delivered_msgs.get() + " msgs"); 125 } 126 catch(InterruptedException e) { 127 e.printStackTrace(); 128 } 129 } 130 } 131 finally { 132 lock.unlock(); 133 } 134 135 long time=System.currentTimeMillis() - start; 136 double requests_sec=num_msgs / (time / 1000.0); 137 System.out.println("\nTime: " + time + " ms, " + Util.format(requests_sec) + " requests / sec\n"); 138 System.out.println("Delivered messages: " + delivered_msg_list.size()); 139 if(delivered_msg_list.size() < 100) 140 System.out.println("Elements: " + delivered_msg_list); 141 142 unicast.stop(); 143 timer.stop(); 144 145 List<Long> results=new ArrayList<Long>(delivered_msg_list); 146 147 if(oob) 148 Collections.sort(results); 149 150 assert results.size() == num_msgs : "expected " + num_msgs + ", but got " + results.size(); 151 152 System.out.println("Checking results consistency"); 153 int i=1; 154 for(Long num: results) { 155 if(num.longValue() != i) { 156 assert i == num : "expected " + i + " but got " + num; 157 return; 158 } 159 i++; 160 } 161 System.out.println("OK"); 162 } 163 createMessage(Address dest, Address src, long seqno, boolean oob, boolean first)164 private static Message createMessage(Address dest, Address src, long seqno, boolean oob, boolean first) { 165 Message msg=new Message(dest, src, "hello world"); 166 UNICAST2.Unicast2Header hdr=UNICAST2.Unicast2Header.createDataHeader(seqno, (short)1, first); 167 msg.putHeader(UNICAST_ID, hdr); 168 if(oob) 169 msg.setFlag(Message.OOB); 170 return msg; 171 } 172 173 174 static class Sender extends Thread { 175 final UNICAST2 unicast; 176 final CountDownLatch latch; 177 final AtomicInteger num_msgs; 178 final AtomicLong current_seqno; 179 final boolean oob; 180 final Address dest; 181 final Address sender; 182 Sender(UNICAST2 unicast, CountDownLatch latch, AtomicInteger num_msgs, AtomicLong current_seqno, boolean oob, final Address dest, final Address sender)183 public Sender(UNICAST2 unicast, CountDownLatch latch, AtomicInteger num_msgs, AtomicLong current_seqno, 184 boolean oob, final Address dest, final Address sender) { 185 this.unicast=unicast; 186 this.latch=latch; 187 this.num_msgs=num_msgs; 188 this.current_seqno=current_seqno; 189 this.oob=oob; 190 this.dest=dest; 191 this.sender=sender; 192 setName("Adder"); 193 } 194 195 run()196 public void run() { 197 try { 198 latch.await(); 199 } 200 catch(InterruptedException e) { 201 e.printStackTrace(); 202 return; 203 } 204 205 while(num_msgs.getAndDecrement() > 0) { 206 long seqno=current_seqno.getAndIncrement(); 207 Message msg=createMessage(dest, sender, seqno, oob, false); 208 unicast.up(new Event(Event.MSG, msg)); 209 } 210 } 211 } 212 213 214 @Test(enabled=false) main(String[] args)215 public static void main(String[] args) { 216 int num_threads=10; 217 int num_msgs=1000000; 218 int max=20000; 219 boolean oob=false; 220 221 for(int i=0; i < args.length; i++) { 222 if(args[i].equals("-num_threads")) { 223 num_threads=Integer.parseInt(args[++i]); 224 continue; 225 } 226 if(args[i].equals("-num_msgs")) { 227 num_msgs=Integer.parseInt(args[++i]); 228 continue; 229 } 230 if(args[i].equals("-oob")) { 231 oob=Boolean.parseBoolean(args[++i]); 232 continue; 233 } 234 if(args[i].equals("-max")) { 235 max=Integer.parseInt(args[++i]); 236 continue; 237 } 238 System.out.println("UNICAST2_StressTest [-num_msgs msgs] [-num_threads threads] " + 239 "[-oob <true | false>] [-max <batch size>]"); 240 return; 241 } 242 start(num_threads, num_msgs, oob, max, null); 243 } 244 }