1 package org.jgroups.protocols; 2 3 import org.jgroups.*; 4 import org.jgroups.conf.ClassConfigurator; 5 import org.jgroups.stack.Protocol; 6 import org.jgroups.stack.NakReceiverWindow; 7 import org.jgroups.protocols.pbcast.NAKACK; 8 import org.jgroups.protocols.pbcast.NakAckHeader; 9 import org.jgroups.util.*; 10 import org.jgroups.util.UUID; 11 import org.testng.annotations.AfterMethod; 12 import org.testng.annotations.BeforeMethod; 13 import org.testng.annotations.Test; 14 15 import java.util.*; 16 import java.util.concurrent.*; 17 18 /** 19 * Tests whether a mix of OOB and regular messages (with duplicates), sent my multiple threads, are delivery 20 * correctly. Correct delivery means: 21 * <ul> 22 * <li>All messages are received exactly once (no duplicates and no missing messages) 23 * <li>For regular messages only: all messages are received in the order in which they were sent (order of seqnos) 24 * </ul> 25 * @author Bela Ban 26 */ 27 @Test(groups=Global.FUNCTIONAL) 28 public class NAKACK_Delivery_Test { 29 private NAKACK nak; 30 private Address c1, c2; 31 static final short NAKACK_ID=ClassConfigurator.getProtocolId(NAKACK.class); 32 MyReceiver receiver=new MyReceiver(); 33 Executor pool; 34 final static int NUM_MSGS=50; 35 36 @BeforeMethod setUp()37 protected void setUp() throws Exception { 38 c1=Util.createRandomAddress("C1"); 39 c2=Util.createRandomAddress("C2"); 40 nak=new NAKACK(); 41 42 TP transport=new TP() { 43 public boolean supportsMulticasting() {return false;} 44 public void sendMulticast(byte[] data, int offset, int length) throws Exception {} 45 public void sendUnicast(PhysicalAddress dest, byte[] data, int offset, int length) throws Exception {} 46 public String getInfo() {return null;} 47 public Object down(Event evt) {return null;} 48 protected PhysicalAddress getPhysicalAddress() {return null;} 49 public TimeScheduler getTimer() {return new DefaultTimeScheduler(1);} 50 }; 51 52 transport.setId((short)100); 53 54 nak.setDownProtocol(transport); 55 56 receiver.init(c1, c2); 57 nak.setUpProtocol(receiver); 58 59 nak.start(); 60 61 Vector<Address> members=new Vector<Address>(2); 62 members.add(c1); members.add(c2); 63 View view=new View(c1, 1, members); 64 65 // set the local address 66 nak.down(new Event(Event.SET_LOCAL_ADDRESS, c1)); 67 68 // set a dummy digest 69 MutableDigest digest=new MutableDigest(2); 70 digest.add(c1, 0, 0, 0); 71 digest.add(c2, 0, 0, 0); 72 nak.down(new Event(Event.SET_DIGEST, digest)); 73 74 // set dummy view 75 nak.down(new Event(Event.VIEW_CHANGE, view)); 76 77 pool=new ThreadPoolExecutor(1, 100, 1000, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>()); 78 // pool=new DirectExecutor(); 79 // if(pool instanceof ThreadPoolExecutor) 80 ((ThreadPoolExecutor)pool).setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); 81 } 82 83 84 @AfterMethod tearDown()85 protected void tearDown() { 86 nak.stop(); 87 if(pool instanceof ThreadPoolExecutor) 88 ((ThreadPoolExecutor)pool).shutdownNow(); 89 } 90 91 92 /** 93 * Sends NUM_MSGS (regular or OOB) multicasts on c1 and c2, checks that both c1 and c2 received NUM_MSGS messages. 94 * This test doesn't use a transport, but injects messages directly into NAKACK. 95 */ testSendingOfRandomMessages()96 public void testSendingOfRandomMessages() { 97 List<Integer> seqnos=generateRandomNumbers(1, NUM_MSGS); 98 seqnos.addAll(generateRandomNumbers(1, NUM_MSGS)); 99 seqnos.addAll(generateRandomNumbers(Math.min(15, NUM_MSGS / 2), NUM_MSGS / 2)); 100 seqnos.addAll(generateRandomNumbers(2, NUM_MSGS)); 101 seqnos.addAll(generateRandomNumbers(5, Math.max(5, NUM_MSGS - 10))); 102 103 Set<Integer> no_duplicates=new HashSet<Integer>(seqnos); 104 105 System.out.println("sending " + seqnos.size() + " msgs (including duplicates); size excluding duplicates=" + 106 no_duplicates.size()); 107 108 // we need to add our own messages (nak is for C1), or else they will get discarded by NAKACK.handleMessage() 109 NakReceiverWindow win=nak.getWindow(c1); 110 for(int i=1; i <= NUM_MSGS; i++) 111 win.add(i, msg(c1, i, i, true)); 112 113 for(int i: seqnos) { 114 boolean oob=Util.tossWeightedCoin(0.5); 115 pool.execute(new Sender(c2, i, i, oob)); 116 pool.execute(new Sender(c1, i, i, oob)); 117 } 118 119 ConcurrentMap<Address, Collection<Message>> msgs=receiver.getMsgs(); 120 Collection<Message> c1_list=msgs.get(c1); 121 Collection<Message> c2_list=msgs.get(c2); 122 123 long end_time=System.currentTimeMillis() + 10000; 124 while(System.currentTimeMillis() < end_time) { 125 int size_c1=c1_list.size(); 126 int size_c2=c2_list.size(); 127 System.out.println("size C1 = " + size_c1 + ", size C2=" + size_c2); 128 if(size_c1 == NUM_MSGS && size_c2 == NUM_MSGS) 129 break; 130 Util.sleep(1000); 131 } 132 133 assert c1_list.size() == NUM_MSGS : "[C1] expected " + NUM_MSGS + " messages, but got " + c1_list.size(); 134 assert c2_list.size() == NUM_MSGS : "[C2] expected " + NUM_MSGS + " messages, but got " + c2_list.size(); 135 } 136 generateRandomNumbers(int from, int to)137 private static List<Integer> generateRandomNumbers(int from, int to) { 138 List<Integer> retval=new ArrayList<Integer>(20); 139 for(int i=from; i <= to; i++) 140 retval.add(i); 141 Collections.shuffle(retval); 142 return retval; 143 } 144 145 send(Address sender, long seqno, int number, boolean oob)146 private void send(Address sender, long seqno, int number, boolean oob) { 147 assert sender != null; 148 nak.up(new Event(Event.MSG, msg(sender, seqno, number, oob))); 149 } 150 151 msg(Address sender, long seqno, int number, boolean oob)152 private static Message msg(Address sender, long seqno, int number, boolean oob) { 153 Message msg=new Message(null, sender, number); 154 if(oob) 155 msg.setFlag(Message.OOB); 156 if(seqno != -1) 157 msg.putHeader(NAKACK_ID, NakAckHeader.createMessageHeader(seqno)); 158 return msg; 159 } 160 161 162 static class MyReceiver extends Protocol { 163 final ConcurrentMap<Address, Collection<Message>> msgs=new ConcurrentHashMap<Address,Collection<Message>>(); 164 getMsgs()165 public ConcurrentMap<Address, Collection<Message>> getMsgs() { 166 return msgs; 167 } init(Address .... mbrs)168 public void init(Address ... mbrs) { 169 for(Address mbr: mbrs) { 170 msgs.putIfAbsent(mbr, new ConcurrentLinkedQueue<Message>()); 171 } 172 } 173 up(Event evt)174 public Object up(Event evt) { 175 if(evt.getType() == Event.MSG) { 176 Message msg=(Message)evt.getArg(); 177 Address sender=msg.getSrc(); 178 Collection<Message> list=msgs.get(sender); 179 if(list == null) { 180 list=new ConcurrentLinkedQueue<Message>(); 181 Collection<Message> tmp=msgs.putIfAbsent(sender, list); 182 if(tmp != null) 183 list=tmp; 184 } 185 list.add(msg); 186 } 187 return null; 188 } 189 } 190 191 class Sender implements Runnable { 192 final Address sender; 193 final long seqno; 194 final int number; 195 final boolean oob; 196 Sender(Address sender, long seqno, int number, boolean oob)197 public Sender(Address sender, long seqno, int number, boolean oob) { 198 this.sender=sender; 199 this.seqno=seqno; 200 this.number=number; 201 this.oob=oob; 202 } 203 run()204 public void run() { 205 send(sender, seqno, number, oob); 206 } 207 } 208 209 }