1 package org.jgroups.protocols;
2 
3 import org.jgroups.Address;
4 import org.jgroups.Event;
5 import org.jgroups.Global;
6 import org.jgroups.Message;
7 import org.jgroups.conf.ClassConfigurator;
8 import org.jgroups.stack.Protocol;
9 import org.jgroups.util.TimeScheduler;
10 import org.jgroups.util.TimeScheduler2;
11 import org.jgroups.util.Util;
12 import org.testng.annotations.DataProvider;
13 import org.testng.annotations.Test;
14 
15 import java.util.ArrayList;
16 import java.util.Collections;
17 import java.util.List;
18 import java.util.concurrent.ConcurrentLinkedQueue;
19 import java.util.concurrent.CountDownLatch;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.atomic.AtomicInteger;
22 import java.util.concurrent.atomic.AtomicLong;
23 import java.util.concurrent.locks.Condition;
24 import java.util.concurrent.locks.Lock;
25 import java.util.concurrent.locks.ReentrantLock;
26 
27 
28 /**
29  * Tests time for N threads to deliver M messages to UNICAST2
30  * @author Bela Ban
31  */
32 @Test(groups=Global.FUNCTIONAL, sequential=true)
33 public class UNICAST2_StressTest {
34     static final int NUM_MSGS=1000000;
35     static final int NUM_THREADS=50;
36     static final int MAX_MSG_BATCH_SIZE=50000;
37 
38     static final short UNICAST_ID=ClassConfigurator.getProtocolId(UNICAST2.class);
39 
40     @DataProvider(name="createTimer")
createTimer()41     Object[][] createTimer() {
42         return Util.createTimer();
43     }
44 
45     @Test(dataProvider="createTimer")
stressTest(TimeScheduler timer)46     public static void stressTest(TimeScheduler timer) {
47         start(NUM_THREADS, NUM_MSGS, false, MAX_MSG_BATCH_SIZE, timer);
48     }
49 
50     @Test(dataProvider="createTimer")
stressTestOOB(TimeScheduler timer)51     public static void stressTestOOB(TimeScheduler timer) {
52         start(NUM_THREADS, NUM_MSGS, true, MAX_MSG_BATCH_SIZE, timer);
53     }
54 
start(final int num_threads, final int num_msgs, boolean oob, int max_msg_batch_size, TimeScheduler timer)55     private static void start(final int num_threads, final int num_msgs, boolean oob, int max_msg_batch_size, TimeScheduler timer) {
56         final UNICAST2 unicast=new UNICAST2();
57         final AtomicInteger counter=new AtomicInteger(num_msgs);
58         final AtomicLong seqno=new AtomicLong(1);
59         final AtomicInteger delivered_msgs=new AtomicInteger(0);
60         final Lock lock=new ReentrantLock();
61         final Condition all_msgs_delivered=lock.newCondition();
62         final ConcurrentLinkedQueue<Long> delivered_msg_list=new ConcurrentLinkedQueue<Long>();
63         final Address local_addr=Util.createRandomAddress();
64         final Address sender=Util.createRandomAddress();
65 
66         if(timer == null)
67             timer=new TimeScheduler2();
68         unicast.setTimer(timer);
69         System.out.println("timer is a " + timer.getClass());
70 
71 
72         unicast.setDownProtocol(new Protocol() {
73             public Object down(Event evt) {return null;}
74         });
75 
76         unicast.setUpProtocol(new Protocol() {
77             public Object up(Event evt) {
78                 if(evt.getType() == Event.MSG) {
79                     delivered_msgs.incrementAndGet();
80                     UNICAST2.Unicast2Header hdr=(UNICAST2.Unicast2Header)((Message)evt.getArg()).getHeader(UNICAST_ID);
81                     if(hdr != null)
82                         delivered_msg_list.add(hdr.getSeqno());
83 
84                     if(delivered_msgs.get() >= num_msgs) {
85                         lock.lock();
86                         try {
87                             all_msgs_delivered.signalAll();
88                         }
89                         finally {
90                             lock.unlock();
91                         }
92                     }
93                 }
94                 return null;
95             }
96         });
97 
98         unicast.down(new Event(Event.SET_LOCAL_ADDRESS, local_addr));
99 
100         unicast.setMaxMessageBatchSize(max_msg_batch_size);
101         unicast.setValue("max_bytes", 20000);
102 
103         // send the first message manually, to initialize the AckReceiverWindow tables
104         Message msg=createMessage(local_addr, sender, 1L, oob, true);
105         unicast.up(new Event(Event.MSG, msg));
106         Util.sleep(500);
107 
108 
109         final CountDownLatch latch=new CountDownLatch(1);
110         Sender[] adders=new Sender[num_threads];
111         for(int i=0; i < adders.length; i++) {
112             adders[i]=new Sender(unicast, latch, counter, seqno, oob, local_addr, sender);
113             adders[i].start();
114         }
115 
116         long start=System.currentTimeMillis();
117         latch.countDown(); // starts all adders
118 
119         lock.lock();
120         try {
121             while(delivered_msgs.get() < num_msgs) {
122                 try {
123                     all_msgs_delivered.await(1000, TimeUnit.MILLISECONDS);
124                     System.out.println("received " + delivered_msgs.get() + " msgs");
125                 }
126                 catch(InterruptedException e) {
127                     e.printStackTrace();
128                 }
129             }
130         }
131         finally {
132             lock.unlock();
133         }
134 
135         long time=System.currentTimeMillis() - start;
136         double requests_sec=num_msgs / (time / 1000.0);
137         System.out.println("\nTime: " + time + " ms, " + Util.format(requests_sec) + " requests / sec\n");
138         System.out.println("Delivered messages: " + delivered_msg_list.size());
139         if(delivered_msg_list.size() < 100)
140             System.out.println("Elements: " + delivered_msg_list);
141 
142         unicast.stop();
143         timer.stop();
144 
145         List<Long> results=new ArrayList<Long>(delivered_msg_list);
146 
147         if(oob)
148             Collections.sort(results);
149 
150         assert results.size() == num_msgs : "expected " + num_msgs + ", but got " + results.size();
151 
152         System.out.println("Checking results consistency");
153         int i=1;
154         for(Long num: results) {
155             if(num.longValue() != i) {
156                 assert i == num : "expected " + i + " but got " + num;
157                 return;
158             }
159             i++;
160         }
161         System.out.println("OK");
162     }
163 
createMessage(Address dest, Address src, long seqno, boolean oob, boolean first)164     private static Message createMessage(Address dest, Address src, long seqno, boolean oob, boolean first) {
165         Message msg=new Message(dest, src, "hello world");
166         UNICAST2.Unicast2Header hdr=UNICAST2.Unicast2Header.createDataHeader(seqno, (short)1, first);
167         msg.putHeader(UNICAST_ID, hdr);
168         if(oob)
169             msg.setFlag(Message.OOB);
170         return msg;
171     }
172 
173 
174     static class Sender extends Thread {
175         final UNICAST2 unicast;
176         final CountDownLatch latch;
177         final AtomicInteger num_msgs;
178         final AtomicLong current_seqno;
179         final boolean oob;
180         final Address dest;
181         final Address sender;
182 
Sender(UNICAST2 unicast, CountDownLatch latch, AtomicInteger num_msgs, AtomicLong current_seqno, boolean oob, final Address dest, final Address sender)183         public Sender(UNICAST2 unicast, CountDownLatch latch, AtomicInteger num_msgs, AtomicLong current_seqno,
184                       boolean oob, final Address dest, final Address sender) {
185             this.unicast=unicast;
186             this.latch=latch;
187             this.num_msgs=num_msgs;
188             this.current_seqno=current_seqno;
189             this.oob=oob;
190             this.dest=dest;
191             this.sender=sender;
192             setName("Adder");
193         }
194 
195 
run()196         public void run() {
197             try {
198                 latch.await();
199             }
200             catch(InterruptedException e) {
201                 e.printStackTrace();
202                 return;
203             }
204 
205             while(num_msgs.getAndDecrement() > 0) {
206                 long seqno=current_seqno.getAndIncrement();
207                 Message msg=createMessage(dest, sender, seqno, oob, false);
208                 unicast.up(new Event(Event.MSG, msg));
209             }
210         }
211     }
212 
213 
214     @Test(enabled=false)
main(String[] args)215     public static void main(String[] args) {
216         int num_threads=10;
217         int num_msgs=1000000;
218         int max=20000;
219         boolean oob=false;
220 
221         for(int i=0; i < args.length; i++) {
222             if(args[i].equals("-num_threads")) {
223                 num_threads=Integer.parseInt(args[++i]);
224                 continue;
225             }
226             if(args[i].equals("-num_msgs")) {
227                 num_msgs=Integer.parseInt(args[++i]);
228                 continue;
229             }
230             if(args[i].equals("-oob")) {
231                 oob=Boolean.parseBoolean(args[++i]);
232                 continue;
233             }
234             if(args[i].equals("-max")) {
235                 max=Integer.parseInt(args[++i]);
236                 continue;
237             }
238             System.out.println("UNICAST2_StressTest [-num_msgs msgs] [-num_threads threads] " +
239                     "[-oob <true | false>] [-max <batch size>]");
240             return;
241         }
242         start(num_threads, num_msgs, oob, max, null);
243     }
244 }