1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.zookeeper.test; 20 21 import static org.junit.jupiter.api.Assertions.assertArrayEquals; 22 import static org.junit.jupiter.api.Assertions.assertEquals; 23 import static org.junit.jupiter.api.Assertions.assertNotNull; 24 import static org.junit.jupiter.api.Assertions.assertNull; 25 import static org.junit.jupiter.api.Assertions.assertTrue; 26 import static org.junit.jupiter.api.Assertions.fail; 27 import java.io.IOException; 28 import java.util.Collection; 29 import java.util.Set; 30 import java.util.concurrent.LinkedBlockingQueue; 31 import java.util.concurrent.Semaphore; 32 import java.util.concurrent.TimeUnit; 33 import java.util.concurrent.TimeoutException; 34 import java.util.concurrent.atomic.AtomicBoolean; 35 import java.util.concurrent.atomic.AtomicInteger; 36 import org.apache.zookeeper.CreateMode; 37 import org.apache.zookeeper.KeeperException; 38 import org.apache.zookeeper.TestableZooKeeper; 39 import org.apache.zookeeper.WatchedEvent; 40 import org.apache.zookeeper.ZKTestCase; 41 import org.apache.zookeeper.ZooDefs; 42 import org.apache.zookeeper.ZooDefs.Ids; 43 import org.apache.zookeeper.ZooKeeper; 44 import org.apache.zookeeper.server.ZKDatabase; 45 import org.apache.zookeeper.server.quorum.Leader; 46 import org.apache.zookeeper.test.ClientBase.CountdownWatcher; 47 import org.junit.jupiter.api.AfterEach; 48 import org.junit.jupiter.api.BeforeEach; 49 import org.junit.jupiter.api.Test; 50 import org.slf4j.Logger; 51 import org.slf4j.LoggerFactory; 52 53 public class FollowerResyncConcurrencyTest extends ZKTestCase { 54 55 private static final Logger LOG = LoggerFactory.getLogger(FollowerResyncConcurrencyTest.class); 56 public static final long CONNECTION_TIMEOUT = ClientTest.CONNECTION_TIMEOUT; 57 58 private AtomicInteger counter = new AtomicInteger(0); 59 private AtomicInteger errors = new AtomicInteger(0); 60 /** 61 * Keep track of pending async operations, we shouldn't start verifying 62 * the state until pending operation is 0 63 */ 64 private AtomicInteger pending = new AtomicInteger(0); 65 66 @BeforeEach setUp()67 public void setUp() throws Exception { 68 pending.set(0); 69 errors.set(0); 70 counter.set(0); 71 } 72 73 @AfterEach tearDown()74 public void tearDown() throws Exception { 75 LOG.info("Error count {}", errors.get()); 76 } 77 78 /** 79 * See ZOOKEEPER-1319 - verify that a lagging follwer resyncs correctly 80 * 81 * 1) start with down quorum 82 * 2) start leader/follower1, add some data 83 * 3) restart leader/follower1 84 * 4) start follower2 85 * 5) verify data consistency across the ensemble 86 * 87 * @throws Exception 88 */ 89 @Test testLaggingFollowerResyncsUnderNewEpoch()90 public void testLaggingFollowerResyncsUnderNewEpoch() throws Exception { 91 CountdownWatcher watcher1 = new CountdownWatcher(); 92 CountdownWatcher watcher2 = new CountdownWatcher(); 93 CountdownWatcher watcher3 = new CountdownWatcher(); 94 95 QuorumUtil qu = new QuorumUtil(1); 96 qu.shutdownAll(); 97 98 qu.start(1); 99 qu.start(2); 100 assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + qu.getPeer(1).clientPort, ClientBase.CONNECTION_TIMEOUT), 101 "Waiting for server up"); 102 assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + qu.getPeer(2).clientPort, ClientBase.CONNECTION_TIMEOUT), 103 "Waiting for server up"); 104 105 ZooKeeper zk1 = createClient(qu.getPeer(1).peer.getClientPort(), watcher1); 106 LOG.info("zk1 has session id 0x{}", Long.toHexString(zk1.getSessionId())); 107 108 final String resyncPath = "/resyncundernewepoch"; 109 zk1.create(resyncPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 110 zk1.close(); 111 112 qu.shutdown(1); 113 qu.shutdown(2); 114 assertTrue(ClientBase.waitForServerDown("127.0.0.1:" + qu.getPeer(1).clientPort, ClientBase.CONNECTION_TIMEOUT), 115 "Waiting for server down"); 116 assertTrue(ClientBase.waitForServerDown("127.0.0.1:" + qu.getPeer(2).clientPort, ClientBase.CONNECTION_TIMEOUT), 117 "Waiting for server down"); 118 119 qu.start(1); 120 qu.start(2); 121 assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + qu.getPeer(1).clientPort, ClientBase.CONNECTION_TIMEOUT), 122 "Waiting for server up"); 123 assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + qu.getPeer(2).clientPort, ClientBase.CONNECTION_TIMEOUT), 124 "Waiting for server up"); 125 126 qu.start(3); 127 assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + qu.getPeer(3).clientPort, ClientBase.CONNECTION_TIMEOUT), 128 "Waiting for server up"); 129 130 zk1 = createClient(qu.getPeer(1).peer.getClientPort(), watcher1); 131 LOG.info("zk1 has session id 0x{}", Long.toHexString(zk1.getSessionId())); 132 133 assertNotNull(zk1.exists(resyncPath, false), "zk1 has data"); 134 135 final ZooKeeper zk2 = createClient(qu.getPeer(2).peer.getClientPort(), watcher2); 136 LOG.info("zk2 has session id 0x{}", Long.toHexString(zk2.getSessionId())); 137 138 assertNotNull(zk2.exists(resyncPath, false), "zk2 has data"); 139 140 final ZooKeeper zk3 = createClient(qu.getPeer(3).peer.getClientPort(), watcher3); 141 LOG.info("zk3 has session id 0x{}", Long.toHexString(zk3.getSessionId())); 142 143 assertNotNull(zk3.exists(resyncPath, false), "zk3 has data"); 144 145 zk1.close(); 146 zk2.close(); 147 zk3.close(); 148 149 qu.shutdownAll(); 150 } 151 152 /** 153 * See ZOOKEEPER-962. This tests for one of the bugs hit while fixing this, 154 * setting the ZXID of the SNAP packet 155 * Starts up 3 ZKs. Shut down F1, write a node, restart the one that was shut down 156 * The non-leader ZKs are writing to cluster 157 * Shut down F1 again 158 * Restart after sessions are expired, expect to get a snap file 159 * Shut down, run some transactions through. 160 * Restart to a diff while transactions are running in leader 161 * @throws IOException 162 * @throws InterruptedException 163 * @throws KeeperException 164 */ 165 @Test testResyncBySnapThenDiffAfterFollowerCrashes()166 public void testResyncBySnapThenDiffAfterFollowerCrashes() throws Throwable { 167 followerResyncCrashTest(false); 168 } 169 170 /** 171 * Same as testResyncBySnapThenDiffAfterFollowerCrashes() but we resync 172 * follower using txnlog 173 * 174 * @throws IOException 175 * @throws InterruptedException 176 * @throws KeeperException 177 */ 178 @Test testResyncByTxnlogThenDiffAfterFollowerCrashes()179 public void testResyncByTxnlogThenDiffAfterFollowerCrashes() throws Throwable { 180 followerResyncCrashTest(true); 181 } 182 followerResyncCrashTest(boolean useTxnLogResync)183 public void followerResyncCrashTest(boolean useTxnLogResync) throws Throwable { 184 final Semaphore sem = new Semaphore(0); 185 186 QuorumUtil qu = new QuorumUtil(1); 187 qu.startAll(); 188 CountdownWatcher watcher1 = new CountdownWatcher(); 189 CountdownWatcher watcher2 = new CountdownWatcher(); 190 CountdownWatcher watcher3 = new CountdownWatcher(); 191 192 int index = 1; 193 while (qu.getPeer(index).peer.leader == null) { 194 index++; 195 } 196 197 Leader leader = qu.getPeer(index).peer.leader; 198 assertNotNull(leader); 199 200 if (useTxnLogResync) { 201 // Set the factor to high value so that this test case always 202 // resync using txnlog 203 qu.getPeer(index).peer.getActiveServer().getZKDatabase().setSnapshotSizeFactor(1000); 204 } else { 205 // Disable sending DIFF using txnlog, so that this test still 206 // testing the ZOOKEEPER-962 bug 207 qu.getPeer(index).peer.getActiveServer().getZKDatabase().setSnapshotSizeFactor(-1); 208 } 209 210 /* Reusing the index variable to select a follower to connect to */ 211 index = (index == 1) ? 2 : 1; 212 LOG.info("Connecting to follower: {}", index); 213 214 qu.shutdown(index); 215 216 final ZooKeeper zk3 = createClient(qu.getPeer(3).peer.getClientPort(), watcher3); 217 LOG.info("zk3 has session id 0x{}", Long.toHexString(zk3.getSessionId())); 218 219 zk3.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); 220 221 qu.restart(index); 222 223 final ZooKeeper zk1 = createClient(qu.getPeer(index).peer.getClientPort(), watcher1); 224 LOG.info("zk1 has session id 0x{}", Long.toHexString(zk1.getSessionId())); 225 226 final ZooKeeper zk2 = createClient(qu.getPeer(index).peer.getClientPort(), watcher2); 227 LOG.info("zk2 has session id 0x{}", Long.toHexString(zk2.getSessionId())); 228 229 zk1.create("/first", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 230 231 // Prepare a thread that will create znodes. 232 Thread mytestfooThread = new Thread(new Runnable() { 233 @Override 234 public void run() { 235 for (int i = 0; i < 3000; i++) { 236 // Here we create 3000 znodes 237 zk3.create("/mytestfoo", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, (rc, path, ctx, name) -> { 238 pending.decrementAndGet(); 239 counter.incrementAndGet(); 240 if (rc != 0) { 241 errors.incrementAndGet(); 242 } 243 if (counter.get() == 16200) { 244 sem.release(); 245 } 246 }, null); 247 pending.incrementAndGet(); 248 if (i % 10 == 0) { 249 try { 250 Thread.sleep(100); 251 } catch (Exception e) { 252 253 } 254 } 255 } 256 257 } 258 }); 259 260 // Here we start populating the server and shutdown the follower after 261 // initial data is written. 262 for (int i = 0; i < 13000; i++) { 263 // Here we create 13000 znodes 264 zk3.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, (rc, path, ctx, name) -> { 265 pending.decrementAndGet(); 266 counter.incrementAndGet(); 267 if (rc != 0) { 268 errors.incrementAndGet(); 269 } 270 if (counter.get() == 16200) { 271 sem.release(); 272 } 273 }, null); 274 pending.incrementAndGet(); 275 276 if (i == 5000) { 277 qu.shutdown(index); 278 LOG.info("Shutting down s1"); 279 } 280 if (i == 12000) { 281 // Start the prepared thread so that it is writing znodes while 282 // the follower is restarting. On the first restart, the follow 283 // should use txnlog to catchup. For subsequent restart, the 284 // follower should use a diff to catchup. 285 mytestfooThread.start(); 286 LOG.info("Restarting follower: {}", index); 287 qu.restart(index); 288 Thread.sleep(300); 289 LOG.info("Shutdown follower: {}", index); 290 qu.shutdown(index); 291 Thread.sleep(300); 292 LOG.info("Restarting follower: {}", index); 293 qu.restart(index); 294 LOG.info("Setting up server: {}", index); 295 } 296 if ((i % 1000) == 0) { 297 Thread.sleep(1000); 298 } 299 300 if (i % 50 == 0) { 301 zk2.create("/newbaz", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, (rc, path, ctx, name) -> { 302 pending.decrementAndGet(); 303 counter.incrementAndGet(); 304 if (rc != 0) { 305 errors.incrementAndGet(); 306 } 307 if (counter.get() == 16200) { 308 sem.release(); 309 } 310 }, null); 311 pending.incrementAndGet(); 312 } 313 } 314 315 // Wait until all updates return 316 if (!sem.tryAcquire(ClientBase.CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)) { 317 LOG.warn("Did not aquire semaphore fast enough"); 318 } 319 mytestfooThread.join(ClientBase.CONNECTION_TIMEOUT); 320 if (mytestfooThread.isAlive()) { 321 LOG.error("mytestfooThread is still alive"); 322 } 323 assertTrue(waitForPendingRequests(60)); 324 assertTrue(waitForSync(qu, index, 10)); 325 326 verifyState(qu, index, leader); 327 328 zk1.close(); 329 zk2.close(); 330 zk3.close(); 331 332 qu.shutdownAll(); 333 } 334 335 /** 336 * This test: 337 * Starts up 3 ZKs. The non-leader ZKs are writing to cluster 338 * Shut down one of the non-leader ZKs. 339 * Restart after sessions have expired but less than 500 txns have taken place (get a diff) 340 * Shut down immediately after restarting, start running separate thread with other transactions 341 * Restart to a diff while transactions are running in leader 342 * 343 * 344 * Before fixes for ZOOKEEPER-962, restarting off of diff could get an inconsistent view of data missing transactions that 345 * completed during diff syncing. Follower would also be considered "restarted" before all forwarded transactions 346 * were completely processed, so restarting would cause a snap file with a too-high zxid to be written, and transactions 347 * would be missed 348 * 349 * This test should pretty reliably catch the failure of restarting the server before all diff messages have been processed, 350 * however, due to the transient nature of the system it may not catch failures due to concurrent processing of transactions 351 * during the leader's diff forwarding. 352 * 353 * @throws IOException 354 * @throws InterruptedException 355 * @throws KeeperException 356 * @throws Throwable 357 */ 358 359 @Test testResyncByDiffAfterFollowerCrashes()360 public void testResyncByDiffAfterFollowerCrashes() throws IOException, InterruptedException, KeeperException, Throwable { 361 final Semaphore sem = new Semaphore(0); 362 363 QuorumUtil qu = new QuorumUtil(1); 364 qu.startAll(); 365 CountdownWatcher watcher1 = new CountdownWatcher(); 366 CountdownWatcher watcher2 = new CountdownWatcher(); 367 CountdownWatcher watcher3 = new CountdownWatcher(); 368 369 int index = 1; 370 while (qu.getPeer(index).peer.leader == null) { 371 index++; 372 } 373 374 Leader leader = qu.getPeer(index).peer.leader; 375 assertNotNull(leader); 376 377 /* Reusing the index variable to select a follower to connect to */ 378 index = (index == 1) ? 2 : 1; 379 LOG.info("Connecting to follower: {}", index); 380 381 final ZooKeeper zk1 = createClient(qu.getPeer(index).peer.getClientPort(), watcher1); 382 LOG.info("zk1 has session id 0x{}", Long.toHexString(zk1.getSessionId())); 383 384 final ZooKeeper zk2 = createClient(qu.getPeer(index).peer.getClientPort(), watcher2); 385 LOG.info("zk2 has session id 0x{}", Long.toHexString(zk2.getSessionId())); 386 387 final ZooKeeper zk3 = createClient(qu.getPeer(3).peer.getClientPort(), watcher3); 388 LOG.info("zk3 has session id 0x{}", Long.toHexString(zk3.getSessionId())); 389 390 zk1.create("/first", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 391 zk2.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); 392 393 final AtomicBoolean runNow = new AtomicBoolean(false); 394 Thread mytestfooThread = new Thread(new Runnable() { 395 396 @Override 397 public void run() { 398 int inSyncCounter = 0; 399 while (inSyncCounter < 400) { 400 if (runNow.get()) { 401 zk3.create("/mytestfoo", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, (rc, path, ctx, name) -> { 402 pending.decrementAndGet(); 403 counter.incrementAndGet(); 404 if (rc != 0) { 405 errors.incrementAndGet(); 406 } 407 if (counter.get() > 7300) { 408 sem.release(); 409 } 410 }, null); 411 pending.incrementAndGet(); 412 try { 413 Thread.sleep(10); 414 } catch (Exception e) { 415 } 416 inSyncCounter++; 417 } else { 418 Thread.yield(); 419 } 420 } 421 422 } 423 }); 424 425 mytestfooThread.start(); 426 for (int i = 0; i < 5000; i++) { 427 zk2.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, (rc, path, ctx, name) -> { 428 pending.decrementAndGet(); 429 counter.incrementAndGet(); 430 if (rc != 0) { 431 errors.incrementAndGet(); 432 } 433 if (counter.get() > 7300) { 434 sem.release(); 435 } 436 }, null); 437 pending.incrementAndGet(); 438 if (i == 1000) { 439 qu.shutdown(index); 440 Thread.sleep(1100); 441 LOG.info("Shutting down s1"); 442 } 443 if (i == 1100 || i == 1150 || i == 1200) { 444 Thread.sleep(1000); 445 } 446 447 if (i == 1200) { 448 qu.startThenShutdown(index); 449 runNow.set(true); 450 qu.restart(index); 451 LOG.info("Setting up server: {}", index); 452 } 453 454 if (i >= 1000 && i % 2 == 0) { 455 zk3.create("/newbaz", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, (rc, path, ctx, name) -> { 456 pending.decrementAndGet(); 457 counter.incrementAndGet(); 458 if (rc != 0) { 459 errors.incrementAndGet(); 460 } 461 if (counter.get() > 7300) { 462 sem.release(); 463 } 464 }, null); 465 pending.incrementAndGet(); 466 } 467 if (i == 1050 || i == 1100 || i == 1150) { 468 Thread.sleep(1000); 469 } 470 } 471 472 // Wait until all updates return 473 if (!sem.tryAcquire(ClientBase.CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)) { 474 LOG.warn("Did not aquire semaphore fast enough"); 475 } 476 mytestfooThread.join(ClientBase.CONNECTION_TIMEOUT); 477 if (mytestfooThread.isAlive()) { 478 LOG.error("mytestfooThread is still alive"); 479 } 480 481 assertTrue(waitForPendingRequests(60)); 482 assertTrue(waitForSync(qu, index, 10)); 483 // Verify that server is following and has the same epoch as the leader 484 485 verifyState(qu, index, leader); 486 487 zk1.close(); 488 zk2.close(); 489 zk3.close(); 490 491 qu.shutdownAll(); 492 } 493 createClient(int port, CountdownWatcher watcher)494 private static DisconnectableZooKeeper createClient(int port, CountdownWatcher watcher) throws IOException, TimeoutException, InterruptedException { 495 DisconnectableZooKeeper zk = new DisconnectableZooKeeper( 496 "127.0.0.1:" + port, 497 ClientBase.CONNECTION_TIMEOUT, 498 watcher); 499 500 watcher.waitForConnected(CONNECTION_TIMEOUT); 501 return zk; 502 } 503 504 /** 505 * Wait for all async operation to return. So we know that we can start 506 * verifying the state 507 */ waitForPendingRequests(int timeout)508 private boolean waitForPendingRequests(int timeout) throws InterruptedException { 509 LOG.info("Wait for pending requests: {}", pending.get()); 510 for (int i = 0; i < timeout; ++i) { 511 Thread.sleep(1000); 512 if (pending.get() == 0) { 513 return true; 514 } 515 } 516 LOG.info("Timeout waiting for pending requests: {}", pending.get()); 517 return false; 518 } 519 520 /** 521 * Wait for all server to have the same lastProccessedZxid. Timeout in seconds 522 */ waitForSync(QuorumUtil qu, int index, int timeout)523 private boolean waitForSync(QuorumUtil qu, int index, int timeout) throws InterruptedException { 524 LOG.info("Wait for server to sync"); 525 int leaderIndex = (index == 1) ? 2 : 1; 526 ZKDatabase restartedDb = qu.getPeer(index).peer.getActiveServer().getZKDatabase(); 527 ZKDatabase cleanDb = qu.getPeer(3).peer.getActiveServer().getZKDatabase(); 528 ZKDatabase leadDb = qu.getPeer(leaderIndex).peer.getActiveServer().getZKDatabase(); 529 long leadZxid = 0; 530 long cleanZxid = 0; 531 long restartedZxid = 0; 532 for (int i = 0; i < timeout; ++i) { 533 leadZxid = leadDb.getDataTreeLastProcessedZxid(); 534 cleanZxid = cleanDb.getDataTreeLastProcessedZxid(); 535 restartedZxid = restartedDb.getDataTreeLastProcessedZxid(); 536 if (leadZxid == cleanZxid && leadZxid == restartedZxid) { 537 return true; 538 } 539 Thread.sleep(1000); 540 } 541 LOG.info( 542 "Timeout waiting for zxid to sync: leader 0x{} clean 0x{} restarted 0x{}", 543 Long.toHexString(leadZxid), 544 Long.toHexString(cleanZxid), 545 Long.toHexString(restartedZxid)); 546 return false; 547 } 548 createTestableClient(String hp)549 private static TestableZooKeeper createTestableClient(String hp) throws IOException, TimeoutException, InterruptedException { 550 CountdownWatcher watcher = new CountdownWatcher(); 551 return createTestableClient(watcher, hp); 552 } 553 createTestableClient( CountdownWatcher watcher, String hp)554 private static TestableZooKeeper createTestableClient( 555 CountdownWatcher watcher, String hp) throws IOException, TimeoutException, InterruptedException { 556 TestableZooKeeper zk = new TestableZooKeeper(hp, ClientBase.CONNECTION_TIMEOUT, watcher); 557 558 watcher.waitForConnected(CONNECTION_TIMEOUT); 559 return zk; 560 } 561 verifyState(QuorumUtil qu, int index, Leader leader)562 private void verifyState(QuorumUtil qu, int index, Leader leader) { 563 LOG.info("Verifying state"); 564 assertTrue(qu.getPeer(index).peer.follower != null, "Not following"); 565 long epochF = (qu.getPeer(index).peer.getActiveServer().getZxid() >> 32L); 566 long epochL = (leader.getEpoch() >> 32L); 567 assertTrue(epochF == epochL, 568 "Zxid: " + qu.getPeer(index).peer.getActiveServer().getZKDatabase().getDataTreeLastProcessedZxid() 569 + "Current epoch: " + epochF); 570 int leaderIndex = (index == 1) ? 2 : 1; 571 Collection<Long> sessionsRestarted = qu.getPeer(index).peer.getActiveServer().getZKDatabase().getSessions(); 572 Collection<Long> sessionsNotRestarted = qu.getPeer(leaderIndex).peer.getActiveServer().getZKDatabase().getSessions(); 573 574 for (Long l : sessionsRestarted) { 575 assertTrue(sessionsNotRestarted.contains(l), "Should have same set of sessions in both servers, did not expect: " + l); 576 } 577 assertEquals(sessionsNotRestarted.size(), sessionsRestarted.size(), "Should have same number of sessions"); 578 ZKDatabase restarted = qu.getPeer(index).peer.getActiveServer().getZKDatabase(); 579 ZKDatabase clean = qu.getPeer(3).peer.getActiveServer().getZKDatabase(); 580 ZKDatabase lead = qu.getPeer(leaderIndex).peer.getActiveServer().getZKDatabase(); 581 for (Long l : sessionsRestarted) { 582 LOG.info("Validating ephemeral for session id 0x{}", Long.toHexString(l)); 583 assertTrue(sessionsNotRestarted.contains(l), "Should have same set of sessions in both servers, did not expect: " + l); 584 Set<String> ephemerals = restarted.getEphemerals(l); 585 Set<String> cleanEphemerals = clean.getEphemerals(l); 586 for (String o : cleanEphemerals) { 587 if (!ephemerals.contains(o)) { 588 LOG.info("Restarted follower doesn't contain ephemeral {} zxid 0x{}", o, Long.toHexString(clean.getDataTree().getNode(o).stat.getMzxid())); 589 } 590 } 591 for (String o : ephemerals) { 592 if (!cleanEphemerals.contains(o)) { 593 LOG.info("Restarted follower has extra ephemeral {} zxid 0x{}", o, Long.toHexString(restarted.getDataTree().getNode(o).stat.getMzxid())); 594 } 595 } 596 Set<String> leadEphemerals = lead.getEphemerals(l); 597 for (String o : leadEphemerals) { 598 if (!cleanEphemerals.contains(o)) { 599 LOG.info("Follower doesn't contain ephemeral from leader {} zxid 0x{}", o, Long.toHexString(lead.getDataTree().getNode(o).stat.getMzxid())); 600 } 601 } 602 for (String o : cleanEphemerals) { 603 if (!leadEphemerals.contains(o)) { 604 LOG.info("Leader doesn't contain ephemeral from follower {} zxid 0x{}", o, Long.toHexString(clean.getDataTree().getNode(o).stat.getMzxid())); 605 } 606 } 607 assertEquals(ephemerals.size(), cleanEphemerals.size(), "Should have same number of ephemerals in both followers"); 608 assertEquals(lead.getEphemerals(l).size(), cleanEphemerals.size(), "Leader should equal follower"); 609 } 610 } 611 612 /** 613 * Verify that the server is sending the proper zxid. See ZOOKEEPER-1412. 614 */ 615 @Test testFollowerSendsLastZxid()616 public void testFollowerSendsLastZxid() throws Exception { 617 QuorumUtil qu = new QuorumUtil(1); 618 qu.startAll(); 619 620 int index = 1; 621 while (qu.getPeer(index).peer.follower == null) { 622 index++; 623 } 624 LOG.info("Connecting to follower: {}", index); 625 626 TestableZooKeeper zk = createTestableClient("localhost:" + qu.getPeer(index).peer.getClientPort()); 627 628 assertEquals(0L, zk.testableLastZxid()); 629 zk.exists("/", false); 630 long lzxid = zk.testableLastZxid(); 631 assertTrue(lzxid > 0, "lzxid:" + lzxid + " > 0"); 632 zk.close(); 633 qu.shutdownAll(); 634 } 635 636 private class MyWatcher extends CountdownWatcher { 637 638 LinkedBlockingQueue<WatchedEvent> events = new LinkedBlockingQueue<WatchedEvent>(); 639 process(WatchedEvent event)640 public void process(WatchedEvent event) { 641 super.process(event); 642 if (event.getType() != Event.EventType.None) { 643 try { 644 events.put(event); 645 } catch (InterruptedException e) { 646 LOG.warn("ignoring interrupt during event.put"); 647 } 648 } 649 } 650 651 } 652 653 /** 654 * Verify that the server is sending the proper zxid, and as a result 655 * the watch doesn't fire. See ZOOKEEPER-1412. 656 */ 657 @Test testFollowerWatcherResync()658 public void testFollowerWatcherResync() throws Exception { 659 QuorumUtil qu = new QuorumUtil(1); 660 qu.startAll(); 661 662 int index = 1; 663 while (qu.getPeer(index).peer.follower == null) { 664 index++; 665 } 666 LOG.info("Connecting to follower: {}", index); 667 668 TestableZooKeeper zk1 = createTestableClient("localhost:" + qu.getPeer(index).peer.getClientPort()); 669 zk1.create("/foo", "foo".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 670 671 MyWatcher watcher = new MyWatcher(); 672 TestableZooKeeper zk2 = createTestableClient(watcher, "localhost:" + qu.getPeer(index).peer.getClientPort()); 673 674 zk2.exists("/foo", true); 675 676 watcher.reset(); 677 zk2.testableConnloss(); 678 if (!watcher.clientConnected.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)) { 679 fail("Unable to connect to server"); 680 } 681 assertArrayEquals("foo".getBytes(), zk2.getData("/foo", false, null)); 682 683 assertNull(watcher.events.poll(5, TimeUnit.SECONDS)); 684 685 zk1.close(); 686 zk2.close(); 687 qu.shutdownAll(); 688 } 689 690 } 691