1 package org.jgroups.tests; 2 3 import org.jgroups.*; 4 import org.jgroups.protocols.FD; 5 import org.jgroups.protocols.FD_ALL; 6 import org.jgroups.protocols.pbcast.FLUSH; 7 import org.jgroups.util.Util; 8 import org.testng.annotations.Test; 9 10 import java.io.IOException; 11 import java.io.InputStream; 12 import java.io.OutputStream; 13 import java.util.ArrayList; 14 import java.util.Collections; 15 import java.util.LinkedList; 16 import java.util.List; 17 import java.util.concurrent.Semaphore; 18 import java.util.concurrent.TimeUnit; 19 20 /** 21 * Tests the FLUSH protocol. Adds a FLUSH layer on top of the stack unless already present. Should 22 * work with any stack. 23 * 24 * @author Bela Ban 25 */ 26 @Test(groups = Global.FLUSH, sequential = false) 27 public class FlushTest extends ChannelTestBase { 28 29 @Test testSingleChannel()30 public void testSingleChannel() throws Exception { 31 Semaphore s = new Semaphore(1); 32 FlushTestReceiver receivers[] = new FlushTestReceiver[] { new FlushTestReceiver("c1", s, 0, 33 FlushTestReceiver.CONNECT_ONLY) }; 34 receivers[0].start(); 35 s.release(1); 36 37 // Make sure everyone is in sync 38 Channel[] tmp = new Channel[receivers.length]; 39 for (int i = 0; i < receivers.length; i++) 40 tmp[i] = receivers[i].getChannel(); 41 Util.blockUntilViewsReceived(60000, 1000, tmp); 42 43 // Reacquire the semaphore tickets; when we have them all 44 // we know the threads are done 45 s.tryAcquire(1, 60, TimeUnit.SECONDS); 46 receivers[0].cleanup(); 47 Util.sleep(1000); 48 49 checkEventStateTransferSequence(receivers[0]); 50 } 51 52 /** 53 * Tests issue #1 in http://jira.jboss.com/jira/browse/JGRP-335 54 * 55 * @throws Exception 56 */ 57 @Test testJoinFollowedByUnicast()58 public void testJoinFollowedByUnicast() throws Exception { 59 JChannel c1 = null; 60 JChannel c2 = null; 61 try { 62 c1 = createChannel(true, 2); 63 c1.setReceiver(new SimpleReplier(c1, true)); 64 c1.connect("testJoinFollowedByUnicast"); 65 66 Address target = c1.getAddress(); 67 Message unicast_msg = new Message(target); 68 69 c2 = createChannel(c1); 70 c2.setReceiver(new SimpleReplier(c2, false)); 71 c2.connect("testJoinFollowedByUnicast"); 72 73 // now send unicast, this might block as described in the case 74 c2.send(unicast_msg); 75 // if we don't get here this means we'd time out 76 } finally { 77 Util.close(c2, c1); 78 } 79 } 80 81 /** 82 * Tests issue #2 in http://jira.jboss.com/jira/browse/JGRP-335 83 * 84 * @throws Exception 85 */ 86 @Test testStateTransferFollowedByUnicast()87 public void testStateTransferFollowedByUnicast() throws Exception { 88 JChannel c1 = null; 89 JChannel c2 = null; 90 try { 91 c1 = createChannel(true, 2); 92 c1.setReceiver(new SimpleReplier(c1, true)); 93 c1.connect("testStateTransferFollowedByUnicast"); 94 95 Address target = c1.getAddress(); 96 Message unicast_msg = new Message(target); 97 98 c2 = createChannel(c1); 99 c2.setReceiver(new SimpleReplier(c2, false)); 100 c2.connect("testStateTransferFollowedByUnicast"); 101 102 log.info("\n** Getting the state **"); 103 c2.getState(null, 10000); 104 // now send unicast, this might block as described in the case 105 c2.send(unicast_msg); 106 } finally { 107 Util.close(c2, c1); 108 } 109 } 110 111 @Test testSequentialFlushInvocation()112 public void testSequentialFlushInvocation() throws Exception { 113 Channel channel = null, channel2 = null, channel3 = null; 114 try { 115 channel = createChannel(true, 3); 116 channel.setName("A"); 117 118 channel2 = createChannel((JChannel) channel); 119 channel2.setName("B"); 120 121 channel3 = createChannel((JChannel) channel); 122 channel3.setName("C"); 123 124 channel.connect("x"); 125 channel2.connect("x"); 126 channel3.connect("x"); 127 128 //we need to sleep here since coordinator (channel) 129 //might be unblocked before channel3.connect() returns 130 Util.sleep(500); 131 132 for (int i = 0; i < 100; i++) { 133 System.out.print("flush #" + i + ": "); 134 long start = System.currentTimeMillis(); 135 boolean status = channel.startFlush(false); 136 channel.stopFlush(); 137 long diff = System.currentTimeMillis() - start; 138 System.out.println(status ? " OK (in " + diff + " ms)" : " FAIL"); 139 assert status; 140 } 141 } finally { 142 Util.close(channel, channel2, channel3); 143 } 144 } 145 146 @Test testFlushWithCrashedFlushCoordinator()147 public void testFlushWithCrashedFlushCoordinator() throws Exception { 148 JChannel c1 = null; 149 JChannel c2 = null; 150 JChannel c3 = null; 151 152 try { 153 c1 = createChannel(true, 3, "C1"); changeProps(c1); 154 c1.connect("testFlushWithCrashedFlushCoordinator"); 155 156 c2 = createChannel(c1, "C2"); changeProps(c2); 157 c2.connect("testFlushWithCrashedFlushCoordinator"); 158 159 c3 = createChannel(c1, "C3"); changeProps(c3); 160 c3.connect("testFlushWithCrashedFlushCoordinator"); 161 162 163 164 165 System.out.println("shutting down flush coordinator C2"); 166 // send out START_FLUSH and then return 167 c2.down(new Event(Event.SUSPEND_BUT_FAIL)); 168 169 // now shut down C2. This means, after failure detection kicks in and the new coordinator takes over 170 // (either C1 or C3), that the current flush started by C2 will be cancelled and a new flush (by C1 or C3) 171 // will be started 172 Util.shutdown(c2); 173 174 c1.getProtocolStack().findProtocol(FLUSH.class).setLevel("trace"); 175 c3.getProtocolStack().findProtocol(FLUSH.class).setLevel("trace"); 176 177 Util.blockUntilViewsReceived(10000, 500, c1, c3); 178 179 // cluster should not hang and two remaining members should have a correct view 180 assertTrue("correct view size", c1.getView().size() == 2); 181 assertTrue("correct view size", c3.getView().size() == 2); 182 183 c1.getProtocolStack().findProtocol(FLUSH.class).setLevel("warn"); 184 c3.getProtocolStack().findProtocol(FLUSH.class).setLevel("warn"); 185 186 } finally { 187 Util.close(c3, c2, c1); 188 } 189 } 190 191 @Test testFlushWithCrashedParticipant()192 public void testFlushWithCrashedParticipant() throws Exception { 193 JChannel c1 = null; 194 JChannel c2 = null; 195 JChannel c3 = null; 196 197 try { 198 c1 = createChannel(true, 3, "C1"); changeProps(c1); 199 c1.connect("testFlushWithCrashedParticipant"); 200 201 c2 = createChannel(c1, "C2"); changeProps(c2); 202 c2.connect("testFlushWithCrashedParticipant"); 203 204 c3 = createChannel(c1, "C3"); changeProps(c3); 205 c3.connect("testFlushWithCrashedParticipant"); 206 207 System.out.println("shutting down C3"); 208 Util.shutdown(c3); // kill a flush participant 209 210 System.out.println("C2: starting flush"); 211 boolean rc=Util.startFlush(c2); 212 System.out.println("flush " + (rc? " was successful" : "failed")); 213 assert rc; 214 215 System.out.println("stopping flush"); 216 c2.stopFlush(); 217 218 System.out.println("waiting for view to contain C1 and C2"); 219 Util.blockUntilViewsReceived(10000, 500, c1, c2); 220 221 // cluster should not hang and two remaining members should have a correct view 222 System.out.println("C1: view=" + c1.getView() + "\nC2: view=" + c2.getView()); 223 assertTrue("correct view size", c1.getView().size() == 2); 224 assertTrue("correct view size", c2.getView().size() == 2); 225 } finally { 226 Util.close(c3, c2, c1); 227 } 228 } 229 230 @Test testFlushWithCrashedParticipants()231 public void testFlushWithCrashedParticipants() throws Exception { 232 JChannel c1 = null; 233 JChannel c2 = null; 234 JChannel c3 = null; 235 236 try { 237 c1 = createChannel(true, 3, "C1"); changeProps(c1); 238 c1.connect("testFlushWithCrashedFlushCoordinator"); 239 240 c2 = createChannel(c1, "C2"); changeProps(c2); 241 c2.connect("testFlushWithCrashedFlushCoordinator"); 242 243 c3 = createChannel(c1, "C3"); changeProps(c3); 244 c3.connect("testFlushWithCrashedFlushCoordinator"); 245 246 // and then kill members other than flush coordinator 247 Util.shutdown(c3); 248 Util.shutdown(c1); 249 250 // start flush 251 Util.startFlush(c2); 252 253 c2.stopFlush(); 254 Util.blockUntilViewsReceived(10000, 500, c2); 255 256 // cluster should not hang and one remaining member should have a correct view 257 assertTrue("correct view size", c2.getView().size() == 1); 258 } finally { 259 Util.close(c3, c2, c1); 260 } 261 } 262 263 /** 264 * Tests http://jira.jboss.com/jira/browse/JGRP-661 265 * 266 * @throws Exception 267 */ 268 @Test testPartialFlush()269 public void testPartialFlush() throws Exception { 270 JChannel c1 = null; 271 JChannel c2 = null; 272 try { 273 c1 = createChannel(true, 2); 274 c1.setReceiver(new SimpleReplier(c1, true)); 275 c1.connect("testPartialFlush"); 276 277 c2 = createChannel(c1); 278 c2.setReceiver(new SimpleReplier(c2, false)); 279 c2.connect("testPartialFlush"); 280 281 List<Address> members = new ArrayList<Address>(); 282 members.add(c2.getLocalAddress()); 283 boolean flushedOk = Util.startFlush(c2, members); 284 285 assertTrue("Partial flush worked", flushedOk); 286 287 c2.stopFlush(members); 288 } finally { 289 Util.close(c2, c1); 290 } 291 } 292 293 /** Tests the emition of block/unblock/get|set state events */ 294 @Test testBlockingNoStateTransfer()295 public void testBlockingNoStateTransfer() throws Exception { 296 String[] names = { "A", "B", "C", "D" }; 297 _testChannels(names, FlushTestReceiver.CONNECT_ONLY); 298 } 299 300 /** Tests the emition of block/unblock/get|set state events */ 301 @Test testBlockingWithStateTransfer()302 public void testBlockingWithStateTransfer() throws Exception { 303 String[] names = { "A", "B", "C", "D" }; 304 _testChannels(names, FlushTestReceiver.CONNECT_AND_SEPARATE_GET_STATE); 305 } 306 307 /** Tests the emition of block/unblock/get|set state events */ 308 @Test testBlockingWithConnectAndStateTransfer()309 public void testBlockingWithConnectAndStateTransfer() throws Exception { 310 String[] names = { "A", "B", "C", "D" }; 311 _testChannels(names, FlushTestReceiver.CONNECT_AND_GET_STATE); 312 } 313 _testChannels(String names[], int connectType)314 private void _testChannels(String names[], int connectType) throws Exception { 315 int count = names.length; 316 317 List<FlushTestReceiver> channels = new ArrayList<FlushTestReceiver>(count); 318 try { 319 // Create a semaphore and take all its permits 320 Semaphore semaphore = new Semaphore(count); 321 semaphore.acquire(count); 322 323 // Create channels and their threads that will block on the 324 // semaphore 325 boolean first = true; 326 for (String channelName : names) { 327 FlushTestReceiver channel = null; 328 if (first) 329 channel = new FlushTestReceiver(channelName, semaphore, 0, connectType); 330 else { 331 channel = new FlushTestReceiver((JChannel) channels.get(0).getChannel(), 332 channelName, semaphore, 0, connectType); 333 } 334 channels.add(channel); 335 336 // Release one ticket at a time to allow the thread to start working 337 channel.start(); 338 semaphore.release(1); 339 if (first) 340 Util.sleep(3000); // minimize changes of a merge happening 341 first = false; 342 } 343 344 Channel[] tmp = new Channel[channels.size()]; 345 int cnt = 0; 346 for (FlushTestReceiver receiver : channels) 347 tmp[cnt++] = receiver.getChannel(); 348 Util.blockUntilViewsReceived(30000, 1000, tmp); 349 350 // Reacquire the semaphore tickets; when we have them all 351 // we know the threads are done 352 semaphore.tryAcquire(count, 40, TimeUnit.SECONDS); 353 354 Util.sleep(1000); //let all events propagate... 355 for (FlushTestReceiver app : channels) 356 app.getChannel().setReceiver(null); 357 for (FlushTestReceiver app : channels) 358 app.cleanup(); 359 360 // verify block/unblock/view/get|set state sequences for all members 361 for (FlushTestReceiver receiver : channels) { 362 checkEventStateTransferSequence(receiver); 363 System.out.println("event sequence is OK"); 364 } 365 } 366 finally { 367 for (FlushTestReceiver app : channels) 368 app.cleanup(); 369 } 370 } 371 changeProps(JChannel .... channels)372 private static void changeProps(JChannel ... channels) { 373 for(JChannel ch: channels) { 374 FD fd=(FD)ch.getProtocolStack().findProtocol(FD.class); 375 if(fd != null) { 376 fd.setTimeout(1000); 377 fd.setMaxTries(2); 378 } 379 FD_ALL fd_all=(FD_ALL)ch.getProtocolStack().findProtocol(FD_ALL.class); 380 if(fd_all != null) { 381 fd_all.setTimeout(2000); 382 fd_all.setInterval(800); 383 } 384 } 385 } 386 387 private class FlushTestReceiver extends PushChannelApplicationWithSemaphore { 388 private int connectMethod; 389 390 public static final int CONNECT_ONLY = 1; 391 392 public static final int CONNECT_AND_SEPARATE_GET_STATE = 2; 393 394 public static final int CONNECT_AND_GET_STATE = 3; 395 396 int msgCount = 0; 397 FlushTestReceiver(String name, Semaphore semaphore, int msgCount, int connectMethod)398 protected FlushTestReceiver(String name, Semaphore semaphore, int msgCount, 399 int connectMethod) throws Exception { 400 super(name, semaphore); 401 this.connectMethod = connectMethod; 402 this.msgCount = msgCount; 403 events = Collections.synchronizedList(new LinkedList<Object>()); 404 if (connectMethod == CONNECT_ONLY || connectMethod == CONNECT_AND_SEPARATE_GET_STATE) 405 channel.connect("FlushTestReceiver"); 406 407 if (connectMethod == CONNECT_AND_GET_STATE) { 408 channel.connect("FlushTestReceiver", null, null, 25000); 409 } 410 } 411 FlushTestReceiver(JChannel ch, String name, Semaphore semaphore, int msgCount, int connectMethod)412 protected FlushTestReceiver(JChannel ch, String name, Semaphore semaphore, int msgCount, 413 int connectMethod) throws Exception { 414 super(ch, name, semaphore); 415 this.connectMethod = connectMethod; 416 this.msgCount = msgCount; 417 events = Collections.synchronizedList(new LinkedList<Object>()); 418 if (connectMethod == CONNECT_ONLY || connectMethod == CONNECT_AND_SEPARATE_GET_STATE) 419 channel.connect("FlushTestReceiver"); 420 421 if (connectMethod == CONNECT_AND_GET_STATE) { 422 channel.connect("FlushTestReceiver", null, null, 25000); 423 } 424 } 425 getEvents()426 public List<Object> getEvents() { 427 return new LinkedList<Object>(events); 428 } 429 getState()430 public byte[] getState() { 431 events.add(new GetStateEvent(null, null)); 432 return new byte[] { 'b', 'e', 'l', 'a' }; 433 } 434 getState(OutputStream ostream)435 public void getState(OutputStream ostream) { 436 super.getState(ostream); 437 byte[] payload = new byte[] { 'b', 'e', 'l', 'a' }; 438 try { 439 ostream.write(payload); 440 } catch (IOException e) { 441 e.printStackTrace(); 442 } finally { 443 Util.close(ostream); 444 } 445 } 446 setState(InputStream istream)447 public void setState(InputStream istream) { 448 super.setState(istream); 449 byte[] payload = new byte[4]; 450 try { 451 istream.read(payload); 452 } catch (IOException e) { 453 e.printStackTrace(); 454 } finally { 455 Util.close(istream); 456 } 457 } 458 useChannel()459 protected void useChannel() throws Exception { 460 if (connectMethod == CONNECT_AND_SEPARATE_GET_STATE) { 461 channel.getState(null, 25000); 462 } 463 if (msgCount > 0) { 464 for (int i = 0; i < msgCount; i++) { 465 channel.send(new Message()); 466 Util.sleep(100); 467 } 468 } 469 } 470 } 471 472 private class SimpleReplier extends ExtendedReceiverAdapter { 473 Channel channel; 474 475 boolean handle_requests = false; 476 SimpleReplier(Channel channel, boolean handle_requests)477 public SimpleReplier(Channel channel, boolean handle_requests) { 478 this.channel = channel; 479 this.handle_requests = handle_requests; 480 } 481 receive(Message msg)482 public void receive(Message msg) { 483 Message reply = new Message(msg.getSrc()); 484 try { 485 log.info("-- MySimpleReplier[" + channel.getAddress() + "]: received message from " 486 + msg.getSrc()); 487 if (handle_requests) { 488 log.info(", sending reply"); 489 channel.send(reply); 490 } else 491 System.out.println("\n"); 492 } catch (Exception e) { 493 e.printStackTrace(); 494 } 495 } 496 viewAccepted(View new_view)497 public void viewAccepted(View new_view) { 498 log.info("-- MySimpleReplier[" + channel.getAddress() + "]: viewAccepted(" + new_view 499 + ")"); 500 } 501 block()502 public void block() { 503 log.info("-- MySimpleReplier[" + channel.getAddress() + "]: block()"); 504 } 505 unblock()506 public void unblock() { 507 log.info("-- MySimpleReplier[" + channel.getAddress() + "]: unblock()"); 508 } 509 } 510 511 }