1 package org.jgroups.tests; 2 3 import org.jgroups.*; 4 import org.jgroups.conf.ConfiguratorFactory; 5 import org.jgroups.conf.ProtocolConfiguration; 6 import org.jgroups.conf.ProtocolStackConfigurator; 7 import org.jgroups.protocols.BasicTCP; 8 import org.jgroups.protocols.TCPPING; 9 import org.jgroups.protocols.TP; 10 import org.jgroups.protocols.UDP; 11 import org.jgroups.stack.IpAddress; 12 import org.jgroups.stack.Protocol; 13 import org.jgroups.stack.ProtocolStack; 14 import org.jgroups.util.ResourceManager; 15 import org.jgroups.util.TimeScheduler; 16 import org.jgroups.util.Util; 17 import org.testng.AssertJUnit; 18 import org.testng.annotations.AfterMethod; 19 import org.testng.annotations.Test; 20 21 import java.net.InetAddress; 22 import java.util.LinkedList; 23 import java.util.List; 24 import java.util.concurrent.CountDownLatch; 25 import java.util.concurrent.TimeUnit; 26 27 28 /** 29 * Tests which test the shared transport 30 * @author Bela Ban 31 */ 32 @Test(groups=Global.STACK_DEPENDENT,sequential=true) 33 public class SharedTransportTest extends ChannelTestBase { 34 private JChannel a, b, c; 35 private MyReceiver r1, r2, r3; 36 static final String SINGLETON_1="singleton-1", SINGLETON_2="singleton-2"; 37 38 39 @AfterMethod tearDown()40 protected void tearDown() throws Exception { 41 Util.close(c,b,a); 42 r1=r2=r3=null; 43 } 44 45 testCreationNonSharedTransport()46 public void testCreationNonSharedTransport() throws Exception { 47 a=createChannel(true); 48 a.connect("SharedTransportTest.testCreationNonSharedTransport"); 49 View view=a.getView(); 50 System.out.println("view = " + view); 51 assert view.size() == 1; 52 } 53 54 testCreationOfDuplicateCluster()55 public void testCreationOfDuplicateCluster() throws Exception { 56 a=createSharedChannel(SINGLETON_1); 57 // makeUnique(a, 2); 58 b=createSharedChannel(SINGLETON_1); 59 a.connect("x"); 60 try { 61 b.connect("x"); 62 assert false : "b should not be able to join cluster 'x' as a has already joined it"; 63 } 64 catch(Exception ex) { 65 System.out.println("b was not able to join the same cluster (\"x\") as expected"); 66 } 67 } 68 69 testView()70 public void testView() throws Exception { 71 a=createSharedChannel(SINGLETON_1); 72 b=createSharedChannel(SINGLETON_2); 73 a.setReceiver(new MyReceiver(SINGLETON_1)); 74 b.setReceiver(new MyReceiver(SINGLETON_2)); 75 76 a.connect("x"); 77 b.connect("x"); 78 79 View view=a.getView(); 80 assert view.size() == 2; 81 view=b.getView(); 82 assert view.size() == 2; 83 } 84 85 testView2()86 public void testView2() throws Exception { 87 a=createSharedChannel(SINGLETON_1); 88 b=createSharedChannel(SINGLETON_1); 89 a.setReceiver(new MyReceiver("first-channel")); 90 b.setReceiver(new MyReceiver("second-channel")); 91 92 a.connect("x"); 93 b.connect("y"); 94 95 View view=a.getView(); 96 assert view.size() == 1; 97 view=b.getView(); 98 assert view.size() == 1; 99 } 100 101 /** 102 * Tests http://jira.jboss.com/jira/browse/JGRP-689: with TCP or UDP.ip_mcast=false, the transport iterates through 103 * the 'members' instance variable to send a group message. However, the 'members' var is the value of the last 104 * view change received. If we receive multiple view changes, this leads to incorrect membership. 105 * @throws Exception 106 */ 107 testView3()108 public void testView3() throws Exception { 109 a=createSharedChannel(SINGLETON_1); 110 b=createSharedChannel(SINGLETON_1); 111 c=createSharedChannel(SINGLETON_2); 112 r1=new MyReceiver("A::" + SINGLETON_1); 113 r2=new MyReceiver("B::" + SINGLETON_1); 114 r3=new MyReceiver("C::" + SINGLETON_2); 115 a.setReceiver(r1); 116 b.setReceiver(r2); 117 c.setReceiver(r3); 118 119 a.connect("cluster-1"); 120 c.connect("cluster-1"); 121 122 View view=a.getView(); 123 assert view.size() == 2; 124 view=c.getView(); 125 assert view.size() == 2; 126 127 a.send(new Message(null, null, "msg-1")); 128 c.send(new Message(null, null, "msg-2")); 129 130 Util.sleep(1000); // async sending - wait a little 131 List<Message> list=r1.getList(); 132 assert list.size() == 2; 133 list=r3.getList(); 134 assert list.size() == 2; 135 136 r1.clear(); 137 r2.clear(); 138 r3.clear(); 139 b.connect("cluster-2"); 140 141 a.send(new Message(null, null, "msg-3")); 142 b.send(new Message(null, null, "msg-4")); 143 c.send(new Message(null, null, "msg-5")); 144 Util.sleep(1000); // async sending - wait a little 145 146 // printLists(r1, r2, r3); 147 list=r1.getList(); 148 assert list.size() == 2; 149 list=r2.getList(); 150 assert list.size() == 1; 151 list=r3.getList(); 152 assert list.size() == 2; 153 } 154 155 testView4()156 public void testView4() throws Exception { 157 a=createSharedChannel(SINGLETON_1); 158 r1=new MyReceiver("A::" + SINGLETON_1); 159 a.setReceiver(r1); 160 161 a.connect("cluster-X"); 162 a.send(new Message(null, null, "msg-1")); 163 164 Util.sleep(1000); // async sending - wait a little 165 List<Message> list=r1.getList(); 166 assert list.size() == 1; 167 168 a.send(new Message(null, null, "msg-2")); 169 a.send(new Message(null, null, "msg-3")); 170 a.send(new Message(null, null, "msg-4")); 171 Util.sleep(1000); // async sending - wait a little 172 173 list=r1.getList(); 174 assert list.size() == 4; 175 } 176 177 testSharedTransportAndNonsharedTransport()178 public void testSharedTransportAndNonsharedTransport() throws Exception { 179 a=createSharedChannel(SINGLETON_1); 180 b=createChannel(); 181 a.setReceiver(new MyReceiver("first-channel")); 182 b.setReceiver(new MyReceiver("second-channel")); 183 184 a.connect("x"); 185 b.connect("x"); 186 187 View view=a.getView(); 188 assert view.size() == 2; 189 view=b.getView(); 190 assert view.size() == 2; 191 } 192 193 testCreationOfDifferentCluster()194 public void testCreationOfDifferentCluster() throws Exception { 195 a=createSharedChannel(SINGLETON_1); 196 b=createSharedChannel(SINGLETON_2); 197 a.connect("x"); 198 b.connect("x"); 199 View view=b.getView(); 200 System.out.println("b's view is " + view); 201 assert view.size() == 2; 202 } 203 204 205 testReferenceCounting()206 public void testReferenceCounting() throws ChannelException { 207 a=createSharedChannel(SINGLETON_1); 208 r1=new MyReceiver("a"); 209 a.setReceiver(r1); 210 211 b=createSharedChannel(SINGLETON_1); 212 r2=new MyReceiver("b"); 213 b.setReceiver(r2); 214 215 c=createSharedChannel(SINGLETON_1); 216 r3=new MyReceiver("c"); 217 c.setReceiver(r3); 218 219 a.connect("A"); 220 b.connect("B"); 221 c.connect("C"); 222 223 a.send(null, null, "message from a"); 224 b.send(null, null, "message from b"); 225 c.send(null, null, "message from c"); 226 Util.sleep(500); 227 assert r1.size() == 1; 228 assert r2.size() == 1; 229 assert r3.size() == 1; 230 r1.clear(); 231 r2.clear(); 232 r3.clear(); 233 234 b.disconnect(); 235 System.out.println("\n"); 236 a.send(null, null, "message from a"); 237 c.send(null, null, "message from c"); 238 Util.sleep(500); 239 assert r1.size() == 1 : "size should be 1 but is " + r1.size(); 240 assert r3.size() == 1 : "size should be 1 but is " + r3.size(); 241 r1.clear(); 242 r3.clear(); 243 244 c.disconnect(); 245 System.out.println("\n"); 246 a.send(null, null, "message from a"); 247 Util.sleep(500); 248 assert r1.size() == 1; 249 } 250 251 /** 252 * Tests that a second channel with the same group name can be 253 * created and connected once the first channel is disconnected. 254 * @throws Exception 255 */ 256 testSimpleReCreation()257 public void testSimpleReCreation() throws Exception { 258 a=createSharedChannel(SINGLETON_1); 259 a.setReceiver(new MyReceiver("A")); 260 a.connect("A"); 261 a.disconnect(); 262 b=createSharedChannel(SINGLETON_1); 263 b.setReceiver(new MyReceiver("A'")); 264 b.connect("A"); 265 } 266 267 /** 268 * Tests that a second channel with the same group name can be 269 * created and connected once the first channel is disconnected even 270 * if 3rd channel with a different group name is still using the shared 271 * transport. 272 * @throws Exception 273 */ 274 testCreationFollowedByDeletion()275 public void testCreationFollowedByDeletion() throws Exception { 276 a=createSharedChannel(SINGLETON_1); 277 a.setReceiver(new MyReceiver("A")); 278 a.connect("A"); 279 280 b=createSharedChannel(SINGLETON_1); 281 b.setReceiver(new MyReceiver("B")); 282 b.connect("B"); 283 284 b.close(); 285 a.close(); 286 } 287 288 289 test2ChannelsCreationFollowedByDeletion()290 public void test2ChannelsCreationFollowedByDeletion() throws Exception { 291 a=createSharedChannel(SINGLETON_1); 292 a.setReceiver(new MyReceiver("A")); 293 a.connect("A"); 294 295 b=createSharedChannel(SINGLETON_2); 296 b.setReceiver(new MyReceiver("B")); 297 b.connect("A"); 298 299 c=createSharedChannel(SINGLETON_2); 300 c.setReceiver(new MyReceiver("C")); 301 c.connect("B"); 302 303 c.send(null, null, "hello world from C"); 304 } 305 306 307 testReCreationWithSurvivingChannel()308 public void testReCreationWithSurvivingChannel() throws Exception { 309 310 // Create 2 channels sharing a transport 311 System.out.println("-- creating A"); 312 a=createSharedChannel(SINGLETON_1); 313 a.setReceiver(new MyReceiver("A")); 314 a.connect("A"); 315 316 System.out.println("-- creating B"); 317 b=createSharedChannel(SINGLETON_1); 318 b.setReceiver(new MyReceiver("B")); 319 b.connect("B"); 320 321 System.out.println("-- disconnecting A"); 322 a.disconnect(); 323 324 // a is disconnected so we should be able to create a new channel with group "A" 325 System.out.println("-- creating A'"); 326 c=createSharedChannel(SINGLETON_1); 327 c.setReceiver(new MyReceiver("A'")); 328 c.connect("A"); 329 } 330 331 /** 332 * Tests http://jira.jboss.com/jira/browse/JGRP-737 333 * @throws Exception 334 */ testShutdownOfTimer()335 public void testShutdownOfTimer() throws Exception { 336 a=createSharedChannel(SINGLETON_1); 337 b=createSharedChannel(SINGLETON_1); 338 a.connect("x"); 339 b.connect("y"); 340 TimeScheduler timer1=a.getProtocolStack().getTransport().getTimer(); 341 TimeScheduler timer2=b.getProtocolStack().getTransport().getTimer(); 342 343 assert timer1 == timer2; 344 345 assert !timer1.isShutdown(); 346 assert !timer2.isShutdown(); 347 348 Util.sleep(500); 349 b.close(); 350 351 assert !timer2.isShutdown(); 352 assert !timer1.isShutdown(); 353 354 a.close(); // now, reference counting reaches 0, so the timer thread pool is stopped 355 assert timer2.isShutdown(); 356 assert timer1.isShutdown(); 357 } 358 359 360 /** Create channels A, B and C. Close A. This will close the timer and transports threads (!), so B will 361 * not be able to send messages anymore, so C will not receive any messages 362 * Tests http://jira.jboss.com/jira/browse/JGRP-737 */ testSendingOfMessagesAfterChannelClose()363 public void testSendingOfMessagesAfterChannelClose() throws ChannelException { 364 MyReceiver rec_a=new MyReceiver("A"), rec_b=new MyReceiver("B"), rec_c=new MyReceiver("C"); 365 System.out.println("-- creating A"); 366 a=createSharedChannel(SINGLETON_1); 367 a.setReceiver(rec_a); 368 a.connect("A"); 369 370 System.out.println("-- creating B"); 371 b=createSharedChannel(SINGLETON_1); 372 b.setReceiver(rec_b); 373 b.connect("B"); 374 375 System.out.println("-- creating C"); 376 c=createSharedChannel(SINGLETON_2); 377 c.setReceiver(rec_c); 378 c.connect("B"); 379 380 b.send(null, null, "first"); 381 Util.sleep(500); // msg delivery is asynchronous, so give members some time to receive the msg (incl retransmission) 382 assertSize(1, rec_b, rec_c); 383 assertSize(0, rec_a); 384 a.close(); 385 386 b.send(null, null, "second"); 387 Util.sleep(500); 388 assertSize(0, rec_a); 389 assertSize(2, rec_b, rec_c); 390 } 391 392 /** 393 * Use a CountDownLatch to concurrently connect 3 channels; confirms 394 * the channels connect 395 * 396 * @throws ChannelException 397 * @throws InterruptedException 398 */ testConcurrentCreation()399 public void testConcurrentCreation() throws ChannelException, InterruptedException 400 { 401 a=createSharedChannel(SINGLETON_1); 402 r1=new MyReceiver("a"); 403 a.setReceiver(r1); 404 405 b=createSharedChannel(SINGLETON_1); 406 r2=new MyReceiver("b"); 407 b.setReceiver(r2); 408 409 c=createSharedChannel(SINGLETON_1); 410 r3=new MyReceiver("c"); 411 c.setReceiver(r3); 412 413 CountDownLatch startLatch = new CountDownLatch(1); 414 CountDownLatch finishLatch = new CountDownLatch(3); 415 416 ConnectTask connectA = new ConnectTask(a, "a", startLatch, finishLatch); 417 Thread threadA = new Thread(connectA); 418 threadA.setDaemon(true); 419 threadA.start(); 420 421 ConnectTask connectB = new ConnectTask(b, "b", startLatch, finishLatch); 422 Thread threadB = new Thread(connectB); 423 threadB.setDaemon(true); 424 threadB.start(); 425 426 ConnectTask connectC = new ConnectTask(c, "c", startLatch, finishLatch); 427 Thread threadC = new Thread(connectC); 428 threadC.setDaemon(true); 429 threadC.start(); 430 431 startLatch.countDown(); 432 433 try 434 { 435 boolean finished = finishLatch.await(20, TimeUnit.SECONDS); 436 437 if (connectA.exception != null) 438 { 439 AssertJUnit.fail("connectA threw exception " + connectA.exception); 440 } 441 if (connectB.exception != null) 442 { 443 AssertJUnit.fail("connectB threw exception " + connectB.exception); 444 } 445 if (connectC.exception != null) 446 { 447 AssertJUnit.fail("connectC threw exception " + connectC.exception); 448 } 449 450 if (!finished) { 451 if (threadA.isAlive()) 452 AssertJUnit.fail("threadA did not finish"); 453 if (threadB.isAlive()) 454 AssertJUnit.fail("threadB did not finish"); 455 if (threadC.isAlive()) 456 AssertJUnit.fail("threadC did not finish"); 457 } 458 } 459 finally 460 { 461 if (threadA.isAlive()) 462 threadA.interrupt(); 463 if (threadB.isAlive()) 464 threadB.interrupt(); 465 if (threadC.isAlive()) 466 threadC.interrupt(); 467 } 468 } 469 assertSize(int expected, MyReceiver... receivers)470 private static void assertSize(int expected, MyReceiver... receivers) { 471 for(MyReceiver recv: receivers) { 472 assertEquals(expected, recv.size()); 473 } 474 } 475 createSharedChannel(String singleton_name)476 private JChannel createSharedChannel(String singleton_name) throws ChannelException { 477 ProtocolStackConfigurator config=ConfiguratorFactory.getStackConfigurator(channel_conf); 478 List<ProtocolConfiguration> protocols=config.getProtocolStack(); 479 ProtocolConfiguration transport=protocols.get(0); 480 transport.getProperties().put(Global.SINGLETON_NAME, singleton_name); 481 return new JChannel(config); 482 } 483 484 makeUnique(Channel channel, int num)485 protected static void makeUnique(Channel channel, int num) throws Exception { 486 ProtocolStack stack=channel.getProtocolStack(); 487 TP transport=stack.getTransport(); 488 InetAddress bind_addr=transport.getBindAddressAsInetAddress(); 489 490 if(transport instanceof UDP) { 491 String mcast_addr=ResourceManager.getNextMulticastAddress(); 492 short mcast_port=ResourceManager.getNextMulticastPort(bind_addr); 493 ((UDP)transport).setMulticastAddress(InetAddress.getByName(mcast_addr)); 494 ((UDP)transport).setMulticastPort(mcast_port); 495 } 496 else if(transport instanceof BasicTCP) { 497 List<Short> ports=ResourceManager.getNextTcpPorts(bind_addr, num); 498 transport.setBindPort(ports.get(0)); 499 transport.setPortRange(num); 500 501 Protocol ping=stack.findProtocol(TCPPING.class); 502 if(ping == null) 503 throw new IllegalStateException("TCP stack must consist of TCP:TCPPING - other config are not supported"); 504 505 List<String> initial_hosts=new LinkedList<String>(); 506 for(short port: ports) { 507 initial_hosts.add(bind_addr + "[" + port + "]"); 508 } 509 String tmp=Util.printListWithDelimiter(initial_hosts, ","); 510 List<IpAddress> init_hosts = Util.parseCommaDelimitedHosts(tmp, 1) ; 511 ((TCPPING)ping).setInitialHosts(init_hosts) ; 512 } 513 else { 514 throw new IllegalStateException("Only UDP and TCP are supported as transport protocols"); 515 } 516 } 517 518 519 520 private static class MyReceiver extends ReceiverAdapter { 521 final List<Message> list=new LinkedList<Message>(); 522 final String name; 523 MyReceiver(String name)524 private MyReceiver(String name) { 525 this.name=name; 526 } 527 getList()528 public List<Message> getList() { 529 return list; 530 } 531 size()532 public int size() { 533 return list.size(); 534 } 535 clear()536 public void clear() { 537 list.clear(); 538 } 539 receive(Message msg)540 public void receive(Message msg) { 541 System.out.println("[" + name + "]: received message from " + msg.getSrc() + ": " + msg.getObject()); 542 list.add(msg); 543 } 544 viewAccepted(View new_view)545 public void viewAccepted(View new_view) { 546 StringBuilder sb=new StringBuilder(); 547 sb.append("[" + name + "]: view = " + new_view); 548 System.out.println(sb); 549 } 550 toString()551 public String toString() { 552 return super.toString() + " (size=" + list.size() + ")"; 553 } 554 } 555 556 private static class ConnectTask implements Runnable 557 { 558 private final Channel channel; 559 private final String clusterName; 560 private final CountDownLatch startLatch; 561 private final CountDownLatch finishLatch; 562 private Exception exception; 563 ConnectTask(Channel channel, String clusterName, CountDownLatch startLatch, CountDownLatch finishLatch)564 ConnectTask(Channel channel, String clusterName, CountDownLatch startLatch, CountDownLatch finishLatch) 565 { 566 this.channel = channel; 567 this.clusterName = clusterName; 568 this.startLatch = startLatch; 569 this.finishLatch = finishLatch; 570 } 571 run()572 public void run() 573 { 574 try 575 { 576 startLatch.await(); 577 channel.connect(clusterName); 578 } 579 catch (Exception e) 580 { 581 e.printStackTrace(System.out); 582 this.exception = e; 583 } 584 finally 585 { 586 finishLatch.countDown(); 587 } 588 589 } 590 591 } 592 593 } 594