1 package org.jgroups.tests;
2 
3 import org.jgroups.*;
4 import org.jgroups.protocols.FD;
5 import org.jgroups.protocols.FD_ALL;
6 import org.jgroups.protocols.pbcast.FLUSH;
7 import org.jgroups.util.Util;
8 import org.testng.annotations.Test;
9 
10 import java.io.IOException;
11 import java.io.InputStream;
12 import java.io.OutputStream;
13 import java.util.ArrayList;
14 import java.util.Collections;
15 import java.util.LinkedList;
16 import java.util.List;
17 import java.util.concurrent.Semaphore;
18 import java.util.concurrent.TimeUnit;
19 
20 /**
21  * Tests the FLUSH protocol. Adds a FLUSH layer on top of the stack unless already present. Should
22  * work with any stack.
23  *
24  * @author Bela Ban
25  */
26 @Test(groups = Global.FLUSH, sequential = false)
27 public class FlushTest extends ChannelTestBase {
28 
29     @Test
testSingleChannel()30     public void testSingleChannel() throws Exception {
31         Semaphore s = new Semaphore(1);
32         FlushTestReceiver receivers[] = new FlushTestReceiver[] { new FlushTestReceiver("c1", s, 0,
33                         FlushTestReceiver.CONNECT_ONLY) };
34         receivers[0].start();
35         s.release(1);
36 
37         // Make sure everyone is in sync
38         Channel[] tmp = new Channel[receivers.length];
39         for (int i = 0; i < receivers.length; i++)
40             tmp[i] = receivers[i].getChannel();
41         Util.blockUntilViewsReceived(60000, 1000, tmp);
42 
43         // Reacquire the semaphore tickets; when we have them all
44         // we know the threads are done
45         s.tryAcquire(1, 60, TimeUnit.SECONDS);
46         receivers[0].cleanup();
47         Util.sleep(1000);
48 
49         checkEventStateTransferSequence(receivers[0]);
50     }
51 
52     /**
53      * Tests issue #1 in http://jira.jboss.com/jira/browse/JGRP-335
54      *
55      * @throws Exception
56      */
57     @Test
testJoinFollowedByUnicast()58     public void testJoinFollowedByUnicast() throws Exception {
59         JChannel c1 = null;
60         JChannel c2 = null;
61         try {
62             c1 = createChannel(true, 2);
63             c1.setReceiver(new SimpleReplier(c1, true));
64             c1.connect("testJoinFollowedByUnicast");
65 
66             Address target = c1.getAddress();
67             Message unicast_msg = new Message(target);
68 
69             c2 = createChannel(c1);
70             c2.setReceiver(new SimpleReplier(c2, false));
71             c2.connect("testJoinFollowedByUnicast");
72 
73             // now send unicast, this might block as described in the case
74             c2.send(unicast_msg);
75             // if we don't get here this means we'd time out
76         } finally {
77             Util.close(c2, c1);
78         }
79     }
80 
81     /**
82      * Tests issue #2 in http://jira.jboss.com/jira/browse/JGRP-335
83      *
84      * @throws Exception
85      */
86     @Test
testStateTransferFollowedByUnicast()87     public void testStateTransferFollowedByUnicast() throws Exception {
88         JChannel c1 = null;
89         JChannel c2 = null;
90         try {
91             c1 = createChannel(true, 2);
92             c1.setReceiver(new SimpleReplier(c1, true));
93             c1.connect("testStateTransferFollowedByUnicast");
94 
95             Address target = c1.getAddress();
96             Message unicast_msg = new Message(target);
97 
98             c2 = createChannel(c1);
99             c2.setReceiver(new SimpleReplier(c2, false));
100             c2.connect("testStateTransferFollowedByUnicast");
101 
102             log.info("\n** Getting the state **");
103             c2.getState(null, 10000);
104             // now send unicast, this might block as described in the case
105             c2.send(unicast_msg);
106         } finally {
107             Util.close(c2, c1);
108         }
109     }
110 
111     @Test
testSequentialFlushInvocation()112     public void testSequentialFlushInvocation() throws Exception {
113         Channel channel = null, channel2 = null, channel3 = null;
114         try {
115             channel = createChannel(true, 3);
116             channel.setName("A");
117 
118             channel2 = createChannel((JChannel) channel);
119             channel2.setName("B");
120 
121             channel3 = createChannel((JChannel) channel);
122             channel3.setName("C");
123 
124             channel.connect("x");
125             channel2.connect("x");
126             channel3.connect("x");
127 
128             //we need to sleep here since coordinator (channel)
129             //might be unblocked before channel3.connect() returns
130             Util.sleep(500);
131 
132             for (int i = 0; i < 100; i++) {
133                 System.out.print("flush #" + i + ": ");
134                 long start = System.currentTimeMillis();
135                 boolean status = channel.startFlush(false);
136                 channel.stopFlush();
137                 long diff = System.currentTimeMillis() - start;
138                 System.out.println(status ? " OK (in " + diff + " ms)" : " FAIL");
139                 assert status;
140             }
141         } finally {
142             Util.close(channel, channel2, channel3);
143         }
144     }
145 
146     @Test
testFlushWithCrashedFlushCoordinator()147     public void testFlushWithCrashedFlushCoordinator() throws Exception {
148         JChannel c1 = null;
149         JChannel c2 = null;
150         JChannel c3 = null;
151 
152         try {
153             c1 = createChannel(true, 3, "C1"); changeProps(c1);
154             c1.connect("testFlushWithCrashedFlushCoordinator");
155 
156             c2 = createChannel(c1, "C2"); changeProps(c2);
157             c2.connect("testFlushWithCrashedFlushCoordinator");
158 
159             c3 = createChannel(c1, "C3"); changeProps(c3);
160             c3.connect("testFlushWithCrashedFlushCoordinator");
161 
162 
163 
164 
165             System.out.println("shutting down flush coordinator C2");
166             // send out START_FLUSH and then return
167             c2.down(new Event(Event.SUSPEND_BUT_FAIL));
168 
169             // now shut down C2. This means, after failure detection kicks in and the new coordinator takes over
170             // (either C1 or C3), that the current flush started by C2 will be cancelled and a new flush (by C1 or C3)
171             // will be started
172             Util.shutdown(c2);
173 
174             c1.getProtocolStack().findProtocol(FLUSH.class).setLevel("trace");
175             c3.getProtocolStack().findProtocol(FLUSH.class).setLevel("trace");
176 
177             Util.blockUntilViewsReceived(10000, 500, c1, c3);
178 
179             // cluster should not hang and two remaining members should have a correct view
180             assertTrue("correct view size", c1.getView().size() == 2);
181             assertTrue("correct view size", c3.getView().size() == 2);
182 
183             c1.getProtocolStack().findProtocol(FLUSH.class).setLevel("warn");
184             c3.getProtocolStack().findProtocol(FLUSH.class).setLevel("warn");
185 
186         } finally {
187             Util.close(c3, c2, c1);
188         }
189     }
190 
191     @Test
testFlushWithCrashedParticipant()192     public void testFlushWithCrashedParticipant() throws Exception {
193         JChannel c1 = null;
194         JChannel c2 = null;
195         JChannel c3 = null;
196 
197         try {
198             c1 = createChannel(true, 3, "C1"); changeProps(c1);
199             c1.connect("testFlushWithCrashedParticipant");
200 
201             c2 = createChannel(c1, "C2"); changeProps(c2);
202             c2.connect("testFlushWithCrashedParticipant");
203 
204             c3 = createChannel(c1, "C3"); changeProps(c3);
205             c3.connect("testFlushWithCrashedParticipant");
206 
207             System.out.println("shutting down C3");
208             Util.shutdown(c3); // kill a flush participant
209 
210             System.out.println("C2: starting flush");
211             boolean rc=Util.startFlush(c2);
212             System.out.println("flush " + (rc? " was successful" : "failed"));
213             assert rc;
214 
215             System.out.println("stopping flush");
216             c2.stopFlush();
217 
218             System.out.println("waiting for view to contain C1 and C2");
219             Util.blockUntilViewsReceived(10000, 500, c1, c2);
220 
221             // cluster should not hang and two remaining members should have a correct view
222             System.out.println("C1: view=" + c1.getView() + "\nC2: view=" + c2.getView());
223             assertTrue("correct view size", c1.getView().size() == 2);
224             assertTrue("correct view size", c2.getView().size() == 2);
225         } finally {
226             Util.close(c3, c2, c1);
227         }
228     }
229 
230     @Test
testFlushWithCrashedParticipants()231     public void testFlushWithCrashedParticipants() throws Exception {
232         JChannel c1 = null;
233         JChannel c2 = null;
234         JChannel c3 = null;
235 
236         try {
237             c1 = createChannel(true, 3, "C1"); changeProps(c1);
238             c1.connect("testFlushWithCrashedFlushCoordinator");
239 
240             c2 = createChannel(c1, "C2"); changeProps(c2);
241             c2.connect("testFlushWithCrashedFlushCoordinator");
242 
243             c3 = createChannel(c1, "C3"); changeProps(c3);
244             c3.connect("testFlushWithCrashedFlushCoordinator");
245 
246             // and then kill members other than flush coordinator
247             Util.shutdown(c3);
248             Util.shutdown(c1);
249 
250             // start flush
251             Util.startFlush(c2);
252 
253             c2.stopFlush();
254             Util.blockUntilViewsReceived(10000, 500, c2);
255 
256             // cluster should not hang and one remaining member should have a correct view
257             assertTrue("correct view size", c2.getView().size() == 1);
258         } finally {
259             Util.close(c3, c2, c1);
260         }
261     }
262 
263     /**
264      * Tests http://jira.jboss.com/jira/browse/JGRP-661
265      *
266      * @throws Exception
267      */
268     @Test
testPartialFlush()269     public void testPartialFlush() throws Exception {
270         JChannel c1 = null;
271         JChannel c2 = null;
272         try {
273             c1 = createChannel(true, 2);
274             c1.setReceiver(new SimpleReplier(c1, true));
275             c1.connect("testPartialFlush");
276 
277             c2 = createChannel(c1);
278             c2.setReceiver(new SimpleReplier(c2, false));
279             c2.connect("testPartialFlush");
280 
281             List<Address> members = new ArrayList<Address>();
282             members.add(c2.getLocalAddress());
283             boolean flushedOk = Util.startFlush(c2, members);
284 
285             assertTrue("Partial flush worked", flushedOk);
286 
287             c2.stopFlush(members);
288         } finally {
289             Util.close(c2, c1);
290         }
291     }
292 
293     /** Tests the emition of block/unblock/get|set state events */
294     @Test
testBlockingNoStateTransfer()295     public void testBlockingNoStateTransfer() throws Exception {
296         String[] names = { "A", "B", "C", "D" };
297         _testChannels(names, FlushTestReceiver.CONNECT_ONLY);
298     }
299 
300     /** Tests the emition of block/unblock/get|set state events */
301     @Test
testBlockingWithStateTransfer()302     public void testBlockingWithStateTransfer() throws Exception {
303         String[] names = { "A", "B", "C", "D" };
304         _testChannels(names, FlushTestReceiver.CONNECT_AND_SEPARATE_GET_STATE);
305     }
306 
307     /** Tests the emition of block/unblock/get|set state events */
308     @Test
testBlockingWithConnectAndStateTransfer()309     public void testBlockingWithConnectAndStateTransfer() throws Exception {
310         String[] names = { "A", "B", "C", "D" };
311         _testChannels(names, FlushTestReceiver.CONNECT_AND_GET_STATE);
312     }
313 
_testChannels(String names[], int connectType)314     private void _testChannels(String names[], int connectType) throws Exception {
315         int count = names.length;
316 
317         List<FlushTestReceiver> channels = new ArrayList<FlushTestReceiver>(count);
318         try {
319             // Create a semaphore and take all its permits
320             Semaphore semaphore = new Semaphore(count);
321             semaphore.acquire(count);
322 
323             // Create channels and their threads that will block on the
324             // semaphore
325             boolean first = true;
326             for (String channelName : names) {
327                 FlushTestReceiver channel = null;
328                 if (first)
329                     channel = new FlushTestReceiver(channelName, semaphore, 0, connectType);
330                 else {
331                     channel = new FlushTestReceiver((JChannel) channels.get(0).getChannel(),
332                                     channelName, semaphore, 0, connectType);
333                 }
334                 channels.add(channel);
335 
336                 // Release one ticket at a time to allow the thread to start working
337                 channel.start();
338                 semaphore.release(1);
339                 if (first)
340                     Util.sleep(3000); // minimize changes of a merge happening
341                 first = false;
342             }
343 
344             Channel[] tmp = new Channel[channels.size()];
345             int cnt = 0;
346             for (FlushTestReceiver receiver : channels)
347                 tmp[cnt++] = receiver.getChannel();
348             Util.blockUntilViewsReceived(30000, 1000, tmp);
349 
350             // Reacquire the semaphore tickets; when we have them all
351             // we know the threads are done
352             semaphore.tryAcquire(count, 40, TimeUnit.SECONDS);
353 
354             Util.sleep(1000); //let all events propagate...
355             for (FlushTestReceiver app : channels)
356                 app.getChannel().setReceiver(null);
357             for (FlushTestReceiver app : channels)
358                 app.cleanup();
359 
360             // verify block/unblock/view/get|set state sequences for all members
361             for (FlushTestReceiver receiver : channels) {
362                 checkEventStateTransferSequence(receiver);
363                 System.out.println("event sequence is OK");
364             }
365         }
366         finally {
367             for (FlushTestReceiver app : channels)
368                 app.cleanup();
369         }
370     }
371 
changeProps(JChannel .... channels)372     private static void changeProps(JChannel ... channels) {
373         for(JChannel ch: channels) {
374             FD fd=(FD)ch.getProtocolStack().findProtocol(FD.class);
375             if(fd != null) {
376                 fd.setTimeout(1000);
377                 fd.setMaxTries(2);
378             }
379             FD_ALL fd_all=(FD_ALL)ch.getProtocolStack().findProtocol(FD_ALL.class);
380             if(fd_all != null) {
381                 fd_all.setTimeout(2000);
382                 fd_all.setInterval(800);
383             }
384         }
385     }
386 
387     private class FlushTestReceiver extends PushChannelApplicationWithSemaphore {
388         private int connectMethod;
389 
390         public static final int CONNECT_ONLY = 1;
391 
392         public static final int CONNECT_AND_SEPARATE_GET_STATE = 2;
393 
394         public static final int CONNECT_AND_GET_STATE = 3;
395 
396         int msgCount = 0;
397 
FlushTestReceiver(String name, Semaphore semaphore, int msgCount, int connectMethod)398         protected FlushTestReceiver(String name, Semaphore semaphore, int msgCount,
399                         int connectMethod) throws Exception {
400             super(name, semaphore);
401             this.connectMethod = connectMethod;
402             this.msgCount = msgCount;
403             events = Collections.synchronizedList(new LinkedList<Object>());
404             if (connectMethod == CONNECT_ONLY || connectMethod == CONNECT_AND_SEPARATE_GET_STATE)
405                 channel.connect("FlushTestReceiver");
406 
407             if (connectMethod == CONNECT_AND_GET_STATE) {
408                 channel.connect("FlushTestReceiver", null, null, 25000);
409             }
410         }
411 
FlushTestReceiver(JChannel ch, String name, Semaphore semaphore, int msgCount, int connectMethod)412         protected FlushTestReceiver(JChannel ch, String name, Semaphore semaphore, int msgCount,
413                         int connectMethod) throws Exception {
414             super(ch, name, semaphore);
415             this.connectMethod = connectMethod;
416             this.msgCount = msgCount;
417             events = Collections.synchronizedList(new LinkedList<Object>());
418             if (connectMethod == CONNECT_ONLY || connectMethod == CONNECT_AND_SEPARATE_GET_STATE)
419                 channel.connect("FlushTestReceiver");
420 
421             if (connectMethod == CONNECT_AND_GET_STATE) {
422                 channel.connect("FlushTestReceiver", null, null, 25000);
423             }
424         }
425 
getEvents()426         public List<Object> getEvents() {
427             return new LinkedList<Object>(events);
428         }
429 
getState()430         public byte[] getState() {
431             events.add(new GetStateEvent(null, null));
432             return new byte[] { 'b', 'e', 'l', 'a' };
433         }
434 
getState(OutputStream ostream)435         public void getState(OutputStream ostream) {
436             super.getState(ostream);
437             byte[] payload = new byte[] { 'b', 'e', 'l', 'a' };
438             try {
439                 ostream.write(payload);
440             } catch (IOException e) {
441                 e.printStackTrace();
442             } finally {
443                 Util.close(ostream);
444             }
445         }
446 
setState(InputStream istream)447         public void setState(InputStream istream) {
448             super.setState(istream);
449             byte[] payload = new byte[4];
450             try {
451                 istream.read(payload);
452             } catch (IOException e) {
453                 e.printStackTrace();
454             } finally {
455                 Util.close(istream);
456             }
457         }
458 
useChannel()459         protected void useChannel() throws Exception {
460             if (connectMethod == CONNECT_AND_SEPARATE_GET_STATE) {
461                 channel.getState(null, 25000);
462             }
463             if (msgCount > 0) {
464                 for (int i = 0; i < msgCount; i++) {
465                     channel.send(new Message());
466                     Util.sleep(100);
467                 }
468             }
469         }
470     }
471 
472     private class SimpleReplier extends ExtendedReceiverAdapter {
473         Channel channel;
474 
475         boolean handle_requests = false;
476 
SimpleReplier(Channel channel, boolean handle_requests)477         public SimpleReplier(Channel channel, boolean handle_requests) {
478             this.channel = channel;
479             this.handle_requests = handle_requests;
480         }
481 
receive(Message msg)482         public void receive(Message msg) {
483             Message reply = new Message(msg.getSrc());
484             try {
485                 log.info("-- MySimpleReplier[" + channel.getAddress() + "]: received message from "
486                                 + msg.getSrc());
487                 if (handle_requests) {
488                     log.info(", sending reply");
489                     channel.send(reply);
490                 } else
491                     System.out.println("\n");
492             } catch (Exception e) {
493                 e.printStackTrace();
494             }
495         }
496 
viewAccepted(View new_view)497         public void viewAccepted(View new_view) {
498             log.info("-- MySimpleReplier[" + channel.getAddress() + "]: viewAccepted(" + new_view
499                             + ")");
500         }
501 
block()502         public void block() {
503             log.info("-- MySimpleReplier[" + channel.getAddress() + "]: block()");
504         }
505 
unblock()506         public void unblock() {
507             log.info("-- MySimpleReplier[" + channel.getAddress() + "]: unblock()");
508         }
509     }
510 
511 }