1 /*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2002, 2014 Oracle and/or its affiliates. All rights reserved. 5 * 6 */ 7 8 package com.sleepycat.je.rep.impl.node; 9 10 import static com.sleepycat.je.rep.impl.node.ReplicaStatDefinition.N_LAG_CONSISTENCY_WAITS; 11 import static com.sleepycat.je.rep.impl.node.ReplicaStatDefinition.N_LAG_CONSISTENCY_WAIT_MS; 12 import static com.sleepycat.je.rep.impl.node.ReplicaStatDefinition.N_VLSN_CONSISTENCY_WAITS; 13 import static com.sleepycat.je.rep.impl.node.ReplicaStatDefinition.N_VLSN_CONSISTENCY_WAIT_MS; 14 15 import java.io.IOException; 16 import java.net.ConnectException; 17 import java.nio.channels.ClosedByInterruptException; 18 import java.util.Set; 19 import java.util.SortedMap; 20 import java.util.TreeMap; 21 import java.util.concurrent.ArrayBlockingQueue; 22 import java.util.concurrent.BlockingQueue; 23 import java.util.concurrent.TimeUnit; 24 import java.util.logging.Level; 25 import java.util.logging.Logger; 26 27 import com.sleepycat.je.CheckpointConfig; 28 import com.sleepycat.je.DatabaseException; 29 import com.sleepycat.je.EnvironmentFailureException; 30 import com.sleepycat.je.ReplicaConsistencyPolicy; 31 import com.sleepycat.je.StatsConfig; 32 import com.sleepycat.je.dbi.DbConfigManager; 33 import com.sleepycat.je.dbi.EnvironmentImpl; 34 import com.sleepycat.je.rep.CommitPointConsistencyPolicy; 35 import com.sleepycat.je.rep.GroupShutdownException; 36 import com.sleepycat.je.rep.InsufficientLogException; 37 import com.sleepycat.je.rep.MasterStateException; 38 import com.sleepycat.je.rep.ReplicaConsistencyException; 39 import com.sleepycat.je.rep.ReplicatedEnvironment.State; 40 import com.sleepycat.je.rep.RestartRequiredException; 41 import com.sleepycat.je.rep.TimeConsistencyPolicy; 42 import com.sleepycat.je.rep.impl.RepImpl; 43 import com.sleepycat.je.rep.impl.RepParams; 44 import com.sleepycat.je.rep.net.DataChannel; 45 import com.sleepycat.je.rep.net.DataChannelFactory.ConnectOptions; 46 import com.sleepycat.je.rep.stream.MasterStatus.MasterSyncException; 47 import com.sleepycat.je.rep.stream.Protocol; 48 import com.sleepycat.je.rep.stream.Protocol.Heartbeat; 49 import com.sleepycat.je.rep.stream.Protocol.ShutdownRequest; 50 import com.sleepycat.je.rep.stream.ReplicaFeederHandshake; 51 import com.sleepycat.je.rep.stream.ReplicaFeederSyncup; 52 import com.sleepycat.je.rep.stream.ReplicaFeederSyncup.TestHook; 53 import com.sleepycat.je.rep.txn.MasterTxn; 54 import com.sleepycat.je.rep.txn.ReplayTxn; 55 import com.sleepycat.je.rep.utilint.BinaryProtocol.Message; 56 import com.sleepycat.je.rep.utilint.BinaryProtocol.MessageOp; 57 import com.sleepycat.je.rep.utilint.BinaryProtocol.ProtocolException; 58 import com.sleepycat.je.rep.utilint.BinaryProtocolStatDefinition; 59 import com.sleepycat.je.rep.utilint.NamedChannelWithTimeout; 60 import com.sleepycat.je.rep.utilint.RepUtils; 61 import com.sleepycat.je.rep.utilint.RepUtils.ExceptionAwareCountDownLatch; 62 import com.sleepycat.je.rep.utilint.ServiceDispatcher; 63 import com.sleepycat.je.rep.utilint.ServiceDispatcher.Response; 64 import com.sleepycat.je.rep.utilint.ServiceDispatcher.ServiceConnectFailedException; 65 import com.sleepycat.je.utilint.LoggerUtils; 66 import com.sleepycat.je.utilint.LongStat; 67 import com.sleepycat.je.utilint.StatGroup; 68 import com.sleepycat.je.utilint.StoppableThread; 69 import com.sleepycat.je.utilint.TestHookExecute; 70 import com.sleepycat.je.utilint.VLSN; 71 72 /** 73 * The Replica class is the locus of the replay operations and replica 74 * transaction consistency tracking and management operations at a replica 75 * node. 76 * 77 * A single instance of this class is created when the replication node is 78 * created and exists for the lifetime of the replication node, although it is 79 * only really used when the node is operating as a Replica. 80 * 81 * Note that the Replica (like the FeederManager) does not have its own 82 * independent thread of control; it runs in the RepNode's thread. To make the 83 * network I/O as aync as possible, and avoid stalls during network I/O the 84 * input and output are done in separate threads. The overall thread 85 * and queue organization is as sketched below: 86 * 87 * read from network -> RepNodeThread (does read) -> replayQueue 88 * replayQueue -> ReplayThread -> outputQueue 89 * outputQueue -> ReplicaOutputThread (does write) -> writes to network 90 * 91 * This three thread organization has the following benefits over a single 92 * thread replay model: 93 * 94 * 1) It makes the hearbeat mechanism used to determine whether the HA sockets 95 * are in use more reliable. This is because a heartbeat response cannot 96 * be blocked by lock contention in the replay thread, since a heartbeat 97 * can be sent spontaneously (without an explicit heartbeat request from the 98 * feeder) by the ReplicaOutputThread, if a heartbeat had not been sent during 99 * a heartbeat interval period. 100 * 101 * 2) The cpu load in the replay thread is reducde by offloading the 102 * network-specific aspects of the processing to different threads. It's 103 * important to keep the CPU load in this thread at a minimum so we can use 104 * a simple single thread replay scheme. 105 * 106 * 3) Prevents replay thread stalls by input and output buffering in the two 107 * threads on either side of it. 108 * 109 * With jdk 1.7 we could eliminate the use of these threads and switch over to 110 * the new aysnc I/O APIs, but that involves a lot more code restructuring. 111 */ 112 public class Replica { 113 114 /* The Node to which the Replica belongs. */ 115 private final RepNode repNode; 116 private final RepImpl repImpl; 117 118 /* The replay component of the Replica */ 119 private final Replay replay; 120 121 /* The exception that provoked the replica exit. */ 122 private Exception shutdownException = null; 123 124 /* 125 * It's non null when the loop is active. 126 */ 127 private NamedChannelWithTimeout replicaFeederChannel = null; 128 129 /* The consistency component. */ 130 private final ConsistencyTracker consistencyTracker; 131 132 /** 133 * The latest txn-ending (commit or abort) VLSN that we have on this 134 * replica. 135 */ 136 private volatile VLSN txnEndVLSN; 137 138 /* 139 * A test delay introduced in the replica loop to simulate a loaded 140 * replica. The replica waits this amount of time before processing each 141 * message. 142 */ 143 private int testDelayMs = 0; 144 145 /* For testing only - mimic a network partition. */ 146 private boolean dontProcessStream = false; 147 148 /* Number of times to retry on a network connection failure. */ 149 private static final int NETWORK_RETRIES = 2 ; 150 151 /* 152 * Service unavailable retries. These are typically the result of service 153 * request being made before the node is ready to provide them. For 154 * example, the feeder service is only available after a node has 155 * transitioned to becoming the master. 156 */ 157 private static final int SERVICE_UNAVAILABLE_RETRIES = 10; 158 159 /* 160 * The number of ms to wait between above retries, allowing time for the 161 * master to assume its role, and start listening on its port. 162 */ 163 private static final int CONNECT_RETRY_SLEEP_MS = 1000; 164 165 /* 166 * The protocol instance if one is currently in use by the Replica. 167 */ 168 private Protocol protocol = null; 169 170 171 /* 172 * Protocol statistics aggregated across all past protocol instantiations. 173 * It does not include the statistics for the current Protocol object in 174 * use. A node can potentially go through the Replica state multiple time 175 * during it's lifetime. This instance aggregates replica statistics 176 * across all transitions into and out of the Replica state. 177 */ 178 private final StatGroup aggProtoStats; 179 180 /* 181 * Holds the exception that is thrown to indicate that an election is 182 * needed before a hard recovery can proceed. It's set to a non-null value 183 * when the need for a hard recovery is first discovered and is 184 * subsequently cleared after an election is held and before the next 185 * attempt at a syncup with the newly elected master. The election ensures 186 * that the master being used for an actual rollback is current and is not 187 * an isolated master that is out of date, due to a network partition that 188 * has since been resolved. 189 */ 190 private HardRecoveryElectionException hardRecoveryElectionException; 191 192 /* For testing only. */ 193 private TestHook<Object> replicaFeederSyncupHook; 194 private final com.sleepycat.je.utilint.TestHook<Message> replayHook; 195 static private com.sleepycat.je.utilint.TestHook<Message> initialReplayHook; 196 197 /* 198 * A cache of DatabaseImpls for the Replay to speed up DbTree.getId(). 199 * Cleared/invalidated by a heartbeat or if je.rep.dbIdCacheOpCount 200 * operations have gone by, or if any replay operations on Name LNs are 201 * executed. 202 */ 203 private final DbCache dbCache; 204 205 /** 206 * The message queue used for communications between the network read 207 * thread and the replay thread. 208 */ 209 private final BlockingQueue<Message> replayQueue; 210 211 /* 212 * The replica output thread. It's only maintained here as an IV, rather 213 * than as a local variable inside doRunReplicaLoopInternalWork() to 214 * facilitate unit tests and is non null only for for the duration of the 215 * method. 216 */ 217 private volatile ReplicaOutputThread replicaOutputThread; 218 219 private final Logger logger; 220 221 /** 222 * The number of times a message entry could not be inserted into 223 * the queue within the poll period and had to be retried. 224 */ 225 private final LongStat nMessageQueueOverflows; 226 Replica(RepNode repNode, Replay replay)227 Replica(RepNode repNode, Replay replay) { 228 this.repNode = repNode; 229 this.repImpl = repNode.getRepImpl(); 230 DbConfigManager configManager = repNode.getConfigManager(); 231 dbCache = new DbCache(repImpl.getDbTree(), 232 configManager.getInt 233 (RepParams.REPLAY_MAX_OPEN_DB_HANDLES), 234 configManager.getDuration 235 (RepParams.REPLAY_DB_HANDLE_TIMEOUT)); 236 237 consistencyTracker = new ConsistencyTracker(); 238 this.replay = replay; 239 logger = LoggerUtils.getLogger(getClass()); 240 aggProtoStats = 241 new StatGroup(BinaryProtocolStatDefinition.GROUP_NAME, 242 BinaryProtocolStatDefinition.GROUP_DESC); 243 nMessageQueueOverflows = replay.getMessageQueueOverflows(); 244 testDelayMs = 245 repNode.getConfigManager().getInt(RepParams.TEST_REPLICA_DELAY); 246 replayHook = initialReplayHook; 247 248 /* Set up the replay queue. */ 249 final int replayQueueSize = repNode.getConfigManager(). 250 getInt(RepParams.REPLICA_MESSAGE_QUEUE_SIZE); 251 252 replayQueue = new ArrayBlockingQueue<Message>(replayQueueSize); 253 } 254 255 /** 256 * Shutdown the Replica, free any threads that may have been waiting for 257 * the replica to reach some degree of consistency. This method is only 258 * invoked as part of the repnode shutdown. 259 * 260 * If the shutdown is being executed from a different thread, it attempts 261 * to interrupt the thread by first shutting down the channel it may be 262 * waiting on for input from the feeder. The replica thread should notice 263 * the channel shutdown and/or the shutdown state of the rep node itself. 264 * The caller will use harsher methods, like an interrupt, if the rep node 265 * thread (Replica or Feeder) is still active. 266 */ shutdown()267 public void shutdown() { 268 if (!repNode.isShutdown()) { 269 throw EnvironmentFailureException.unexpectedState 270 ("Rep node must have initiated the shutdown."); 271 } 272 consistencyTracker.shutdown(); 273 if (Thread.currentThread() == repNode) { 274 return; 275 } 276 277 /* 278 * Perform the actions to provoke a "soft" shutdown. 279 * 280 * Since the replica shares the RepNode thread, it will take care of 281 * the actual thread shutdown itself. 282 */ 283 284 /* 285 * Shutdown the channel as an attempt to interrupt just the socket 286 * read/write operation. 287 */ 288 RepUtils.shutdownChannel(replicaFeederChannel); 289 290 /* 291 * Clear the latch in case the replica loop is waiting for the outcome 292 * of an election. 293 */ 294 repNode.getVLSNFreezeLatch().clearLatch(); 295 } 296 297 /** 298 * For unit testing only! 299 */ setTestDelayMs(int testDelayMs)300 public void setTestDelayMs(int testDelayMs) { 301 this.testDelayMs = testDelayMs; 302 } 303 getTestDelayMs()304 public int getTestDelayMs() { 305 return testDelayMs; 306 } 307 308 /** 309 * For unit testing only! 310 */ setDontProcessStream()311 public void setDontProcessStream() { 312 dontProcessStream = true; 313 } 314 getTxnEndVLSN()315 public VLSN getTxnEndVLSN() { 316 return txnEndVLSN; 317 } 318 replay()319 public Replay replay() { 320 return replay; 321 } 322 getDbCache()323 public DbCache getDbCache() { 324 return dbCache; 325 } 326 getConsistencyTracker()327 public ConsistencyTracker getConsistencyTracker() { 328 return consistencyTracker; 329 } 330 331 getReplicaFeederChannel()332 DataChannel getReplicaFeederChannel() { 333 return replicaFeederChannel.getChannel(); 334 } 335 getProtocol()336 Protocol getProtocol() { 337 return protocol; 338 } 339 340 /** 341 * Returns the last commit VLSN at the master, as known at the replica. 342 * 343 * @return the commit VLSN 344 */ getMasterTxnEndVLSN()345 public long getMasterTxnEndVLSN() { 346 return consistencyTracker.getMasterTxnEndVLSN(); 347 } 348 349 /** 350 * For test use only. 351 */ getReplicaOutputThread()352 public ReplicaOutputThread getReplicaOutputThread() { 353 return replicaOutputThread; 354 } 355 356 /** 357 * The core control loop when the node is serving as a Replica. Note that 358 * if a Replica is also serving the role of a feeder, it will run 359 * additional feeder loops in separate threads. The loop exits when it 360 * encounters one of the following possible conditions: 361 * 362 * 1) The connection to the master can no longer be maintained, due to 363 * connectivity issues, or because the master has explicitly shutdown its 364 * connections due to an election. 365 * 366 * 2) The node becomes aware of a change in master, that is, assertSync() 367 * fails. 368 * 369 * 3) The loop is interrupted, which is interpreted as a request to 370 * shutdown the replication node as a whole. 371 * 372 * 4) It fails to establish its node information in the master as it 373 * attempts to join the replication group for the first time. 374 * 375 * Normal exit from this run loop results in the rep node retrying an 376 * election and continuing in its new role as determined by the outcome of 377 * the election. A thrown exception, on the other hand, results in the rep 378 * node as a whole terminating its operation and no longer participating in 379 * the replication group, that is, it enters the DETACHED state. 380 * 381 * Note that the in/out streams are handled synchronously on the replica, 382 * while they are handled asynchronously by the Feeder. 383 * 384 * @throws InterruptedException 385 * @throws RestoreFailedException 386 * @throws DatabaseException if the environment cannot be closed/for a 387 * re-init 388 * @throws GroupShutdownException 389 */ runReplicaLoop()390 void runReplicaLoop() 391 throws InterruptedException, 392 DatabaseException, 393 GroupShutdownException { 394 395 Class<? extends RetryException> retryExceptionClass = null; 396 int retryCount = 0; 397 try { 398 399 while (true) { 400 try { 401 runReplicaLoopInternal(); 402 /* Normal exit */ 403 break; 404 } catch (RetryException e) { 405 if (!repNode.getMasterStatus().inSync()) { 406 LoggerUtils.fine(logger, repImpl, 407 "Retry terminated, out of sync."); 408 break; 409 } 410 if ((e.getClass() == retryExceptionClass) || 411 (e.retries == 0)) { 412 if (++retryCount >= e.retries) { 413 /* Exit replica retry elections */ 414 LoggerUtils.info 415 (logger, repImpl, 416 "Failed to recover from exception: " + 417 e.getMessage() + ", despite " + e.retries + 418 " retries.\n" + 419 LoggerUtils.getStackTrace(e)); 420 break; 421 } 422 } else { 423 retryCount = 0; 424 retryExceptionClass = e.getClass(); 425 } 426 LoggerUtils.info(logger, repImpl, "Retry #: " + 427 retryCount + "/" + e.retries + 428 " Will retry replica loop after " + 429 e.retrySleepMs + "ms. "); 430 Thread.sleep(e.retrySleepMs); 431 if (!repNode.getMasterStatus().inSync()) { 432 break; 433 } 434 } 435 } 436 } finally { 437 /* 438 * Reset the rep node ready latch unless the replica is not ready 439 * because it's going to hold an election before proceeding with 440 * hard recovery and joining the group. 441 */ 442 if (hardRecoveryElectionException == null) { 443 repNode.resetReadyLatch(shutdownException); 444 } 445 } 446 /* Exit use elections to try a different master. */ 447 } 448 runReplicaLoopInternal()449 private void runReplicaLoopInternal() 450 throws RestartRequiredException, 451 InterruptedException, 452 RetryException, 453 InsufficientLogException { 454 455 shutdownException = null; 456 LoggerUtils.info(logger, repImpl, 457 "Replica loop started with master: " + 458 repNode.getMasterStatus().getNodeMasterNameId()); 459 if (testDelayMs > 0) { 460 LoggerUtils.info(logger, repImpl, 461 "Test delay of: " + testDelayMs + "ms." + 462 " after each message sent"); 463 } 464 try { 465 initReplicaLoop(); 466 doRunReplicaLoopInternalWork(); 467 } catch (RestartRequiredException rre) { 468 shutdownException = rre; 469 throw rre; 470 } catch (ClosedByInterruptException closedByInterruptException) { 471 if (repNode.isShutdown()) { 472 LoggerUtils.info(logger, repImpl, 473 "Replica loop interrupted for shutdown."); 474 return; 475 } 476 LoggerUtils.warning(logger, repImpl, 477 "Replica loop unexpected interrupt."); 478 throw new InterruptedException 479 (closedByInterruptException.getMessage()); 480 } catch (IOException e) { 481 482 /* 483 * Master may have changed with the master shutting down its 484 * connection as a result. Normal course of events, log it and 485 * return to the outer node level loop. 486 */ 487 LoggerUtils.info(logger, repImpl, 488 "Replica IO exception: " + e.getMessage() + 489 "\n" + LoggerUtils.getStackTrace(e)); 490 } catch (RetryException e) { 491 /* Propagate it outwards. Node does not need to shutdown. */ 492 throw e; 493 } catch (GroupShutdownException e) { 494 shutdownException = e; 495 throw e; 496 } catch (RuntimeException e) { 497 shutdownException = e; 498 LoggerUtils.severe(logger, repImpl, 499 "Replica unexpected exception " + e + 500 " " + LoggerUtils.getStackTrace(e)); 501 throw e; 502 } catch (MasterSyncException e) { 503 /* expected change in masters from an election. */ 504 LoggerUtils.info(logger, repImpl, e.getMessage()); 505 } catch (HardRecoveryElectionException e) { 506 /* 507 * Exit the replica loop so that elections can be held and the 508 * master confirmed. 509 */ 510 hardRecoveryElectionException = e; 511 LoggerUtils.info(logger, repImpl, e.getMessage()); 512 } catch (Exception e) { 513 shutdownException = e; 514 LoggerUtils.severe(logger, repImpl, 515 "Replica unexpected exception " + e + 516 " " + LoggerUtils.getStackTrace(e)); 517 throw EnvironmentFailureException.unexpectedException(e); 518 } finally { 519 loopExitCleanup(); 520 } 521 } 522 doRunReplicaLoopInternalWork()523 protected void doRunReplicaLoopInternalWork() 524 throws Exception { 525 526 final int timeoutMs = repNode.getConfigManager(). 527 getDuration(RepParams.REPLICA_TIMEOUT); 528 replicaFeederChannel.setTimeoutMs(timeoutMs); 529 530 replayQueue.clear(); 531 repImpl.getReplay().reset(); 532 533 replicaOutputThread = new ReplicaOutputThread(repImpl); 534 replicaOutputThread.start(); 535 536 final ReplayThread replayThread = new ReplayThread(); 537 replayThread.start(); 538 long maxPending = 0; 539 540 try { 541 while (true) { 542 Message message = protocol.read(replicaFeederChannel); 543 544 if (repNode.isShutdownOrInvalid() || (message == null)) { 545 return; 546 } 547 548 while (!replayQueue. 549 offer(message, 550 ReplayThread.QUEUE_POLL_INTERVAL_NS, 551 TimeUnit.NANOSECONDS)) { 552 /* Offer timed out. */ 553 if (!replayThread.isAlive()) { 554 return; 555 } 556 /* Retry the offer */ 557 nMessageQueueOverflows.increment(); 558 } 559 560 final int pending = replayQueue.size(); 561 if (pending > maxPending) { 562 maxPending = pending; 563 if ((maxPending % 100) == 0) { 564 /* Prune logging information. */ 565 LoggerUtils.info(logger, repImpl, 566 "Max pending replay log items:" + 567 maxPending); 568 } 569 } 570 } 571 } catch (IOException ioe) { 572 /* 573 * Make sure messages in the queue are processed. Ensure, in 574 * particular, that shutdown requests are processed and not ignored 575 * due to the IOEException resulting from a closed connection. 576 */ 577 replayThread.exitRequest = ReplayExitType.SOFT; 578 } finally { 579 580 if (replayThread.exitRequest == ReplayExitType.SOFT) { 581 /* 582 * Drain all queued messages, exceptions may be generated 583 * in the process. They logically precede IO exceptions. 584 */ 585 replayThread.join(); 586 } 587 588 try { 589 590 if (replayThread.exception != null) { 591 /* replay thread is dead or exiting. */ 592 throw replayThread.exception; 593 } 594 595 if (replicaOutputThread.getException() != null) { 596 throw replicaOutputThread.getException(); 597 } 598 } finally { 599 600 /* Ensure thread has exited in all circumstances */ 601 replayThread.exitRequest = ReplayExitType.IMMEDIATE; 602 replayThread.join(); 603 604 replicaOutputThread.shutdownThread(logger); 605 replicaOutputThread = null; 606 } 607 } 608 } 609 610 /** 611 * Process the shutdown message from the master and return the 612 * GroupShutdownException that must be thrown to exit the Replica loop. 613 * 614 * @return the GroupShutdownException 615 */ processShutdown(ShutdownRequest shutdown)616 private GroupShutdownException processShutdown(ShutdownRequest shutdown) 617 throws IOException { 618 619 /* 620 * Acknowledge the shutdown message right away, since the checkpoint 621 * operation can take a long time to complete. Long enough to exceed 622 * the feeder timeout on the master. The master only needs to know that 623 * the replica has received the message. 624 */ 625 replay.queueAck(ReplicaOutputThread.SHUTDOWN_ACK); 626 627 /* 628 * Turn off network timeouts on the replica, since we don't want the 629 * replica to timeout the connection. The connection itself is no 630 * longer used past this point and will be reclaimed as part of normal 631 * replica exit cleanup. 632 */ 633 replicaFeederChannel.setTimeoutMs(Integer.MAX_VALUE); 634 635 /* 636 * TODO: Share the following code with the standalone Environment 637 * shutdown, or better yet, call EnvironmentImpl.doClose here. 638 */ 639 640 /* 641 * Begin shutdown of the deamons before checkpointing. Cleaning during 642 * the checkpoint is wasted and slows down the checkpoint, plus it may 643 * cause additional checkpoints. 644 */ 645 repNode.getRepImpl().requestShutdownDaemons(); 646 647 /* 648 * Now start a potentially long running checkpoint. 649 */ 650 LoggerUtils.info(logger, repImpl, "Checkpoint initiated."); 651 CheckpointConfig config = new CheckpointConfig(); 652 config.setForce(true); 653 config.setMinimizeRecoveryTime(true); 654 repNode.getRepImpl().invokeCheckpoint(config, "Group Shutdown"); 655 /* Force final shutdown of the daemons. */ 656 repNode.getRepImpl().shutdownDaemons(); 657 LoggerUtils.info(logger, repImpl, "Checkpoint completed."); 658 659 return new GroupShutdownException(logger, 660 repNode, 661 shutdown.getShutdownTimeMs()); 662 } 663 664 /** 665 * Initialize for replica loop entry, which involves completing the 666 * following steps successfully: 667 * 668 * 1) The replica feeder handshake. 669 * 2) The replica feeder syncup. 670 * 3) Processing the first heartbeat request from the feeder. 671 */ initReplicaLoop()672 private void initReplicaLoop() 673 throws IOException, 674 ConnectRetryException, 675 DatabaseException, 676 ProtocolException, 677 InterruptedException, 678 HardRecoveryElectionException { 679 680 createReplicaFeederChannel(); 681 ReplicaFeederHandshake handshake = 682 new ReplicaFeederHandshake(repNode, replicaFeederChannel); 683 protocol = handshake.execute(); 684 repNode.notifyReplicaConnected(); 685 686 final boolean hardRecoveryNeedsElection; 687 688 if (hardRecoveryElectionException != null) { 689 LoggerUtils.info(logger, repImpl, 690 "Replica syncup after election to verify master:"+ 691 hardRecoveryElectionException.getMaster() + 692 " elected master:" + 693 repNode.getMasterStatus().getNodeMasterNameId()); 694 hardRecoveryNeedsElection = false; 695 } else { 696 hardRecoveryNeedsElection = true; 697 } 698 hardRecoveryElectionException = null; 699 700 ReplicaFeederSyncup syncup = 701 new ReplicaFeederSyncup(repNode, replay, replicaFeederChannel, 702 protocol, hardRecoveryNeedsElection); 703 syncup.execute(repNode.getCBVLSNTracker()); 704 705 txnEndVLSN = syncup.getMatchedVLSN(); 706 long matchedTxnEndTime = syncup.getMatchedVLSNTime(); 707 consistencyTracker.reinit(txnEndVLSN.getSequence(), 708 matchedTxnEndTime); 709 Protocol.Heartbeat heartbeat = 710 protocol.read(replicaFeederChannel.getChannel(), 711 Protocol.Heartbeat.class); 712 processHeartbeat(heartbeat); 713 long replicaDelta = consistencyTracker.getMasterTxnEndVLSN() - 714 consistencyTracker.lastReplayedVLSN.getSequence(); 715 LoggerUtils.info(logger, repImpl, String.format 716 ("Replica initialization completed. Replica VLSN: %s " 717 + " Heartbeat master commit VLSN: %,d " + 718 "VLSN delta: %,d", 719 consistencyTracker.lastReplayedVLSN, 720 consistencyTracker.getMasterTxnEndVLSN(), 721 replicaDelta)); 722 723 /* 724 * The replica is ready for business, indicate that the node is 725 * ready by counting down the latch and releasing any waiters. 726 */ 727 repNode.getReadyLatch().countDown(); 728 } 729 730 /** 731 * Process a heartbeat message. It queues a response and updates 732 * the consistency tracker with the information in the heartbeat. 733 * 734 * @param heartbeat 735 * @throws IOException 736 */ processHeartbeat(Heartbeat heartbeat)737 private void processHeartbeat(Heartbeat heartbeat) 738 throws IOException { 739 740 replay.queueAck(ReplicaOutputThread.HEARTBEAT_ACK); 741 consistencyTracker.trackHeartbeat(heartbeat); 742 } 743 744 /** 745 * Performs the cleanup actions upon exit from the internal replica loop. 746 * 747 * @param replicaFeederChannel 748 */ loopExitCleanup()749 private void loopExitCleanup() { 750 751 if (shutdownException != null) { 752 if (shutdownException instanceof RetryException) { 753 LoggerUtils.info(logger, repImpl, 754 "Retrying connection to feeder. Message: " + 755 shutdownException.getMessage()); 756 } else if (shutdownException instanceof GroupShutdownException) { 757 LoggerUtils.info(logger, repImpl, 758 "Exiting inner Replica loop." + 759 " Master requested shutdown."); 760 } else { 761 LoggerUtils.warning 762 (logger, repImpl, 763 "Exiting inner Replica loop with exception " + 764 shutdownException + "\n" + 765 LoggerUtils.getStackTrace(shutdownException)); 766 } 767 } else { 768 LoggerUtils.info(logger, repImpl, "Exiting inner Replica loop." ); 769 } 770 771 clearDbTreeCache(); 772 RepUtils.shutdownChannel(replicaFeederChannel); 773 774 if (consistencyTracker != null) { 775 consistencyTracker.logStats(); 776 } 777 778 /* Sum up statistics for the loop. */ 779 if (protocol != null) { 780 aggProtoStats.addAll(protocol.getStats(StatsConfig.DEFAULT)); 781 } 782 protocol = null; 783 784 /* 785 * If this is a secondary node, then null out its ID to allow the 786 * next feeder connection to assign it a new one 787 */ 788 if (repNode.getNodeType().isSecondary()) { 789 repNode.getNameIdPair().revertToNull(); 790 } 791 } 792 793 /* 794 * Clear the DatabaseId -> DatabaseImpl cache used to speed up DbTree 795 * lookup operations. 796 */ clearDbTreeCache()797 void clearDbTreeCache() { 798 dbCache.clear(); 799 } 800 801 /** 802 * Invoked when this node transitions to the master state. Aborts all 803 * inflight replay transactions outstanding from a previous state as a 804 * Replica, because they were initiated by a different master and will 805 * never complete. Also, release any Replica transactions that were waiting 806 * on consistency policy requirements. 807 */ masterTransitionCleanup()808 void masterTransitionCleanup() 809 throws DatabaseException { 810 hardRecoveryElectionException = null; 811 replay.abortOldTxns(); 812 consistencyTracker.forceTripLatches 813 (new MasterStateException(repNode.getRepImpl(). 814 getStateChangeEvent())); 815 } 816 817 /** 818 * Invoked when this node seamlessly changes roles from master to replica 819 * without a recovery. The ability to do this transition without a recovery 820 * is desirable because it's a faster transition, and avoids the GC 821 * overhead of releasing the JE cache, and the I/O overhead of recreating 822 * the in-memory btree. 823 * <p> 824 * The two key cases where this happens are: 825 * A) a network partition occurs, and the group elects a new master. The 826 * orphaned master did not crash and its environment is still valid, and 827 * when it regains contact with the group, it discovers that it has been 828 * deposed. It transitions into a replica status. 829 * <p> 830 * B) a master transfer request moves mastership from this node to another 831 * member of the group. This node's environment is still valid, and it 832 * transitions to replica state. 833 * <p> 834 * The transition from master to replica requires resetting state so all 835 * is as expected for a Replica. There are two categories of work: 836 * - network connections: shutting down feeder connections and 837 * reinitializing feeder infrastructure so that a future replica->master 838 * transition will work. 839 * - resetting transaction state. All MasterTxns must be transformed 840 * into ReplayTxns, bearing the same transaction id and holding the same 841 * locks. 842 * <p> 843 * Note: since non-masters can't commit txns, the inflight MasterTxns are 844 * destined to be aborted in the future. An alternative to resetting 845 * transaction state would be to mark them in some way so that the later HA 846 * syncup/ replay ignores operations pertaining to these ill fated txns. We 847 * didn't chose that approach because the simplicity of the replay is a 848 * plus; it is almost entirely ignorant of the semantics of the replication 849 * stream. Also, replays have potential for complexity, either because 850 * syncups could restart if masters change or become unavailable, or 851 * because there may be future performance optimizations in that area. 852 * <p> 853 * Resetting transaction state is tricky because the MasterTxn is 854 * accessible to the application code. While the Replay thread is 855 * attempting to transform the MasterTxn, application threads may be 856 * attempting to commit or abort the transactions. Note that application 857 * threads will not be trying to add locks, because the node will be in 858 * UNKNOWN state, and writes will be prohibited by the MasterTxn. 859 * <p> 860 * MasterTransfers do impose a blocking period on transaction commits and 861 * aborts, but even there, windows exist in the post-block period where 862 * the application may try to abort the transaction. Network partitions 863 * do no form of blocking, and have a wider window when the application 864 * and RepNode thread must be coordinated. Here's a diagram of the time 865 * periods of concern 866 * <p> 867 * t1 - master transfer request issued (only when master transfer) 868 * t2 - user txns which attempt to abort or commit are blocked on 869 * RepImpl.blockTxnLatch (only when mt) 870 * t3 - node detects that it has transitioned to UNKNOWN and lost 871 * master status. MasterTxns are now stopped from acquiring 872 * locks or committing and will throw UnknownMasterException. 873 * t4 - feeder connections shutdown 874 * t5 - node begins conversion to replica state 875 * t6 - blockTxnLatch released (only when master transfer) 876 * t7 - existing MasterTxns converted into ReplayTxns, locks moved into 877 * new ReplayTxns. Blocked txns must be released before this 878 * conversion, because the application thread is holding the 879 * txn mutex, and conversion needs to take that mutex. 880 * <p> 881 * At any time during this process, the application threads may attempt to 882 * abort or commit outstanding txns, or acquire read or write locks. After 883 * t3, any attempts to lock, abort or commit will throw an 884 * UnknownMasterException or ReplicaWriteException, and in the normal 885 * course of events, the txn would internally abort. But once t5 is 886 * reached, we want to prevent any changes to the number of write locks in 887 * the txn so as to prevent interference with the conversion of the master 888 * txns and any danger of converting only part of a txn. We set the 889 * volatile, transient MasterTxn.freeze field at t5 to indicate that there 890 * should be no change to the contents of the transaction. When freeze is 891 * true, any attempts to abort or commit the transaction will throw 892 * Unknown/ReplicaWriteException, and the txn will be put into MUST_ABORT 893 * state, but the existing locks will be unchanged. 894 * <p> 895 * In a network partition, it's possible that the txn will be aborted or 896 * committed locally before t5. In that case, there may be a hard rollback 897 * when the node syncs up with the new master, and finds the anomalous 898 * abort record. In masterTransfer, the window is smaller, and the blocking 899 * latch ensures that no commits can happen bettween t1-t5. After t5, the 900 * node will not be a master, so there can be no commits. Aborts may happen 901 * and can cause hard rollbacks, but no data will be lost. 902 * <p> 903 * The freeze field is similar to the blockTxnLatch, and we considered 904 * using the blockTxnLatch to stabilize the txns, but ruled it out because: 905 * - the locking hierarchy where the application thread holds the txn 906 * mutex while awaiting the block txn latch prevents txn conversion. 907 * - the blockTxnLatch is scoped to the MasterTransfer instance, which may 908 * not be in play for network partitioning. 909 */ replicaTransitionCleanup()910 void replicaTransitionCleanup() { 911 912 /* 913 * Logically an assert, use an exception rather than Java assert 914 * because we want this check to be enabled at all times. If 915 * unexpectedly in master state, invalidate the environment, so we do a 916 * recovery and are sure to cleanup. 917 */ 918 if (repImpl.getState() == State.MASTER) { 919 throw EnvironmentFailureException.unexpectedState(repImpl, 920 "Should not be in MASTER state when converting from master " + 921 "to replica state"); 922 } 923 924 /* 925 * Find all MasterTxns, and convert them to ReplayTxns. The set of 926 * existing MasterTxns cannot increase at this point, because the node 927 * is not in MASTER state. Freeze all txns and prevent change. 928 */ 929 Set<MasterTxn> existingMasterTxns = repImpl.getExistingMasterTxns(); 930 LoggerUtils.info(logger, repImpl, 931 "Transitioning node to replica state, " + 932 existingMasterTxns.size() + " txns to clean up"); 933 934 /* Prevent aborts on all MasterTxns; hold their contents steady */ 935 for (MasterTxn masterTxn: existingMasterTxns) { 936 masterTxn.freeze(); 937 } 938 939 /* 940 * Unblock any transactions that are stuck in their commit processing, 941 * awaiting the release of the master transfer block. Such 942 * transactions hold a mutex on the transaction, and the mutex would 943 * block any of the lock stealing that will occur below. Note that if 944 * we are doing this transition because of a network partition, there 945 * will be no blocked transactions. 946 */ 947 repImpl.unblockTxnCompletion(); 948 949 for (MasterTxn masterTxn: existingMasterTxns) { 950 951 /* 952 * Convert this masterTxn to a ReplayTxn and move any existing 953 * write locks to it. Unfreeze and then abort the masterTxn. 954 */ 955 ReplayTxn replayTxn = 956 masterTxn.convertToReplayTxnAndClose(logger, 957 repImpl.getReplay()); 958 959 if (replayTxn == null) { 960 LoggerUtils.info(logger, repImpl, "Master Txn " + 961 masterTxn.getId() + 962 " has no locks, nothing to transfer"); 963 } else { 964 repImpl.getTxnManager().registerTxn(replayTxn); 965 LoggerUtils.info(logger, repImpl, 966 "state for replay transaction " + 967 replayTxn.getId() + " = " + 968 replayTxn.getState()); 969 } 970 } 971 972 /* 973 * We're done with the transition, clear any active master transfers, 974 * if they exist. 975 */ 976 repNode.clearActiveTransfer(); 977 } 978 979 /** 980 * Returns a channel used by the Replica to connect to the Feeder. The 981 * socket is configured with a read timeout that's a multiple of the 982 * heartbeat interval to help detect, or initiate a change in master. 983 * 984 * @throws IOException 985 * @throws ConnectRetryException 986 */ createReplicaFeederChannel()987 private void createReplicaFeederChannel() 988 throws IOException, ConnectRetryException { 989 990 DataChannel dataChannel = null; 991 992 final DbConfigManager configManager = repNode.getConfigManager(); 993 final int timeoutMs = configManager. 994 getDuration(RepParams.PRE_HEARTBEAT_TIMEOUT); 995 996 final int receiveBufferSize = 997 configManager.getInt(RepParams.REPLICA_RECEIVE_BUFFER_SIZE); 998 999 try { 1000 final int openTimeout = configManager. 1001 getDuration(RepParams.REPSTREAM_OPEN_TIMEOUT); 1002 1003 /* 1004 * Note that soTimeout is not set since it's a blocking channel and 1005 * setSoTimeout has no effect on a blocking nio channel. 1006 * 1007 * Push responses out rapidly, they are small (heart beat or commit 1008 * response) and need timely delivery to the master. 1009 * (tcpNoDelay = true) 1010 */ 1011 1012 final ConnectOptions connectOpts = new ConnectOptions(). 1013 setTcpNoDelay(true). 1014 setReceiveBufferSize(receiveBufferSize). 1015 setOpenTimeout(openTimeout). 1016 setBlocking(true); 1017 1018 dataChannel = 1019 repImpl.getChannelFactory(). 1020 connect(repNode.getMasterStatus().getNodeMaster(), 1021 connectOpts); 1022 1023 replicaFeederChannel = 1024 new NamedChannelWithTimeout(repNode, dataChannel, timeoutMs); 1025 1026 ServiceDispatcher.doServiceHandshake 1027 (dataChannel, FeederManager.FEEDER_SERVICE); 1028 } catch (ConnectException e) { 1029 1030 /* 1031 * A network problem, or the node went down between the time we 1032 * learned it was the master and we tried to connect. 1033 */ 1034 throw new ConnectRetryException(e.getMessage(), 1035 NETWORK_RETRIES, 1036 CONNECT_RETRY_SLEEP_MS); 1037 } catch (ServiceConnectFailedException e) { 1038 1039 /* 1040 * The feeder may not have established the Feeder Service 1041 * as yet. For example, the transition to the master may not have 1042 * been completed. Wait longer. 1043 */ 1044 if (e.getResponse() == Response.UNKNOWN_SERVICE) { 1045 throw new ConnectRetryException(e.getMessage(), 1046 SERVICE_UNAVAILABLE_RETRIES, 1047 CONNECT_RETRY_SLEEP_MS); 1048 } 1049 throw EnvironmentFailureException.unexpectedException(e); 1050 } 1051 } 1052 1053 /** 1054 * Returns the replay statistics associated with the Replica. 1055 * 1056 * @return the statistics. 1057 */ getReplayStats(StatsConfig config)1058 public StatGroup getReplayStats(StatsConfig config) { 1059 return replay.getStats(config); 1060 } 1061 1062 /* Get the protocl statistics for this replica. */ getProtocolStats(StatsConfig config)1063 public StatGroup getProtocolStats(StatsConfig config) { 1064 StatGroup protoStats = aggProtoStats.cloneGroup(config.getClear()); 1065 1066 /* Guard against concurrent modification. */ 1067 Protocol prot = this.protocol; 1068 if (prot != null) { 1069 /* These statistics are not ye a part of the agg statistics. */ 1070 protoStats.addAll(prot.getStats(config)); 1071 } 1072 1073 return protoStats; 1074 } 1075 1076 /* Get the consistency tracker stats for this replica. */ getTrackerStats(StatsConfig config)1077 public StatGroup getTrackerStats(StatsConfig config) { 1078 return consistencyTracker.getStats(config); 1079 } 1080 1081 /* Reset the stats associated with this Replica. */ resetStats()1082 public void resetStats() { 1083 replay.resetStats(); 1084 aggProtoStats.clear(); 1085 if (protocol != null) { 1086 protocol.resetStats(); 1087 } 1088 consistencyTracker.resetStats(); 1089 } 1090 1091 /** 1092 * Defines the possible types of exits that can be requested from the 1093 * ReplayThread. 1094 */ 1095 private enum ReplayExitType { 1096 IMMEDIATE, /* An immediate exit; ignore queued requests. */ 1097 SOFT /* Process pending requests in queue, then exit */ 1098 } 1099 1100 /** 1101 * The thread responsible for the replay of messages delivered over the 1102 * replication stream. Reading and replay are done in separate threads for 1103 * two reasons: 1104 * 1105 * 1) It allows the two activities to make independent progress. The 1106 * network can be read and messages assembled even if the replay activity 1107 * has stalled. 2) The two threads permit use of two cores to perform the 1108 * replay thus making it less likely that cpu is the replay bottleneck. 1109 * 1110 * The inputs and outputs of this thread are schematically described as: 1111 * 1112 * replayQueue -> ReplayThread -> outputQueue 1113 * 1114 * It's the second component of the three thread structure outlined in the 1115 * Replica's class level comment. 1116 */ 1117 class ReplayThread extends StoppableThread { 1118 1119 /** 1120 * Thread exit exception. It's null if the thread exited due to an 1121 * exception. It's the responsibility of the main replica thread to 1122 * propagate the exception across the thread boundary in this case. 1123 */ 1124 volatile private Exception exception; 1125 1126 /** 1127 * Set asynchronously when a shutdown is being requested. 1128 */ 1129 volatile ReplayExitType exitRequest = null; 1130 1131 /* The queue poll interval, 1 second */ 1132 private final static long QUEUE_POLL_INTERVAL_NS = 1000000000l; 1133 ReplayThread()1134 protected ReplayThread() { 1135 super(repImpl, "ReplayThread"); 1136 } 1137 1138 @Override initiateSoftShutdown()1139 protected int initiateSoftShutdown() { 1140 /* Use immediate, since the stream will continue to be read. */ 1141 exitRequest = ReplayExitType.IMMEDIATE; 1142 return 0; 1143 } 1144 1145 @Override run()1146 public void run() { 1147 1148 LoggerUtils.info(logger, repImpl, 1149 "Replay thread started. Message queue size:" + 1150 replayQueue.remainingCapacity()); 1151 1152 final int dbTreeCacheClearingOpCount = 1153 repNode.getDbTreeCacheClearingOpCount(); 1154 1155 long opCount = 0; 1156 1157 try { 1158 while (true) { 1159 1160 final long pollIntervalNs = 1161 replay.getPollIntervalNs(QUEUE_POLL_INTERVAL_NS); 1162 1163 final Message message = 1164 replayQueue.poll(pollIntervalNs, 1165 TimeUnit.NANOSECONDS); 1166 1167 if ((exitRequest == ReplayExitType.IMMEDIATE) || 1168 ((exitRequest == ReplayExitType.SOFT) && 1169 (message == null)) || 1170 repNode.isShutdownOrInvalid()) { 1171 1172 if (exitRequest == ReplayExitType.SOFT) { 1173 replay.flushPendingAcks(Long.MAX_VALUE); 1174 } 1175 return; 1176 } 1177 1178 final long startNs = System.nanoTime(); 1179 replay.flushPendingAcks(startNs); 1180 1181 repNode.getMasterStatus().assertSync(); 1182 1183 if (message == null) { 1184 /* Timeout on poll. */ 1185 continue; 1186 } 1187 assert TestHookExecute.doHookIfSet(replayHook, message); 1188 1189 final MessageOp messageOp = message.getOp(); 1190 1191 if (messageOp == Protocol.SHUTDOWN_REQUEST) { 1192 throw processShutdown((ShutdownRequest) message); 1193 } 1194 1195 if (messageOp == Protocol.HEARTBEAT) { 1196 processHeartbeat((Protocol.Heartbeat) message); 1197 dbCache.tick(); 1198 } else { 1199 /* For testing only! */ 1200 if (dontProcessStream) { 1201 LoggerUtils.info(logger, repImpl, 1202 "Not processing " + message); 1203 continue; 1204 } 1205 1206 replay.replayEntry(startNs, (Protocol.Entry) message); 1207 1208 /* 1209 * Note: the consistency tracking is more obscure than 1210 * it needs to be, because the commit/abort VLSN is set 1211 * in Replay.replayEntry() and is then used below. An 1212 * alternative would be to promote the following 1213 * conditional to a level above, so commit/abort 1214 * operations get their own replay method which does 1215 * the consistency tracking. 1216 */ 1217 if (((Protocol.Entry) message).isTxnEnd()) { 1218 txnEndVLSN = replay.getLastReplayedVLSN(); 1219 consistencyTracker.trackTxnEnd(); 1220 } 1221 consistencyTracker.trackVLSN(); 1222 } 1223 1224 if (testDelayMs > 0) { 1225 Thread.sleep(testDelayMs); 1226 } 1227 1228 if (opCount++ % dbTreeCacheClearingOpCount == 0) { 1229 clearDbTreeCache(); 1230 } 1231 } 1232 } catch (Exception e) { 1233 exception = e; 1234 /* 1235 * Bring it to the attention of the main thread by freeing 1236 * up the "offer" wait right away. 1237 */ 1238 replayQueue.clear(); 1239 1240 /* 1241 * Get the attention of the main replica thread in case it's 1242 * waiting in a read on the socket channel. 1243 */ 1244 LoggerUtils.info(logger, repImpl, 1245 "closing replicaFeederChannel = " + 1246 replicaFeederChannel); 1247 RepUtils.shutdownChannel(replicaFeederChannel); 1248 1249 LoggerUtils.info(logger, repImpl, 1250 "Replay thread exiting with exception:" + 1251 e.getMessage()); 1252 } 1253 } 1254 1255 @Override getLogger()1256 protected Logger getLogger() { 1257 return logger; 1258 } 1259 } 1260 1261 /** 1262 * Tracks the consistency of this replica wrt the Master. It provides the 1263 * mechanisms that will cause a beginTransaction() or a joinGroup() to wait 1264 * until the specified consistency policy is satisfied. 1265 */ 1266 public class ConsistencyTracker { 1267 private final long NULL_VLSN_SEQUENCE = VLSN.NULL_VLSN.getSequence(); 1268 1269 /* 1270 * Initialized by the Feeder handshake and updated by commit replays. 1271 * All access to lastReplayedXXXX must be synchronized on the 1272 * ConsistencyTracker itself. 1273 */ 1274 private long lastReplayedTxnVLSN = NULL_VLSN_SEQUENCE; 1275 private VLSN lastReplayedVLSN = VLSN.NULL_VLSN; 1276 private long masterTxnEndTime = 0l; 1277 1278 /* Updated by heartbeats */ 1279 private volatile long masterTxnEndVLSN; 1280 private volatile long masterNow = 0l; 1281 1282 private final StatGroup stats = 1283 new StatGroup(ReplicaStatDefinition.GROUP_NAME, 1284 ReplicaStatDefinition.GROUP_DESC); 1285 1286 private final LongStat nLagConsistencyWaits = 1287 new LongStat(stats, N_LAG_CONSISTENCY_WAITS); 1288 1289 private final LongStat nLagConsistencyWaitMs = 1290 new LongStat(stats, N_LAG_CONSISTENCY_WAIT_MS); 1291 1292 private final LongStat nVLSNConsistencyWaits = 1293 new LongStat(stats, N_VLSN_CONSISTENCY_WAITS); 1294 1295 private final LongStat nVLSNConsistencyWaitMs = 1296 new LongStat(stats, N_VLSN_CONSISTENCY_WAIT_MS); 1297 1298 private final OrderedLatches vlsnLatches = 1299 new OrderedLatches(repNode.getRepImpl()) { 1300 /* 1301 * Note that this assumes that NULL_VLSN is -1, and that 1302 * the vlsns ascend. 1303 */ 1304 @Override 1305 boolean tripPredicate(long keyVLSN, long tripVLSN) { 1306 return keyVLSN <= tripVLSN; 1307 } 1308 }; 1309 1310 private final OrderedLatches lagLatches = 1311 new OrderedLatches(repNode.getRepImpl()) { 1312 @Override 1313 boolean tripPredicate(long keyLag, long currentLag) { 1314 return currentLag <= keyLag; 1315 } 1316 }; 1317 1318 /** 1319 * Invoked each time after a replica syncup so that the Replica 1320 * can re-establish it's consistency vis a vis the master and what 1321 * part of the replication stream it considers as having been replayed. 1322 * 1323 * @param matchedTxnVLSN the replica state corresponds to this txn 1324 * @param matchedTxnEndTime the time at which this txn was committed or 1325 * aborted on the master 1326 */ reinit(long matchedTxnVLSN, long matchedTxnEndTime)1327 void reinit(long matchedTxnVLSN, long matchedTxnEndTime) { 1328 this.lastReplayedVLSN = new VLSN(matchedTxnVLSN); 1329 this.lastReplayedTxnVLSN = matchedTxnVLSN; 1330 this.masterTxnEndTime = matchedTxnEndTime; 1331 } 1332 getMasterTxnEndVLSN()1333 public long getMasterTxnEndVLSN() { 1334 return masterTxnEndVLSN; 1335 } 1336 close()1337 void close() { 1338 logStats(); 1339 } 1340 logStats()1341 void logStats() { 1342 if (logger.isLoggable(Level.INFO)) { 1343 LoggerUtils.info 1344 (logger, repImpl, 1345 "Replica stats - Lag waits: " + nLagConsistencyWaits.get() + 1346 " Lag wait time: " + nLagConsistencyWaitMs.get() 1347 + "ms. " + 1348 " VLSN waits: " + nVLSNConsistencyWaits.get() + 1349 " Lag wait time: " + nVLSNConsistencyWaitMs.get() + 1350 "ms."); 1351 } 1352 } 1353 1354 /** 1355 * Calculates the time lag in ms at the Replica. 1356 */ currentLag()1357 private long currentLag() { 1358 if (masterNow == 0l) { 1359 1360 /* 1361 * Have not seen a heartbeat, can't determine the time lag in 1362 * its absence. It's the first message sent by the feeder after 1363 * completion of the handshake. 1364 */ 1365 return Integer.MAX_VALUE; 1366 } 1367 1368 long lag; 1369 if (lastReplayedTxnVLSN < masterTxnEndVLSN) { 1370 lag = System.currentTimeMillis() - masterTxnEndTime; 1371 } else if (lastReplayedTxnVLSN == masterTxnEndVLSN) { 1372 1373 /* 1374 * The lag is determined by the transactions (if any) that are 1375 * further downstream, assume the worst. 1376 */ 1377 lag = System.currentTimeMillis() - masterNow; 1378 } else { 1379 /* commit leapfrogged the heartbeat */ 1380 lag = System.currentTimeMillis() - masterNow; 1381 } 1382 return lag; 1383 } 1384 1385 /** 1386 * Frees all the threads that are waiting on latches. 1387 * 1388 * @param exception the exception to be thrown to explain the reason 1389 * behind the latches being forced. 1390 */ forceTripLatches(DatabaseException exception)1391 synchronized void forceTripLatches(DatabaseException exception) { 1392 assert (exception != null); 1393 vlsnLatches.trip(Long.MAX_VALUE, exception); 1394 lagLatches.trip(0, exception); 1395 } 1396 trackTxnEnd()1397 synchronized void trackTxnEnd() { 1398 Replay.TxnInfo lastReplayedTxn = replay.getLastReplayedTxn(); 1399 lastReplayedTxnVLSN = lastReplayedTxn.getTxnVLSN().getSequence(); 1400 masterTxnEndTime = lastReplayedTxn.getMasterTxnEndTime(); 1401 1402 if ((lastReplayedTxnVLSN > masterTxnEndVLSN) && 1403 (masterTxnEndTime >= masterNow)) { 1404 masterTxnEndVLSN = lastReplayedTxnVLSN; 1405 masterNow = masterTxnEndTime; 1406 } 1407 1408 /* 1409 * Advances both replica VLSN and commit time, trip qualifying 1410 * latches in both sets. 1411 */ 1412 vlsnLatches.trip(lastReplayedTxnVLSN, null); 1413 lagLatches.trip(currentLag(), null); 1414 } 1415 trackVLSN()1416 synchronized void trackVLSN() { 1417 lastReplayedVLSN = replay.getLastReplayedVLSN(); 1418 vlsnLatches.trip(lastReplayedVLSN.getSequence(), null); 1419 } 1420 trackHeartbeat(Protocol.Heartbeat heartbeat)1421 synchronized void trackHeartbeat(Protocol.Heartbeat heartbeat) { 1422 masterTxnEndVLSN = heartbeat.getCurrentTxnEndVLSN(); 1423 masterNow = heartbeat.getMasterNow(); 1424 /* Trip just the time lag latches. */ 1425 lagLatches.trip(currentLag(), null); 1426 } 1427 lagAwait(TimeConsistencyPolicy consistencyPolicy)1428 public void lagAwait(TimeConsistencyPolicy consistencyPolicy) 1429 throws InterruptedException, 1430 ReplicaConsistencyException, 1431 DatabaseException { 1432 1433 long currentLag = currentLag(); 1434 long lag = 1435 consistencyPolicy.getPermissibleLag(TimeUnit.MILLISECONDS); 1436 if (currentLag <= lag) { 1437 return; 1438 } 1439 long waitStart = System.currentTimeMillis(); 1440 ExceptionAwareCountDownLatch waitLagLatch = 1441 lagLatches.getOrCreate(lag); 1442 await(waitLagLatch, consistencyPolicy); 1443 nLagConsistencyWaits.increment(); 1444 nLagConsistencyWaitMs.add(System.currentTimeMillis() - waitStart); 1445 } 1446 1447 /** 1448 * Wait until the log record identified by VLSN has gone by. 1449 */ awaitVLSN(long vlsn, ReplicaConsistencyPolicy consistencyPolicy)1450 public void awaitVLSN(long vlsn, 1451 ReplicaConsistencyPolicy consistencyPolicy) 1452 throws InterruptedException, 1453 ReplicaConsistencyException, 1454 DatabaseException { 1455 1456 long waitStart = System.currentTimeMillis(); 1457 1458 ExceptionAwareCountDownLatch waitVLSNLatch = null; 1459 1460 synchronized(this) { 1461 final long compareVLSN = 1462 (consistencyPolicy instanceof CommitPointConsistencyPolicy)? 1463 lastReplayedTxnVLSN : 1464 lastReplayedVLSN.getSequence(); 1465 if (vlsn <= compareVLSN) { 1466 return; 1467 } 1468 waitVLSNLatch = vlsnLatches.getOrCreate(vlsn); 1469 } 1470 await(waitVLSNLatch, consistencyPolicy); 1471 /* Stats after the await, so the counts and times are related. */ 1472 nVLSNConsistencyWaits.increment(); 1473 nVLSNConsistencyWaitMs.add(System.currentTimeMillis() - waitStart); 1474 } 1475 1476 /** 1477 * Wait on the given countdown latch and generate the appropriate 1478 * exception upon timeout. 1479 * 1480 * @throws InterruptedException 1481 */ await(ExceptionAwareCountDownLatch consistencyLatch, ReplicaConsistencyPolicy consistencyPolicy)1482 private void await(ExceptionAwareCountDownLatch consistencyLatch, 1483 ReplicaConsistencyPolicy consistencyPolicy) 1484 throws ReplicaConsistencyException, 1485 DatabaseException, 1486 InterruptedException { 1487 1488 if (!consistencyLatch.awaitOrException 1489 (consistencyPolicy.getTimeout(TimeUnit.MILLISECONDS), 1490 TimeUnit.MILLISECONDS)) { 1491 /* Timed out. */ 1492 final boolean detached = 1493 repNode.getRepImpl().getState().isDetached(); 1494 throw new ReplicaConsistencyException(consistencyPolicy, 1495 detached); 1496 } 1497 } 1498 getStats(StatsConfig config)1499 private StatGroup getStats(StatsConfig config) { 1500 return stats.cloneGroup(config.getClear()); 1501 } 1502 resetStats()1503 private void resetStats() { 1504 stats.clear(); 1505 } 1506 1507 /** 1508 * Shutdown the consistency tracker. This is typically done as part 1509 * of the shutdown of a replication node. It counts down all open 1510 * latches, so the threads waiting on them can make progress. It's 1511 * the responsibility of the waiting threads to check whether the 1512 * latch countdown was due to a shutdown, and take appropriate action. 1513 */ shutdown()1514 public void shutdown() { 1515 final Exception savedShutdownException = 1516 repNode.getSavedShutdownException(); 1517 1518 /* 1519 * Don't wrap in another level of EnvironmentFailureException 1520 * if we have one in hand already. It can confuse any catch 1521 * handlers which are expecting a specific exception e.g. 1522 * RollBackException while waiting for read consistency. 1523 */ 1524 final EnvironmentFailureException latchException = 1525 (savedShutdownException instanceof 1526 EnvironmentFailureException) ? 1527 1528 ((EnvironmentFailureException)savedShutdownException) : 1529 1530 EnvironmentFailureException.unexpectedException 1531 ("Node: " + repNode.getNameIdPair() + " was shut down.", 1532 savedShutdownException); 1533 1534 forceTripLatches(latchException); 1535 } 1536 } 1537 1538 /** 1539 * Manages a set of ordered latches. They are ordered by the key value. 1540 */ 1541 private abstract class OrderedLatches { 1542 1543 final EnvironmentImpl envImpl; 1544 1545 final SortedMap<Long, ExceptionAwareCountDownLatch> latchMap = 1546 new TreeMap<Long, ExceptionAwareCountDownLatch>(); 1547 tripPredicate(long key, long tripValue)1548 abstract boolean tripPredicate(long key, long tripValue); 1549 OrderedLatches(EnvironmentImpl envImpl)1550 OrderedLatches(EnvironmentImpl envImpl) { 1551 this.envImpl = envImpl; 1552 } 1553 getOrCreate(Long key)1554 synchronized ExceptionAwareCountDownLatch getOrCreate(Long key) { 1555 ExceptionAwareCountDownLatch latch = latchMap.get(key); 1556 if (latch == null) { 1557 latch = new ExceptionAwareCountDownLatch(envImpl, 1); 1558 latchMap.put(key, latch); 1559 } 1560 return latch; 1561 } 1562 1563 /** 1564 * Trip all latches until the first latch that will not trip. 1565 * 1566 * @param tripValue 1567 * @param exception the exception to be thrown by the waiter upon 1568 * exit from the await. It can be null if no exception need be thrown. 1569 */ trip(long tripValue, DatabaseException exception)1570 synchronized void trip(long tripValue, 1571 DatabaseException exception) { 1572 while (latchMap.size() > 0) { 1573 Long key = latchMap.firstKey(); 1574 if (!tripPredicate(key, tripValue)) { 1575 /* It will fail on the rest as well. */ 1576 return; 1577 } 1578 /* Set the waiters free. */ 1579 ExceptionAwareCountDownLatch latch = latchMap.remove(key); 1580 latch.releaseAwait(exception); 1581 } 1582 } 1583 } 1584 1585 /** 1586 * Thrown to indicate that the Replica must retry connecting to the same 1587 * master, after some period of time. 1588 */ 1589 @SuppressWarnings("serial") 1590 static abstract class RetryException extends Exception { 1591 final int retries; 1592 final int retrySleepMs; 1593 RetryException(String message, int retries, int retrySleepMs)1594 RetryException(String message, 1595 int retries, 1596 int retrySleepMs) { 1597 super(message); 1598 this.retries = retries; 1599 this.retrySleepMs = retrySleepMs; 1600 } 1601 1602 @Override getMessage()1603 public String getMessage() { 1604 return "Failed after retries: " + retries + 1605 " with retry interval: " + retrySleepMs + "ms."; 1606 } 1607 } 1608 1609 1610 @SuppressWarnings("serial") 1611 static class ConnectRetryException extends RetryException { 1612 ConnectRetryException(String message, int retries, int retrySleepMs)1613 ConnectRetryException(String message, 1614 int retries, 1615 int retrySleepMs) { 1616 super(message, retries, retrySleepMs); 1617 } 1618 } 1619 1620 /** 1621 * Indicates that an election is needed before the hard recovery can 1622 * proceed. Please see SR 20572 for a motivating scenario and 1623 * NetworkPartitionHealingTest for an example. 1624 */ 1625 @SuppressWarnings("serial") 1626 public static class HardRecoveryElectionException extends Exception { 1627 1628 final NameIdPair masterNameIdPair; 1629 final VLSN lastTxnEnd; 1630 final VLSN matchpointVLSN; 1631 HardRecoveryElectionException(NameIdPair masterNameIdPair, VLSN lastTxnEnd, VLSN matchpointVLSN)1632 public HardRecoveryElectionException(NameIdPair masterNameIdPair, 1633 VLSN lastTxnEnd, 1634 VLSN matchpointVLSN) { 1635 1636 this.masterNameIdPair = masterNameIdPair; 1637 this.lastTxnEnd = lastTxnEnd; 1638 this.matchpointVLSN = matchpointVLSN; 1639 } 1640 1641 /** 1642 * The master that needs to be verified with an election. 1643 */ getMaster()1644 public NameIdPair getMaster() { 1645 return masterNameIdPair; 1646 } 1647 1648 @Override getMessage()1649 public String getMessage() { 1650 return "Need election preceding hard recovery to verify master:" + 1651 masterNameIdPair + 1652 " last txn end:" + lastTxnEnd + 1653 " matchpoint VLSN:" + matchpointVLSN; 1654 } 1655 } 1656 1657 /** 1658 * Sets a test hook for installation into Replica class instances to be 1659 * created in the future. This is needed when the test hook must be 1660 * installed before the {@code ReplicatedEnvironment} handle constructor 1661 * returns, so that a test may influence the replay of the sync-up 1662 * transaction backlog. 1663 */ setInitialReplayHook(com.sleepycat.je.utilint.TestHook<Message> hook)1664 static public void setInitialReplayHook 1665 (com.sleepycat.je.utilint.TestHook<Message> hook) { 1666 initialReplayHook = hook; 1667 } 1668 1669 /** 1670 * Set a test hook which is executed when the ReplicaFeederSyncup 1671 * finishes. This differs from the static method 1672 * ReplicaFeederSyncup.setGlobalSyncupHook in that it sets the hook for a 1673 * specific node, whereas the other method is static and sets it globally. 1674 * 1675 * This method is required when a test is trying to set the hook for only 1676 * one node, and the node already exists. The other method is useful when a 1677 * test is trying to set the hook before a node exists. 1678 */ setReplicaFeederSyncupHook(TestHook<Object> syncupHook)1679 public void setReplicaFeederSyncupHook(TestHook<Object> syncupHook) { 1680 replicaFeederSyncupHook = syncupHook; 1681 } 1682 getReplicaFeederSyncupHook()1683 public TestHook<Object> getReplicaFeederSyncupHook() { 1684 return replicaFeederSyncupHook; 1685 } 1686 } 1687