1 /*-
2  * See the file LICENSE for redistribution information.
3  *
4  * Copyright (c) 2002, 2014 Oracle and/or its affiliates.  All rights reserved.
5  *
6  */
7 
8 package com.sleepycat.je.rep.impl.node;
9 
10 import static com.sleepycat.je.rep.impl.node.ReplicaStatDefinition.N_LAG_CONSISTENCY_WAITS;
11 import static com.sleepycat.je.rep.impl.node.ReplicaStatDefinition.N_LAG_CONSISTENCY_WAIT_MS;
12 import static com.sleepycat.je.rep.impl.node.ReplicaStatDefinition.N_VLSN_CONSISTENCY_WAITS;
13 import static com.sleepycat.je.rep.impl.node.ReplicaStatDefinition.N_VLSN_CONSISTENCY_WAIT_MS;
14 
15 import java.io.IOException;
16 import java.net.ConnectException;
17 import java.nio.channels.ClosedByInterruptException;
18 import java.util.Set;
19 import java.util.SortedMap;
20 import java.util.TreeMap;
21 import java.util.concurrent.ArrayBlockingQueue;
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.TimeUnit;
24 import java.util.logging.Level;
25 import java.util.logging.Logger;
26 
27 import com.sleepycat.je.CheckpointConfig;
28 import com.sleepycat.je.DatabaseException;
29 import com.sleepycat.je.EnvironmentFailureException;
30 import com.sleepycat.je.ReplicaConsistencyPolicy;
31 import com.sleepycat.je.StatsConfig;
32 import com.sleepycat.je.dbi.DbConfigManager;
33 import com.sleepycat.je.dbi.EnvironmentImpl;
34 import com.sleepycat.je.rep.CommitPointConsistencyPolicy;
35 import com.sleepycat.je.rep.GroupShutdownException;
36 import com.sleepycat.je.rep.InsufficientLogException;
37 import com.sleepycat.je.rep.MasterStateException;
38 import com.sleepycat.je.rep.ReplicaConsistencyException;
39 import com.sleepycat.je.rep.ReplicatedEnvironment.State;
40 import com.sleepycat.je.rep.RestartRequiredException;
41 import com.sleepycat.je.rep.TimeConsistencyPolicy;
42 import com.sleepycat.je.rep.impl.RepImpl;
43 import com.sleepycat.je.rep.impl.RepParams;
44 import com.sleepycat.je.rep.net.DataChannel;
45 import com.sleepycat.je.rep.net.DataChannelFactory.ConnectOptions;
46 import com.sleepycat.je.rep.stream.MasterStatus.MasterSyncException;
47 import com.sleepycat.je.rep.stream.Protocol;
48 import com.sleepycat.je.rep.stream.Protocol.Heartbeat;
49 import com.sleepycat.je.rep.stream.Protocol.ShutdownRequest;
50 import com.sleepycat.je.rep.stream.ReplicaFeederHandshake;
51 import com.sleepycat.je.rep.stream.ReplicaFeederSyncup;
52 import com.sleepycat.je.rep.stream.ReplicaFeederSyncup.TestHook;
53 import com.sleepycat.je.rep.txn.MasterTxn;
54 import com.sleepycat.je.rep.txn.ReplayTxn;
55 import com.sleepycat.je.rep.utilint.BinaryProtocol.Message;
56 import com.sleepycat.je.rep.utilint.BinaryProtocol.MessageOp;
57 import com.sleepycat.je.rep.utilint.BinaryProtocol.ProtocolException;
58 import com.sleepycat.je.rep.utilint.BinaryProtocolStatDefinition;
59 import com.sleepycat.je.rep.utilint.NamedChannelWithTimeout;
60 import com.sleepycat.je.rep.utilint.RepUtils;
61 import com.sleepycat.je.rep.utilint.RepUtils.ExceptionAwareCountDownLatch;
62 import com.sleepycat.je.rep.utilint.ServiceDispatcher;
63 import com.sleepycat.je.rep.utilint.ServiceDispatcher.Response;
64 import com.sleepycat.je.rep.utilint.ServiceDispatcher.ServiceConnectFailedException;
65 import com.sleepycat.je.utilint.LoggerUtils;
66 import com.sleepycat.je.utilint.LongStat;
67 import com.sleepycat.je.utilint.StatGroup;
68 import com.sleepycat.je.utilint.StoppableThread;
69 import com.sleepycat.je.utilint.TestHookExecute;
70 import com.sleepycat.je.utilint.VLSN;
71 
72 /**
73  * The Replica class is the locus of the replay operations and replica
74  * transaction consistency tracking and management operations at a replica
75  * node.
76  *
77  * A single instance of this class is created when the replication node is
78  * created and exists for the lifetime of the replication node, although it is
79  * only really used when the node is operating as a Replica.
80  *
81  * Note that the Replica (like the FeederManager) does not have its own
82  * independent thread of control; it runs in the RepNode's thread. To make the
83  * network I/O as aync as possible, and avoid stalls during network I/O the
84  * input and output are done in separate threads. The overall thread
85  * and queue organization is as sketched below:
86  *
87  *  read from network -> RepNodeThread (does read)       -> replayQueue
88  *  replayQueue       -> ReplayThread                     -> outputQueue
89  *  outputQueue       -> ReplicaOutputThread (does write) -> writes to network
90  *
91  * This three thread organization has the following benefits over a single
92  * thread replay model:
93  *
94  * 1) It makes the hearbeat mechanism used to determine whether the HA sockets
95  * are in use more reliable. This is because a heartbeat response cannot
96  * be blocked by lock contention in the replay thread, since a heartbeat
97  * can be sent spontaneously (without an explicit heartbeat request from the
98  * feeder) by the ReplicaOutputThread, if a heartbeat had not been sent during
99  * a heartbeat interval period.
100  *
101  * 2) The cpu load in the replay thread is reducde by offloading the
102  * network-specific aspects of the processing to different threads. It's
103  * important to keep the CPU load in this thread at a minimum so we can use
104  * a simple single thread replay scheme.
105  *
106  * 3) Prevents replay thread stalls by input and output buffering in the two
107  * threads on either side of it.
108  *
109  * With jdk 1.7 we could eliminate the use of these threads and switch over to
110  * the new aysnc I/O APIs, but that involves a lot more code restructuring.
111  */
112 public class Replica {
113 
114     /* The Node to which the Replica belongs. */
115     private final RepNode repNode;
116     private final RepImpl repImpl;
117 
118     /* The replay component of the Replica */
119     private final Replay replay;
120 
121     /* The exception that provoked the replica exit. */
122     private Exception shutdownException = null;
123 
124     /*
125      * It's non null when the loop is active.
126      */
127     private NamedChannelWithTimeout replicaFeederChannel = null;
128 
129     /* The consistency component. */
130     private final ConsistencyTracker consistencyTracker;
131 
132     /**
133      * The latest txn-ending (commit or abort) VLSN that we have on this
134      * replica.
135      */
136     private volatile VLSN txnEndVLSN;
137 
138     /*
139      * A test delay introduced in the replica loop to simulate a loaded
140      * replica. The replica waits this amount of time before processing each
141      * message.
142      */
143     private int testDelayMs = 0;
144 
145     /* For testing only - mimic a network partition. */
146     private boolean dontProcessStream = false;
147 
148     /* Number of times to retry on a network connection failure. */
149     private static final int NETWORK_RETRIES = 2 ;
150 
151     /*
152      * Service unavailable retries. These are typically the result of service
153      * request being made before the node is ready to provide them. For
154      * example, the feeder service is only available after a node has
155      * transitioned to becoming the master.
156      */
157     private static final int SERVICE_UNAVAILABLE_RETRIES = 10;
158 
159     /*
160      * The number of ms to wait between above retries, allowing time for the
161      * master to assume its role, and start listening on its port.
162      */
163     private static final int CONNECT_RETRY_SLEEP_MS = 1000;
164 
165     /*
166      * The protocol instance if one is currently in use by the Replica.
167      */
168     private Protocol protocol = null;
169 
170 
171     /*
172      * Protocol statistics aggregated across all past protocol instantiations.
173      * It does not include the statistics for the current Protocol object in
174      * use. A node can potentially go through the Replica state multiple time
175      * during it's lifetime. This instance aggregates replica statistics
176      * across all transitions into and out of the Replica state.
177      */
178     private final StatGroup aggProtoStats;
179 
180     /*
181      * Holds the exception that is thrown to indicate that an election is
182      * needed before a hard recovery can proceed. It's set to a non-null value
183      * when the need for a hard recovery is first discovered and is
184      * subsequently cleared after an election is held and before the next
185      * attempt at a syncup with the newly elected master. The election ensures
186      * that the master being used for an actual rollback is current and is not
187      * an isolated master that is out of date, due to a network partition that
188      * has since been resolved.
189      */
190     private HardRecoveryElectionException hardRecoveryElectionException;
191 
192     /* For testing only. */
193     private TestHook<Object> replicaFeederSyncupHook;
194     private final com.sleepycat.je.utilint.TestHook<Message> replayHook;
195     static private com.sleepycat.je.utilint.TestHook<Message> initialReplayHook;
196 
197     /*
198      * A cache of DatabaseImpls for the Replay to speed up DbTree.getId().
199      * Cleared/invalidated by a heartbeat or if je.rep.dbIdCacheOpCount
200      * operations have gone by, or if any replay operations on Name LNs are
201      * executed.
202      */
203     private final DbCache dbCache;
204 
205     /**
206      * The message queue used for communications between the network read
207      * thread and the replay thread.
208      */
209     private final BlockingQueue<Message> replayQueue;
210 
211     /*
212      * The replica output thread. It's only maintained here as an IV, rather
213      * than as a local variable inside doRunReplicaLoopInternalWork() to
214      * facilitate unit tests and is non null only for for the duration of the
215      * method.
216      */
217     private volatile ReplicaOutputThread replicaOutputThread;
218 
219     private final Logger logger;
220 
221     /**
222      * The number of times a message entry could not be inserted into
223      * the queue within the poll period and had to be retried.
224      */
225     private final LongStat nMessageQueueOverflows;
226 
Replica(RepNode repNode, Replay replay)227     Replica(RepNode repNode, Replay replay) {
228         this.repNode = repNode;
229         this.repImpl = repNode.getRepImpl();
230         DbConfigManager configManager = repNode.getConfigManager();
231         dbCache = new DbCache(repImpl.getDbTree(),
232                               configManager.getInt
233                                   (RepParams.REPLAY_MAX_OPEN_DB_HANDLES),
234                                   configManager.getDuration
235                                   (RepParams.REPLAY_DB_HANDLE_TIMEOUT));
236 
237         consistencyTracker = new ConsistencyTracker();
238         this.replay = replay;
239         logger = LoggerUtils.getLogger(getClass());
240         aggProtoStats =
241             new StatGroup(BinaryProtocolStatDefinition.GROUP_NAME,
242                           BinaryProtocolStatDefinition.GROUP_DESC);
243         nMessageQueueOverflows = replay.getMessageQueueOverflows();
244         testDelayMs =
245             repNode.getConfigManager().getInt(RepParams.TEST_REPLICA_DELAY);
246         replayHook = initialReplayHook;
247 
248         /* Set up the replay queue. */
249         final int replayQueueSize = repNode.getConfigManager().
250             getInt(RepParams.REPLICA_MESSAGE_QUEUE_SIZE);
251 
252         replayQueue = new ArrayBlockingQueue<Message>(replayQueueSize);
253     }
254 
255     /**
256      * Shutdown the Replica, free any threads that may have been waiting for
257      * the replica to reach some degree of consistency. This method is only
258      * invoked as part of the repnode shutdown.
259      *
260      * If the shutdown is being executed from a different thread, it attempts
261      * to interrupt the thread by first shutting down the channel it may be
262      * waiting on for input from the feeder. The replica thread should notice
263      * the channel shutdown and/or the shutdown state of the rep node itself.
264      * The caller will use harsher methods, like an interrupt, if the rep node
265      * thread (Replica or Feeder) is still active.
266      */
shutdown()267     public void shutdown() {
268         if (!repNode.isShutdown()) {
269             throw EnvironmentFailureException.unexpectedState
270                 ("Rep node must have initiated the shutdown.");
271         }
272         consistencyTracker.shutdown();
273         if (Thread.currentThread() == repNode) {
274             return;
275         }
276 
277         /*
278          * Perform the actions to provoke a "soft" shutdown.
279          *
280          * Since the replica shares the RepNode thread, it will take care of
281          * the actual thread shutdown itself.
282          */
283 
284         /*
285          * Shutdown the channel as an attempt to interrupt just the socket
286          * read/write operation.
287          */
288         RepUtils.shutdownChannel(replicaFeederChannel);
289 
290         /*
291          * Clear the latch in case the replica loop is waiting for the outcome
292          * of an election.
293          */
294         repNode.getVLSNFreezeLatch().clearLatch();
295     }
296 
297     /**
298      * For unit testing only!
299      */
setTestDelayMs(int testDelayMs)300     public void setTestDelayMs(int testDelayMs) {
301         this.testDelayMs = testDelayMs;
302     }
303 
getTestDelayMs()304     public int getTestDelayMs() {
305         return testDelayMs;
306     }
307 
308     /**
309      * For unit testing only!
310      */
setDontProcessStream()311     public void setDontProcessStream() {
312         dontProcessStream = true;
313     }
314 
getTxnEndVLSN()315     public VLSN getTxnEndVLSN() {
316         return txnEndVLSN;
317     }
318 
replay()319     public Replay replay() {
320         return replay;
321     }
322 
getDbCache()323     public DbCache getDbCache() {
324         return dbCache;
325     }
326 
getConsistencyTracker()327     public ConsistencyTracker getConsistencyTracker() {
328         return consistencyTracker;
329     }
330 
331 
getReplicaFeederChannel()332     DataChannel getReplicaFeederChannel() {
333         return replicaFeederChannel.getChannel();
334     }
335 
getProtocol()336     Protocol getProtocol() {
337         return protocol;
338     }
339 
340     /**
341      * Returns the last commit VLSN at the master, as known at the replica.
342      *
343      * @return the commit VLSN
344      */
getMasterTxnEndVLSN()345     public long getMasterTxnEndVLSN() {
346         return consistencyTracker.getMasterTxnEndVLSN();
347     }
348 
349     /**
350      * For test use only.
351      */
getReplicaOutputThread()352     public ReplicaOutputThread getReplicaOutputThread() {
353         return replicaOutputThread;
354     }
355 
356     /**
357      * The core control loop when the node is serving as a Replica. Note that
358      * if a Replica is also serving the role of a feeder, it will run
359      * additional feeder loops in separate threads. The loop exits when it
360      * encounters one of the following possible conditions:
361      *
362      * 1) The connection to the master can no longer be maintained, due to
363      * connectivity issues, or because the master has explicitly shutdown its
364      * connections due to an election.
365      *
366      * 2) The node becomes aware of a change in master, that is, assertSync()
367      * fails.
368      *
369      * 3) The loop is interrupted, which is interpreted as a request to
370      * shutdown the replication node as a whole.
371      *
372      * 4) It fails to establish its node information in the master as it
373      * attempts to join the replication group for the first time.
374      *
375      * Normal exit from this run loop results in the rep node retrying an
376      * election and continuing in its new role as determined by the outcome of
377      * the election. A thrown exception, on the other hand, results in the rep
378      * node as a whole terminating its operation and no longer participating in
379      * the replication group, that is, it enters the DETACHED state.
380      *
381      * Note that the in/out streams are handled synchronously on the replica,
382      * while they are handled asynchronously by the Feeder.
383      *
384      * @throws InterruptedException
385      * @throws RestoreFailedException
386      * @throws DatabaseException if the environment cannot be closed/for a
387      * re-init
388      * @throws GroupShutdownException
389      */
runReplicaLoop()390     void runReplicaLoop()
391         throws InterruptedException,
392                DatabaseException,
393                GroupShutdownException {
394 
395         Class<? extends RetryException> retryExceptionClass = null;
396         int retryCount = 0;
397         try {
398 
399             while (true) {
400                 try {
401                     runReplicaLoopInternal();
402                     /* Normal exit */
403                     break;
404                 } catch (RetryException e) {
405                     if (!repNode.getMasterStatus().inSync()) {
406                         LoggerUtils.fine(logger, repImpl,
407                                          "Retry terminated, out of sync.");
408                         break;
409                     }
410                     if ((e.getClass() == retryExceptionClass) ||
411                         (e.retries == 0)) {
412                         if (++retryCount >= e.retries) {
413                             /* Exit replica retry elections */
414                             LoggerUtils.info
415                                 (logger, repImpl,
416                                  "Failed to recover from exception: " +
417                                  e.getMessage() + ", despite " + e.retries +
418                                  " retries.\n" +
419                                  LoggerUtils.getStackTrace(e));
420                             break;
421                         }
422                     } else {
423                         retryCount = 0;
424                         retryExceptionClass = e.getClass();
425                     }
426                     LoggerUtils.info(logger, repImpl, "Retry #: " +
427                                      retryCount + "/" + e.retries +
428                                      " Will retry replica loop after " +
429                                      e.retrySleepMs + "ms. ");
430                     Thread.sleep(e.retrySleepMs);
431                     if (!repNode.getMasterStatus().inSync()) {
432                         break;
433                     }
434                 }
435             }
436         } finally {
437             /*
438              * Reset the rep node ready latch unless the replica is not ready
439              * because it's going to hold an election before proceeding with
440              * hard recovery and joining the group.
441              */
442             if (hardRecoveryElectionException == null) {
443                 repNode.resetReadyLatch(shutdownException);
444             }
445         }
446         /* Exit use elections to try a different master. */
447     }
448 
runReplicaLoopInternal()449     private void runReplicaLoopInternal()
450         throws RestartRequiredException,
451                InterruptedException,
452                RetryException,
453                InsufficientLogException {
454 
455         shutdownException = null;
456         LoggerUtils.info(logger, repImpl,
457                          "Replica loop started with master: " +
458                          repNode.getMasterStatus().getNodeMasterNameId());
459         if (testDelayMs > 0) {
460             LoggerUtils.info(logger, repImpl,
461                              "Test delay of: " + testDelayMs + "ms." +
462                              " after each message sent");
463         }
464         try {
465             initReplicaLoop();
466             doRunReplicaLoopInternalWork();
467         } catch (RestartRequiredException rre) {
468             shutdownException = rre;
469             throw rre;
470         } catch (ClosedByInterruptException closedByInterruptException) {
471             if (repNode.isShutdown()) {
472                 LoggerUtils.info(logger, repImpl,
473                                  "Replica loop interrupted for shutdown.");
474                 return;
475             }
476             LoggerUtils.warning(logger, repImpl,
477                                 "Replica loop unexpected interrupt.");
478             throw new InterruptedException
479                 (closedByInterruptException.getMessage());
480         } catch (IOException e) {
481 
482             /*
483              * Master may have changed with the master shutting down its
484              * connection as a result. Normal course of events, log it and
485              * return to the outer node level loop.
486              */
487             LoggerUtils.info(logger, repImpl,
488                              "Replica IO exception: " + e.getMessage() +
489                              "\n" + LoggerUtils.getStackTrace(e));
490         } catch (RetryException e) {
491             /* Propagate it outwards. Node does not need to shutdown. */
492             throw e;
493         } catch (GroupShutdownException e) {
494             shutdownException = e;
495             throw e;
496         } catch (RuntimeException e) {
497             shutdownException = e;
498             LoggerUtils.severe(logger, repImpl,
499                                "Replica unexpected exception " + e +
500                                 " " + LoggerUtils.getStackTrace(e));
501             throw e;
502         } catch (MasterSyncException e) {
503             /* expected change in masters from an election. */
504             LoggerUtils.info(logger, repImpl, e.getMessage());
505         } catch (HardRecoveryElectionException e) {
506             /*
507              * Exit the replica loop so that elections can be held and the
508              * master confirmed.
509              */
510             hardRecoveryElectionException = e;
511             LoggerUtils.info(logger, repImpl, e.getMessage());
512         } catch (Exception e) {
513             shutdownException = e;
514             LoggerUtils.severe(logger, repImpl,
515                                "Replica unexpected exception " + e +
516                                " " + LoggerUtils.getStackTrace(e));
517             throw EnvironmentFailureException.unexpectedException(e);
518         } finally {
519             loopExitCleanup();
520         }
521     }
522 
doRunReplicaLoopInternalWork()523     protected void doRunReplicaLoopInternalWork()
524        throws Exception {
525 
526         final int timeoutMs = repNode.getConfigManager().
527                 getDuration(RepParams.REPLICA_TIMEOUT);
528         replicaFeederChannel.setTimeoutMs(timeoutMs);
529 
530         replayQueue.clear();
531         repImpl.getReplay().reset();
532 
533         replicaOutputThread = new ReplicaOutputThread(repImpl);
534         replicaOutputThread.start();
535 
536         final ReplayThread replayThread = new ReplayThread();
537         replayThread.start();
538         long maxPending = 0;
539 
540         try {
541             while (true) {
542                 Message message = protocol.read(replicaFeederChannel);
543 
544                 if (repNode.isShutdownOrInvalid() || (message == null)) {
545                     return;
546                 }
547 
548                 while (!replayQueue.
549                         offer(message,
550                               ReplayThread.QUEUE_POLL_INTERVAL_NS,
551                               TimeUnit.NANOSECONDS)) {
552                     /* Offer timed out. */
553                     if (!replayThread.isAlive()) {
554                         return;
555                     }
556                     /* Retry the offer */
557                     nMessageQueueOverflows.increment();
558                 }
559 
560                 final int pending = replayQueue.size();
561                 if (pending > maxPending) {
562                     maxPending = pending;
563                     if ((maxPending % 100) == 0) {
564                         /* Prune logging information. */
565                         LoggerUtils.info(logger, repImpl,
566                                          "Max pending replay log items:" +
567                                           maxPending);
568                     }
569                 }
570             }
571         } catch (IOException ioe) {
572             /*
573              * Make sure messages in the queue are processed. Ensure, in
574              * particular, that shutdown requests are processed and not ignored
575              * due to the IOEException resulting from a closed connection.
576              */
577             replayThread.exitRequest = ReplayExitType.SOFT;
578         } finally {
579 
580             if (replayThread.exitRequest == ReplayExitType.SOFT) {
581                 /*
582                  * Drain all queued messages, exceptions may be generated
583                  * in the process. They logically precede IO exceptions.
584                  */
585                 replayThread.join();
586             }
587 
588             try {
589 
590                 if (replayThread.exception != null) {
591                     /* replay thread is dead or exiting. */
592                     throw replayThread.exception;
593                 }
594 
595                 if (replicaOutputThread.getException() != null) {
596                     throw replicaOutputThread.getException();
597                 }
598             } finally {
599 
600                 /* Ensure thread has exited in all circumstances */
601                 replayThread.exitRequest = ReplayExitType.IMMEDIATE;
602                 replayThread.join();
603 
604                 replicaOutputThread.shutdownThread(logger);
605                 replicaOutputThread = null;
606             }
607         }
608     }
609 
610     /**
611      * Process the shutdown message from the master and return the
612      * GroupShutdownException that must be thrown to exit the Replica loop.
613      *
614      * @return the GroupShutdownException
615      */
processShutdown(ShutdownRequest shutdown)616     private GroupShutdownException processShutdown(ShutdownRequest shutdown)
617         throws IOException {
618 
619         /*
620          * Acknowledge the shutdown message right away, since the checkpoint
621          * operation can take a long time to complete. Long enough to exceed
622          * the feeder timeout on the master. The master only needs to know that
623          * the replica has received the message.
624          */
625         replay.queueAck(ReplicaOutputThread.SHUTDOWN_ACK);
626 
627         /*
628          * Turn off network timeouts on the replica, since we don't want the
629          * replica to timeout the connection. The connection itself is no
630          * longer used past this point and will be reclaimed as part of normal
631          * replica exit cleanup.
632          */
633         replicaFeederChannel.setTimeoutMs(Integer.MAX_VALUE);
634 
635         /*
636          * TODO: Share the following code with the standalone Environment
637          * shutdown, or better yet, call EnvironmentImpl.doClose here.
638          */
639 
640         /*
641          * Begin shutdown of the deamons before checkpointing.  Cleaning during
642          * the checkpoint is wasted and slows down the checkpoint, plus it may
643          * cause additional checkpoints.
644          */
645         repNode.getRepImpl().requestShutdownDaemons();
646 
647         /*
648          * Now start a potentially long running checkpoint.
649          */
650         LoggerUtils.info(logger, repImpl, "Checkpoint initiated.");
651         CheckpointConfig config = new CheckpointConfig();
652         config.setForce(true);
653         config.setMinimizeRecoveryTime(true);
654         repNode.getRepImpl().invokeCheckpoint(config, "Group Shutdown");
655         /* Force final shutdown of the daemons. */
656         repNode.getRepImpl().shutdownDaemons();
657         LoggerUtils.info(logger, repImpl, "Checkpoint completed.");
658 
659         return new GroupShutdownException(logger,
660                                           repNode,
661                                           shutdown.getShutdownTimeMs());
662     }
663 
664     /**
665      * Initialize for replica loop entry, which involves completing the
666      * following steps successfully:
667      *
668      * 1) The replica feeder handshake.
669      * 2) The replica feeder syncup.
670      * 3) Processing the first heartbeat request from the feeder.
671      */
initReplicaLoop()672     private void initReplicaLoop()
673         throws IOException,
674                ConnectRetryException,
675                DatabaseException,
676                ProtocolException,
677                InterruptedException,
678                HardRecoveryElectionException {
679 
680         createReplicaFeederChannel();
681         ReplicaFeederHandshake handshake =
682             new ReplicaFeederHandshake(repNode, replicaFeederChannel);
683         protocol = handshake.execute();
684         repNode.notifyReplicaConnected();
685 
686         final boolean hardRecoveryNeedsElection;
687 
688         if (hardRecoveryElectionException != null) {
689             LoggerUtils.info(logger, repImpl,
690                              "Replica syncup after election to verify master:"+
691                              hardRecoveryElectionException.getMaster() +
692                              " elected master:" +
693                              repNode.getMasterStatus().getNodeMasterNameId());
694             hardRecoveryNeedsElection = false;
695         } else {
696             hardRecoveryNeedsElection = true;
697         }
698         hardRecoveryElectionException = null;
699 
700         ReplicaFeederSyncup syncup =
701             new ReplicaFeederSyncup(repNode, replay, replicaFeederChannel,
702                                     protocol, hardRecoveryNeedsElection);
703         syncup.execute(repNode.getCBVLSNTracker());
704 
705         txnEndVLSN = syncup.getMatchedVLSN();
706         long matchedTxnEndTime = syncup.getMatchedVLSNTime();
707         consistencyTracker.reinit(txnEndVLSN.getSequence(),
708                                   matchedTxnEndTime);
709         Protocol.Heartbeat heartbeat =
710             protocol.read(replicaFeederChannel.getChannel(),
711                           Protocol.Heartbeat.class);
712         processHeartbeat(heartbeat);
713         long replicaDelta = consistencyTracker.getMasterTxnEndVLSN() -
714             consistencyTracker.lastReplayedVLSN.getSequence();
715         LoggerUtils.info(logger, repImpl, String.format
716                          ("Replica initialization completed. Replica VLSN: %s "
717                           + " Heartbeat master commit VLSN: %,d " +
718                           "VLSN delta: %,d",
719                           consistencyTracker.lastReplayedVLSN,
720                           consistencyTracker.getMasterTxnEndVLSN(),
721                           replicaDelta));
722 
723         /*
724          * The replica is ready for business, indicate that the node is
725          * ready by counting down the latch and releasing any waiters.
726          */
727         repNode.getReadyLatch().countDown();
728     }
729 
730     /**
731      * Process a heartbeat message. It queues a response and updates
732      * the consistency tracker with the information in the heartbeat.
733      *
734      * @param heartbeat
735      * @throws IOException
736      */
processHeartbeat(Heartbeat heartbeat)737     private void processHeartbeat(Heartbeat heartbeat)
738         throws IOException {
739 
740         replay.queueAck(ReplicaOutputThread.HEARTBEAT_ACK);
741         consistencyTracker.trackHeartbeat(heartbeat);
742     }
743 
744     /**
745      * Performs the cleanup actions upon exit from the internal replica loop.
746      *
747      * @param replicaFeederChannel
748      */
loopExitCleanup()749     private void loopExitCleanup() {
750 
751         if (shutdownException != null) {
752             if (shutdownException instanceof RetryException) {
753                 LoggerUtils.info(logger, repImpl,
754                                  "Retrying connection to feeder. Message: " +
755                                  shutdownException.getMessage());
756             } else if (shutdownException instanceof GroupShutdownException) {
757                 LoggerUtils.info(logger, repImpl,
758                                  "Exiting inner Replica loop." +
759                                  " Master requested shutdown.");
760             } else {
761                 LoggerUtils.warning
762                     (logger, repImpl,
763                      "Exiting inner Replica loop with exception " +
764                      shutdownException + "\n" +
765                      LoggerUtils.getStackTrace(shutdownException));
766             }
767         } else {
768             LoggerUtils.info(logger, repImpl, "Exiting inner Replica loop." );
769         }
770 
771         clearDbTreeCache();
772         RepUtils.shutdownChannel(replicaFeederChannel);
773 
774         if (consistencyTracker != null) {
775             consistencyTracker.logStats();
776         }
777 
778         /* Sum up statistics for the loop. */
779         if (protocol != null) {
780             aggProtoStats.addAll(protocol.getStats(StatsConfig.DEFAULT));
781         }
782         protocol = null;
783 
784         /*
785          * If this is a secondary node, then null out its ID to allow the
786          * next feeder connection to assign it a new one
787          */
788         if (repNode.getNodeType().isSecondary()) {
789             repNode.getNameIdPair().revertToNull();
790         }
791     }
792 
793     /*
794      * Clear the DatabaseId -> DatabaseImpl cache used to speed up DbTree
795      * lookup operations.
796      */
clearDbTreeCache()797     void clearDbTreeCache() {
798         dbCache.clear();
799     }
800 
801     /**
802      * Invoked when this node transitions to the master state. Aborts all
803      * inflight replay transactions outstanding from a previous state as a
804      * Replica, because they were initiated by a different master and will
805      * never complete. Also, release any Replica transactions that were waiting
806      * on consistency policy requirements.
807      */
masterTransitionCleanup()808     void masterTransitionCleanup()
809         throws DatabaseException {
810         hardRecoveryElectionException = null;
811         replay.abortOldTxns();
812         consistencyTracker.forceTripLatches
813             (new MasterStateException(repNode.getRepImpl().
814                                       getStateChangeEvent()));
815     }
816 
817     /**
818      * Invoked when this node seamlessly changes roles from master to replica
819      * without a recovery. The ability to do this transition without a recovery
820      * is desirable because it's a faster transition, and avoids the GC
821      * overhead of releasing the JE cache, and the I/O overhead of recreating
822      * the in-memory btree.
823      * <p>
824      * The two key cases where this happens are:
825      * A) a network partition occurs, and the group elects a new master. The
826      * orphaned master did not crash and its environment is still valid, and
827      * when it regains contact with the group, it discovers that it has been
828      * deposed. It transitions into a replica status.
829      * <p>
830      * B) a master transfer request moves mastership from this node to another
831      * member of the group. This node's environment is still valid, and it
832      * transitions to replica state.
833      * <p>
834      * The transition from master to replica requires resetting state so all
835      * is as expected for a Replica. There are two categories of work:
836      * - network connections: shutting down feeder connections and
837      * reinitializing feeder infrastructure so that a future replica->master
838      * transition will work.
839      * - resetting transaction state. All MasterTxns must be transformed
840      * into ReplayTxns, bearing the same transaction id and holding the same
841      * locks.
842      * <p>
843      * Note: since non-masters can't commit txns, the inflight MasterTxns are
844      * destined to be aborted in the future. An alternative to resetting
845      * transaction state would be to mark them in some way so that the later HA
846      * syncup/ replay ignores operations pertaining to these ill fated txns. We
847      * didn't chose that approach because the simplicity of the replay is a
848      * plus; it is almost entirely ignorant of the semantics of the replication
849      * stream. Also, replays have potential for complexity, either because
850      * syncups could restart if masters change or become unavailable, or
851      * because there may be future performance optimizations in that area.
852      * <p>
853      * Resetting transaction state is tricky because the MasterTxn is
854      * accessible to the application code. While the Replay thread is
855      * attempting to transform the MasterTxn, application threads may be
856      * attempting to commit or abort the transactions. Note that application
857      * threads will not be trying to add locks, because the node will be in
858      * UNKNOWN state, and writes will be prohibited by the MasterTxn.
859      * <p>
860      * MasterTransfers do impose a blocking period on transaction commits and
861      * aborts, but even there, windows exist in the post-block period where
862      * the application may try to abort the transaction. Network partitions
863      * do no form of blocking, and have a wider window when the application
864      * and RepNode thread must be coordinated. Here's a diagram of the time
865      * periods of concern
866      * <p>
867      * t1 - master transfer request issued (only when master transfer)
868      * t2 - user txns which attempt to abort or commit are blocked on
869      *      RepImpl.blockTxnLatch (only when mt)
870      * t3 - node detects that it has transitioned to UNKNOWN and lost
871      *      master status. MasterTxns are now stopped from acquiring
872      *      locks or committing and will throw UnknownMasterException.
873      * t4 - feeder connections shutdown
874      * t5 - node begins conversion to replica state
875      * t6 - blockTxnLatch released (only when master transfer)
876      * t7 - existing MasterTxns converted into ReplayTxns, locks moved into
877      *      new ReplayTxns. Blocked txns must be released before this
878      *      conversion, because the application thread is holding the
879      *      txn mutex, and conversion needs to take that mutex.
880      * <p>
881      * At any time during this process, the application threads may attempt to
882      * abort or commit outstanding txns, or acquire read or write locks. After
883      * t3, any attempts to lock, abort or commit will throw an
884      * UnknownMasterException or ReplicaWriteException, and in the normal
885      * course of events, the txn would internally abort. But once t5 is
886      * reached, we want to prevent any changes to the number of write locks in
887      * the txn so as to prevent interference with the conversion of the master
888      * txns and any danger of converting only part of a txn. We set the
889      * volatile, transient MasterTxn.freeze field at t5 to indicate that there
890      * should be no change to the contents of the transaction. When freeze is
891      * true, any attempts to abort or commit the transaction will throw
892      * Unknown/ReplicaWriteException, and the txn will be put into MUST_ABORT
893      * state, but the existing locks will be unchanged.
894      * <p>
895      * In a network partition, it's possible that the txn will be aborted or
896      * committed locally before t5. In that case, there may be a hard rollback
897      * when the node syncs up with the new master, and finds the anomalous
898      * abort record. In masterTransfer, the window is smaller, and the blocking
899      * latch ensures that no commits can happen bettween t1-t5. After t5, the
900      * node will not be a master, so there can be no commits. Aborts may happen
901      * and can cause hard rollbacks, but no data will be lost.
902      * <p>
903      * The freeze field is similar to the blockTxnLatch, and we considered
904      * using the blockTxnLatch to stabilize the txns, but ruled it out because:
905      * - the locking hierarchy where the application thread holds the txn
906      *   mutex while awaiting the block txn latch prevents txn conversion.
907      * - the blockTxnLatch is scoped to the MasterTransfer instance, which may
908      *   not be in play for network partitioning.
909      */
replicaTransitionCleanup()910     void replicaTransitionCleanup() {
911 
912         /*
913          * Logically an assert, use an exception rather than Java assert
914          * because we want this check to be enabled at all times. If
915          * unexpectedly in master state, invalidate the environment, so we do a
916          * recovery and are sure to cleanup.
917          */
918         if (repImpl.getState() == State.MASTER) {
919             throw EnvironmentFailureException.unexpectedState(repImpl,
920                 "Should not be in MASTER state when converting from master " +
921                 "to replica state");
922         }
923 
924         /*
925          * Find all MasterTxns, and convert them to ReplayTxns.  The set of
926          * existing MasterTxns cannot increase at this point, because the node
927          * is not in MASTER state. Freeze all txns and prevent change.
928          */
929         Set<MasterTxn> existingMasterTxns = repImpl.getExistingMasterTxns();
930         LoggerUtils.info(logger, repImpl,
931                          "Transitioning node to replica state, " +
932                          existingMasterTxns.size() + " txns to clean up");
933 
934         /* Prevent aborts on all MasterTxns; hold their contents steady */
935         for (MasterTxn masterTxn: existingMasterTxns) {
936             masterTxn.freeze();
937         }
938 
939         /*
940          * Unblock any transactions that are stuck in their commit processing,
941          * awaiting the release of the master transfer block. Such
942          * transactions hold a mutex on the transaction, and the mutex would
943          * block any of the lock stealing that will occur below. Note that if
944          * we are doing this transition because of a network partition, there
945          * will be no blocked transactions.
946          */
947         repImpl.unblockTxnCompletion();
948 
949         for (MasterTxn masterTxn: existingMasterTxns) {
950 
951             /*
952              * Convert this masterTxn to a ReplayTxn and move any existing
953              * write locks to it. Unfreeze and then abort the masterTxn.
954              */
955             ReplayTxn replayTxn =
956                 masterTxn.convertToReplayTxnAndClose(logger,
957                                                      repImpl.getReplay());
958 
959             if (replayTxn == null) {
960                 LoggerUtils.info(logger, repImpl, "Master Txn " +
961                                  masterTxn.getId() +
962                                  " has no locks, nothing to transfer");
963             } else {
964                 repImpl.getTxnManager().registerTxn(replayTxn);
965                 LoggerUtils.info(logger, repImpl,
966                                  "state for replay transaction " +
967                                  replayTxn.getId() + " = " +
968                                  replayTxn.getState());
969             }
970         }
971 
972         /*
973          * We're done with the transition, clear any active master transfers,
974          * if they exist.
975          */
976         repNode.clearActiveTransfer();
977     }
978 
979     /**
980      * Returns a channel used by the Replica to connect to the Feeder. The
981      * socket is configured with a read timeout that's a multiple of the
982      * heartbeat interval to help detect, or initiate a change in master.
983      *
984      * @throws IOException
985      * @throws ConnectRetryException
986      */
createReplicaFeederChannel()987     private void createReplicaFeederChannel()
988         throws IOException, ConnectRetryException {
989 
990         DataChannel dataChannel = null;
991 
992         final DbConfigManager configManager = repNode.getConfigManager();
993         final int timeoutMs = configManager.
994             getDuration(RepParams.PRE_HEARTBEAT_TIMEOUT);
995 
996         final int receiveBufferSize =
997                 configManager.getInt(RepParams.REPLICA_RECEIVE_BUFFER_SIZE);
998 
999         try {
1000             final int openTimeout = configManager.
1001                 getDuration(RepParams.REPSTREAM_OPEN_TIMEOUT);
1002 
1003             /*
1004              * Note that soTimeout is not set since it's a blocking channel and
1005              * setSoTimeout has no effect on a blocking nio channel.
1006              *
1007              * Push responses out rapidly, they are small (heart beat or commit
1008              * response) and need timely delivery to the master.
1009              * (tcpNoDelay = true)
1010              */
1011 
1012             final ConnectOptions connectOpts = new ConnectOptions().
1013                 setTcpNoDelay(true).
1014                 setReceiveBufferSize(receiveBufferSize).
1015                 setOpenTimeout(openTimeout).
1016                 setBlocking(true);
1017 
1018             dataChannel =
1019                 repImpl.getChannelFactory().
1020                 connect(repNode.getMasterStatus().getNodeMaster(),
1021                         connectOpts);
1022 
1023             replicaFeederChannel =
1024                 new NamedChannelWithTimeout(repNode, dataChannel, timeoutMs);
1025 
1026             ServiceDispatcher.doServiceHandshake
1027                 (dataChannel, FeederManager.FEEDER_SERVICE);
1028         } catch (ConnectException e) {
1029 
1030             /*
1031              * A network problem, or the node went down between the time we
1032              * learned it was the master and we tried to connect.
1033              */
1034             throw new ConnectRetryException(e.getMessage(),
1035                                             NETWORK_RETRIES,
1036                                             CONNECT_RETRY_SLEEP_MS);
1037         } catch (ServiceConnectFailedException e) {
1038 
1039             /*
1040              * The feeder may not have established the Feeder Service
1041              * as yet. For example, the transition to the master may not have
1042              * been completed. Wait longer.
1043              */
1044            if (e.getResponse() == Response.UNKNOWN_SERVICE) {
1045                throw new ConnectRetryException(e.getMessage(),
1046                                                SERVICE_UNAVAILABLE_RETRIES,
1047                                                CONNECT_RETRY_SLEEP_MS);
1048            }
1049            throw EnvironmentFailureException.unexpectedException(e);
1050         }
1051     }
1052 
1053     /**
1054      * Returns the replay statistics associated with the Replica.
1055      *
1056      * @return the statistics.
1057      */
getReplayStats(StatsConfig config)1058     public StatGroup getReplayStats(StatsConfig config) {
1059         return replay.getStats(config);
1060     }
1061 
1062     /* Get the protocl statistics for this replica. */
getProtocolStats(StatsConfig config)1063     public StatGroup getProtocolStats(StatsConfig config) {
1064         StatGroup protoStats = aggProtoStats.cloneGroup(config.getClear());
1065 
1066         /* Guard against concurrent modification. */
1067         Protocol prot = this.protocol;
1068         if (prot != null) {
1069             /* These statistics are not ye a part of the agg statistics. */
1070             protoStats.addAll(prot.getStats(config));
1071         }
1072 
1073         return protoStats;
1074     }
1075 
1076     /* Get the consistency tracker stats for this replica. */
getTrackerStats(StatsConfig config)1077     public StatGroup getTrackerStats(StatsConfig config) {
1078         return consistencyTracker.getStats(config);
1079     }
1080 
1081     /* Reset the stats associated with this Replica. */
resetStats()1082     public void resetStats() {
1083         replay.resetStats();
1084         aggProtoStats.clear();
1085         if (protocol != null) {
1086             protocol.resetStats();
1087         }
1088         consistencyTracker.resetStats();
1089     }
1090 
1091     /**
1092      * Defines the possible types of exits that can be requested from the
1093      * ReplayThread.
1094      */
1095     private enum ReplayExitType {
1096         IMMEDIATE, /* An immediate exit; ignore queued requests. */
1097         SOFT       /* Process pending requests in queue, then exit */
1098     }
1099 
1100     /**
1101      * The thread responsible for the replay of messages delivered over the
1102      * replication stream. Reading and replay are done in separate threads for
1103      * two reasons:
1104      *
1105      * 1) It allows the two activities to make independent progress. The
1106      * network can be read and messages assembled even if the replay activity
1107      * has stalled. 2) The two threads permit use of two cores to perform the
1108      * replay thus making it less likely that cpu is the replay bottleneck.
1109      *
1110      * The inputs and outputs of this thread are schematically described as:
1111      *
1112      * replayQueue -> ReplayThread -> outputQueue
1113      *
1114      * It's the second component of the three thread structure outlined in the
1115      * Replica's class level comment.
1116      */
1117     class ReplayThread extends StoppableThread {
1118 
1119         /**
1120          * Thread exit exception. It's null if the thread exited due to an
1121          * exception. It's the responsibility of the main replica thread to
1122          * propagate the exception across the thread boundary in this case.
1123          */
1124         volatile private Exception exception;
1125 
1126         /**
1127          * Set asynchronously when a shutdown is being requested.
1128          */
1129         volatile ReplayExitType exitRequest = null;
1130 
1131         /* The queue poll interval, 1 second */
1132         private final static long QUEUE_POLL_INTERVAL_NS = 1000000000l;
1133 
ReplayThread()1134         protected ReplayThread() {
1135             super(repImpl, "ReplayThread");
1136         }
1137 
1138         @Override
initiateSoftShutdown()1139         protected int initiateSoftShutdown() {
1140            /* Use immediate, since the stream will continue to be read. */
1141            exitRequest = ReplayExitType.IMMEDIATE;
1142            return 0;
1143         }
1144 
1145         @Override
run()1146         public void run() {
1147 
1148             LoggerUtils.info(logger, repImpl,
1149                              "Replay thread started. Message queue size:" +
1150                               replayQueue.remainingCapacity());
1151 
1152             final int dbTreeCacheClearingOpCount =
1153                 repNode.getDbTreeCacheClearingOpCount();
1154 
1155             long opCount = 0;
1156 
1157             try {
1158                 while (true)  {
1159 
1160                     final long pollIntervalNs =
1161                         replay.getPollIntervalNs(QUEUE_POLL_INTERVAL_NS);
1162 
1163                     final Message message =
1164                         replayQueue.poll(pollIntervalNs,
1165                                           TimeUnit.NANOSECONDS);
1166 
1167                     if ((exitRequest == ReplayExitType.IMMEDIATE) ||
1168                         ((exitRequest == ReplayExitType.SOFT) &&
1169                          (message == null)) ||
1170                          repNode.isShutdownOrInvalid()) {
1171 
1172                         if (exitRequest == ReplayExitType.SOFT) {
1173                             replay.flushPendingAcks(Long.MAX_VALUE);
1174                         }
1175                         return;
1176                     }
1177 
1178                     final long startNs = System.nanoTime();
1179                     replay.flushPendingAcks(startNs);
1180 
1181                     repNode.getMasterStatus().assertSync();
1182 
1183                     if (message == null) {
1184                         /* Timeout on poll. */
1185                         continue;
1186                     }
1187                     assert TestHookExecute.doHookIfSet(replayHook, message);
1188 
1189                     final MessageOp messageOp = message.getOp();
1190 
1191                     if (messageOp == Protocol.SHUTDOWN_REQUEST) {
1192                         throw processShutdown((ShutdownRequest) message);
1193                     }
1194 
1195                     if (messageOp == Protocol.HEARTBEAT) {
1196                         processHeartbeat((Protocol.Heartbeat) message);
1197                         dbCache.tick();
1198                     } else {
1199                         /* For testing only! */
1200                         if (dontProcessStream) {
1201                             LoggerUtils.info(logger, repImpl,
1202                                              "Not processing " + message);
1203                             continue;
1204                         }
1205 
1206                         replay.replayEntry(startNs, (Protocol.Entry) message);
1207 
1208                         /*
1209                          * Note: the consistency tracking is more obscure than
1210                          * it needs to be, because the commit/abort VLSN is set
1211                          * in Replay.replayEntry() and is then used below. An
1212                          * alternative would be to promote the following
1213                          * conditional to a level above, so commit/abort
1214                          * operations get their own replay method which does
1215                          * the consistency tracking.
1216                          */
1217                         if (((Protocol.Entry) message).isTxnEnd()) {
1218                             txnEndVLSN = replay.getLastReplayedVLSN();
1219                             consistencyTracker.trackTxnEnd();
1220                         }
1221                         consistencyTracker.trackVLSN();
1222                     }
1223 
1224                     if (testDelayMs > 0) {
1225                         Thread.sleep(testDelayMs);
1226                     }
1227 
1228                     if (opCount++ % dbTreeCacheClearingOpCount == 0) {
1229                         clearDbTreeCache();
1230                     }
1231                 }
1232             } catch (Exception e) {
1233                 exception = e;
1234                 /*
1235                  * Bring it to the attention of the main thread by freeing
1236                  * up the "offer" wait right away.
1237                  */
1238                 replayQueue.clear();
1239 
1240                 /*
1241                  * Get the attention of the main replica thread in case it's
1242                  * waiting in a read on the socket channel.
1243                  */
1244                 LoggerUtils.info(logger, repImpl,
1245                                  "closing replicaFeederChannel = " +
1246                                  replicaFeederChannel);
1247                 RepUtils.shutdownChannel(replicaFeederChannel);
1248 
1249                 LoggerUtils.info(logger, repImpl,
1250                                  "Replay thread exiting with exception:" +
1251                                   e.getMessage());
1252             }
1253         }
1254 
1255         @Override
getLogger()1256         protected Logger getLogger() {
1257             return logger;
1258         }
1259     }
1260 
1261     /**
1262      * Tracks the consistency of this replica wrt the Master. It provides the
1263      * mechanisms that will cause a beginTransaction() or a joinGroup() to wait
1264      * until the specified consistency policy is satisfied.
1265      */
1266     public class ConsistencyTracker {
1267         private final long NULL_VLSN_SEQUENCE = VLSN.NULL_VLSN.getSequence();
1268 
1269         /*
1270          * Initialized by the Feeder handshake and updated by commit replays.
1271          * All access to lastReplayedXXXX must be synchronized on the
1272          * ConsistencyTracker itself.
1273          */
1274         private long lastReplayedTxnVLSN = NULL_VLSN_SEQUENCE;
1275         private VLSN lastReplayedVLSN = VLSN.NULL_VLSN;
1276         private long masterTxnEndTime = 0l;
1277 
1278         /* Updated by heartbeats */
1279         private volatile long masterTxnEndVLSN;
1280         private volatile long masterNow = 0l;
1281 
1282         private final StatGroup stats =
1283             new StatGroup(ReplicaStatDefinition.GROUP_NAME,
1284                           ReplicaStatDefinition.GROUP_DESC);
1285 
1286         private final LongStat nLagConsistencyWaits =
1287             new LongStat(stats, N_LAG_CONSISTENCY_WAITS);
1288 
1289         private final LongStat nLagConsistencyWaitMs =
1290             new LongStat(stats, N_LAG_CONSISTENCY_WAIT_MS);
1291 
1292         private final LongStat nVLSNConsistencyWaits =
1293             new LongStat(stats, N_VLSN_CONSISTENCY_WAITS);
1294 
1295         private final LongStat nVLSNConsistencyWaitMs =
1296             new LongStat(stats, N_VLSN_CONSISTENCY_WAIT_MS);
1297 
1298         private final OrderedLatches vlsnLatches =
1299             new OrderedLatches(repNode.getRepImpl()) {
1300                 /*
1301                  * Note that this assumes that NULL_VLSN is -1, and that
1302                  * the vlsns ascend.
1303                  */
1304                 @Override
1305                     boolean tripPredicate(long keyVLSN, long tripVLSN) {
1306                     return keyVLSN <= tripVLSN;
1307                 }
1308             };
1309 
1310         private final OrderedLatches lagLatches =
1311             new OrderedLatches(repNode.getRepImpl()) {
1312                 @Override
1313                 boolean tripPredicate(long keyLag, long currentLag) {
1314                     return currentLag <= keyLag;
1315                 }
1316             };
1317 
1318         /**
1319          * Invoked each time after a replica syncup so that the Replica
1320          * can re-establish it's consistency vis a vis the master and what
1321          * part of the replication stream it considers as having been replayed.
1322          *
1323          * @param matchedTxnVLSN the replica state corresponds to this txn
1324          * @param matchedTxnEndTime the time at which this txn was committed or
1325          * aborted on the master
1326          */
reinit(long matchedTxnVLSN, long matchedTxnEndTime)1327         void reinit(long matchedTxnVLSN, long matchedTxnEndTime) {
1328             this.lastReplayedVLSN = new VLSN(matchedTxnVLSN);
1329             this.lastReplayedTxnVLSN = matchedTxnVLSN;
1330             this.masterTxnEndTime = matchedTxnEndTime;
1331         }
1332 
getMasterTxnEndVLSN()1333         public long getMasterTxnEndVLSN() {
1334             return masterTxnEndVLSN;
1335         }
1336 
close()1337         void close() {
1338             logStats();
1339         }
1340 
logStats()1341         void logStats() {
1342             if (logger.isLoggable(Level.INFO)) {
1343                 LoggerUtils.info
1344                     (logger, repImpl,
1345                     "Replica stats - Lag waits: " + nLagConsistencyWaits.get() +
1346                      " Lag wait time: " + nLagConsistencyWaitMs.get()
1347                      + "ms. " +
1348                      " VLSN waits: " + nVLSNConsistencyWaits.get() +
1349                      " Lag wait time: " +  nVLSNConsistencyWaitMs.get() +
1350                      "ms.");
1351             }
1352         }
1353 
1354         /**
1355          * Calculates the time lag in ms at the Replica.
1356          */
currentLag()1357         private long currentLag() {
1358             if (masterNow == 0l) {
1359 
1360                 /*
1361                  * Have not seen a heartbeat, can't determine the time lag in
1362                  * its absence. It's the first message sent by the feeder after
1363                  * completion of the handshake.
1364                  */
1365                 return Integer.MAX_VALUE;
1366             }
1367 
1368             long lag;
1369             if (lastReplayedTxnVLSN < masterTxnEndVLSN) {
1370                 lag = System.currentTimeMillis() - masterTxnEndTime;
1371             } else if (lastReplayedTxnVLSN == masterTxnEndVLSN) {
1372 
1373                 /*
1374                  * The lag is determined by the transactions (if any) that are
1375                  * further downstream, assume the worst.
1376                  */
1377                 lag = System.currentTimeMillis() - masterNow;
1378             } else {
1379                /* commit leapfrogged the heartbeat */
1380                lag = System.currentTimeMillis() - masterNow;
1381             }
1382             return lag;
1383         }
1384 
1385         /**
1386          * Frees all the threads that are waiting on latches.
1387          *
1388          * @param exception the exception to be thrown to explain the reason
1389          * behind the latches being forced.
1390          */
forceTripLatches(DatabaseException exception)1391         synchronized void forceTripLatches(DatabaseException exception) {
1392             assert (exception != null);
1393             vlsnLatches.trip(Long.MAX_VALUE, exception);
1394             lagLatches.trip(0, exception);
1395         }
1396 
trackTxnEnd()1397         synchronized void trackTxnEnd() {
1398             Replay.TxnInfo lastReplayedTxn = replay.getLastReplayedTxn();
1399             lastReplayedTxnVLSN = lastReplayedTxn.getTxnVLSN().getSequence();
1400             masterTxnEndTime = lastReplayedTxn.getMasterTxnEndTime();
1401 
1402             if ((lastReplayedTxnVLSN > masterTxnEndVLSN) &&
1403                 (masterTxnEndTime >= masterNow)) {
1404                 masterTxnEndVLSN = lastReplayedTxnVLSN;
1405                 masterNow = masterTxnEndTime;
1406             }
1407 
1408             /*
1409              * Advances both replica VLSN and commit time, trip qualifying
1410              * latches in both sets.
1411              */
1412             vlsnLatches.trip(lastReplayedTxnVLSN, null);
1413             lagLatches.trip(currentLag(), null);
1414         }
1415 
trackVLSN()1416         synchronized void trackVLSN() {
1417             lastReplayedVLSN = replay.getLastReplayedVLSN();
1418             vlsnLatches.trip(lastReplayedVLSN.getSequence(), null);
1419         }
1420 
trackHeartbeat(Protocol.Heartbeat heartbeat)1421         synchronized void trackHeartbeat(Protocol.Heartbeat heartbeat) {
1422             masterTxnEndVLSN = heartbeat.getCurrentTxnEndVLSN();
1423             masterNow = heartbeat.getMasterNow();
1424             /* Trip just the time lag latches. */
1425             lagLatches.trip(currentLag(), null);
1426         }
1427 
lagAwait(TimeConsistencyPolicy consistencyPolicy)1428         public void lagAwait(TimeConsistencyPolicy consistencyPolicy)
1429             throws InterruptedException,
1430                    ReplicaConsistencyException,
1431                    DatabaseException {
1432 
1433             long currentLag = currentLag();
1434             long lag =
1435                 consistencyPolicy.getPermissibleLag(TimeUnit.MILLISECONDS);
1436             if (currentLag <= lag) {
1437                 return;
1438             }
1439             long waitStart = System.currentTimeMillis();
1440             ExceptionAwareCountDownLatch waitLagLatch =
1441                 lagLatches.getOrCreate(lag);
1442             await(waitLagLatch, consistencyPolicy);
1443             nLagConsistencyWaits.increment();
1444             nLagConsistencyWaitMs.add(System.currentTimeMillis() - waitStart);
1445         }
1446 
1447         /**
1448          * Wait until the log record identified by VLSN has gone by.
1449          */
awaitVLSN(long vlsn, ReplicaConsistencyPolicy consistencyPolicy)1450         public void awaitVLSN(long vlsn,
1451                               ReplicaConsistencyPolicy consistencyPolicy)
1452             throws InterruptedException,
1453                    ReplicaConsistencyException,
1454                    DatabaseException {
1455 
1456             long waitStart = System.currentTimeMillis();
1457 
1458             ExceptionAwareCountDownLatch waitVLSNLatch = null;
1459 
1460             synchronized(this) {
1461                 final long compareVLSN =
1462                    (consistencyPolicy instanceof CommitPointConsistencyPolicy)?
1463                     lastReplayedTxnVLSN :
1464                     lastReplayedVLSN.getSequence();
1465                 if (vlsn <= compareVLSN) {
1466                     return;
1467                 }
1468                 waitVLSNLatch = vlsnLatches.getOrCreate(vlsn);
1469             }
1470             await(waitVLSNLatch, consistencyPolicy);
1471             /* Stats after the await, so the counts and times are related. */
1472             nVLSNConsistencyWaits.increment();
1473             nVLSNConsistencyWaitMs.add(System.currentTimeMillis() - waitStart);
1474         }
1475 
1476         /**
1477          * Wait on the given countdown latch and generate the appropriate
1478          * exception upon timeout.
1479          *
1480          * @throws InterruptedException
1481          */
await(ExceptionAwareCountDownLatch consistencyLatch, ReplicaConsistencyPolicy consistencyPolicy)1482         private void await(ExceptionAwareCountDownLatch consistencyLatch,
1483                            ReplicaConsistencyPolicy consistencyPolicy)
1484             throws ReplicaConsistencyException,
1485                    DatabaseException,
1486                    InterruptedException {
1487 
1488             if (!consistencyLatch.awaitOrException
1489                  (consistencyPolicy.getTimeout(TimeUnit.MILLISECONDS),
1490                   TimeUnit.MILLISECONDS)) {
1491                 /* Timed out. */
1492                 final boolean detached =
1493                     repNode.getRepImpl().getState().isDetached();
1494                 throw new ReplicaConsistencyException(consistencyPolicy,
1495                                                       detached);
1496             }
1497         }
1498 
getStats(StatsConfig config)1499         private StatGroup getStats(StatsConfig config) {
1500             return stats.cloneGroup(config.getClear());
1501         }
1502 
resetStats()1503         private void resetStats() {
1504             stats.clear();
1505         }
1506 
1507         /**
1508          * Shutdown the consistency tracker. This is typically done as part
1509          * of the shutdown of a replication node. It counts down all open
1510          * latches, so the threads waiting on them can make progress. It's
1511          * the responsibility of the waiting threads to check whether the
1512          * latch countdown was due to a shutdown, and take appropriate action.
1513          */
shutdown()1514         public void shutdown() {
1515             final Exception savedShutdownException =
1516                 repNode.getSavedShutdownException();
1517 
1518             /*
1519              * Don't wrap in another level of EnvironmentFailureException
1520              * if we have one in hand already. It can confuse any catch
1521              * handlers which are expecting a specific exception e.g.
1522              * RollBackException while waiting for read consistency.
1523              */
1524             final EnvironmentFailureException latchException =
1525                 (savedShutdownException instanceof
1526                  EnvironmentFailureException) ?
1527 
1528                 ((EnvironmentFailureException)savedShutdownException) :
1529 
1530                 EnvironmentFailureException.unexpectedException
1531                     ("Node: " + repNode.getNameIdPair() + " was shut down.",
1532                      savedShutdownException);
1533 
1534             forceTripLatches(latchException);
1535         }
1536     }
1537 
1538     /**
1539      * Manages a set of ordered latches. They are ordered by the key value.
1540      */
1541     private abstract class OrderedLatches {
1542 
1543         final EnvironmentImpl envImpl;
1544 
1545         final SortedMap<Long, ExceptionAwareCountDownLatch> latchMap =
1546             new TreeMap<Long, ExceptionAwareCountDownLatch>();
1547 
tripPredicate(long key, long tripValue)1548         abstract boolean tripPredicate(long key, long tripValue);
1549 
OrderedLatches(EnvironmentImpl envImpl)1550         OrderedLatches(EnvironmentImpl envImpl) {
1551             this.envImpl = envImpl;
1552         }
1553 
getOrCreate(Long key)1554         synchronized ExceptionAwareCountDownLatch getOrCreate(Long key) {
1555             ExceptionAwareCountDownLatch latch = latchMap.get(key);
1556             if (latch == null) {
1557                 latch = new ExceptionAwareCountDownLatch(envImpl, 1);
1558                 latchMap.put(key, latch);
1559             }
1560             return latch;
1561         }
1562 
1563         /**
1564          * Trip all latches until the first latch that will not trip.
1565          *
1566          * @param tripValue
1567          * @param exception the exception to be thrown by the waiter upon
1568          * exit from the await. It can be null if no exception need be thrown.
1569          */
trip(long tripValue, DatabaseException exception)1570         synchronized void trip(long tripValue,
1571                                DatabaseException exception) {
1572             while (latchMap.size() > 0) {
1573                 Long key = latchMap.firstKey();
1574                 if (!tripPredicate(key, tripValue)) {
1575                     /* It will fail on the rest as well. */
1576                     return;
1577                 }
1578                 /* Set the waiters free. */
1579                 ExceptionAwareCountDownLatch latch = latchMap.remove(key);
1580                 latch.releaseAwait(exception);
1581             }
1582         }
1583     }
1584 
1585     /**
1586      * Thrown to indicate that the Replica must retry connecting to the same
1587      * master, after some period of time.
1588      */
1589     @SuppressWarnings("serial")
1590     static abstract class RetryException extends Exception {
1591         final int retries;
1592         final int retrySleepMs;
1593 
RetryException(String message, int retries, int retrySleepMs)1594         RetryException(String message,
1595                        int retries,
1596                        int retrySleepMs) {
1597             super(message);
1598             this.retries = retries;
1599             this.retrySleepMs = retrySleepMs;
1600         }
1601 
1602         @Override
getMessage()1603         public String getMessage() {
1604           return "Failed after retries: " + retries +
1605                  " with retry interval: " + retrySleepMs + "ms.";
1606         }
1607     }
1608 
1609 
1610     @SuppressWarnings("serial")
1611     static class ConnectRetryException extends RetryException {
1612 
ConnectRetryException(String message, int retries, int retrySleepMs)1613         ConnectRetryException(String message,
1614                               int retries,
1615                               int retrySleepMs) {
1616             super(message, retries, retrySleepMs);
1617         }
1618     }
1619 
1620     /**
1621      * Indicates that an election is needed before the hard recovery can
1622      * proceed. Please see SR 20572 for a motivating scenario and
1623      * NetworkPartitionHealingTest for an example.
1624      */
1625     @SuppressWarnings("serial")
1626     public static class HardRecoveryElectionException extends Exception {
1627 
1628         final NameIdPair masterNameIdPair;
1629         final VLSN lastTxnEnd;
1630         final VLSN matchpointVLSN;
1631 
HardRecoveryElectionException(NameIdPair masterNameIdPair, VLSN lastTxnEnd, VLSN matchpointVLSN)1632         public HardRecoveryElectionException(NameIdPair masterNameIdPair,
1633                                              VLSN lastTxnEnd,
1634                                              VLSN matchpointVLSN) {
1635 
1636             this.masterNameIdPair = masterNameIdPair;
1637             this.lastTxnEnd = lastTxnEnd;
1638             this.matchpointVLSN = matchpointVLSN;
1639         }
1640 
1641         /**
1642          * The master that needs to be verified with an election.
1643          */
getMaster()1644         public NameIdPair getMaster() {
1645             return masterNameIdPair;
1646         }
1647 
1648         @Override
getMessage()1649         public String getMessage() {
1650             return "Need election preceding hard recovery to verify master:" +
1651                     masterNameIdPair +
1652                    " last txn end:" + lastTxnEnd +
1653                    " matchpoint VLSN:" + matchpointVLSN;
1654         }
1655     }
1656 
1657     /**
1658      * Sets a test hook for installation into Replica class instances to be
1659      * created in the future.  This is needed when the test hook must be
1660      * installed before the {@code ReplicatedEnvironment} handle constructor
1661      * returns, so that a test may influence the replay of the sync-up
1662      * transaction backlog.
1663      */
setInitialReplayHook(com.sleepycat.je.utilint.TestHook<Message> hook)1664     static public void setInitialReplayHook
1665     (com.sleepycat.je.utilint.TestHook<Message> hook) {
1666         initialReplayHook = hook;
1667     }
1668 
1669     /**
1670      * Set a test hook which is executed when the ReplicaFeederSyncup
1671      * finishes. This differs from the static method
1672      * ReplicaFeederSyncup.setGlobalSyncupHook in that it sets the hook for a
1673      * specific node, whereas the other method is static and sets it globally.
1674      *
1675      * This method is required when a test is trying to set the hook for only
1676      * one node, and the node already exists. The other method is useful when a
1677      * test is trying to set the hook before a node exists.
1678      */
setReplicaFeederSyncupHook(TestHook<Object> syncupHook)1679     public void setReplicaFeederSyncupHook(TestHook<Object> syncupHook) {
1680         replicaFeederSyncupHook = syncupHook;
1681     }
1682 
getReplicaFeederSyncupHook()1683     public TestHook<Object> getReplicaFeederSyncupHook() {
1684         return replicaFeederSyncupHook;
1685     }
1686 }
1687