1 package org.jgroups.protocols;
2 
3 import org.jgroups.*;
4 import org.jgroups.conf.ClassConfigurator;
5 import org.jgroups.stack.Protocol;
6 import org.jgroups.stack.NakReceiverWindow;
7 import org.jgroups.protocols.pbcast.NAKACK;
8 import org.jgroups.protocols.pbcast.NakAckHeader;
9 import org.jgroups.util.*;
10 import org.jgroups.util.UUID;
11 import org.testng.annotations.AfterMethod;
12 import org.testng.annotations.BeforeMethod;
13 import org.testng.annotations.Test;
14 
15 import java.util.*;
16 import java.util.concurrent.*;
17 
18 /**
19  * Tests whether a mix of OOB and regular messages (with duplicates), sent my multiple threads, are delivery
20  * correctly. Correct delivery means:
21  * <ul>
22  * <li>All messages are received exactly once (no duplicates and no missing messages)
23  * <li>For regular messages only: all messages are received in the order in which they were sent (order of seqnos)
24  * </ul>
25  * @author Bela Ban
26  */
27 @Test(groups=Global.FUNCTIONAL)
28 public class NAKACK_Delivery_Test {
29     private NAKACK nak;
30     private Address c1, c2;
31     static final short NAKACK_ID=ClassConfigurator.getProtocolId(NAKACK.class);
32     MyReceiver receiver=new MyReceiver();
33     Executor pool;
34     final static int NUM_MSGS=50;
35 
36     @BeforeMethod
setUp()37     protected void setUp() throws Exception {
38         c1=Util.createRandomAddress("C1");
39         c2=Util.createRandomAddress("C2");
40         nak=new NAKACK();
41 
42         TP transport=new TP() {
43             public boolean supportsMulticasting() {return false;}
44             public void sendMulticast(byte[] data, int offset, int length) throws Exception {}
45             public void sendUnicast(PhysicalAddress dest, byte[] data, int offset, int length) throws Exception {}
46             public String getInfo() {return null;}
47             public Object down(Event evt) {return null;}
48             protected PhysicalAddress getPhysicalAddress() {return null;}
49             public TimeScheduler getTimer() {return new DefaultTimeScheduler(1);}
50         };
51 
52         transport.setId((short)100);
53 
54         nak.setDownProtocol(transport);
55 
56         receiver.init(c1, c2);
57         nak.setUpProtocol(receiver);
58 
59         nak.start();
60 
61         Vector<Address> members=new Vector<Address>(2);
62         members.add(c1); members.add(c2);
63         View view=new View(c1, 1, members);
64 
65         // set the local address
66         nak.down(new Event(Event.SET_LOCAL_ADDRESS, c1));
67 
68         // set a dummy digest
69         MutableDigest digest=new MutableDigest(2);
70         digest.add(c1, 0, 0, 0);
71         digest.add(c2, 0, 0, 0);
72         nak.down(new Event(Event.SET_DIGEST, digest));
73 
74         // set dummy view
75         nak.down(new Event(Event.VIEW_CHANGE, view));
76 
77         pool=new ThreadPoolExecutor(1, 100, 1000, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>());
78         // pool=new DirectExecutor();
79         // if(pool instanceof ThreadPoolExecutor)
80         ((ThreadPoolExecutor)pool).setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
81     }
82 
83 
84     @AfterMethod
tearDown()85     protected void tearDown() {
86         nak.stop();
87         if(pool instanceof ThreadPoolExecutor)
88             ((ThreadPoolExecutor)pool).shutdownNow();
89     }
90 
91 
92     /**
93      * Sends NUM_MSGS (regular or OOB) multicasts on c1 and c2, checks that both c1 and c2 received NUM_MSGS messages.
94      * This test doesn't use a transport, but injects messages directly into NAKACK.
95      */
testSendingOfRandomMessages()96     public void testSendingOfRandomMessages() {
97         List<Integer> seqnos=generateRandomNumbers(1, NUM_MSGS);
98         seqnos.addAll(generateRandomNumbers(1, NUM_MSGS));
99         seqnos.addAll(generateRandomNumbers(Math.min(15, NUM_MSGS / 2), NUM_MSGS / 2));
100         seqnos.addAll(generateRandomNumbers(2, NUM_MSGS));
101         seqnos.addAll(generateRandomNumbers(5, Math.max(5, NUM_MSGS - 10)));
102 
103         Set<Integer> no_duplicates=new HashSet<Integer>(seqnos);
104 
105         System.out.println("sending " + seqnos.size() + " msgs (including duplicates); size excluding duplicates=" +
106                 no_duplicates.size());
107 
108         // we need to add our own messages (nak is for C1), or else they will get discarded by NAKACK.handleMessage()
109         NakReceiverWindow win=nak.getWindow(c1);
110         for(int i=1; i <= NUM_MSGS; i++)
111             win.add(i, msg(c1, i, i, true));
112 
113         for(int i: seqnos) {
114             boolean oob=Util.tossWeightedCoin(0.5);
115             pool.execute(new Sender(c2, i, i, oob));
116             pool.execute(new Sender(c1, i, i, oob));
117         }
118 
119         ConcurrentMap<Address, Collection<Message>> msgs=receiver.getMsgs();
120         Collection<Message> c1_list=msgs.get(c1);
121         Collection<Message> c2_list=msgs.get(c2);
122 
123         long end_time=System.currentTimeMillis() + 10000;
124         while(System.currentTimeMillis() < end_time) {
125             int size_c1=c1_list.size();
126             int size_c2=c2_list.size();
127             System.out.println("size C1 = " + size_c1 + ", size C2=" + size_c2);
128             if(size_c1 == NUM_MSGS && size_c2 == NUM_MSGS)
129                 break;
130             Util.sleep(1000);
131         }
132 
133         assert c1_list.size() == NUM_MSGS : "[C1] expected " + NUM_MSGS + " messages, but got " + c1_list.size();
134         assert c2_list.size() == NUM_MSGS : "[C2] expected " + NUM_MSGS + " messages, but got " + c2_list.size();
135     }
136 
generateRandomNumbers(int from, int to)137     private static List<Integer> generateRandomNumbers(int from, int to) {
138         List<Integer> retval=new ArrayList<Integer>(20);
139         for(int i=from; i <= to; i++)
140             retval.add(i);
141         Collections.shuffle(retval);
142         return retval;
143     }
144 
145 
send(Address sender, long seqno, int number, boolean oob)146     private void send(Address sender, long seqno, int number, boolean oob) {
147         assert sender != null;
148         nak.up(new Event(Event.MSG, msg(sender, seqno, number, oob)));
149     }
150 
151 
msg(Address sender, long seqno, int number, boolean oob)152     private static Message msg(Address sender, long seqno, int number, boolean oob) {
153         Message msg=new Message(null, sender, number);
154         if(oob)
155             msg.setFlag(Message.OOB);
156         if(seqno != -1)
157             msg.putHeader(NAKACK_ID, NakAckHeader.createMessageHeader(seqno));
158         return msg;
159     }
160 
161 
162     static class MyReceiver extends Protocol {
163         final ConcurrentMap<Address, Collection<Message>> msgs=new ConcurrentHashMap<Address,Collection<Message>>();
164 
getMsgs()165         public ConcurrentMap<Address, Collection<Message>> getMsgs() {
166             return msgs;
167         }
init(Address .... mbrs)168         public void init(Address ... mbrs) {
169             for(Address mbr: mbrs) {
170                 msgs.putIfAbsent(mbr, new ConcurrentLinkedQueue<Message>());
171             }
172         }
173 
up(Event evt)174         public Object up(Event evt) {
175             if(evt.getType() == Event.MSG) {
176                 Message msg=(Message)evt.getArg();
177                 Address sender=msg.getSrc();
178                 Collection<Message> list=msgs.get(sender);
179                 if(list == null) {
180                     list=new ConcurrentLinkedQueue<Message>();
181                     Collection<Message> tmp=msgs.putIfAbsent(sender, list);
182                     if(tmp != null)
183                         list=tmp;
184                 }
185                 list.add(msg);
186             }
187             return null;
188         }
189     }
190 
191     class Sender implements Runnable {
192         final Address sender;
193         final long seqno;
194         final int number;
195         final boolean oob;
196 
Sender(Address sender, long seqno, int number, boolean oob)197         public Sender(Address sender, long seqno, int number, boolean oob) {
198             this.sender=sender;
199             this.seqno=seqno;
200             this.number=number;
201             this.oob=oob;
202         }
203 
run()204         public void run() {
205             send(sender, seqno, number, oob);
206         }
207     }
208 
209 }