1 package org.jgroups.tests;
2 
3 import org.jgroups.*;
4 import org.jgroups.conf.ConfiguratorFactory;
5 import org.jgroups.conf.ProtocolConfiguration;
6 import org.jgroups.conf.ProtocolStackConfigurator;
7 import org.jgroups.protocols.BasicTCP;
8 import org.jgroups.protocols.TCPPING;
9 import org.jgroups.protocols.TP;
10 import org.jgroups.protocols.UDP;
11 import org.jgroups.stack.IpAddress;
12 import org.jgroups.stack.Protocol;
13 import org.jgroups.stack.ProtocolStack;
14 import org.jgroups.util.ResourceManager;
15 import org.jgroups.util.TimeScheduler;
16 import org.jgroups.util.Util;
17 import org.testng.AssertJUnit;
18 import org.testng.annotations.AfterMethod;
19 import org.testng.annotations.Test;
20 
21 import java.net.InetAddress;
22 import java.util.LinkedList;
23 import java.util.List;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.TimeUnit;
26 
27 
28 /**
29  * Tests which test the shared transport
30  * @author Bela Ban
31  */
32 @Test(groups=Global.STACK_DEPENDENT,sequential=true)
33 public class SharedTransportTest extends ChannelTestBase {
34     private JChannel a, b, c;
35     private MyReceiver r1, r2, r3;
36     static final String SINGLETON_1="singleton-1", SINGLETON_2="singleton-2";
37 
38 
39     @AfterMethod
tearDown()40     protected void tearDown() throws Exception {
41         Util.close(c,b,a);
42         r1=r2=r3=null;
43     }
44 
45 
testCreationNonSharedTransport()46     public void testCreationNonSharedTransport() throws Exception {
47         a=createChannel(true);
48         a.connect("SharedTransportTest.testCreationNonSharedTransport");
49         View view=a.getView();
50         System.out.println("view = " + view);
51         assert view.size() == 1;
52     }
53 
54 
testCreationOfDuplicateCluster()55     public void testCreationOfDuplicateCluster() throws Exception {
56         a=createSharedChannel(SINGLETON_1);
57         // makeUnique(a, 2);
58         b=createSharedChannel(SINGLETON_1);
59         a.connect("x");
60         try {
61             b.connect("x");
62             assert false : "b should not be able to join cluster 'x' as a has already joined it";
63         }
64         catch(Exception ex) {
65             System.out.println("b was not able to join the same cluster (\"x\") as expected");
66         }
67     }
68 
69 
testView()70     public void testView() throws Exception {
71         a=createSharedChannel(SINGLETON_1);
72         b=createSharedChannel(SINGLETON_2);
73         a.setReceiver(new MyReceiver(SINGLETON_1));
74         b.setReceiver(new MyReceiver(SINGLETON_2));
75 
76         a.connect("x");
77         b.connect("x");
78 
79         View view=a.getView();
80         assert view.size() == 2;
81         view=b.getView();
82         assert view.size() == 2;
83     }
84 
85 
testView2()86     public void testView2() throws Exception {
87         a=createSharedChannel(SINGLETON_1);
88         b=createSharedChannel(SINGLETON_1);
89         a.setReceiver(new MyReceiver("first-channel"));
90         b.setReceiver(new MyReceiver("second-channel"));
91 
92         a.connect("x");
93         b.connect("y");
94 
95         View view=a.getView();
96         assert view.size() == 1;
97         view=b.getView();
98         assert view.size() == 1;
99     }
100 
101     /**
102      * Tests http://jira.jboss.com/jira/browse/JGRP-689: with TCP or UDP.ip_mcast=false, the transport iterates through
103      * the 'members' instance variable to send a group message. However, the 'members' var is the value of the last
104      * view change received. If we receive multiple view changes, this leads to incorrect membership.
105      * @throws Exception
106      */
107 
testView3()108     public void testView3() throws Exception {
109         a=createSharedChannel(SINGLETON_1);
110         b=createSharedChannel(SINGLETON_1);
111         c=createSharedChannel(SINGLETON_2);
112         r1=new MyReceiver("A::" + SINGLETON_1);
113         r2=new MyReceiver("B::" + SINGLETON_1);
114         r3=new MyReceiver("C::" + SINGLETON_2);
115         a.setReceiver(r1);
116         b.setReceiver(r2);
117         c.setReceiver(r3);
118 
119         a.connect("cluster-1");
120         c.connect("cluster-1");
121 
122         View view=a.getView();
123         assert view.size() == 2;
124         view=c.getView();
125         assert view.size() == 2;
126 
127         a.send(new Message(null, null, "msg-1"));
128         c.send(new Message(null, null, "msg-2"));
129 
130         Util.sleep(1000); // async sending - wait a little
131         List<Message> list=r1.getList();
132         assert list.size() == 2;
133         list=r3.getList();
134         assert list.size() == 2;
135 
136         r1.clear();
137         r2.clear();
138         r3.clear();
139         b.connect("cluster-2");
140 
141         a.send(new Message(null, null, "msg-3"));
142         b.send(new Message(null, null, "msg-4"));
143         c.send(new Message(null, null, "msg-5"));
144         Util.sleep(1000); // async sending - wait a little
145 
146         // printLists(r1, r2, r3);
147         list=r1.getList();
148         assert list.size() == 2;
149         list=r2.getList();
150         assert list.size() == 1;
151         list=r3.getList();
152         assert list.size() == 2;
153     }
154 
155 
testView4()156     public void testView4() throws Exception {
157         a=createSharedChannel(SINGLETON_1);
158         r1=new MyReceiver("A::" + SINGLETON_1);
159         a.setReceiver(r1);
160 
161         a.connect("cluster-X");
162         a.send(new Message(null, null, "msg-1"));
163 
164         Util.sleep(1000); // async sending - wait a little
165         List<Message> list=r1.getList();
166         assert list.size() == 1;
167 
168         a.send(new Message(null, null, "msg-2"));
169         a.send(new Message(null, null, "msg-3"));
170         a.send(new Message(null, null, "msg-4"));
171         Util.sleep(1000); // async sending - wait a little
172 
173         list=r1.getList();
174         assert list.size() == 4;
175     }
176 
177 
testSharedTransportAndNonsharedTransport()178     public void testSharedTransportAndNonsharedTransport() throws Exception {
179         a=createSharedChannel(SINGLETON_1);
180         b=createChannel();
181         a.setReceiver(new MyReceiver("first-channel"));
182         b.setReceiver(new MyReceiver("second-channel"));
183 
184         a.connect("x");
185         b.connect("x");
186 
187         View view=a.getView();
188         assert view.size() == 2;
189         view=b.getView();
190         assert view.size() == 2;
191     }
192 
193 
testCreationOfDifferentCluster()194     public void testCreationOfDifferentCluster() throws Exception {
195         a=createSharedChannel(SINGLETON_1);
196         b=createSharedChannel(SINGLETON_2);
197         a.connect("x");
198         b.connect("x");
199         View view=b.getView();
200         System.out.println("b's view is " + view);
201         assert view.size() == 2;
202     }
203 
204 
205 
testReferenceCounting()206     public void testReferenceCounting() throws ChannelException {
207         a=createSharedChannel(SINGLETON_1);
208         r1=new MyReceiver("a");
209         a.setReceiver(r1);
210 
211         b=createSharedChannel(SINGLETON_1);
212         r2=new MyReceiver("b");
213         b.setReceiver(r2);
214 
215         c=createSharedChannel(SINGLETON_1);
216         r3=new MyReceiver("c");
217         c.setReceiver(r3);
218 
219         a.connect("A");
220         b.connect("B");
221         c.connect("C");
222 
223         a.send(null, null, "message from a");
224         b.send(null, null, "message from b");
225         c.send(null, null, "message from c");
226         Util.sleep(500);
227         assert r1.size() == 1;
228         assert r2.size() == 1;
229         assert r3.size() == 1;
230         r1.clear();
231         r2.clear();
232         r3.clear();
233 
234         b.disconnect();
235         System.out.println("\n");
236         a.send(null, null, "message from a");
237         c.send(null, null, "message from c");
238         Util.sleep(500);
239         assert r1.size() == 1 : "size should be 1 but is " + r1.size();
240         assert r3.size() == 1 : "size should be 1 but is " + r3.size();
241         r1.clear();
242         r3.clear();
243 
244         c.disconnect();
245         System.out.println("\n");
246         a.send(null, null, "message from a");
247         Util.sleep(500);
248         assert r1.size() == 1;
249     }
250 
251     /**
252      * Tests that a second channel with the same group name can be
253      * created and connected once the first channel is disconnected.
254      * @throws Exception
255      */
256 
testSimpleReCreation()257     public void testSimpleReCreation() throws Exception {
258         a=createSharedChannel(SINGLETON_1);
259         a.setReceiver(new MyReceiver("A"));
260         a.connect("A");
261         a.disconnect();
262         b=createSharedChannel(SINGLETON_1);
263         b.setReceiver(new MyReceiver("A'"));
264         b.connect("A");
265     }
266 
267     /**
268      * Tests that a second channel with the same group name can be
269      * created and connected once the first channel is disconnected even
270      * if 3rd channel with a different group name is still using the shared
271      * transport.
272      * @throws Exception
273      */
274 
testCreationFollowedByDeletion()275     public void testCreationFollowedByDeletion() throws Exception {
276         a=createSharedChannel(SINGLETON_1);
277         a.setReceiver(new MyReceiver("A"));
278         a.connect("A");
279 
280         b=createSharedChannel(SINGLETON_1);
281         b.setReceiver(new MyReceiver("B"));
282         b.connect("B");
283 
284         b.close();
285         a.close();
286     }
287 
288 
289 
test2ChannelsCreationFollowedByDeletion()290     public void test2ChannelsCreationFollowedByDeletion() throws Exception {
291         a=createSharedChannel(SINGLETON_1);
292         a.setReceiver(new MyReceiver("A"));
293         a.connect("A");
294 
295         b=createSharedChannel(SINGLETON_2);
296         b.setReceiver(new MyReceiver("B"));
297         b.connect("A");
298 
299         c=createSharedChannel(SINGLETON_2);
300         c.setReceiver(new MyReceiver("C"));
301         c.connect("B");
302 
303         c.send(null, null, "hello world from C");
304     }
305 
306 
307 
testReCreationWithSurvivingChannel()308     public void testReCreationWithSurvivingChannel() throws Exception {
309 
310         // Create 2 channels sharing a transport
311         System.out.println("-- creating A");
312         a=createSharedChannel(SINGLETON_1);
313         a.setReceiver(new MyReceiver("A"));
314         a.connect("A");
315 
316         System.out.println("-- creating B");
317         b=createSharedChannel(SINGLETON_1);
318         b.setReceiver(new MyReceiver("B"));
319         b.connect("B");
320 
321         System.out.println("-- disconnecting A");
322         a.disconnect();
323 
324         // a is disconnected so we should be able to create a new channel with group "A"
325         System.out.println("-- creating A'");
326         c=createSharedChannel(SINGLETON_1);
327         c.setReceiver(new MyReceiver("A'"));
328         c.connect("A");
329     }
330 
331     /**
332      * Tests http://jira.jboss.com/jira/browse/JGRP-737
333      * @throws Exception
334      */
testShutdownOfTimer()335     public void testShutdownOfTimer() throws Exception {
336         a=createSharedChannel(SINGLETON_1);
337         b=createSharedChannel(SINGLETON_1);
338         a.connect("x");
339         b.connect("y");
340         TimeScheduler timer1=a.getProtocolStack().getTransport().getTimer();
341         TimeScheduler timer2=b.getProtocolStack().getTransport().getTimer();
342 
343         assert timer1 == timer2;
344 
345         assert !timer1.isShutdown();
346         assert !timer2.isShutdown();
347 
348         Util.sleep(500);
349         b.close();
350 
351         assert !timer2.isShutdown();
352         assert !timer1.isShutdown();
353 
354         a.close(); // now, reference counting reaches 0, so the timer thread pool is stopped
355         assert timer2.isShutdown();
356         assert timer1.isShutdown();
357     }
358 
359 
360     /** Create channels A, B and C. Close A. This will close the timer and transports threads (!), so B will
361      * not be able to send messages anymore, so C will not receive any messages
362      * Tests http://jira.jboss.com/jira/browse/JGRP-737 */
testSendingOfMessagesAfterChannelClose()363     public void testSendingOfMessagesAfterChannelClose() throws ChannelException {
364         MyReceiver rec_a=new MyReceiver("A"), rec_b=new MyReceiver("B"), rec_c=new MyReceiver("C");
365         System.out.println("-- creating A");
366         a=createSharedChannel(SINGLETON_1);
367         a.setReceiver(rec_a);
368         a.connect("A");
369 
370         System.out.println("-- creating B");
371         b=createSharedChannel(SINGLETON_1);
372         b.setReceiver(rec_b);
373         b.connect("B");
374 
375         System.out.println("-- creating C");
376         c=createSharedChannel(SINGLETON_2);
377         c.setReceiver(rec_c);
378         c.connect("B");
379 
380         b.send(null, null, "first");
381         Util.sleep(500); // msg delivery is asynchronous, so give members some time to receive the msg (incl retransmission)
382         assertSize(1, rec_b, rec_c);
383         assertSize(0, rec_a);
384         a.close();
385 
386         b.send(null, null, "second");
387         Util.sleep(500);
388         assertSize(0, rec_a);
389         assertSize(2, rec_b, rec_c);
390     }
391 
392     /**
393      * Use a CountDownLatch to concurrently connect 3 channels; confirms
394      * the channels connect
395      *
396      * @throws ChannelException
397      * @throws InterruptedException
398      */
testConcurrentCreation()399     public void testConcurrentCreation() throws ChannelException, InterruptedException
400     {
401        a=createSharedChannel(SINGLETON_1);
402        r1=new MyReceiver("a");
403        a.setReceiver(r1);
404 
405        b=createSharedChannel(SINGLETON_1);
406        r2=new MyReceiver("b");
407        b.setReceiver(r2);
408 
409        c=createSharedChannel(SINGLETON_1);
410        r3=new MyReceiver("c");
411        c.setReceiver(r3);
412 
413        CountDownLatch startLatch = new CountDownLatch(1);
414        CountDownLatch finishLatch = new CountDownLatch(3);
415 
416        ConnectTask connectA = new ConnectTask(a, "a", startLatch, finishLatch);
417        Thread threadA = new Thread(connectA);
418        threadA.setDaemon(true);
419        threadA.start();
420 
421        ConnectTask connectB = new ConnectTask(b, "b", startLatch, finishLatch);
422        Thread threadB = new Thread(connectB);
423        threadB.setDaemon(true);
424        threadB.start();
425 
426        ConnectTask connectC = new ConnectTask(c, "c", startLatch, finishLatch);
427        Thread threadC = new Thread(connectC);
428        threadC.setDaemon(true);
429        threadC.start();
430 
431        startLatch.countDown();
432 
433        try
434        {
435           boolean finished = finishLatch.await(20, TimeUnit.SECONDS);
436 
437           if (connectA.exception != null)
438           {
439              AssertJUnit.fail("connectA threw exception " + connectA.exception);
440           }
441           if (connectB.exception != null)
442           {
443              AssertJUnit.fail("connectB threw exception " + connectB.exception);
444           }
445           if (connectC.exception != null)
446           {
447              AssertJUnit.fail("connectC threw exception " + connectC.exception);
448           }
449 
450           if (!finished) {
451              if (threadA.isAlive())
452                 AssertJUnit.fail("threadA did not finish");
453              if (threadB.isAlive())
454                 AssertJUnit.fail("threadB did not finish");
455              if (threadC.isAlive())
456                 AssertJUnit.fail("threadC did not finish");
457           }
458        }
459        finally
460        {
461           if (threadA.isAlive())
462              threadA.interrupt();
463           if (threadB.isAlive())
464              threadB.interrupt();
465           if (threadC.isAlive())
466              threadC.interrupt();
467        }
468     }
469 
assertSize(int expected, MyReceiver... receivers)470     private static void assertSize(int expected, MyReceiver... receivers) {
471         for(MyReceiver recv: receivers) {
472             assertEquals(expected, recv.size());
473         }
474     }
475 
createSharedChannel(String singleton_name)476     private JChannel createSharedChannel(String singleton_name) throws ChannelException {
477         ProtocolStackConfigurator config=ConfiguratorFactory.getStackConfigurator(channel_conf);
478         List<ProtocolConfiguration> protocols=config.getProtocolStack();
479         ProtocolConfiguration transport=protocols.get(0);
480         transport.getProperties().put(Global.SINGLETON_NAME, singleton_name);
481         return new JChannel(config);
482     }
483 
484 
makeUnique(Channel channel, int num)485     protected static void makeUnique(Channel channel, int num) throws Exception {
486         ProtocolStack stack=channel.getProtocolStack();
487         TP transport=stack.getTransport();
488         InetAddress bind_addr=transport.getBindAddressAsInetAddress();
489 
490         if(transport instanceof UDP) {
491             String mcast_addr=ResourceManager.getNextMulticastAddress();
492             short mcast_port=ResourceManager.getNextMulticastPort(bind_addr);
493             ((UDP)transport).setMulticastAddress(InetAddress.getByName(mcast_addr));
494             ((UDP)transport).setMulticastPort(mcast_port);
495         }
496         else if(transport instanceof BasicTCP) {
497             List<Short> ports=ResourceManager.getNextTcpPorts(bind_addr, num);
498             transport.setBindPort(ports.get(0));
499             transport.setPortRange(num);
500 
501             Protocol ping=stack.findProtocol(TCPPING.class);
502             if(ping == null)
503                 throw new IllegalStateException("TCP stack must consist of TCP:TCPPING - other config are not supported");
504 
505             List<String> initial_hosts=new LinkedList<String>();
506             for(short port: ports) {
507                 initial_hosts.add(bind_addr + "[" + port + "]");
508             }
509             String tmp=Util.printListWithDelimiter(initial_hosts, ",");
510             List<IpAddress> init_hosts = Util.parseCommaDelimitedHosts(tmp, 1) ;
511             ((TCPPING)ping).setInitialHosts(init_hosts) ;
512         }
513         else {
514             throw new IllegalStateException("Only UDP and TCP are supported as transport protocols");
515         }
516     }
517 
518 
519 
520     private static class MyReceiver extends ReceiverAdapter {
521         final List<Message> list=new LinkedList<Message>();
522         final String name;
523 
MyReceiver(String name)524         private MyReceiver(String name) {
525             this.name=name;
526         }
527 
getList()528         public List<Message> getList() {
529             return list;
530         }
531 
size()532         public int size() {
533             return list.size();
534         }
535 
clear()536         public void clear() {
537             list.clear();
538         }
539 
receive(Message msg)540         public void receive(Message msg) {
541             System.out.println("[" + name + "]: received message from " + msg.getSrc() + ": " + msg.getObject());
542             list.add(msg);
543         }
544 
viewAccepted(View new_view)545         public void viewAccepted(View new_view) {
546             StringBuilder sb=new StringBuilder();
547             sb.append("[" + name + "]: view = " + new_view);
548             System.out.println(sb);
549         }
550 
toString()551         public String toString() {
552             return super.toString() + " (size=" + list.size() + ")";
553         }
554     }
555 
556     private static class ConnectTask implements Runnable
557     {
558        private final Channel channel;
559        private final String clusterName;
560        private final CountDownLatch startLatch;
561        private final CountDownLatch finishLatch;
562        private Exception exception;
563 
ConnectTask(Channel channel, String clusterName, CountDownLatch startLatch, CountDownLatch finishLatch)564        ConnectTask(Channel channel, String clusterName, CountDownLatch startLatch, CountDownLatch finishLatch)
565        {
566           this.channel = channel;
567           this.clusterName = clusterName;
568           this.startLatch = startLatch;
569           this.finishLatch = finishLatch;
570        }
571 
run()572        public void run()
573        {
574           try
575           {
576              startLatch.await();
577              channel.connect(clusterName);
578           }
579           catch (Exception e)
580           {
581              e.printStackTrace(System.out);
582              this.exception = e;
583           }
584           finally
585           {
586              finishLatch.countDown();
587           }
588 
589        }
590 
591     }
592 
593 }
594