1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.zookeeper.test;
20 
21 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
22 import static org.junit.jupiter.api.Assertions.assertEquals;
23 import static org.junit.jupiter.api.Assertions.assertNotNull;
24 import static org.junit.jupiter.api.Assertions.assertNull;
25 import static org.junit.jupiter.api.Assertions.assertTrue;
26 import static org.junit.jupiter.api.Assertions.fail;
27 import java.io.IOException;
28 import java.util.Collection;
29 import java.util.Set;
30 import java.util.concurrent.LinkedBlockingQueue;
31 import java.util.concurrent.Semaphore;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.TimeoutException;
34 import java.util.concurrent.atomic.AtomicBoolean;
35 import java.util.concurrent.atomic.AtomicInteger;
36 import org.apache.zookeeper.CreateMode;
37 import org.apache.zookeeper.KeeperException;
38 import org.apache.zookeeper.TestableZooKeeper;
39 import org.apache.zookeeper.WatchedEvent;
40 import org.apache.zookeeper.ZKTestCase;
41 import org.apache.zookeeper.ZooDefs;
42 import org.apache.zookeeper.ZooDefs.Ids;
43 import org.apache.zookeeper.ZooKeeper;
44 import org.apache.zookeeper.server.ZKDatabase;
45 import org.apache.zookeeper.server.quorum.Leader;
46 import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
47 import org.junit.jupiter.api.AfterEach;
48 import org.junit.jupiter.api.BeforeEach;
49 import org.junit.jupiter.api.Test;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52 
53 public class FollowerResyncConcurrencyTest extends ZKTestCase {
54 
55     private static final Logger LOG = LoggerFactory.getLogger(FollowerResyncConcurrencyTest.class);
56     public static final long CONNECTION_TIMEOUT = ClientTest.CONNECTION_TIMEOUT;
57 
58     private AtomicInteger counter = new AtomicInteger(0);
59     private AtomicInteger errors = new AtomicInteger(0);
60     /**
61      * Keep track of pending async operations, we shouldn't start verifying
62      * the state until pending operation is 0
63      */
64     private AtomicInteger pending = new AtomicInteger(0);
65 
66     @BeforeEach
setUp()67     public void setUp() throws Exception {
68         pending.set(0);
69         errors.set(0);
70         counter.set(0);
71     }
72 
73     @AfterEach
tearDown()74     public void tearDown() throws Exception {
75         LOG.info("Error count {}", errors.get());
76     }
77 
78     /**
79      * See ZOOKEEPER-1319 - verify that a lagging follwer resyncs correctly
80      *
81      * 1) start with down quorum
82      * 2) start leader/follower1, add some data
83      * 3) restart leader/follower1
84      * 4) start follower2
85      * 5) verify data consistency across the ensemble
86      *
87      * @throws Exception
88      */
89     @Test
testLaggingFollowerResyncsUnderNewEpoch()90     public void testLaggingFollowerResyncsUnderNewEpoch() throws Exception {
91         CountdownWatcher watcher1 = new CountdownWatcher();
92         CountdownWatcher watcher2 = new CountdownWatcher();
93         CountdownWatcher watcher3 = new CountdownWatcher();
94 
95         QuorumUtil qu = new QuorumUtil(1);
96         qu.shutdownAll();
97 
98         qu.start(1);
99         qu.start(2);
100         assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + qu.getPeer(1).clientPort, ClientBase.CONNECTION_TIMEOUT),
101                 "Waiting for server up");
102         assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + qu.getPeer(2).clientPort, ClientBase.CONNECTION_TIMEOUT),
103                 "Waiting for server up");
104 
105         ZooKeeper zk1 = createClient(qu.getPeer(1).peer.getClientPort(), watcher1);
106         LOG.info("zk1 has session id 0x{}", Long.toHexString(zk1.getSessionId()));
107 
108         final String resyncPath = "/resyncundernewepoch";
109         zk1.create(resyncPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
110         zk1.close();
111 
112         qu.shutdown(1);
113         qu.shutdown(2);
114         assertTrue(ClientBase.waitForServerDown("127.0.0.1:" + qu.getPeer(1).clientPort, ClientBase.CONNECTION_TIMEOUT),
115                 "Waiting for server down");
116         assertTrue(ClientBase.waitForServerDown("127.0.0.1:" + qu.getPeer(2).clientPort, ClientBase.CONNECTION_TIMEOUT),
117                 "Waiting for server down");
118 
119         qu.start(1);
120         qu.start(2);
121         assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + qu.getPeer(1).clientPort, ClientBase.CONNECTION_TIMEOUT),
122                 "Waiting for server up");
123         assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + qu.getPeer(2).clientPort, ClientBase.CONNECTION_TIMEOUT),
124                 "Waiting for server up");
125 
126         qu.start(3);
127         assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + qu.getPeer(3).clientPort, ClientBase.CONNECTION_TIMEOUT),
128                 "Waiting for server up");
129 
130         zk1 = createClient(qu.getPeer(1).peer.getClientPort(), watcher1);
131         LOG.info("zk1 has session id 0x{}", Long.toHexString(zk1.getSessionId()));
132 
133         assertNotNull(zk1.exists(resyncPath, false), "zk1 has data");
134 
135         final ZooKeeper zk2 = createClient(qu.getPeer(2).peer.getClientPort(), watcher2);
136         LOG.info("zk2 has session id 0x{}", Long.toHexString(zk2.getSessionId()));
137 
138         assertNotNull(zk2.exists(resyncPath, false), "zk2 has data");
139 
140         final ZooKeeper zk3 = createClient(qu.getPeer(3).peer.getClientPort(), watcher3);
141         LOG.info("zk3 has session id 0x{}", Long.toHexString(zk3.getSessionId()));
142 
143         assertNotNull(zk3.exists(resyncPath, false), "zk3 has data");
144 
145         zk1.close();
146         zk2.close();
147         zk3.close();
148 
149         qu.shutdownAll();
150     }
151 
152     /**
153      * See ZOOKEEPER-962. This tests for one of the bugs hit while fixing this,
154      * setting the ZXID of the SNAP packet
155      * Starts up 3 ZKs. Shut down F1, write a node, restart the one that was shut down
156      * The non-leader ZKs are writing to cluster
157      * Shut down F1 again
158      * Restart after sessions are expired, expect to get a snap file
159      * Shut down, run some transactions through.
160      * Restart to a diff while transactions are running in leader
161      * @throws IOException
162      * @throws InterruptedException
163      * @throws KeeperException
164      */
165     @Test
testResyncBySnapThenDiffAfterFollowerCrashes()166     public void testResyncBySnapThenDiffAfterFollowerCrashes() throws Throwable {
167         followerResyncCrashTest(false);
168     }
169 
170     /**
171      * Same as testResyncBySnapThenDiffAfterFollowerCrashes() but we resync
172      * follower using txnlog
173      *
174      * @throws IOException
175      * @throws InterruptedException
176      * @throws KeeperException
177      */
178     @Test
testResyncByTxnlogThenDiffAfterFollowerCrashes()179     public void testResyncByTxnlogThenDiffAfterFollowerCrashes() throws Throwable {
180         followerResyncCrashTest(true);
181     }
182 
followerResyncCrashTest(boolean useTxnLogResync)183     public void followerResyncCrashTest(boolean useTxnLogResync) throws Throwable {
184         final Semaphore sem = new Semaphore(0);
185 
186         QuorumUtil qu = new QuorumUtil(1);
187         qu.startAll();
188         CountdownWatcher watcher1 = new CountdownWatcher();
189         CountdownWatcher watcher2 = new CountdownWatcher();
190         CountdownWatcher watcher3 = new CountdownWatcher();
191 
192         int index = 1;
193         while (qu.getPeer(index).peer.leader == null) {
194             index++;
195         }
196 
197         Leader leader = qu.getPeer(index).peer.leader;
198         assertNotNull(leader);
199 
200         if (useTxnLogResync) {
201             // Set the factor to high value so that this test case always
202             // resync using txnlog
203             qu.getPeer(index).peer.getActiveServer().getZKDatabase().setSnapshotSizeFactor(1000);
204         } else {
205             // Disable sending DIFF using txnlog, so that this test still
206             // testing the ZOOKEEPER-962 bug
207             qu.getPeer(index).peer.getActiveServer().getZKDatabase().setSnapshotSizeFactor(-1);
208         }
209 
210         /* Reusing the index variable to select a follower to connect to */
211         index = (index == 1) ? 2 : 1;
212         LOG.info("Connecting to follower: {}", index);
213 
214         qu.shutdown(index);
215 
216         final ZooKeeper zk3 = createClient(qu.getPeer(3).peer.getClientPort(), watcher3);
217         LOG.info("zk3 has session id 0x{}", Long.toHexString(zk3.getSessionId()));
218 
219         zk3.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
220 
221         qu.restart(index);
222 
223         final ZooKeeper zk1 = createClient(qu.getPeer(index).peer.getClientPort(), watcher1);
224         LOG.info("zk1 has session id 0x{}", Long.toHexString(zk1.getSessionId()));
225 
226         final ZooKeeper zk2 = createClient(qu.getPeer(index).peer.getClientPort(), watcher2);
227         LOG.info("zk2 has session id 0x{}", Long.toHexString(zk2.getSessionId()));
228 
229         zk1.create("/first", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
230 
231         // Prepare a thread that will create znodes.
232         Thread mytestfooThread = new Thread(new Runnable() {
233             @Override
234             public void run() {
235                 for (int i = 0; i < 3000; i++) {
236                     // Here we create 3000 znodes
237                     zk3.create("/mytestfoo", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, (rc, path, ctx, name) -> {
238                         pending.decrementAndGet();
239                         counter.incrementAndGet();
240                         if (rc != 0) {
241                             errors.incrementAndGet();
242                         }
243                         if (counter.get() == 16200) {
244                             sem.release();
245                         }
246                     }, null);
247                     pending.incrementAndGet();
248                     if (i % 10 == 0) {
249                         try {
250                             Thread.sleep(100);
251                         } catch (Exception e) {
252 
253                         }
254                     }
255                 }
256 
257             }
258         });
259 
260         // Here we start populating the server and shutdown the follower after
261         // initial data is written.
262         for (int i = 0; i < 13000; i++) {
263             // Here we create 13000 znodes
264             zk3.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, (rc, path, ctx, name) -> {
265                 pending.decrementAndGet();
266                 counter.incrementAndGet();
267                 if (rc != 0) {
268                     errors.incrementAndGet();
269                 }
270                 if (counter.get() == 16200) {
271                     sem.release();
272                 }
273             }, null);
274             pending.incrementAndGet();
275 
276             if (i == 5000) {
277                 qu.shutdown(index);
278                 LOG.info("Shutting down s1");
279             }
280             if (i == 12000) {
281                 // Start the prepared thread so that it is writing znodes while
282                 // the follower is restarting. On the first restart, the follow
283                 // should use txnlog to catchup. For subsequent restart, the
284                 // follower should use a diff to catchup.
285                 mytestfooThread.start();
286                 LOG.info("Restarting follower: {}", index);
287                 qu.restart(index);
288                 Thread.sleep(300);
289                 LOG.info("Shutdown follower: {}", index);
290                 qu.shutdown(index);
291                 Thread.sleep(300);
292                 LOG.info("Restarting follower: {}", index);
293                 qu.restart(index);
294                 LOG.info("Setting up server: {}", index);
295             }
296             if ((i % 1000) == 0) {
297                 Thread.sleep(1000);
298             }
299 
300             if (i % 50 == 0) {
301                 zk2.create("/newbaz", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, (rc, path, ctx, name) -> {
302                     pending.decrementAndGet();
303                     counter.incrementAndGet();
304                     if (rc != 0) {
305                         errors.incrementAndGet();
306                     }
307                     if (counter.get() == 16200) {
308                         sem.release();
309                     }
310                 }, null);
311                 pending.incrementAndGet();
312             }
313         }
314 
315         // Wait until all updates return
316         if (!sem.tryAcquire(ClientBase.CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)) {
317             LOG.warn("Did not aquire semaphore fast enough");
318         }
319         mytestfooThread.join(ClientBase.CONNECTION_TIMEOUT);
320         if (mytestfooThread.isAlive()) {
321             LOG.error("mytestfooThread is still alive");
322         }
323         assertTrue(waitForPendingRequests(60));
324         assertTrue(waitForSync(qu, index, 10));
325 
326         verifyState(qu, index, leader);
327 
328         zk1.close();
329         zk2.close();
330         zk3.close();
331 
332         qu.shutdownAll();
333     }
334 
335     /**
336      * This test:
337      * Starts up 3 ZKs. The non-leader ZKs are writing to cluster
338      * Shut down one of the non-leader ZKs.
339      * Restart after sessions have expired but less than 500 txns have taken place (get a diff)
340      * Shut down immediately after restarting, start running separate thread with other transactions
341      * Restart to a diff while transactions are running in leader
342      *
343      *
344      * Before fixes for ZOOKEEPER-962, restarting off of diff could get an inconsistent view of data missing transactions that
345      * completed during diff syncing. Follower would also be considered "restarted" before all forwarded transactions
346      * were completely processed, so restarting would cause a snap file with a too-high zxid to be written, and transactions
347      * would be missed
348      *
349      * This test should pretty reliably catch the failure of restarting the server before all diff messages have been processed,
350      * however, due to the transient nature of the system it may not catch failures due to concurrent processing of transactions
351      * during the leader's diff forwarding.
352      *
353      * @throws IOException
354      * @throws InterruptedException
355      * @throws KeeperException
356      * @throws Throwable
357      */
358 
359     @Test
testResyncByDiffAfterFollowerCrashes()360     public void testResyncByDiffAfterFollowerCrashes() throws IOException, InterruptedException, KeeperException, Throwable {
361         final Semaphore sem = new Semaphore(0);
362 
363         QuorumUtil qu = new QuorumUtil(1);
364         qu.startAll();
365         CountdownWatcher watcher1 = new CountdownWatcher();
366         CountdownWatcher watcher2 = new CountdownWatcher();
367         CountdownWatcher watcher3 = new CountdownWatcher();
368 
369         int index = 1;
370         while (qu.getPeer(index).peer.leader == null) {
371             index++;
372         }
373 
374         Leader leader = qu.getPeer(index).peer.leader;
375         assertNotNull(leader);
376 
377         /* Reusing the index variable to select a follower to connect to */
378         index = (index == 1) ? 2 : 1;
379         LOG.info("Connecting to follower: {}", index);
380 
381         final ZooKeeper zk1 = createClient(qu.getPeer(index).peer.getClientPort(), watcher1);
382         LOG.info("zk1 has session id 0x{}", Long.toHexString(zk1.getSessionId()));
383 
384         final ZooKeeper zk2 = createClient(qu.getPeer(index).peer.getClientPort(), watcher2);
385         LOG.info("zk2 has session id 0x{}", Long.toHexString(zk2.getSessionId()));
386 
387         final ZooKeeper zk3 = createClient(qu.getPeer(3).peer.getClientPort(), watcher3);
388         LOG.info("zk3 has session id 0x{}", Long.toHexString(zk3.getSessionId()));
389 
390         zk1.create("/first", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
391         zk2.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
392 
393         final AtomicBoolean runNow = new AtomicBoolean(false);
394         Thread mytestfooThread = new Thread(new Runnable() {
395 
396             @Override
397             public void run() {
398                 int inSyncCounter = 0;
399                 while (inSyncCounter < 400) {
400                     if (runNow.get()) {
401                         zk3.create("/mytestfoo", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, (rc, path, ctx, name) -> {
402                             pending.decrementAndGet();
403                             counter.incrementAndGet();
404                             if (rc != 0) {
405                                 errors.incrementAndGet();
406                             }
407                             if (counter.get() > 7300) {
408                                 sem.release();
409                             }
410                         }, null);
411                         pending.incrementAndGet();
412                         try {
413                             Thread.sleep(10);
414                         } catch (Exception e) {
415                         }
416                         inSyncCounter++;
417                     } else {
418                         Thread.yield();
419                     }
420                 }
421 
422             }
423         });
424 
425         mytestfooThread.start();
426         for (int i = 0; i < 5000; i++) {
427             zk2.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, (rc, path, ctx, name) -> {
428                 pending.decrementAndGet();
429                 counter.incrementAndGet();
430                 if (rc != 0) {
431                     errors.incrementAndGet();
432                 }
433                 if (counter.get() > 7300) {
434                     sem.release();
435                 }
436             }, null);
437             pending.incrementAndGet();
438             if (i == 1000) {
439                 qu.shutdown(index);
440                 Thread.sleep(1100);
441                 LOG.info("Shutting down s1");
442             }
443             if (i == 1100 || i == 1150 || i == 1200) {
444                 Thread.sleep(1000);
445             }
446 
447             if (i == 1200) {
448                 qu.startThenShutdown(index);
449                 runNow.set(true);
450                 qu.restart(index);
451                 LOG.info("Setting up server: {}", index);
452             }
453 
454             if (i >= 1000 && i % 2 == 0) {
455                 zk3.create("/newbaz", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, (rc, path, ctx, name) -> {
456                     pending.decrementAndGet();
457                     counter.incrementAndGet();
458                     if (rc != 0) {
459                         errors.incrementAndGet();
460                     }
461                     if (counter.get() > 7300) {
462                         sem.release();
463                     }
464                 }, null);
465                 pending.incrementAndGet();
466             }
467             if (i == 1050 || i == 1100 || i == 1150) {
468                 Thread.sleep(1000);
469             }
470         }
471 
472         // Wait until all updates return
473         if (!sem.tryAcquire(ClientBase.CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)) {
474             LOG.warn("Did not aquire semaphore fast enough");
475         }
476         mytestfooThread.join(ClientBase.CONNECTION_TIMEOUT);
477         if (mytestfooThread.isAlive()) {
478             LOG.error("mytestfooThread is still alive");
479         }
480 
481         assertTrue(waitForPendingRequests(60));
482         assertTrue(waitForSync(qu, index, 10));
483         // Verify that server is following and has the same epoch as the leader
484 
485         verifyState(qu, index, leader);
486 
487         zk1.close();
488         zk2.close();
489         zk3.close();
490 
491         qu.shutdownAll();
492     }
493 
createClient(int port, CountdownWatcher watcher)494     private static DisconnectableZooKeeper createClient(int port, CountdownWatcher watcher) throws IOException, TimeoutException, InterruptedException {
495         DisconnectableZooKeeper zk = new DisconnectableZooKeeper(
496                 "127.0.0.1:" + port,
497                 ClientBase.CONNECTION_TIMEOUT,
498                 watcher);
499 
500         watcher.waitForConnected(CONNECTION_TIMEOUT);
501         return zk;
502     }
503 
504     /**
505      * Wait for all async operation to return. So we know that we can start
506      * verifying the state
507      */
waitForPendingRequests(int timeout)508     private boolean waitForPendingRequests(int timeout) throws InterruptedException {
509         LOG.info("Wait for pending requests: {}", pending.get());
510         for (int i = 0; i < timeout; ++i) {
511             Thread.sleep(1000);
512             if (pending.get() == 0) {
513                 return true;
514             }
515         }
516         LOG.info("Timeout waiting for pending requests: {}", pending.get());
517         return false;
518     }
519 
520     /**
521      * Wait for all server to have the same lastProccessedZxid. Timeout in seconds
522      */
waitForSync(QuorumUtil qu, int index, int timeout)523     private boolean waitForSync(QuorumUtil qu, int index, int timeout) throws InterruptedException {
524         LOG.info("Wait for server to sync");
525         int leaderIndex = (index == 1) ? 2 : 1;
526         ZKDatabase restartedDb = qu.getPeer(index).peer.getActiveServer().getZKDatabase();
527         ZKDatabase cleanDb = qu.getPeer(3).peer.getActiveServer().getZKDatabase();
528         ZKDatabase leadDb = qu.getPeer(leaderIndex).peer.getActiveServer().getZKDatabase();
529         long leadZxid = 0;
530         long cleanZxid = 0;
531         long restartedZxid = 0;
532         for (int i = 0; i < timeout; ++i) {
533             leadZxid = leadDb.getDataTreeLastProcessedZxid();
534             cleanZxid = cleanDb.getDataTreeLastProcessedZxid();
535             restartedZxid = restartedDb.getDataTreeLastProcessedZxid();
536             if (leadZxid == cleanZxid && leadZxid == restartedZxid) {
537                 return true;
538             }
539             Thread.sleep(1000);
540         }
541         LOG.info(
542             "Timeout waiting for zxid to sync: leader 0x{} clean 0x{} restarted 0x{}",
543             Long.toHexString(leadZxid),
544             Long.toHexString(cleanZxid),
545             Long.toHexString(restartedZxid));
546         return false;
547     }
548 
createTestableClient(String hp)549     private static TestableZooKeeper createTestableClient(String hp) throws IOException, TimeoutException, InterruptedException {
550         CountdownWatcher watcher = new CountdownWatcher();
551         return createTestableClient(watcher, hp);
552     }
553 
createTestableClient( CountdownWatcher watcher, String hp)554     private static TestableZooKeeper createTestableClient(
555             CountdownWatcher watcher, String hp) throws IOException, TimeoutException, InterruptedException {
556         TestableZooKeeper zk = new TestableZooKeeper(hp, ClientBase.CONNECTION_TIMEOUT, watcher);
557 
558         watcher.waitForConnected(CONNECTION_TIMEOUT);
559         return zk;
560     }
561 
verifyState(QuorumUtil qu, int index, Leader leader)562     private void verifyState(QuorumUtil qu, int index, Leader leader) {
563         LOG.info("Verifying state");
564         assertTrue(qu.getPeer(index).peer.follower != null, "Not following");
565         long epochF = (qu.getPeer(index).peer.getActiveServer().getZxid() >> 32L);
566         long epochL = (leader.getEpoch() >> 32L);
567         assertTrue(epochF == epochL,
568                 "Zxid: " + qu.getPeer(index).peer.getActiveServer().getZKDatabase().getDataTreeLastProcessedZxid()
569                 + "Current epoch: " + epochF);
570         int leaderIndex = (index == 1) ? 2 : 1;
571         Collection<Long> sessionsRestarted = qu.getPeer(index).peer.getActiveServer().getZKDatabase().getSessions();
572         Collection<Long> sessionsNotRestarted = qu.getPeer(leaderIndex).peer.getActiveServer().getZKDatabase().getSessions();
573 
574         for (Long l : sessionsRestarted) {
575             assertTrue(sessionsNotRestarted.contains(l), "Should have same set of sessions in both servers, did not expect: " + l);
576         }
577         assertEquals(sessionsNotRestarted.size(), sessionsRestarted.size(), "Should have same number of sessions");
578         ZKDatabase restarted = qu.getPeer(index).peer.getActiveServer().getZKDatabase();
579         ZKDatabase clean = qu.getPeer(3).peer.getActiveServer().getZKDatabase();
580         ZKDatabase lead = qu.getPeer(leaderIndex).peer.getActiveServer().getZKDatabase();
581         for (Long l : sessionsRestarted) {
582             LOG.info("Validating ephemeral for session id 0x{}", Long.toHexString(l));
583             assertTrue(sessionsNotRestarted.contains(l), "Should have same set of sessions in both servers, did not expect: " + l);
584             Set<String> ephemerals = restarted.getEphemerals(l);
585             Set<String> cleanEphemerals = clean.getEphemerals(l);
586             for (String o : cleanEphemerals) {
587                 if (!ephemerals.contains(o)) {
588                     LOG.info("Restarted follower doesn't contain ephemeral {} zxid 0x{}", o, Long.toHexString(clean.getDataTree().getNode(o).stat.getMzxid()));
589                 }
590             }
591             for (String o : ephemerals) {
592                 if (!cleanEphemerals.contains(o)) {
593                     LOG.info("Restarted follower has extra ephemeral {} zxid 0x{}", o, Long.toHexString(restarted.getDataTree().getNode(o).stat.getMzxid()));
594                 }
595             }
596             Set<String> leadEphemerals = lead.getEphemerals(l);
597             for (String o : leadEphemerals) {
598                 if (!cleanEphemerals.contains(o)) {
599                     LOG.info("Follower doesn't contain ephemeral from leader {} zxid 0x{}", o, Long.toHexString(lead.getDataTree().getNode(o).stat.getMzxid()));
600                 }
601             }
602             for (String o : cleanEphemerals) {
603                 if (!leadEphemerals.contains(o)) {
604                     LOG.info("Leader doesn't contain ephemeral from follower {} zxid 0x{}", o, Long.toHexString(clean.getDataTree().getNode(o).stat.getMzxid()));
605                 }
606             }
607             assertEquals(ephemerals.size(), cleanEphemerals.size(), "Should have same number of ephemerals in both followers");
608             assertEquals(lead.getEphemerals(l).size(), cleanEphemerals.size(), "Leader should equal follower");
609         }
610     }
611 
612     /**
613      * Verify that the server is sending the proper zxid. See ZOOKEEPER-1412.
614      */
615     @Test
testFollowerSendsLastZxid()616     public void testFollowerSendsLastZxid() throws Exception {
617         QuorumUtil qu = new QuorumUtil(1);
618         qu.startAll();
619 
620         int index = 1;
621         while (qu.getPeer(index).peer.follower == null) {
622             index++;
623         }
624         LOG.info("Connecting to follower: {}", index);
625 
626         TestableZooKeeper zk = createTestableClient("localhost:" + qu.getPeer(index).peer.getClientPort());
627 
628         assertEquals(0L, zk.testableLastZxid());
629         zk.exists("/", false);
630         long lzxid = zk.testableLastZxid();
631         assertTrue(lzxid > 0, "lzxid:" + lzxid + " > 0");
632         zk.close();
633         qu.shutdownAll();
634     }
635 
636     private class MyWatcher extends CountdownWatcher {
637 
638         LinkedBlockingQueue<WatchedEvent> events = new LinkedBlockingQueue<WatchedEvent>();
639 
process(WatchedEvent event)640         public void process(WatchedEvent event) {
641             super.process(event);
642             if (event.getType() != Event.EventType.None) {
643                 try {
644                     events.put(event);
645                 } catch (InterruptedException e) {
646                     LOG.warn("ignoring interrupt during event.put");
647                 }
648             }
649         }
650 
651     }
652 
653     /**
654      * Verify that the server is sending the proper zxid, and as a result
655      * the watch doesn't fire. See ZOOKEEPER-1412.
656      */
657     @Test
testFollowerWatcherResync()658     public void testFollowerWatcherResync() throws Exception {
659         QuorumUtil qu = new QuorumUtil(1);
660         qu.startAll();
661 
662         int index = 1;
663         while (qu.getPeer(index).peer.follower == null) {
664             index++;
665         }
666         LOG.info("Connecting to follower: {}", index);
667 
668         TestableZooKeeper zk1 = createTestableClient("localhost:" + qu.getPeer(index).peer.getClientPort());
669         zk1.create("/foo", "foo".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
670 
671         MyWatcher watcher = new MyWatcher();
672         TestableZooKeeper zk2 = createTestableClient(watcher, "localhost:" + qu.getPeer(index).peer.getClientPort());
673 
674         zk2.exists("/foo", true);
675 
676         watcher.reset();
677         zk2.testableConnloss();
678         if (!watcher.clientConnected.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)) {
679             fail("Unable to connect to server");
680         }
681         assertArrayEquals("foo".getBytes(), zk2.getData("/foo", false, null));
682 
683         assertNull(watcher.events.poll(5, TimeUnit.SECONDS));
684 
685         zk1.close();
686         zk2.close();
687         qu.shutdownAll();
688     }
689 
690 }
691