1 /*-
2  * See the file LICENSE for redistribution information.
3  *
4  * Copyright (c) 2002, 2014 Oracle and/or its affiliates.  All rights reserved.
5  *
6  */
7 
8 package com.sleepycat.je.rep.impl.node;
9 
10 import static com.sleepycat.je.log.LogEntryType.LOG_NAMELN_TRANSACTIONAL;
11 import static com.sleepycat.je.log.LogEntryType.LOG_TXN_ABORT;
12 import static com.sleepycat.je.log.LogEntryType.LOG_TXN_COMMIT;
13 import static com.sleepycat.je.rep.impl.node.ReplayStatDefinition.LATEST_COMMIT_LAG_MS;
14 import static com.sleepycat.je.rep.impl.node.ReplayStatDefinition.MAX_COMMIT_PROCESSING_NANOS;
15 import static com.sleepycat.je.rep.impl.node.ReplayStatDefinition.MIN_COMMIT_PROCESSING_NANOS;
16 import static com.sleepycat.je.rep.impl.node.ReplayStatDefinition.N_ABORTS;
17 import static com.sleepycat.je.rep.impl.node.ReplayStatDefinition.N_COMMITS;
18 import static com.sleepycat.je.rep.impl.node.ReplayStatDefinition.N_COMMIT_ACKS;
19 import static com.sleepycat.je.rep.impl.node.ReplayStatDefinition.N_COMMIT_NO_SYNCS;
20 import static com.sleepycat.je.rep.impl.node.ReplayStatDefinition.N_COMMIT_SYNCS;
21 import static com.sleepycat.je.rep.impl.node.ReplayStatDefinition.N_COMMIT_WRITE_NO_SYNCS;
22 import static com.sleepycat.je.rep.impl.node.ReplayStatDefinition.N_ELAPSED_TXN_TIME;
23 import static com.sleepycat.je.rep.impl.node.ReplayStatDefinition.N_GROUP_COMMITS;
24 import static com.sleepycat.je.rep.impl.node.ReplayStatDefinition.N_GROUP_COMMIT_MAX_EXCEEDED;
25 import static com.sleepycat.je.rep.impl.node.ReplayStatDefinition.N_GROUP_COMMIT_TIMEOUTS;
26 import static com.sleepycat.je.rep.impl.node.ReplayStatDefinition.N_GROUP_COMMIT_TXNS;
27 import static com.sleepycat.je.rep.impl.node.ReplayStatDefinition.N_LNS;
28 import static com.sleepycat.je.rep.impl.node.ReplayStatDefinition.N_MESSAGE_QUEUE_OVERFLOWS;
29 import static com.sleepycat.je.rep.impl.node.ReplayStatDefinition.N_NAME_LNS;
30 import static com.sleepycat.je.rep.impl.node.ReplayStatDefinition.TOTAL_COMMIT_LAG_MS;
31 import static com.sleepycat.je.rep.impl.node.ReplayStatDefinition.TOTAL_COMMIT_PROCESSING_NANOS;
32 import static java.util.concurrent.TimeUnit.MILLISECONDS;
33 import static java.util.concurrent.TimeUnit.NANOSECONDS;
34 
35 import java.io.File;
36 import java.io.IOException;
37 import java.util.ArrayList;
38 import java.util.Collection;
39 import java.util.Collections;
40 import java.util.Date;
41 import java.util.HashMap;
42 import java.util.HashSet;
43 import java.util.List;
44 import java.util.Map;
45 import java.util.Set;
46 import java.util.concurrent.ArrayBlockingQueue;
47 import java.util.concurrent.BlockingQueue;
48 import java.util.logging.Level;
49 import java.util.logging.Logger;
50 
51 import com.sleepycat.je.Cursor;
52 import com.sleepycat.je.DatabaseConfig;
53 import com.sleepycat.je.DatabaseEntry;
54 import com.sleepycat.je.DatabaseException;
55 import com.sleepycat.je.DatabaseNotFoundException;
56 import com.sleepycat.je.DbInternal;
57 import com.sleepycat.je.Durability.SyncPolicy;
58 import com.sleepycat.je.EnvironmentFailureException;
59 import com.sleepycat.je.LockMode;
60 import com.sleepycat.je.OperationStatus;
61 import com.sleepycat.je.StatsConfig;
62 import com.sleepycat.je.TransactionConfig;
63 import com.sleepycat.je.config.EnvironmentParams;
64 import com.sleepycat.je.dbi.CursorImpl.SearchMode;
65 import com.sleepycat.je.dbi.DatabaseId;
66 import com.sleepycat.je.dbi.DatabaseImpl;
67 import com.sleepycat.je.dbi.DbConfigManager;
68 import com.sleepycat.je.dbi.DbTree.TruncateDbResult;
69 import com.sleepycat.je.dbi.DbType;
70 import com.sleepycat.je.dbi.EnvironmentFailureReason;
71 import com.sleepycat.je.dbi.PutMode;
72 import com.sleepycat.je.dbi.TriggerManager;
73 import com.sleepycat.je.log.DbOpReplicationContext;
74 import com.sleepycat.je.log.FileManager;
75 import com.sleepycat.je.log.LogEntryType;
76 import com.sleepycat.je.log.LogManager;
77 import com.sleepycat.je.log.ReplicationContext;
78 import com.sleepycat.je.log.entry.DbOperationType;
79 import com.sleepycat.je.log.entry.LNLogEntry;
80 import com.sleepycat.je.log.entry.LogEntry;
81 import com.sleepycat.je.log.entry.NameLNLogEntry;
82 import com.sleepycat.je.log.entry.SingleItemEntry;
83 import com.sleepycat.je.recovery.RecoveryInfo;
84 import com.sleepycat.je.recovery.RollbackTracker;
85 import com.sleepycat.je.rep.LogFileRewriteListener;
86 import com.sleepycat.je.rep.SyncupProgress;
87 import com.sleepycat.je.rep.impl.RepGroupDB;
88 import com.sleepycat.je.rep.impl.RepImpl;
89 import com.sleepycat.je.rep.impl.RepParams;
90 import com.sleepycat.je.rep.stream.InputWireRecord;
91 import com.sleepycat.je.rep.stream.MasterStatus.MasterSyncException;
92 import com.sleepycat.je.rep.stream.Protocol;
93 import com.sleepycat.je.rep.txn.ReplayTxn;
94 import com.sleepycat.je.rep.utilint.LongMaxZeroStat;
95 import com.sleepycat.je.rep.utilint.LongMinZeroStat;
96 import com.sleepycat.je.rep.vlsn.VLSNRange;
97 import com.sleepycat.je.tree.LN;
98 import com.sleepycat.je.tree.NameLN;
99 import com.sleepycat.je.txn.RollbackEnd;
100 import com.sleepycat.je.txn.RollbackStart;
101 import com.sleepycat.je.txn.Txn;
102 import com.sleepycat.je.txn.TxnAbort;
103 import com.sleepycat.je.txn.TxnCommit;
104 import com.sleepycat.je.utilint.DbLsn;
105 import com.sleepycat.je.utilint.LoggerUtils;
106 import com.sleepycat.je.utilint.LongMaxStat;
107 import com.sleepycat.je.utilint.LongMinStat;
108 import com.sleepycat.je.utilint.LongStat;
109 import com.sleepycat.je.utilint.NanoTimeUtil;
110 import com.sleepycat.je.utilint.StatGroup;
111 import com.sleepycat.je.utilint.VLSN;
112 import com.sleepycat.utilint.StringUtils;
113 
114 /**
115  * Replays log records from the replication stream, and manages the
116  * transactions for those records.
117  *
118  * The Replay module has a lifetime equivalent to the environment owned by
119  * this replicator. Its lifetime is longer than the feeder/replica stream.
120  * For example, suppose this is nodeX:
121  *
122  * t1 - Node X is a replica, node A is master. Replay X is alive
123  * t2 - Node X is a replica, node B takes over as master. X's Replay module
124  *      is still alive and has the same set of active txns. It doesn't matter
125  *      to X that the master has changed.
126  * t3 - Node X becomes the master. Now its Replay unit is cleared, because
127  *      anything managed by the Replay is defunct.
128  */
129 public class Replay {
130 
131     /* These are strings for the rollback logging. */
132     private static final String RBSTATUS_START =
133         "Started Rollback";
134     private static final String RBSTATUS_NO_ACTIVE =
135         "No active txns, nothing to rollback";
136     private static final String RBSTATUS_RANGE_EQUALS =
137         "End of range equals matchpoint, nothing to rollback";
138     private static final String RBSTATUS_LOG_RBSTART =
139         "Logged RollbackStart entry";
140     private static final String RBSTATUS_MEM_ROLLBACK =
141         "Finished in-memory rollback";
142     private static final String RBSTATUS_INVISIBLE =
143         "Finished invisible setting";
144     private static final String RBSTATUS_FINISH =
145         "Finished rollback";
146 
147     /*
148      * DatabaseEntry objects reused during replay, to minimize allocation in
149      * high performance replay path.
150      */
151     final DatabaseEntry replayKeyEntry = new DatabaseEntry();
152     final DatabaseEntry replayDataEntry = new DatabaseEntry();
153     final DatabaseEntry delDataEntry = new DatabaseEntry();
154 
155     private final RepImpl repImpl;
156 
157     /**
158      *  If a commit replay operation takes more than this threshold, it's
159      *  logged. This information helps determine whether ack timeouts on the
160      *  master are due to a slow replica, or the network.
161      */
162     private final long ackTimeoutLogThresholdNs;
163 
164     /**
165      * ActiveTxns is a collection of txn objects used for applying replicated
166      * transactions. This collection should be empty if the node is a master.
167      *
168      * Note that there is an interesting relationship between ActiveTxns and
169      * the txn collection managed by the environment TxnManager. ActiveTxns is
170      * effectively a subset of the set of txns held by the
171      * TxnManager. ReplayTxns must be sure to register and unregister
172      * themselves from ActiveTxns, just as all Txns must register and
173      * unregister with the TxnManager's set. One implementation alternative to
174      * having an ActiveTxns map here is to search the TxnManager set (which is
175      * a set, not a map) for a given ReplayTxn. Another is to subclass
176      * TxnManager so that replicated nodes have their own replayTxn map, just
177      * as XATxns have a XID->Txn map.
178      *
179      * Both alternatives seemed too costly in terms of performance or elaborate
180      * in terms of code to do for the current function. It seems clearer to
181      * make the ActiveTxns a map in the one place that it is currently
182      * used. This choice may change over time, and should be reevaluated if the
183      * implementation changes.
184      *
185      * The ActiveTxns key is the transaction id. These transactions are closed
186      * when:
187      * - the replay unit executes a commit received over the replication stream
188      * - the replay unit executes an abort received over the replication stream
189      * - the replication node realizes that it has just become the new master,
190      *   and was not previously the master.
191      *
192      * Note that the Replay class has a lifetime that is longer than that of a
193      * RepNode. This means in particular, that transactions may be left open,
194      * and will be resumed when a replica switches from one master to another,
195      * creating a new RepNode in the process. Because of that, part of the
196      * transaction may be implemented by the rep stream from one master and
197      * another part by another.
198      *
199      * The map is synchronized, so simple get/put operations do not require
200      * additional synchronization.  However, iteration requires synchronization
201      * and copyActiveTxns can be used in most cases.
202      */
203     private final Map<Long, ReplayTxn> activeTxns;
204 
205     /*
206      * The entry representing the last replayed txn commit. Supports the
207      * replica's notion of consistency.
208      */
209     private volatile TxnInfo lastReplayedTxn = null;
210 
211     /*
212      * The last replayed entry of any kind. Supports PointConsistencyPolicy.
213      */
214     private volatile VLSN lastReplayedVLSN = null;
215 
216     /*
217      * The sync policy to be used in the absence of an ACK request. The replica
218      * in this case has some latitude about how it syncs the commit.
219      */
220     private final SyncPolicy noAckSyncPolicy = SyncPolicy.NO_SYNC;
221 
222     /**
223      *  The RepParams.REPLAY_LOGGING_THRESHOLD configured logging threshold.
224      */
225     private final long replayLoggingThresholdNs;
226 
227     /**
228      * State that is reinitialized by the reinit() method each time a replay
229      * loop is started with a new feeder.
230      */
231 
232     /**
233      *  All writes (predominantly acks) are queued here, so they do not block
234      *  the replay thread.
235      */
236     private final BlockingQueue<Long> outputQueue;
237 
238     /**
239      * Holds the state associated with group commits.
240      */
241     private final GroupCommit groupCommit;
242 
243     /* Maintains the statistics associated with stream replay. */
244     private final StatGroup statistics;
245     private final LongStat nCommits;
246     private final LongStat nCommitAcks;
247     private final LongStat nCommitSyncs;
248     private final LongStat nCommitNoSyncs;
249     private final LongStat nCommitWriteNoSyncs;
250     private final LongStat nAborts;
251     private final LongStat nNameLNs;
252     private final LongStat nLNs;
253     private final LongStat nElapsedTxnTime;
254     private final LongStat nMessageQueueOverflows;
255     private final LongMinStat minCommitProcessingNanos;
256     private final LongMaxStat maxCommitProcessingNanos;
257     private final LongStat totalCommitProcessingNanos;
258     private final LongStat totalCommitLagMs;
259     private final LongStat latestCommitLagMs;
260 
261     private final Logger logger;
Replay(RepImpl repImpl, @SuppressWarnings(R) NameIdPair nameIdPair)262     public Replay(RepImpl repImpl,
263                   @SuppressWarnings("unused") NameIdPair nameIdPair) {
264 
265         /*
266          * This should have already been caught in
267          * ReplicatedEnvironment.setupRepConfig, but it is checked here anyway
268          * as an added sanity check. [#17643]
269          */
270         if (repImpl.isReadOnly()) {
271             throw EnvironmentFailureException.unexpectedState
272                 ("Replay created with readonly ReplicatedEnvironment");
273         }
274 
275         this.repImpl = repImpl;
276         final DbConfigManager configManager = repImpl.getConfigManager();
277 
278         ackTimeoutLogThresholdNs = MILLISECONDS.toNanos(configManager.
279             getDuration(RepParams.REPLICA_ACK_TIMEOUT));
280 
281         /**
282          * The factor of 2 below is somewhat arbitrary. It should be > 1 X so
283          * that the ReplicaOutputThread can completely process the buffered
284          * messages in the face of a network drop and 2X to allow for
285          * additional headroom and minimize the chances that the replay might
286          * be blocked due to the limited queue length.
287          */
288         final int outputQueueSize = 2 *
289             configManager.getInt(RepParams.REPLICA_MESSAGE_QUEUE_SIZE);
290         outputQueue = new ArrayBlockingQueue<Long>(outputQueueSize);
291 
292         /*
293          * The Replay module manages all write transactions and mimics a
294          * writing application thread. When the node comes up, it populates
295          * the activeTxn collection with ReplayTxns that were resurrected
296          * at recovery time.
297          */
298         activeTxns = Collections.synchronizedMap(
299             new HashMap<Long, ReplayTxn>());
300 
301         /*
302          * Configure the data entry used for deletion to avoid fetching the
303          * old data during deletion replay.
304          */
305         delDataEntry.setPartial(0, 0, true);
306 
307         logger = LoggerUtils.getLogger(getClass());
308         statistics = new StatGroup(ReplayStatDefinition.GROUP_NAME,
309                                    ReplayStatDefinition.GROUP_DESC);
310 
311         groupCommit = new GroupCommit(configManager);
312 
313         nCommits = new LongStat(statistics, N_COMMITS);
314         nCommitAcks = new LongStat(statistics, N_COMMIT_ACKS);
315         nCommitSyncs = new LongStat(statistics, N_COMMIT_SYNCS);
316         nCommitNoSyncs = new LongStat(statistics, N_COMMIT_NO_SYNCS);
317         nCommitWriteNoSyncs =
318             new LongStat(statistics, N_COMMIT_WRITE_NO_SYNCS);
319         nAborts = new LongStat(statistics, N_ABORTS);
320         nNameLNs = new LongStat(statistics, N_NAME_LNS);
321         nLNs = new LongStat(statistics, N_LNS);
322         nElapsedTxnTime = new LongStat(statistics, N_ELAPSED_TXN_TIME);
323         nMessageQueueOverflows =
324             new LongStat(statistics, N_MESSAGE_QUEUE_OVERFLOWS);
325         minCommitProcessingNanos =
326             new LongMinZeroStat(statistics, MIN_COMMIT_PROCESSING_NANOS);
327         maxCommitProcessingNanos =
328             new LongMaxZeroStat(statistics, MAX_COMMIT_PROCESSING_NANOS);
329         totalCommitProcessingNanos =
330             new LongStat(statistics, TOTAL_COMMIT_PROCESSING_NANOS);
331         totalCommitLagMs = new LongStat(statistics, TOTAL_COMMIT_LAG_MS);
332         latestCommitLagMs = new LongStat(statistics, LATEST_COMMIT_LAG_MS);
333 
334         replayLoggingThresholdNs = MILLISECONDS.toNanos(configManager.
335            getDuration(RepParams.REPLAY_LOGGING_THRESHOLD));
336     }
337 
getOutputQueue()338     public BlockingQueue<Long> getOutputQueue() {
339         return outputQueue;
340     }
341 
342     /**
343      * Reinitialize for replay from a new feeder
344      */
reset()345     public void reset() {
346         outputQueue.clear();
347     }
348 
getMessageQueueOverflows()349     LongStat getMessageQueueOverflows() {
350         return nMessageQueueOverflows;
351     }
352 
353     /**
354      * Actions that must be taken before the recovery checkpoint, whether
355      * the environment is read/write or read/only.
356      */
preRecoveryCheckpointInit(RecoveryInfo recoveryInfo)357     public void preRecoveryCheckpointInit(RecoveryInfo recoveryInfo) {
358         for (Txn txn : recoveryInfo.replayTxns.values()) {
359 
360             /*
361              * ReplayTxns need to know about their owning activeTxn map,
362              * so they can remove themselves at close. We are casting upwards,
363              * because the non-HA code is prohibited from referencing
364              * Replication classes, and the RecoveryInfo.replayTxns collection
365              * doesn't know that it's got ReplayTxns.
366              */
367             ((ReplayTxn) txn).registerWithActiveTxns(activeTxns);
368         }
369         lastReplayedVLSN = repImpl.getVLSNIndex().getRange().getLast();
370     }
371 
getLastReplayedTxn()372     public TxnInfo getLastReplayedTxn() {
373         return lastReplayedTxn;
374     }
375 
getLastReplayedVLSN()376     public VLSN getLastReplayedVLSN() {
377         return lastReplayedVLSN;
378     }
379 
380     /**
381      * When mastership changes, all inflight replay transactions are aborted.
382      * Replay transactions need only be aborted by the node that has become
383      * the new master (who was previously a Replica). The replay transactions
384      * on the other replicas who have not changed roles are
385      * resolved by the abort record issued by said new master.
386      */
abortOldTxns()387     public void abortOldTxns()
388         throws DatabaseException {
389 
390         final int masterNodeId = repImpl.getNodeId();
391         for (ReplayTxn replayTxn : copyActiveTxns().values()) {
392             replayTxn.abort(ReplicationContext.MASTER, masterNodeId);
393         }
394         assert activeTxns.size() == 0 : "Unexpected txns in activeTxns = " +
395             activeTxns;
396     }
397 
updateCommitStats(final boolean needsAck, final SyncPolicy syncPolicy, final long startTimeNanos, final long masterCommitTimeMs, final long replicaCommitTimeMs)398     private void updateCommitStats(final boolean needsAck,
399                                    final SyncPolicy syncPolicy,
400                                    final long startTimeNanos,
401                                    final long masterCommitTimeMs,
402                                    final long replicaCommitTimeMs) {
403 
404         final long now = System.nanoTime();
405         final long commitNanos = now - startTimeNanos;
406 
407         if (commitNanos > ackTimeoutLogThresholdNs &&
408             logger.isLoggable(Level.INFO)) {
409             LoggerUtils.info
410                 (logger, repImpl,
411                  "Replay commit time: " + (commitNanos / 1000000) +
412                  " ms exceeded log threshold: " +
413                  (ackTimeoutLogThresholdNs / 1000000));
414         }
415 
416         nCommits.increment();
417 
418         if (needsAck) {
419             nCommitAcks.increment();
420         }
421 
422         if (syncPolicy == SyncPolicy.SYNC) {
423             nCommitSyncs.increment();
424         } else if (syncPolicy == SyncPolicy.NO_SYNC) {
425             nCommitNoSyncs.increment();
426         } else if (syncPolicy == SyncPolicy.WRITE_NO_SYNC) {
427             nCommitWriteNoSyncs.increment();
428         } else {
429             throw EnvironmentFailureException.unexpectedState
430                 ("Unknown sync policy: " + syncPolicy);
431         }
432 
433         totalCommitProcessingNanos.add(commitNanos);
434         minCommitProcessingNanos.setMin(commitNanos);
435         maxCommitProcessingNanos.setMax(commitNanos);
436 
437         /*
438          * Tally the lag between master and replica commits, even if clock skew
439          * makes the lag appear negative.  The documentation already warns that
440          * the value will be affected by clock skew, so users can adjust for
441          * that, but only if we don't throw the information way.
442          */
443         final long replicaLagMs = replicaCommitTimeMs - masterCommitTimeMs;
444         totalCommitLagMs.add(replicaLagMs);
445         latestCommitLagMs.set(replicaLagMs);
446     }
447 
448     /**
449      * Apply the operation represented by this log entry on this replica node.
450      */
replayEntry(long startNs, Protocol.Entry entry)451     public void replayEntry(long startNs,
452                             Protocol.Entry entry)
453         throws DatabaseException,
454                IOException,
455                InterruptedException,
456                MasterSyncException {
457 
458         final InputWireRecord wireRecord = entry.getWireRecord();
459         final LogEntry logEntry = wireRecord.getLogEntry();
460 
461         /*
462          * Sanity check that the replication stream is in sequence. We want to
463          * forestall any possible corruption from replaying invalid entries.
464          */
465         if (!wireRecord.getVLSN().follows(lastReplayedVLSN)) {
466             throw new EnvironmentFailureException
467                 (repImpl,
468                  EnvironmentFailureReason.UNEXPECTED_STATE,
469                  "Rep stream not sequential. Current VLSN: " +
470                  lastReplayedVLSN +
471                  " next log entry VLSN: " + wireRecord.getVLSN());
472         }
473 
474         if (logger.isLoggable(Level.FINEST)) {
475             LoggerUtils.finest(logger, repImpl, "Replaying " + wireRecord);
476         }
477 
478         final ReplayTxn repTxn = getReplayTxn(logEntry.getTransactionId(), true);
479         updateReplicaSequences(logEntry);
480         final byte entryType = wireRecord.getEntryType();
481 
482         lastReplayedVLSN = wireRecord.getVLSN();
483 
484         try {
485             final long txnId = repTxn.getId();
486 
487             if (LOG_TXN_COMMIT.equalsType(entryType)) {
488                 Protocol.Commit commitEntry = (Protocol.Commit) entry;
489 
490                 final boolean needsAck = commitEntry.getNeedsAck();
491                 final SyncPolicy txnSyncPolicy =
492                     commitEntry.getReplicaSyncPolicy();
493                 final SyncPolicy implSyncPolicy =
494                     needsAck ?
495                     groupCommit.getImplSyncPolicy(txnSyncPolicy) :
496                     noAckSyncPolicy;
497 
498                 logReplay(repTxn, needsAck, implSyncPolicy);
499 
500                 final TxnCommit masterCommit =
501                         (TxnCommit) logEntry.getMainItem();
502 
503                 if (needsAck) {
504 
505                     /*
506                      * Only wait if the replica is not lagging and the
507                      * durability requires it.
508                      */
509                     repImpl.getRepNode().getVLSNFreezeLatch().awaitThaw();
510                     repImpl.getRepNode().getMasterStatus().assertSync();
511                 }
512 
513                 repTxn.commit(implSyncPolicy,
514                               new ReplicationContext(lastReplayedVLSN),
515                               masterCommit.getMasterNodeId());
516 
517                 final long masterCommitTimeMs =
518                         masterCommit.getTime().getTime();
519                 lastReplayedTxn = new TxnInfo(lastReplayedVLSN,
520                                               masterCommitTimeMs);
521 
522                 updateCommitStats(needsAck, implSyncPolicy, startNs,
523                                   masterCommitTimeMs, repTxn.getEndTime());
524 
525                 /* Respond to the feeder. */
526                 if (needsAck) {
527                     /*
528                      * Need an ack, either buffer it, for sync group commit, or
529                      * queue it.
530                      */
531                     if (!groupCommit.bufferAck(startNs, repTxn,
532                                                txnSyncPolicy)) {
533                         queueAck(txnId);
534                     }
535                 }
536 
537                 /*
538                  * The group refresh and recalculation can be expensive, since
539                  * it may require a database read. Do it after the ack.
540                  */
541                 if (repTxn.getRepGroupDbChange() && canRefreshGroup(repTxn)) {
542                     repImpl.getRepNode().refreshCachedGroup();
543                     repImpl.getRepNode().recalculateGlobalCBVLSN();
544                 }
545 
546                 nElapsedTxnTime.add(repTxn.elapsedTime());
547 
548             } else if (LOG_TXN_ABORT.equalsType(entryType)) {
549 
550                 nAborts.increment();
551                 final TxnAbort masterAbort = (TxnAbort) logEntry.getMainItem();
552                 final ReplicationContext abortContext =
553                     new ReplicationContext(wireRecord.getVLSN());
554                 if (logger.isLoggable(Level.FINEST)) {
555                     LoggerUtils.finest(logger, repImpl,
556                                        "abort called for " + txnId +
557                                        " masterId=" +
558                                        masterAbort.getMasterNodeId() +
559                                        " repContext=" + abortContext);
560                 }
561                 repTxn.abort(abortContext, masterAbort.getMasterNodeId());
562                 lastReplayedTxn = new TxnInfo(lastReplayedVLSN,
563                                               masterAbort.getTime().getTime());
564                 if (repTxn.getRepGroupDbChange() && canRefreshGroup(repTxn)) {
565 
566                     /*
567                      * Refresh is the safe thing to do on an abort, since a
568                      * refresh may have been held back from an earlier commit
569                      * due to this active transaction.
570                      */
571                     repImpl.getRepNode().refreshCachedGroup();
572                 }
573                 nElapsedTxnTime.add(repTxn.elapsedTime());
574 
575             } else if (LOG_NAMELN_TRANSACTIONAL.equalsType(entryType)) {
576 
577                 repImpl.getRepNode().getReplica().clearDbTreeCache();
578                 nNameLNs.increment();
579                 applyNameLN(repTxn, wireRecord);
580 
581             } else {
582                 nLNs.increment();
583                 /* A data operation. */
584                 assert wireRecord.getLogEntry() instanceof LNLogEntry;
585                 applyLN(repTxn, wireRecord);
586             }
587 
588             /* Remember the last VLSN applied by this txn. */
589             repTxn.setLastAppliedVLSN(lastReplayedVLSN);
590 
591         } catch (DatabaseException e) {
592             e.addErrorMessage("Problem seen replaying entry " + wireRecord);
593             throw e;
594         } finally {
595             final long elapsedNs = System.nanoTime() - startNs;
596             if (elapsedNs > replayLoggingThresholdNs) {
597                 LoggerUtils.info(logger, repImpl,
598                                  "Replay time for entry type:" +
599                                  LogEntryType.findType(entryType) + " " +
600                                  NANOSECONDS.toMillis(elapsedNs) + "ms " +
601                                  "exceeded threshold:" +
602                                  NANOSECONDS.
603                                      toMillis(replayLoggingThresholdNs) +
604                                  "ms");
605             }
606         }
607     }
608 
609     /**
610      * Queue the request ack for an async ack write to the network.
611      */
queueAck(final long txnId)612     void queueAck(final long txnId) throws IOException {
613         try {
614             outputQueue.put(txnId);
615         } catch (InterruptedException ie) {
616             /*
617              * Have the higher levels treat it like an IOE and
618              * exit the thread.
619              */
620             throw new IOException("Ack I/O interrupted", ie);
621         }
622     }
623 
624     /**
625      * Logs information associated with the replay of the txn commit
626      */
logReplay(ReplayTxn repTxn, boolean needsAck, SyncPolicy syncPolicy)627     private void logReplay(ReplayTxn repTxn,
628                            boolean needsAck,
629                            SyncPolicy syncPolicy) {
630 
631         if (!logger.isLoggable(Level.FINE)) {
632             return;
633         }
634 
635         if (needsAck) {
636             LoggerUtils.fine(logger, repImpl,
637                              "Replay: got commit for txn=" + repTxn.getId() +
638                              ", ack needed, replica sync policy=" +
639                              syncPolicy +
640                              " vlsn=" + lastReplayedVLSN);
641         } else {
642             LoggerUtils.fine(logger, repImpl,
643                              "Replay: got commit for txn=" + repTxn.getId() +
644                              " ack not needed" +
645                              " vlsn=" + lastReplayedVLSN);
646         }
647     }
648 
649     /**
650      * Returns true if there are no other activeTxns that have also modified
651      * the membership database and are still open, since they could potentially
652      * hold write locks that would block the read locks acquired during the
653      * refresh operation.
654      *
655      * @param txn the current txn being committed or aborted
656      *
657      * @return true if there are no open transactions that hold locks on the
658      * membership database.
659      */
canRefreshGroup(ReplayTxn txn)660     private boolean canRefreshGroup(ReplayTxn txn) {
661 
662         /*
663          * Use synchronized rather than copyActiveTxns, since this is called
664          * during replay and there is no nested locking to worry about.
665          */
666         synchronized (activeTxns) {
667             for (ReplayTxn atxn : activeTxns.values()) {
668                 if (atxn == txn) {
669                     continue;
670                 }
671                 if (atxn.getRepGroupDbChange()) {
672                     return false;
673                 }
674             }
675         }
676         return true;
677     }
678 
679     /**
680      * Update this replica's node, txn and database sequences with any ids in
681      * this log entry. We can call update, even if the replay id doesn't
682      * represent a new lowest-id point, or if the apply is not successful,
683      * because the apply checks that the replay id is < the sequence on the
684      * replica. We just want to ensure that if this node becomes the master,
685      * its sequences are in sync with what came before in the replication
686      * stream, and ids are not incorrectly reused.
687      */
updateReplicaSequences(LogEntry logEntry)688     private void updateReplicaSequences(LogEntry logEntry) {
689 
690         /* For now, we assume all replay entries have a txn id. */
691         repImpl.getTxnManager().updateFromReplay(logEntry.getTransactionId());
692 
693         /* If it's a database operation, update the database id. */
694         if (logEntry instanceof NameLNLogEntry) {
695             NameLNLogEntry nameLogEntry = (NameLNLogEntry) logEntry;
696             nameLogEntry.postFetchInit(false /*isDupDb*/);
697             NameLN nameLN = (NameLN) nameLogEntry.getLN();
698             repImpl.getDbTree().updateFromReplay(nameLN.getId());
699         }
700     }
701 
702     /**
703      * Obtain a ReplayTxn to represent the incoming operation.
704      */
getReplayTxn(long txnId, boolean registerTxnImmediately)705     public ReplayTxn getReplayTxn(long txnId, boolean registerTxnImmediately)
706         throws DatabaseException {
707 
708         ReplayTxn useTxn = null;
709         synchronized (activeTxns) {
710             useTxn = activeTxns.get(txnId);
711             if (useTxn == null) {
712 
713                 /*
714                  * Durability will be explicitly specified when
715                  * ReplayTxn.commit is called, so TransactionConfig.DEFAULT is
716                  * fine.
717                  */
718                 if (registerTxnImmediately) {
719                     useTxn = new ReplayTxn(repImpl, TransactionConfig.DEFAULT,
720                                            txnId, activeTxns, logger);
721                 } else {
722                     useTxn = new ReplayTxn(repImpl, TransactionConfig.DEFAULT,
723                                            txnId, activeTxns, logger) {
724                             @Override
725                             protected
726                             boolean registerImmediately() {
727                                 return false;
728                             }
729                     };
730                 }
731             }
732         }
733         return useTxn;
734     }
735 
736     /**
737      * Replays the NameLN.
738      *
739      * Note that the operations: remove, rename and truncate need to establish
740      * write locks on the database. Any open handles are closed by this
741      * operation by virtue of the ReplayTxn's importunate property.  The
742      * application will receive a LockPreemptedException if it subsequently
743      * accesses the database handle.
744      */
applyNameLN(ReplayTxn repTxn, InputWireRecord wireRecord)745     private void applyNameLN(ReplayTxn repTxn,
746                              InputWireRecord wireRecord)
747         throws DatabaseException {
748 
749         NameLNLogEntry nameLNEntry = (NameLNLogEntry) wireRecord.getLogEntry();
750         final NameLN nameLN = (NameLN) nameLNEntry.getLN();
751 
752         String databaseName = StringUtils.fromUTF8(nameLNEntry.getKey());
753 
754         final DbOpReplicationContext repContext =
755             new DbOpReplicationContext(wireRecord.getVLSN(), nameLNEntry);
756 
757         DbOperationType opType = repContext.getDbOperationType();
758         DatabaseImpl dbImpl = null;
759         try {
760             switch (opType) {
761                 case CREATE:
762                 {
763                     DatabaseConfig dbConfig =
764                         repContext.getCreateConfig().getReplicaConfig(repImpl);
765 
766                     dbImpl = repImpl.getDbTree().createReplicaDb
767                       (repTxn, databaseName, dbConfig, nameLN, repContext);
768 
769                     /*
770                      * We rely on the RepGroupDB.DB_ID value, so make sure
771                      * it's what we expect for this internal replicated
772                      * database.
773                      */
774                     if ((dbImpl.getId().getId() == RepGroupDB.DB_ID) &&
775                         !DbType.REP_GROUP.getInternalName().equals
776                         (databaseName)) {
777                         throw EnvironmentFailureException.unexpectedState
778                             ("Database: " +
779                              DbType.REP_GROUP.getInternalName() +
780                              " is associated with id: " +
781                              dbImpl.getId().getId() +
782                              " and not the reserved database id: " +
783                              RepGroupDB.DB_ID);
784                     }
785 
786                     TriggerManager.runOpenTriggers(repTxn, dbImpl, true);
787                     break;
788                 }
789 
790                 case REMOVE: {
791                     dbImpl = repImpl.getDbTree().getDb(nameLN.getId());
792                     try {
793                         repImpl.getDbTree().removeReplicaDb
794                             (repTxn, databaseName, nameLN.getId(), repContext);
795                         TriggerManager.runRemoveTriggers(repTxn, dbImpl);
796                     } catch (DatabaseNotFoundException e) {
797                         throw EnvironmentFailureException.unexpectedState
798                             ("Database: " + dbImpl.getName() +
799                              " Id: " + nameLN.getId() +
800                              " not found on the Replica.");
801                     }
802                     break;
803                 }
804 
805                 case TRUNCATE: {
806                     dbImpl = repImpl.getDbTree().getDb
807                         (repContext.getTruncateOldDbId());
808                     try {
809                         TruncateDbResult result =
810                         repImpl.getDbTree().truncateReplicaDb
811                             (repTxn, databaseName, false, nameLN, repContext);
812                         TriggerManager.runTruncateTriggers(repTxn, result.newDb);
813                     } catch (DatabaseNotFoundException e) {
814                         throw EnvironmentFailureException.unexpectedState
815                             ("Database: " + dbImpl.getName() +
816                              " Id: " + nameLN.getId() +
817                              " not found on the Replica.");
818                     }
819 
820                     break;
821                 }
822 
823                 case RENAME: {
824                     dbImpl = repImpl.getDbTree().getDb(nameLN.getId());
825                     try {
826                         dbImpl =
827                         repImpl.getDbTree().renameReplicaDb
828                             (repTxn, dbImpl.getName(), databaseName, nameLN,
829                              repContext);
830                         TriggerManager.runRenameTriggers(repTxn, dbImpl,
831                                                          databaseName);
832                     } catch (DatabaseNotFoundException e) {
833                         throw EnvironmentFailureException.unexpectedState
834                             ("Database rename from: " + dbImpl.getName() +
835                              " to " + databaseName +
836                              " failed, name not found on the Replica.");
837                     }
838                     break;
839                 }
840 
841                 case UPDATE_CONFIG: {
842                     /* Get the replicated database configurations. */
843                     DatabaseConfig dbConfig =
844                         repContext.getCreateConfig().getReplicaConfig(repImpl);
845 
846                     /* Update the NameLN and write it to the log. */
847                     dbImpl = repImpl.getDbTree().getDb(nameLN.getId());
848                     final String dbName = dbImpl.getName();
849                     repImpl.getDbTree().updateNameLN
850                         (repTxn, dbName, repContext);
851 
852                     /* Set the new configurations to DatabaseImpl. */
853                     dbImpl.setConfigProperties
854                         (repTxn, dbName, dbConfig, repImpl);
855 
856                     repImpl.getDbTree().modifyDbRoot(dbImpl);
857 
858                     break;
859                 }
860 
861                 default:
862                     throw EnvironmentFailureException.unexpectedState
863                         ("Illegal database op type of " + opType.toString() +
864                          " from " + wireRecord + " database=" + databaseName);
865             }
866         } finally {
867             if (dbImpl != null) {
868                 repImpl.getDbTree().releaseDb(dbImpl);
869             }
870         }
871     }
872 
applyLN( final ReplayTxn repTxn, final InputWireRecord wireRecord)873     private void applyLN(
874         final ReplayTxn repTxn,
875         final InputWireRecord wireRecord)
876         throws DatabaseException {
877 
878         final LNLogEntry<?> lnEntry = (LNLogEntry<?>) wireRecord.getLogEntry();
879         final DatabaseId dbId = lnEntry.getDbId();
880 
881         /*
882          * If this is a change to the rep group db, remember at commit time,
883          * and refresh this node's group metadata.
884          */
885         if (dbId.getId() == RepGroupDB.DB_ID) {
886             repTxn.noteRepGroupDbChange();
887         }
888 
889         /*
890          * Note that we don't have to worry about serializable isolation when
891          * applying a replicated txn; serializable isolation in only an issue
892          * for txns that take read locks, and a replicated txn consists only of
893          * write operations.
894          */
895         final DatabaseImpl dbImpl =
896             repImpl.getRepNode().getReplica().getDbCache().get(dbId, repTxn);
897 
898         lnEntry.postFetchInit(dbImpl);
899 
900         final ReplicationContext repContext =
901             new ReplicationContext(wireRecord.getVLSN());
902 
903         final Cursor cursor = DbInternal.makeCursor(
904             dbImpl, repTxn, null /*cursorConfig*/);
905 
906         try {
907             OperationStatus status;
908             final LN ln = lnEntry.getLN();
909 
910             if (ln.isDeleted()) {
911 
912                 /*
913                  * Perform an exact search by key. Use read-uncommitted and
914                  * partial data entry to avoid reading old data.
915                  */
916                 replayKeyEntry.setData(lnEntry.getKey());
917 
918                 status = DbInternal.searchForReplay(
919                     cursor, replayKeyEntry, delDataEntry,
920                     LockMode.READ_UNCOMMITTED, SearchMode.SET);
921 
922                 if (status == OperationStatus.SUCCESS) {
923                     status = DbInternal.deleteInternal(cursor, repContext);
924                 }
925             } else {
926                 replayKeyEntry.setData(lnEntry.getKey());
927                 replayDataEntry.setData(ln.getData());
928 
929                 DbConfigManager configManager = repImpl.getConfigManager();
930                 boolean blindInsertions = configManager.getBoolean(
931                     EnvironmentParams.BIN_DELTA_BLIND_OPS);
932 
933                 /*
934                  * Let RL be the logrec being replayed here. Let R and T be
935                  * the record and the txn associated with RL.
936                  *
937                  * We say that RL is replayed "blindly" if the search for
938                  * R's key in the tree lands on a BIN-delta, this delta does
939                  * not contain R's key, and we don't mutate the delta to a
940                  * full BIN to check if R is indeed in the tree or not;
941                  * instead we just insert R in the delta.
942                  *
943                  * RL can be applied blindly only if RL is a "pure" insertion,
944                  * i.e. RL is an insertion and R did not exist prior to T.
945                  *
946                  * A non-pure insertion (where R existed before T, it was
947                  * deleted by T, and then reinserted by T) cannot be applied
948                  * blindly, because if it were, it would generate a logrec
949                  * with abortLSN == NULL, and if T were aborted, undoing the
950                  * logrec with the NULL abortLSN would cause the loss of the
951                  * pre-T version of R. So, to replay a non-pure insertion,
952                  * we must check if a slot for R exists in the tree already,
953                  * and if so, generate a new logrec with an abortLSN pointing
954                  * to the pre-T version of R.
955                  *
956                  * Updates and deletes cannot be replayed blindly either,
957                  * because we wouldn't be able to generate logrecs with the
958                  * correct abortLsn, nor count the previous version of R as
959                  * obsolete.
960                  *
961                  * The condition lnEntry.getAbortLsn() == DbLsn.NULL_LSN ||
962                  * lnEntry.getAbortKnownDeleted() guarantee that LN is a pure
963                  * insertion.
964                  */
965                 PutMode mode;
966                 if (blindInsertions &&
967                     lnEntry.getLogType().equals(
968                         LogEntryType.LOG_INS_LN_TRANSACTIONAL) &&
969                     (lnEntry.getAbortLsn() == DbLsn.NULL_LSN ||
970                      lnEntry.getAbortKnownDeleted())) {
971                     mode = PutMode.BLIND_INSERTION;
972                 } else {
973                     mode = PutMode.OVERWRITE;
974                 }
975 
976                 status = DbInternal.putForReplay(cursor,
977                                                  replayKeyEntry,
978                                                  replayDataEntry,
979                                                  ln,
980                                                  mode,
981                                                  repContext);
982             }
983 
984             if (status != OperationStatus.SUCCESS) {
985                 throw new EnvironmentFailureException
986                     (repImpl,
987                      EnvironmentFailureReason.LOG_INCOMPLETE,
988                      "Replicated operation could  not be applied. Status= " +
989                      status + ' ' + wireRecord);
990             }
991         } finally {
992             cursor.close();
993         }
994     }
995 
996     /**
997      * Go through all active txns and rollback up to but not including the log
998      * entry represented by the matchpoint VLSN.
999      *
1000      * Effectively truncate these rolled back log entries by making them
1001      * invisible. Flush the log first, to make sure these log entries are out
1002      * of the log buffers and are on disk, so we can reliably find them through
1003      * the FileManager.
1004      *
1005      * Rollback steps are described in
1006      * https://sleepycat.oracle.com/trac/wiki/Logging#Recoverysteps. In
1007      * summary,
1008      *
1009      * 1. Log and fsync a new RollbackStart record
1010      * 2. Do the rollback in memory. There is no need to explicitly
1011      *    log INs made dirty by the rollback operation.
1012      * 3. Do invisibility masking by overwriting LNs.
1013      * 4. Fsync all overwritten log files at this point.
1014      * 5. Write a RollbackEnd record, for ease of debugging
1015      *
1016      * Note that application read txns  can continue to run during syncup.
1017      * Reader txns cannot access records that are being rolled back, because
1018      * they are in txns that are not committed, i.e, they are write locked.
1019      * The rollback interval never includes committed txns, and we do a hard
1020      * recovery if it would include them.
1021      */
rollback(VLSN matchpointVLSN, long matchpointLsn)1022     public void rollback(VLSN matchpointVLSN, long matchpointLsn) {
1023 
1024         String rollbackStatus = RBSTATUS_START;
1025 
1026         final Map<Long, ReplayTxn> localActiveTxns = copyActiveTxns();
1027         try {
1028             if (localActiveTxns.size() == 0) {
1029                 /* no live read/write txns, nothing to do. */
1030                 rollbackStatus = RBSTATUS_NO_ACTIVE;
1031                 return;
1032             }
1033 
1034             VLSNRange range = repImpl.getVLSNIndex().getRange();
1035             if (range.getLast().equals(matchpointVLSN)) {
1036                 /* nothing to roll back. */
1037                 rollbackStatus = RBSTATUS_RANGE_EQUALS;
1038                 return;
1039             }
1040 
1041             repImpl.setSyncupProgress(SyncupProgress.DO_ROLLBACK);
1042 
1043             /*
1044              * Stop the log file backup service, since the files will be in an
1045              * inconsistent state while the rollback is in progress.
1046              */
1047             repImpl.getRepNode().shutdownNetworkBackup();
1048 
1049             /*
1050              * Set repImpl's isRollingBack to true, and invalidate all the in
1051              * progress DbBackup.
1052              */
1053             repImpl.setBackupProhibited(true);
1054             repImpl.invalidateBackups(DbLsn.getFileNumber(matchpointLsn));
1055 
1056             /*
1057              * 1. Log RollbackStart. The fsync guarantees that this marker will
1058              * be present in the log for recovery. It also ensures that all log
1059              * entries will be flushed to disk and the TxnChain will not have
1060              * to worry about entries that are in log buffers when constructing
1061              * the rollback information.
1062              */
1063             LogManager logManager = repImpl.getLogManager();
1064             LogEntry rollbackStart = SingleItemEntry.create(
1065                 LogEntryType.LOG_ROLLBACK_START,
1066                 new RollbackStart(
1067                     matchpointVLSN, matchpointLsn, localActiveTxns.keySet()));
1068             long rollbackStartLsn =
1069                 logManager.logForceFlush(rollbackStart,
1070                                          true, // fsyncRequired,
1071                                          ReplicationContext.NO_REPLICATE);
1072             rollbackStatus = RBSTATUS_LOG_RBSTART;
1073 
1074             /*
1075              * 2. Do rollback in memory. Undo any operations that were logged
1076              * after the matchpointLsn, and save the LSNs for those log
1077              * entries.. There should be something to undo, because we checked
1078              * earlier that there were log entries after the matchpoint.
1079              */
1080             List<Long> rollbackLsns = new ArrayList<Long>();
1081             for (ReplayTxn replayTxn : localActiveTxns.values()) {
1082                 Collection<Long> txnRollbackLsns =
1083                     replayTxn.rollback(matchpointLsn);
1084 
1085                 /*
1086                  * Txns that were entirely rolled back should have been removed
1087                  * from the activeTxns map.
1088                  */
1089                 assert checkRemoved(replayTxn) :
1090                     "Should have removed " + replayTxn;
1091 
1092                 rollbackLsns.addAll(txnRollbackLsns);
1093             }
1094             rollbackStatus = RBSTATUS_MEM_ROLLBACK;
1095             assert rollbackLsns.size() != 0 : dumpActiveTxns(matchpointLsn);
1096 
1097             /*
1098              * 3 & 4 - Mark the rolled back log entries as invisible.  But
1099              * before doing so, invoke any registered rewrite listeners, so the
1100              * application knows that existing log files will be modified.
1101              *
1102              * After all are done, fsync the set of files. By waiting, some may
1103              * have made it out on their own.
1104              */
1105             LogFileRewriteListener listener = repImpl.getLogRewriteListener();
1106             if (listener != null) {
1107                 listener.rewriteLogFiles(getFileNames(rollbackLsns));
1108             }
1109             RollbackTracker.makeInvisible(repImpl, rollbackLsns);
1110             rollbackStatus = RBSTATUS_INVISIBLE;
1111 
1112             /*
1113              * 5. Log RollbackEnd. Flush it so that we can use it to optimize
1114              * recoveries later on. If the RollbackEnd exists, we can skip the
1115              * step of re-making LNs invisible.
1116              */
1117             logManager.logForceFlush(
1118                 SingleItemEntry.create(LogEntryType.LOG_ROLLBACK_END,
1119                                        new RollbackEnd(matchpointLsn,
1120                                                        rollbackStartLsn)),
1121                  true, // fsyncRequired
1122                  ReplicationContext.NO_REPLICATE);
1123 
1124             /*
1125              * Restart the backup service only if all the steps of the
1126              * rollback were successful.
1127              */
1128             repImpl.getRepNode().restartNetworkBackup();
1129             repImpl.setBackupProhibited(false);
1130             rollbackStatus = RBSTATUS_FINISH;
1131         } finally {
1132 
1133             /* Reset the lastReplayedVLSN so it's correct when we resume. */
1134             lastReplayedVLSN = matchpointVLSN;
1135             LoggerUtils.info(logger, repImpl,
1136                              "Rollback to matchpoint " + matchpointVLSN +
1137                              " at " + DbLsn.getNoFormatString(matchpointLsn) +
1138                              " status=" + rollbackStatus);
1139         }
1140     }
1141 
1142     /* For debugging support */
dumpActiveTxns(long matchpointLsn)1143     private String dumpActiveTxns(long matchpointLsn) {
1144         StringBuilder sb = new StringBuilder();
1145         sb.append("matchpointLsn=");
1146         sb.append(DbLsn.getNoFormatString(matchpointLsn));
1147         for (ReplayTxn replayTxn : copyActiveTxns().values()) {
1148             sb.append("txn id=").append(replayTxn.getId());
1149             sb.append(" locks=").append(replayTxn.getWriteLockIds());
1150             sb.append("lastLogged=");
1151             sb.append(DbLsn.getNoFormatString(replayTxn.getLastLsn()));
1152             sb.append("\n");
1153         }
1154 
1155         return sb.toString();
1156     }
1157 
getFileNames(List<Long> lsns)1158     private Set<File> getFileNames(List<Long> lsns) {
1159         Set<Long> fileNums = new HashSet<Long>();
1160         Set<File> files = new HashSet<File>();
1161 
1162         for (long lsn : lsns) {
1163             fileNums.add(DbLsn.getFileNumber(lsn));
1164         }
1165         for (long fileNum : fileNums) {
1166             files.add(new File(FileManager.getFileName(fileNum)));
1167         }
1168         return files;
1169     }
1170 
checkRemoved(ReplayTxn txn)1171     private boolean checkRemoved(ReplayTxn txn) {
1172         if (txn.isClosed()) {
1173             if (activeTxns.containsKey(txn.getId())) {
1174                 return false;
1175             }
1176         }
1177 
1178         return true;
1179     }
1180 
1181     /**
1182      * Make a copy of activeTxns to avoid holding its mutex while iterating.
1183      * Can be used whenever the cost of the HashMap copy is not significant.
1184      */
copyActiveTxns()1185     private Map<Long, ReplayTxn> copyActiveTxns() {
1186         synchronized (activeTxns) {
1187             return new HashMap<Long, ReplayTxn>(activeTxns);
1188         }
1189     }
1190 
1191     /**
1192      * Release all transactions, database handles, etc held by the replay
1193      * unit. The Replicator is closing down and Replay will not be invoked
1194      * again.
1195      */
close()1196     public void close() {
1197 
1198         for (ReplayTxn replayTxn : copyActiveTxns().values()) {
1199             try {
1200                 if (logger.isLoggable(Level.FINE)) {
1201                     LoggerUtils.fine(logger, repImpl,
1202                                      "Unregistering open replay txn: " +
1203                                      replayTxn.getId());
1204                 }
1205                 replayTxn.cleanup();
1206             } catch (DatabaseException e) {
1207                 LoggerUtils.fine(logger, repImpl,
1208                                  "Replay txn: " + replayTxn.getId() +
1209                                  " unregistration failed: " + e.getMessage());
1210             }
1211         }
1212         assert activeTxns.size() == 0;
1213     }
1214 
1215     /**
1216      * Returns a copy of the statistics associated with Replay
1217      */
getStats(StatsConfig config)1218     public StatGroup getStats(StatsConfig config) {
1219         StatGroup ret = statistics.cloneGroup(config.getClear());
1220 
1221         return ret;
1222     }
1223 
resetStats()1224     public void resetStats() {
1225         statistics.clear();
1226     }
1227 
1228     /* For unit tests */
getActiveTxns()1229     public Map<Long, ReplayTxn> getActiveTxns() {
1230         return activeTxns;
1231     }
1232 
dumpState()1233     public String dumpState() {
1234         StringBuilder sb = new StringBuilder();
1235         sb.append("lastReplayedTxn=").append(lastReplayedTxn);
1236         sb.append(" lastReplayedVLSN=").append(lastReplayedVLSN);
1237         sb.append(" numActiveReplayTxns=").append(activeTxns.size());
1238         sb.append("\n");
1239         return sb.toString();
1240     }
1241 
1242     /**
1243      * Write out any pending acknowledgments. See GroupCommit.flushPendingAcks
1244      * for details. This method is invoked after each log entry is read from
1245      * the replication stream.
1246      *
1247      * @param nowNs the time at the reading of the log entry
1248      */
flushPendingAcks(long nowNs)1249     void flushPendingAcks(long nowNs)
1250         throws IOException {
1251 
1252         groupCommit.flushPendingAcks(nowNs);
1253     }
1254 
1255     /**
1256      * See GroupCommit.getPollIntervalNs(long)
1257      */
getPollIntervalNs(long defaultNs)1258     long getPollIntervalNs(long defaultNs) {
1259         return groupCommit.getPollIntervalNs(defaultNs);
1260     }
1261 
1262     /**
1263      * Implements group commit. It's really a substructure of Replay and exists
1264      * mainly for modularity reasons.
1265      * <p>
1266      * Since replay is single threaded, the group commit mechanism works
1267      * differently in the replica than in the master. In the replica, SYNC
1268      * transactions are converted into NO_SYNC transactions and executed
1269      * immediately, but their acknowledgments are delayed until after either
1270      * the REPLICA_GROUP_COMMIT_INTERVAL (the max amount the first transaction
1271      * in the group is delayed) has expired, or the size of the group (as
1272      * specified by REPLICA_MAX_GROUP_COMMIT) has been exceeded.
1273      */
1274     private class GroupCommit {
1275 
1276         /* Size determines max fsync commits that can be grouped. */
1277         private final long pendingCommitAcks[];
1278 
1279         /* Number of entries currently in pendingCommitAcks */
1280         private int nPendingAcks;
1281 
1282         /*
1283          * If this time limit is reached, the group will be forced to commit.
1284          * Invariant: nPendingAcks > 0 ==> limitGroupCommitNs > 0
1285          */
1286         private long limitGroupCommitNs = 0;
1287 
1288         /* The time interval that an open group is held back. */
1289         private final long groupCommitIntervalNs;
1290 
1291         private final LongStat nGroupCommitTimeouts;
1292         private final LongStat nGroupCommitMaxExceeded;
1293         private final LongStat nGroupCommits;
1294         private final LongStat nGroupCommitTxns;
1295 
GroupCommit(DbConfigManager configManager)1296         private GroupCommit(DbConfigManager configManager) {
1297             pendingCommitAcks = new long[configManager.
1298                 getInt(RepParams.REPLICA_MAX_GROUP_COMMIT)];
1299 
1300             nPendingAcks = 0;
1301 
1302             final long groupCommitIntervalMs = configManager.
1303                 getDuration(RepParams.REPLICA_GROUP_COMMIT_INTERVAL);
1304 
1305             groupCommitIntervalNs =
1306                 NANOSECONDS.convert(groupCommitIntervalMs, MILLISECONDS);
1307             nGroupCommitTimeouts =
1308                 new LongStat(statistics, N_GROUP_COMMIT_TIMEOUTS);
1309 
1310             nGroupCommitMaxExceeded =
1311                 new LongStat(statistics, N_GROUP_COMMIT_MAX_EXCEEDED);
1312 
1313             nGroupCommitTxns =
1314                 new LongStat(statistics, N_GROUP_COMMIT_TXNS);
1315 
1316             nGroupCommits =
1317                 new LongStat(statistics, N_GROUP_COMMITS);
1318         }
1319 
1320         /**
1321          * Returns true if group commits are enabled at the replica.
1322          */
isEnabled()1323         private boolean isEnabled() {
1324             return pendingCommitAcks.length > 0;
1325         }
1326 
1327         /**
1328          * The interval used to poll for incoming log entries. The time is
1329          * lowered from the defaultNs time, if there are pending
1330          * acknowledgments.
1331          *
1332          * @param defaultNs the default poll interval
1333          *
1334          * @return the actual poll interval
1335          */
getPollIntervalNs(long defaultNs)1336         private long getPollIntervalNs(long defaultNs) {
1337             if (nPendingAcks == 0) {
1338                 return defaultNs;
1339             }
1340             final long now = System.nanoTime();
1341 
1342             final long interval = limitGroupCommitNs - now;
1343             return Math.min(interval, defaultNs);
1344         }
1345 
1346         /**
1347          * Returns the sync policy to be implemented at the replica. If
1348          * group commit is active, and SYNC is requested it will return
1349          * NO_SYNC instead to delay the fsync.
1350          *
1351          * @param txnSyncPolicy the sync policy as stated in the txn
1352          *
1353          * @return the sync policy to be implemented by the replica
1354          */
getImplSyncPolicy(SyncPolicy txnSyncPolicy)1355         private SyncPolicy getImplSyncPolicy(SyncPolicy txnSyncPolicy) {
1356             return ((txnSyncPolicy ==  SyncPolicy.SYNC) && isEnabled()) ?
1357                    SyncPolicy.NO_SYNC : txnSyncPolicy;
1358         }
1359 
1360         /**
1361          * Buffers the acknowledgment if the commit calls for a sync, or if
1362          * there are pending acknowledgments to ensure that acks are sent
1363          * in order.
1364          *
1365          * @param nowNs the current time
1366          * @param ackTxn the txn associated with the ack
1367          * @param txnSyncPolicy the sync policy as request by the committing
1368          * txn
1369          *
1370          * @return true if the ack has been buffered
1371          */
bufferAck(long nowNs, ReplayTxn ackTxn, SyncPolicy txnSyncPolicy)1372         private final boolean bufferAck(long nowNs,
1373                                         ReplayTxn ackTxn,
1374                                         SyncPolicy txnSyncPolicy)
1375             throws IOException {
1376 
1377             if (!isEnabled() ||
1378                 !((txnSyncPolicy == SyncPolicy.SYNC) || (nPendingAcks > 0))) {
1379                 return false;
1380             }
1381 
1382             pendingCommitAcks[nPendingAcks++] = ackTxn.getId();
1383 
1384             if (nPendingAcks == 1) {
1385                 /* First txn in group, start the clock. */
1386                 limitGroupCommitNs = nowNs + groupCommitIntervalNs;
1387             } else {
1388                 flushPendingAcks(nowNs);
1389             }
1390             return true;
1391         }
1392 
1393         /**
1394          * Flush if there are pending acks and either the buffer limit or the
1395          * group interval has been reached.
1396          *
1397          * @param nowNs the current time (passed in to minimize system calls)
1398          */
flushPendingAcks(long nowNs)1399         private final void flushPendingAcks(long nowNs)
1400             throws IOException {
1401 
1402             if ((nPendingAcks == 0) ||
1403                 ((nPendingAcks != pendingCommitAcks.length) &&
1404                  (NanoTimeUtil.compare(nowNs, limitGroupCommitNs) < 0))) {
1405 
1406                 return;
1407             }
1408 
1409             /* Update statistics. */
1410             nGroupCommits.increment();
1411             nGroupCommitTxns.add(nPendingAcks);
1412             if (NanoTimeUtil.compare(nowNs, limitGroupCommitNs) >= 0) {
1413                 nGroupCommitTimeouts.increment();
1414             } else if (nPendingAcks >= pendingCommitAcks.length) {
1415                 nGroupCommitMaxExceeded.increment();
1416             }
1417 
1418             /* flush log buffer and fsync to disk */
1419             repImpl.getLogManager().flush();
1420 
1421             /* commits are on disk, send out acknowledgments on the network. */
1422             for (int i=0; i < nPendingAcks; i++) {
1423                 queueAck(pendingCommitAcks[i]);
1424                 pendingCommitAcks[i] = 0;
1425             }
1426 
1427             nPendingAcks = 0;
1428             limitGroupCommitNs = 0;
1429         }
1430     }
1431 
1432     /**
1433      * Simple helper class to package a Txn vlsn and its associated commit
1434      * time.
1435      */
1436     public static class TxnInfo {
1437         final VLSN txnVLSN;
1438         final long masterTxnEndTime;
1439 
TxnInfo(VLSN txnVLSN, long masterTxnEndTime)1440         private TxnInfo(VLSN txnVLSN, long masterTxnEndTime) {
1441             this.txnVLSN = txnVLSN;
1442             this.masterTxnEndTime = masterTxnEndTime;
1443         }
1444 
getTxnVLSN()1445         public VLSN getTxnVLSN() {
1446             return txnVLSN;
1447         }
1448 
getMasterTxnEndTime()1449         public long getMasterTxnEndTime() {
1450             return masterTxnEndTime;
1451         }
1452 
1453         @Override
toString()1454         public String toString() {
1455             return " VLSN: " + txnVLSN +
1456                 " masterTxnEndTime=" + new Date(masterTxnEndTime);
1457         }
1458     }
1459 }
1460