1 /*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2002, 2014 Oracle and/or its affiliates. All rights reserved. 5 * 6 */ 7 8 package com.sleepycat.je.rep.impl.node; 9 10 import static com.sleepycat.je.log.LogEntryType.LOG_NAMELN_TRANSACTIONAL; 11 import static com.sleepycat.je.log.LogEntryType.LOG_TXN_ABORT; 12 import static com.sleepycat.je.log.LogEntryType.LOG_TXN_COMMIT; 13 import static com.sleepycat.je.rep.impl.node.ReplayStatDefinition.LATEST_COMMIT_LAG_MS; 14 import static com.sleepycat.je.rep.impl.node.ReplayStatDefinition.MAX_COMMIT_PROCESSING_NANOS; 15 import static com.sleepycat.je.rep.impl.node.ReplayStatDefinition.MIN_COMMIT_PROCESSING_NANOS; 16 import static com.sleepycat.je.rep.impl.node.ReplayStatDefinition.N_ABORTS; 17 import static com.sleepycat.je.rep.impl.node.ReplayStatDefinition.N_COMMITS; 18 import static com.sleepycat.je.rep.impl.node.ReplayStatDefinition.N_COMMIT_ACKS; 19 import static com.sleepycat.je.rep.impl.node.ReplayStatDefinition.N_COMMIT_NO_SYNCS; 20 import static com.sleepycat.je.rep.impl.node.ReplayStatDefinition.N_COMMIT_SYNCS; 21 import static com.sleepycat.je.rep.impl.node.ReplayStatDefinition.N_COMMIT_WRITE_NO_SYNCS; 22 import static com.sleepycat.je.rep.impl.node.ReplayStatDefinition.N_ELAPSED_TXN_TIME; 23 import static com.sleepycat.je.rep.impl.node.ReplayStatDefinition.N_GROUP_COMMITS; 24 import static com.sleepycat.je.rep.impl.node.ReplayStatDefinition.N_GROUP_COMMIT_MAX_EXCEEDED; 25 import static com.sleepycat.je.rep.impl.node.ReplayStatDefinition.N_GROUP_COMMIT_TIMEOUTS; 26 import static com.sleepycat.je.rep.impl.node.ReplayStatDefinition.N_GROUP_COMMIT_TXNS; 27 import static com.sleepycat.je.rep.impl.node.ReplayStatDefinition.N_LNS; 28 import static com.sleepycat.je.rep.impl.node.ReplayStatDefinition.N_MESSAGE_QUEUE_OVERFLOWS; 29 import static com.sleepycat.je.rep.impl.node.ReplayStatDefinition.N_NAME_LNS; 30 import static com.sleepycat.je.rep.impl.node.ReplayStatDefinition.TOTAL_COMMIT_LAG_MS; 31 import static com.sleepycat.je.rep.impl.node.ReplayStatDefinition.TOTAL_COMMIT_PROCESSING_NANOS; 32 import static java.util.concurrent.TimeUnit.MILLISECONDS; 33 import static java.util.concurrent.TimeUnit.NANOSECONDS; 34 35 import java.io.File; 36 import java.io.IOException; 37 import java.util.ArrayList; 38 import java.util.Collection; 39 import java.util.Collections; 40 import java.util.Date; 41 import java.util.HashMap; 42 import java.util.HashSet; 43 import java.util.List; 44 import java.util.Map; 45 import java.util.Set; 46 import java.util.concurrent.ArrayBlockingQueue; 47 import java.util.concurrent.BlockingQueue; 48 import java.util.logging.Level; 49 import java.util.logging.Logger; 50 51 import com.sleepycat.je.Cursor; 52 import com.sleepycat.je.DatabaseConfig; 53 import com.sleepycat.je.DatabaseEntry; 54 import com.sleepycat.je.DatabaseException; 55 import com.sleepycat.je.DatabaseNotFoundException; 56 import com.sleepycat.je.DbInternal; 57 import com.sleepycat.je.Durability.SyncPolicy; 58 import com.sleepycat.je.EnvironmentFailureException; 59 import com.sleepycat.je.LockMode; 60 import com.sleepycat.je.OperationStatus; 61 import com.sleepycat.je.StatsConfig; 62 import com.sleepycat.je.TransactionConfig; 63 import com.sleepycat.je.config.EnvironmentParams; 64 import com.sleepycat.je.dbi.CursorImpl.SearchMode; 65 import com.sleepycat.je.dbi.DatabaseId; 66 import com.sleepycat.je.dbi.DatabaseImpl; 67 import com.sleepycat.je.dbi.DbConfigManager; 68 import com.sleepycat.je.dbi.DbTree.TruncateDbResult; 69 import com.sleepycat.je.dbi.DbType; 70 import com.sleepycat.je.dbi.EnvironmentFailureReason; 71 import com.sleepycat.je.dbi.PutMode; 72 import com.sleepycat.je.dbi.TriggerManager; 73 import com.sleepycat.je.log.DbOpReplicationContext; 74 import com.sleepycat.je.log.FileManager; 75 import com.sleepycat.je.log.LogEntryType; 76 import com.sleepycat.je.log.LogManager; 77 import com.sleepycat.je.log.ReplicationContext; 78 import com.sleepycat.je.log.entry.DbOperationType; 79 import com.sleepycat.je.log.entry.LNLogEntry; 80 import com.sleepycat.je.log.entry.LogEntry; 81 import com.sleepycat.je.log.entry.NameLNLogEntry; 82 import com.sleepycat.je.log.entry.SingleItemEntry; 83 import com.sleepycat.je.recovery.RecoveryInfo; 84 import com.sleepycat.je.recovery.RollbackTracker; 85 import com.sleepycat.je.rep.LogFileRewriteListener; 86 import com.sleepycat.je.rep.SyncupProgress; 87 import com.sleepycat.je.rep.impl.RepGroupDB; 88 import com.sleepycat.je.rep.impl.RepImpl; 89 import com.sleepycat.je.rep.impl.RepParams; 90 import com.sleepycat.je.rep.stream.InputWireRecord; 91 import com.sleepycat.je.rep.stream.MasterStatus.MasterSyncException; 92 import com.sleepycat.je.rep.stream.Protocol; 93 import com.sleepycat.je.rep.txn.ReplayTxn; 94 import com.sleepycat.je.rep.utilint.LongMaxZeroStat; 95 import com.sleepycat.je.rep.utilint.LongMinZeroStat; 96 import com.sleepycat.je.rep.vlsn.VLSNRange; 97 import com.sleepycat.je.tree.LN; 98 import com.sleepycat.je.tree.NameLN; 99 import com.sleepycat.je.txn.RollbackEnd; 100 import com.sleepycat.je.txn.RollbackStart; 101 import com.sleepycat.je.txn.Txn; 102 import com.sleepycat.je.txn.TxnAbort; 103 import com.sleepycat.je.txn.TxnCommit; 104 import com.sleepycat.je.utilint.DbLsn; 105 import com.sleepycat.je.utilint.LoggerUtils; 106 import com.sleepycat.je.utilint.LongMaxStat; 107 import com.sleepycat.je.utilint.LongMinStat; 108 import com.sleepycat.je.utilint.LongStat; 109 import com.sleepycat.je.utilint.NanoTimeUtil; 110 import com.sleepycat.je.utilint.StatGroup; 111 import com.sleepycat.je.utilint.VLSN; 112 import com.sleepycat.utilint.StringUtils; 113 114 /** 115 * Replays log records from the replication stream, and manages the 116 * transactions for those records. 117 * 118 * The Replay module has a lifetime equivalent to the environment owned by 119 * this replicator. Its lifetime is longer than the feeder/replica stream. 120 * For example, suppose this is nodeX: 121 * 122 * t1 - Node X is a replica, node A is master. Replay X is alive 123 * t2 - Node X is a replica, node B takes over as master. X's Replay module 124 * is still alive and has the same set of active txns. It doesn't matter 125 * to X that the master has changed. 126 * t3 - Node X becomes the master. Now its Replay unit is cleared, because 127 * anything managed by the Replay is defunct. 128 */ 129 public class Replay { 130 131 /* These are strings for the rollback logging. */ 132 private static final String RBSTATUS_START = 133 "Started Rollback"; 134 private static final String RBSTATUS_NO_ACTIVE = 135 "No active txns, nothing to rollback"; 136 private static final String RBSTATUS_RANGE_EQUALS = 137 "End of range equals matchpoint, nothing to rollback"; 138 private static final String RBSTATUS_LOG_RBSTART = 139 "Logged RollbackStart entry"; 140 private static final String RBSTATUS_MEM_ROLLBACK = 141 "Finished in-memory rollback"; 142 private static final String RBSTATUS_INVISIBLE = 143 "Finished invisible setting"; 144 private static final String RBSTATUS_FINISH = 145 "Finished rollback"; 146 147 /* 148 * DatabaseEntry objects reused during replay, to minimize allocation in 149 * high performance replay path. 150 */ 151 final DatabaseEntry replayKeyEntry = new DatabaseEntry(); 152 final DatabaseEntry replayDataEntry = new DatabaseEntry(); 153 final DatabaseEntry delDataEntry = new DatabaseEntry(); 154 155 private final RepImpl repImpl; 156 157 /** 158 * If a commit replay operation takes more than this threshold, it's 159 * logged. This information helps determine whether ack timeouts on the 160 * master are due to a slow replica, or the network. 161 */ 162 private final long ackTimeoutLogThresholdNs; 163 164 /** 165 * ActiveTxns is a collection of txn objects used for applying replicated 166 * transactions. This collection should be empty if the node is a master. 167 * 168 * Note that there is an interesting relationship between ActiveTxns and 169 * the txn collection managed by the environment TxnManager. ActiveTxns is 170 * effectively a subset of the set of txns held by the 171 * TxnManager. ReplayTxns must be sure to register and unregister 172 * themselves from ActiveTxns, just as all Txns must register and 173 * unregister with the TxnManager's set. One implementation alternative to 174 * having an ActiveTxns map here is to search the TxnManager set (which is 175 * a set, not a map) for a given ReplayTxn. Another is to subclass 176 * TxnManager so that replicated nodes have their own replayTxn map, just 177 * as XATxns have a XID->Txn map. 178 * 179 * Both alternatives seemed too costly in terms of performance or elaborate 180 * in terms of code to do for the current function. It seems clearer to 181 * make the ActiveTxns a map in the one place that it is currently 182 * used. This choice may change over time, and should be reevaluated if the 183 * implementation changes. 184 * 185 * The ActiveTxns key is the transaction id. These transactions are closed 186 * when: 187 * - the replay unit executes a commit received over the replication stream 188 * - the replay unit executes an abort received over the replication stream 189 * - the replication node realizes that it has just become the new master, 190 * and was not previously the master. 191 * 192 * Note that the Replay class has a lifetime that is longer than that of a 193 * RepNode. This means in particular, that transactions may be left open, 194 * and will be resumed when a replica switches from one master to another, 195 * creating a new RepNode in the process. Because of that, part of the 196 * transaction may be implemented by the rep stream from one master and 197 * another part by another. 198 * 199 * The map is synchronized, so simple get/put operations do not require 200 * additional synchronization. However, iteration requires synchronization 201 * and copyActiveTxns can be used in most cases. 202 */ 203 private final Map<Long, ReplayTxn> activeTxns; 204 205 /* 206 * The entry representing the last replayed txn commit. Supports the 207 * replica's notion of consistency. 208 */ 209 private volatile TxnInfo lastReplayedTxn = null; 210 211 /* 212 * The last replayed entry of any kind. Supports PointConsistencyPolicy. 213 */ 214 private volatile VLSN lastReplayedVLSN = null; 215 216 /* 217 * The sync policy to be used in the absence of an ACK request. The replica 218 * in this case has some latitude about how it syncs the commit. 219 */ 220 private final SyncPolicy noAckSyncPolicy = SyncPolicy.NO_SYNC; 221 222 /** 223 * The RepParams.REPLAY_LOGGING_THRESHOLD configured logging threshold. 224 */ 225 private final long replayLoggingThresholdNs; 226 227 /** 228 * State that is reinitialized by the reinit() method each time a replay 229 * loop is started with a new feeder. 230 */ 231 232 /** 233 * All writes (predominantly acks) are queued here, so they do not block 234 * the replay thread. 235 */ 236 private final BlockingQueue<Long> outputQueue; 237 238 /** 239 * Holds the state associated with group commits. 240 */ 241 private final GroupCommit groupCommit; 242 243 /* Maintains the statistics associated with stream replay. */ 244 private final StatGroup statistics; 245 private final LongStat nCommits; 246 private final LongStat nCommitAcks; 247 private final LongStat nCommitSyncs; 248 private final LongStat nCommitNoSyncs; 249 private final LongStat nCommitWriteNoSyncs; 250 private final LongStat nAborts; 251 private final LongStat nNameLNs; 252 private final LongStat nLNs; 253 private final LongStat nElapsedTxnTime; 254 private final LongStat nMessageQueueOverflows; 255 private final LongMinStat minCommitProcessingNanos; 256 private final LongMaxStat maxCommitProcessingNanos; 257 private final LongStat totalCommitProcessingNanos; 258 private final LongStat totalCommitLagMs; 259 private final LongStat latestCommitLagMs; 260 261 private final Logger logger; Replay(RepImpl repImpl, @SuppressWarnings(R) NameIdPair nameIdPair)262 public Replay(RepImpl repImpl, 263 @SuppressWarnings("unused") NameIdPair nameIdPair) { 264 265 /* 266 * This should have already been caught in 267 * ReplicatedEnvironment.setupRepConfig, but it is checked here anyway 268 * as an added sanity check. [#17643] 269 */ 270 if (repImpl.isReadOnly()) { 271 throw EnvironmentFailureException.unexpectedState 272 ("Replay created with readonly ReplicatedEnvironment"); 273 } 274 275 this.repImpl = repImpl; 276 final DbConfigManager configManager = repImpl.getConfigManager(); 277 278 ackTimeoutLogThresholdNs = MILLISECONDS.toNanos(configManager. 279 getDuration(RepParams.REPLICA_ACK_TIMEOUT)); 280 281 /** 282 * The factor of 2 below is somewhat arbitrary. It should be > 1 X so 283 * that the ReplicaOutputThread can completely process the buffered 284 * messages in the face of a network drop and 2X to allow for 285 * additional headroom and minimize the chances that the replay might 286 * be blocked due to the limited queue length. 287 */ 288 final int outputQueueSize = 2 * 289 configManager.getInt(RepParams.REPLICA_MESSAGE_QUEUE_SIZE); 290 outputQueue = new ArrayBlockingQueue<Long>(outputQueueSize); 291 292 /* 293 * The Replay module manages all write transactions and mimics a 294 * writing application thread. When the node comes up, it populates 295 * the activeTxn collection with ReplayTxns that were resurrected 296 * at recovery time. 297 */ 298 activeTxns = Collections.synchronizedMap( 299 new HashMap<Long, ReplayTxn>()); 300 301 /* 302 * Configure the data entry used for deletion to avoid fetching the 303 * old data during deletion replay. 304 */ 305 delDataEntry.setPartial(0, 0, true); 306 307 logger = LoggerUtils.getLogger(getClass()); 308 statistics = new StatGroup(ReplayStatDefinition.GROUP_NAME, 309 ReplayStatDefinition.GROUP_DESC); 310 311 groupCommit = new GroupCommit(configManager); 312 313 nCommits = new LongStat(statistics, N_COMMITS); 314 nCommitAcks = new LongStat(statistics, N_COMMIT_ACKS); 315 nCommitSyncs = new LongStat(statistics, N_COMMIT_SYNCS); 316 nCommitNoSyncs = new LongStat(statistics, N_COMMIT_NO_SYNCS); 317 nCommitWriteNoSyncs = 318 new LongStat(statistics, N_COMMIT_WRITE_NO_SYNCS); 319 nAborts = new LongStat(statistics, N_ABORTS); 320 nNameLNs = new LongStat(statistics, N_NAME_LNS); 321 nLNs = new LongStat(statistics, N_LNS); 322 nElapsedTxnTime = new LongStat(statistics, N_ELAPSED_TXN_TIME); 323 nMessageQueueOverflows = 324 new LongStat(statistics, N_MESSAGE_QUEUE_OVERFLOWS); 325 minCommitProcessingNanos = 326 new LongMinZeroStat(statistics, MIN_COMMIT_PROCESSING_NANOS); 327 maxCommitProcessingNanos = 328 new LongMaxZeroStat(statistics, MAX_COMMIT_PROCESSING_NANOS); 329 totalCommitProcessingNanos = 330 new LongStat(statistics, TOTAL_COMMIT_PROCESSING_NANOS); 331 totalCommitLagMs = new LongStat(statistics, TOTAL_COMMIT_LAG_MS); 332 latestCommitLagMs = new LongStat(statistics, LATEST_COMMIT_LAG_MS); 333 334 replayLoggingThresholdNs = MILLISECONDS.toNanos(configManager. 335 getDuration(RepParams.REPLAY_LOGGING_THRESHOLD)); 336 } 337 getOutputQueue()338 public BlockingQueue<Long> getOutputQueue() { 339 return outputQueue; 340 } 341 342 /** 343 * Reinitialize for replay from a new feeder 344 */ reset()345 public void reset() { 346 outputQueue.clear(); 347 } 348 getMessageQueueOverflows()349 LongStat getMessageQueueOverflows() { 350 return nMessageQueueOverflows; 351 } 352 353 /** 354 * Actions that must be taken before the recovery checkpoint, whether 355 * the environment is read/write or read/only. 356 */ preRecoveryCheckpointInit(RecoveryInfo recoveryInfo)357 public void preRecoveryCheckpointInit(RecoveryInfo recoveryInfo) { 358 for (Txn txn : recoveryInfo.replayTxns.values()) { 359 360 /* 361 * ReplayTxns need to know about their owning activeTxn map, 362 * so they can remove themselves at close. We are casting upwards, 363 * because the non-HA code is prohibited from referencing 364 * Replication classes, and the RecoveryInfo.replayTxns collection 365 * doesn't know that it's got ReplayTxns. 366 */ 367 ((ReplayTxn) txn).registerWithActiveTxns(activeTxns); 368 } 369 lastReplayedVLSN = repImpl.getVLSNIndex().getRange().getLast(); 370 } 371 getLastReplayedTxn()372 public TxnInfo getLastReplayedTxn() { 373 return lastReplayedTxn; 374 } 375 getLastReplayedVLSN()376 public VLSN getLastReplayedVLSN() { 377 return lastReplayedVLSN; 378 } 379 380 /** 381 * When mastership changes, all inflight replay transactions are aborted. 382 * Replay transactions need only be aborted by the node that has become 383 * the new master (who was previously a Replica). The replay transactions 384 * on the other replicas who have not changed roles are 385 * resolved by the abort record issued by said new master. 386 */ abortOldTxns()387 public void abortOldTxns() 388 throws DatabaseException { 389 390 final int masterNodeId = repImpl.getNodeId(); 391 for (ReplayTxn replayTxn : copyActiveTxns().values()) { 392 replayTxn.abort(ReplicationContext.MASTER, masterNodeId); 393 } 394 assert activeTxns.size() == 0 : "Unexpected txns in activeTxns = " + 395 activeTxns; 396 } 397 updateCommitStats(final boolean needsAck, final SyncPolicy syncPolicy, final long startTimeNanos, final long masterCommitTimeMs, final long replicaCommitTimeMs)398 private void updateCommitStats(final boolean needsAck, 399 final SyncPolicy syncPolicy, 400 final long startTimeNanos, 401 final long masterCommitTimeMs, 402 final long replicaCommitTimeMs) { 403 404 final long now = System.nanoTime(); 405 final long commitNanos = now - startTimeNanos; 406 407 if (commitNanos > ackTimeoutLogThresholdNs && 408 logger.isLoggable(Level.INFO)) { 409 LoggerUtils.info 410 (logger, repImpl, 411 "Replay commit time: " + (commitNanos / 1000000) + 412 " ms exceeded log threshold: " + 413 (ackTimeoutLogThresholdNs / 1000000)); 414 } 415 416 nCommits.increment(); 417 418 if (needsAck) { 419 nCommitAcks.increment(); 420 } 421 422 if (syncPolicy == SyncPolicy.SYNC) { 423 nCommitSyncs.increment(); 424 } else if (syncPolicy == SyncPolicy.NO_SYNC) { 425 nCommitNoSyncs.increment(); 426 } else if (syncPolicy == SyncPolicy.WRITE_NO_SYNC) { 427 nCommitWriteNoSyncs.increment(); 428 } else { 429 throw EnvironmentFailureException.unexpectedState 430 ("Unknown sync policy: " + syncPolicy); 431 } 432 433 totalCommitProcessingNanos.add(commitNanos); 434 minCommitProcessingNanos.setMin(commitNanos); 435 maxCommitProcessingNanos.setMax(commitNanos); 436 437 /* 438 * Tally the lag between master and replica commits, even if clock skew 439 * makes the lag appear negative. The documentation already warns that 440 * the value will be affected by clock skew, so users can adjust for 441 * that, but only if we don't throw the information way. 442 */ 443 final long replicaLagMs = replicaCommitTimeMs - masterCommitTimeMs; 444 totalCommitLagMs.add(replicaLagMs); 445 latestCommitLagMs.set(replicaLagMs); 446 } 447 448 /** 449 * Apply the operation represented by this log entry on this replica node. 450 */ replayEntry(long startNs, Protocol.Entry entry)451 public void replayEntry(long startNs, 452 Protocol.Entry entry) 453 throws DatabaseException, 454 IOException, 455 InterruptedException, 456 MasterSyncException { 457 458 final InputWireRecord wireRecord = entry.getWireRecord(); 459 final LogEntry logEntry = wireRecord.getLogEntry(); 460 461 /* 462 * Sanity check that the replication stream is in sequence. We want to 463 * forestall any possible corruption from replaying invalid entries. 464 */ 465 if (!wireRecord.getVLSN().follows(lastReplayedVLSN)) { 466 throw new EnvironmentFailureException 467 (repImpl, 468 EnvironmentFailureReason.UNEXPECTED_STATE, 469 "Rep stream not sequential. Current VLSN: " + 470 lastReplayedVLSN + 471 " next log entry VLSN: " + wireRecord.getVLSN()); 472 } 473 474 if (logger.isLoggable(Level.FINEST)) { 475 LoggerUtils.finest(logger, repImpl, "Replaying " + wireRecord); 476 } 477 478 final ReplayTxn repTxn = getReplayTxn(logEntry.getTransactionId(), true); 479 updateReplicaSequences(logEntry); 480 final byte entryType = wireRecord.getEntryType(); 481 482 lastReplayedVLSN = wireRecord.getVLSN(); 483 484 try { 485 final long txnId = repTxn.getId(); 486 487 if (LOG_TXN_COMMIT.equalsType(entryType)) { 488 Protocol.Commit commitEntry = (Protocol.Commit) entry; 489 490 final boolean needsAck = commitEntry.getNeedsAck(); 491 final SyncPolicy txnSyncPolicy = 492 commitEntry.getReplicaSyncPolicy(); 493 final SyncPolicy implSyncPolicy = 494 needsAck ? 495 groupCommit.getImplSyncPolicy(txnSyncPolicy) : 496 noAckSyncPolicy; 497 498 logReplay(repTxn, needsAck, implSyncPolicy); 499 500 final TxnCommit masterCommit = 501 (TxnCommit) logEntry.getMainItem(); 502 503 if (needsAck) { 504 505 /* 506 * Only wait if the replica is not lagging and the 507 * durability requires it. 508 */ 509 repImpl.getRepNode().getVLSNFreezeLatch().awaitThaw(); 510 repImpl.getRepNode().getMasterStatus().assertSync(); 511 } 512 513 repTxn.commit(implSyncPolicy, 514 new ReplicationContext(lastReplayedVLSN), 515 masterCommit.getMasterNodeId()); 516 517 final long masterCommitTimeMs = 518 masterCommit.getTime().getTime(); 519 lastReplayedTxn = new TxnInfo(lastReplayedVLSN, 520 masterCommitTimeMs); 521 522 updateCommitStats(needsAck, implSyncPolicy, startNs, 523 masterCommitTimeMs, repTxn.getEndTime()); 524 525 /* Respond to the feeder. */ 526 if (needsAck) { 527 /* 528 * Need an ack, either buffer it, for sync group commit, or 529 * queue it. 530 */ 531 if (!groupCommit.bufferAck(startNs, repTxn, 532 txnSyncPolicy)) { 533 queueAck(txnId); 534 } 535 } 536 537 /* 538 * The group refresh and recalculation can be expensive, since 539 * it may require a database read. Do it after the ack. 540 */ 541 if (repTxn.getRepGroupDbChange() && canRefreshGroup(repTxn)) { 542 repImpl.getRepNode().refreshCachedGroup(); 543 repImpl.getRepNode().recalculateGlobalCBVLSN(); 544 } 545 546 nElapsedTxnTime.add(repTxn.elapsedTime()); 547 548 } else if (LOG_TXN_ABORT.equalsType(entryType)) { 549 550 nAborts.increment(); 551 final TxnAbort masterAbort = (TxnAbort) logEntry.getMainItem(); 552 final ReplicationContext abortContext = 553 new ReplicationContext(wireRecord.getVLSN()); 554 if (logger.isLoggable(Level.FINEST)) { 555 LoggerUtils.finest(logger, repImpl, 556 "abort called for " + txnId + 557 " masterId=" + 558 masterAbort.getMasterNodeId() + 559 " repContext=" + abortContext); 560 } 561 repTxn.abort(abortContext, masterAbort.getMasterNodeId()); 562 lastReplayedTxn = new TxnInfo(lastReplayedVLSN, 563 masterAbort.getTime().getTime()); 564 if (repTxn.getRepGroupDbChange() && canRefreshGroup(repTxn)) { 565 566 /* 567 * Refresh is the safe thing to do on an abort, since a 568 * refresh may have been held back from an earlier commit 569 * due to this active transaction. 570 */ 571 repImpl.getRepNode().refreshCachedGroup(); 572 } 573 nElapsedTxnTime.add(repTxn.elapsedTime()); 574 575 } else if (LOG_NAMELN_TRANSACTIONAL.equalsType(entryType)) { 576 577 repImpl.getRepNode().getReplica().clearDbTreeCache(); 578 nNameLNs.increment(); 579 applyNameLN(repTxn, wireRecord); 580 581 } else { 582 nLNs.increment(); 583 /* A data operation. */ 584 assert wireRecord.getLogEntry() instanceof LNLogEntry; 585 applyLN(repTxn, wireRecord); 586 } 587 588 /* Remember the last VLSN applied by this txn. */ 589 repTxn.setLastAppliedVLSN(lastReplayedVLSN); 590 591 } catch (DatabaseException e) { 592 e.addErrorMessage("Problem seen replaying entry " + wireRecord); 593 throw e; 594 } finally { 595 final long elapsedNs = System.nanoTime() - startNs; 596 if (elapsedNs > replayLoggingThresholdNs) { 597 LoggerUtils.info(logger, repImpl, 598 "Replay time for entry type:" + 599 LogEntryType.findType(entryType) + " " + 600 NANOSECONDS.toMillis(elapsedNs) + "ms " + 601 "exceeded threshold:" + 602 NANOSECONDS. 603 toMillis(replayLoggingThresholdNs) + 604 "ms"); 605 } 606 } 607 } 608 609 /** 610 * Queue the request ack for an async ack write to the network. 611 */ queueAck(final long txnId)612 void queueAck(final long txnId) throws IOException { 613 try { 614 outputQueue.put(txnId); 615 } catch (InterruptedException ie) { 616 /* 617 * Have the higher levels treat it like an IOE and 618 * exit the thread. 619 */ 620 throw new IOException("Ack I/O interrupted", ie); 621 } 622 } 623 624 /** 625 * Logs information associated with the replay of the txn commit 626 */ logReplay(ReplayTxn repTxn, boolean needsAck, SyncPolicy syncPolicy)627 private void logReplay(ReplayTxn repTxn, 628 boolean needsAck, 629 SyncPolicy syncPolicy) { 630 631 if (!logger.isLoggable(Level.FINE)) { 632 return; 633 } 634 635 if (needsAck) { 636 LoggerUtils.fine(logger, repImpl, 637 "Replay: got commit for txn=" + repTxn.getId() + 638 ", ack needed, replica sync policy=" + 639 syncPolicy + 640 " vlsn=" + lastReplayedVLSN); 641 } else { 642 LoggerUtils.fine(logger, repImpl, 643 "Replay: got commit for txn=" + repTxn.getId() + 644 " ack not needed" + 645 " vlsn=" + lastReplayedVLSN); 646 } 647 } 648 649 /** 650 * Returns true if there are no other activeTxns that have also modified 651 * the membership database and are still open, since they could potentially 652 * hold write locks that would block the read locks acquired during the 653 * refresh operation. 654 * 655 * @param txn the current txn being committed or aborted 656 * 657 * @return true if there are no open transactions that hold locks on the 658 * membership database. 659 */ canRefreshGroup(ReplayTxn txn)660 private boolean canRefreshGroup(ReplayTxn txn) { 661 662 /* 663 * Use synchronized rather than copyActiveTxns, since this is called 664 * during replay and there is no nested locking to worry about. 665 */ 666 synchronized (activeTxns) { 667 for (ReplayTxn atxn : activeTxns.values()) { 668 if (atxn == txn) { 669 continue; 670 } 671 if (atxn.getRepGroupDbChange()) { 672 return false; 673 } 674 } 675 } 676 return true; 677 } 678 679 /** 680 * Update this replica's node, txn and database sequences with any ids in 681 * this log entry. We can call update, even if the replay id doesn't 682 * represent a new lowest-id point, or if the apply is not successful, 683 * because the apply checks that the replay id is < the sequence on the 684 * replica. We just want to ensure that if this node becomes the master, 685 * its sequences are in sync with what came before in the replication 686 * stream, and ids are not incorrectly reused. 687 */ updateReplicaSequences(LogEntry logEntry)688 private void updateReplicaSequences(LogEntry logEntry) { 689 690 /* For now, we assume all replay entries have a txn id. */ 691 repImpl.getTxnManager().updateFromReplay(logEntry.getTransactionId()); 692 693 /* If it's a database operation, update the database id. */ 694 if (logEntry instanceof NameLNLogEntry) { 695 NameLNLogEntry nameLogEntry = (NameLNLogEntry) logEntry; 696 nameLogEntry.postFetchInit(false /*isDupDb*/); 697 NameLN nameLN = (NameLN) nameLogEntry.getLN(); 698 repImpl.getDbTree().updateFromReplay(nameLN.getId()); 699 } 700 } 701 702 /** 703 * Obtain a ReplayTxn to represent the incoming operation. 704 */ getReplayTxn(long txnId, boolean registerTxnImmediately)705 public ReplayTxn getReplayTxn(long txnId, boolean registerTxnImmediately) 706 throws DatabaseException { 707 708 ReplayTxn useTxn = null; 709 synchronized (activeTxns) { 710 useTxn = activeTxns.get(txnId); 711 if (useTxn == null) { 712 713 /* 714 * Durability will be explicitly specified when 715 * ReplayTxn.commit is called, so TransactionConfig.DEFAULT is 716 * fine. 717 */ 718 if (registerTxnImmediately) { 719 useTxn = new ReplayTxn(repImpl, TransactionConfig.DEFAULT, 720 txnId, activeTxns, logger); 721 } else { 722 useTxn = new ReplayTxn(repImpl, TransactionConfig.DEFAULT, 723 txnId, activeTxns, logger) { 724 @Override 725 protected 726 boolean registerImmediately() { 727 return false; 728 } 729 }; 730 } 731 } 732 } 733 return useTxn; 734 } 735 736 /** 737 * Replays the NameLN. 738 * 739 * Note that the operations: remove, rename and truncate need to establish 740 * write locks on the database. Any open handles are closed by this 741 * operation by virtue of the ReplayTxn's importunate property. The 742 * application will receive a LockPreemptedException if it subsequently 743 * accesses the database handle. 744 */ applyNameLN(ReplayTxn repTxn, InputWireRecord wireRecord)745 private void applyNameLN(ReplayTxn repTxn, 746 InputWireRecord wireRecord) 747 throws DatabaseException { 748 749 NameLNLogEntry nameLNEntry = (NameLNLogEntry) wireRecord.getLogEntry(); 750 final NameLN nameLN = (NameLN) nameLNEntry.getLN(); 751 752 String databaseName = StringUtils.fromUTF8(nameLNEntry.getKey()); 753 754 final DbOpReplicationContext repContext = 755 new DbOpReplicationContext(wireRecord.getVLSN(), nameLNEntry); 756 757 DbOperationType opType = repContext.getDbOperationType(); 758 DatabaseImpl dbImpl = null; 759 try { 760 switch (opType) { 761 case CREATE: 762 { 763 DatabaseConfig dbConfig = 764 repContext.getCreateConfig().getReplicaConfig(repImpl); 765 766 dbImpl = repImpl.getDbTree().createReplicaDb 767 (repTxn, databaseName, dbConfig, nameLN, repContext); 768 769 /* 770 * We rely on the RepGroupDB.DB_ID value, so make sure 771 * it's what we expect for this internal replicated 772 * database. 773 */ 774 if ((dbImpl.getId().getId() == RepGroupDB.DB_ID) && 775 !DbType.REP_GROUP.getInternalName().equals 776 (databaseName)) { 777 throw EnvironmentFailureException.unexpectedState 778 ("Database: " + 779 DbType.REP_GROUP.getInternalName() + 780 " is associated with id: " + 781 dbImpl.getId().getId() + 782 " and not the reserved database id: " + 783 RepGroupDB.DB_ID); 784 } 785 786 TriggerManager.runOpenTriggers(repTxn, dbImpl, true); 787 break; 788 } 789 790 case REMOVE: { 791 dbImpl = repImpl.getDbTree().getDb(nameLN.getId()); 792 try { 793 repImpl.getDbTree().removeReplicaDb 794 (repTxn, databaseName, nameLN.getId(), repContext); 795 TriggerManager.runRemoveTriggers(repTxn, dbImpl); 796 } catch (DatabaseNotFoundException e) { 797 throw EnvironmentFailureException.unexpectedState 798 ("Database: " + dbImpl.getName() + 799 " Id: " + nameLN.getId() + 800 " not found on the Replica."); 801 } 802 break; 803 } 804 805 case TRUNCATE: { 806 dbImpl = repImpl.getDbTree().getDb 807 (repContext.getTruncateOldDbId()); 808 try { 809 TruncateDbResult result = 810 repImpl.getDbTree().truncateReplicaDb 811 (repTxn, databaseName, false, nameLN, repContext); 812 TriggerManager.runTruncateTriggers(repTxn, result.newDb); 813 } catch (DatabaseNotFoundException e) { 814 throw EnvironmentFailureException.unexpectedState 815 ("Database: " + dbImpl.getName() + 816 " Id: " + nameLN.getId() + 817 " not found on the Replica."); 818 } 819 820 break; 821 } 822 823 case RENAME: { 824 dbImpl = repImpl.getDbTree().getDb(nameLN.getId()); 825 try { 826 dbImpl = 827 repImpl.getDbTree().renameReplicaDb 828 (repTxn, dbImpl.getName(), databaseName, nameLN, 829 repContext); 830 TriggerManager.runRenameTriggers(repTxn, dbImpl, 831 databaseName); 832 } catch (DatabaseNotFoundException e) { 833 throw EnvironmentFailureException.unexpectedState 834 ("Database rename from: " + dbImpl.getName() + 835 " to " + databaseName + 836 " failed, name not found on the Replica."); 837 } 838 break; 839 } 840 841 case UPDATE_CONFIG: { 842 /* Get the replicated database configurations. */ 843 DatabaseConfig dbConfig = 844 repContext.getCreateConfig().getReplicaConfig(repImpl); 845 846 /* Update the NameLN and write it to the log. */ 847 dbImpl = repImpl.getDbTree().getDb(nameLN.getId()); 848 final String dbName = dbImpl.getName(); 849 repImpl.getDbTree().updateNameLN 850 (repTxn, dbName, repContext); 851 852 /* Set the new configurations to DatabaseImpl. */ 853 dbImpl.setConfigProperties 854 (repTxn, dbName, dbConfig, repImpl); 855 856 repImpl.getDbTree().modifyDbRoot(dbImpl); 857 858 break; 859 } 860 861 default: 862 throw EnvironmentFailureException.unexpectedState 863 ("Illegal database op type of " + opType.toString() + 864 " from " + wireRecord + " database=" + databaseName); 865 } 866 } finally { 867 if (dbImpl != null) { 868 repImpl.getDbTree().releaseDb(dbImpl); 869 } 870 } 871 } 872 applyLN( final ReplayTxn repTxn, final InputWireRecord wireRecord)873 private void applyLN( 874 final ReplayTxn repTxn, 875 final InputWireRecord wireRecord) 876 throws DatabaseException { 877 878 final LNLogEntry<?> lnEntry = (LNLogEntry<?>) wireRecord.getLogEntry(); 879 final DatabaseId dbId = lnEntry.getDbId(); 880 881 /* 882 * If this is a change to the rep group db, remember at commit time, 883 * and refresh this node's group metadata. 884 */ 885 if (dbId.getId() == RepGroupDB.DB_ID) { 886 repTxn.noteRepGroupDbChange(); 887 } 888 889 /* 890 * Note that we don't have to worry about serializable isolation when 891 * applying a replicated txn; serializable isolation in only an issue 892 * for txns that take read locks, and a replicated txn consists only of 893 * write operations. 894 */ 895 final DatabaseImpl dbImpl = 896 repImpl.getRepNode().getReplica().getDbCache().get(dbId, repTxn); 897 898 lnEntry.postFetchInit(dbImpl); 899 900 final ReplicationContext repContext = 901 new ReplicationContext(wireRecord.getVLSN()); 902 903 final Cursor cursor = DbInternal.makeCursor( 904 dbImpl, repTxn, null /*cursorConfig*/); 905 906 try { 907 OperationStatus status; 908 final LN ln = lnEntry.getLN(); 909 910 if (ln.isDeleted()) { 911 912 /* 913 * Perform an exact search by key. Use read-uncommitted and 914 * partial data entry to avoid reading old data. 915 */ 916 replayKeyEntry.setData(lnEntry.getKey()); 917 918 status = DbInternal.searchForReplay( 919 cursor, replayKeyEntry, delDataEntry, 920 LockMode.READ_UNCOMMITTED, SearchMode.SET); 921 922 if (status == OperationStatus.SUCCESS) { 923 status = DbInternal.deleteInternal(cursor, repContext); 924 } 925 } else { 926 replayKeyEntry.setData(lnEntry.getKey()); 927 replayDataEntry.setData(ln.getData()); 928 929 DbConfigManager configManager = repImpl.getConfigManager(); 930 boolean blindInsertions = configManager.getBoolean( 931 EnvironmentParams.BIN_DELTA_BLIND_OPS); 932 933 /* 934 * Let RL be the logrec being replayed here. Let R and T be 935 * the record and the txn associated with RL. 936 * 937 * We say that RL is replayed "blindly" if the search for 938 * R's key in the tree lands on a BIN-delta, this delta does 939 * not contain R's key, and we don't mutate the delta to a 940 * full BIN to check if R is indeed in the tree or not; 941 * instead we just insert R in the delta. 942 * 943 * RL can be applied blindly only if RL is a "pure" insertion, 944 * i.e. RL is an insertion and R did not exist prior to T. 945 * 946 * A non-pure insertion (where R existed before T, it was 947 * deleted by T, and then reinserted by T) cannot be applied 948 * blindly, because if it were, it would generate a logrec 949 * with abortLSN == NULL, and if T were aborted, undoing the 950 * logrec with the NULL abortLSN would cause the loss of the 951 * pre-T version of R. So, to replay a non-pure insertion, 952 * we must check if a slot for R exists in the tree already, 953 * and if so, generate a new logrec with an abortLSN pointing 954 * to the pre-T version of R. 955 * 956 * Updates and deletes cannot be replayed blindly either, 957 * because we wouldn't be able to generate logrecs with the 958 * correct abortLsn, nor count the previous version of R as 959 * obsolete. 960 * 961 * The condition lnEntry.getAbortLsn() == DbLsn.NULL_LSN || 962 * lnEntry.getAbortKnownDeleted() guarantee that LN is a pure 963 * insertion. 964 */ 965 PutMode mode; 966 if (blindInsertions && 967 lnEntry.getLogType().equals( 968 LogEntryType.LOG_INS_LN_TRANSACTIONAL) && 969 (lnEntry.getAbortLsn() == DbLsn.NULL_LSN || 970 lnEntry.getAbortKnownDeleted())) { 971 mode = PutMode.BLIND_INSERTION; 972 } else { 973 mode = PutMode.OVERWRITE; 974 } 975 976 status = DbInternal.putForReplay(cursor, 977 replayKeyEntry, 978 replayDataEntry, 979 ln, 980 mode, 981 repContext); 982 } 983 984 if (status != OperationStatus.SUCCESS) { 985 throw new EnvironmentFailureException 986 (repImpl, 987 EnvironmentFailureReason.LOG_INCOMPLETE, 988 "Replicated operation could not be applied. Status= " + 989 status + ' ' + wireRecord); 990 } 991 } finally { 992 cursor.close(); 993 } 994 } 995 996 /** 997 * Go through all active txns and rollback up to but not including the log 998 * entry represented by the matchpoint VLSN. 999 * 1000 * Effectively truncate these rolled back log entries by making them 1001 * invisible. Flush the log first, to make sure these log entries are out 1002 * of the log buffers and are on disk, so we can reliably find them through 1003 * the FileManager. 1004 * 1005 * Rollback steps are described in 1006 * https://sleepycat.oracle.com/trac/wiki/Logging#Recoverysteps. In 1007 * summary, 1008 * 1009 * 1. Log and fsync a new RollbackStart record 1010 * 2. Do the rollback in memory. There is no need to explicitly 1011 * log INs made dirty by the rollback operation. 1012 * 3. Do invisibility masking by overwriting LNs. 1013 * 4. Fsync all overwritten log files at this point. 1014 * 5. Write a RollbackEnd record, for ease of debugging 1015 * 1016 * Note that application read txns can continue to run during syncup. 1017 * Reader txns cannot access records that are being rolled back, because 1018 * they are in txns that are not committed, i.e, they are write locked. 1019 * The rollback interval never includes committed txns, and we do a hard 1020 * recovery if it would include them. 1021 */ rollback(VLSN matchpointVLSN, long matchpointLsn)1022 public void rollback(VLSN matchpointVLSN, long matchpointLsn) { 1023 1024 String rollbackStatus = RBSTATUS_START; 1025 1026 final Map<Long, ReplayTxn> localActiveTxns = copyActiveTxns(); 1027 try { 1028 if (localActiveTxns.size() == 0) { 1029 /* no live read/write txns, nothing to do. */ 1030 rollbackStatus = RBSTATUS_NO_ACTIVE; 1031 return; 1032 } 1033 1034 VLSNRange range = repImpl.getVLSNIndex().getRange(); 1035 if (range.getLast().equals(matchpointVLSN)) { 1036 /* nothing to roll back. */ 1037 rollbackStatus = RBSTATUS_RANGE_EQUALS; 1038 return; 1039 } 1040 1041 repImpl.setSyncupProgress(SyncupProgress.DO_ROLLBACK); 1042 1043 /* 1044 * Stop the log file backup service, since the files will be in an 1045 * inconsistent state while the rollback is in progress. 1046 */ 1047 repImpl.getRepNode().shutdownNetworkBackup(); 1048 1049 /* 1050 * Set repImpl's isRollingBack to true, and invalidate all the in 1051 * progress DbBackup. 1052 */ 1053 repImpl.setBackupProhibited(true); 1054 repImpl.invalidateBackups(DbLsn.getFileNumber(matchpointLsn)); 1055 1056 /* 1057 * 1. Log RollbackStart. The fsync guarantees that this marker will 1058 * be present in the log for recovery. It also ensures that all log 1059 * entries will be flushed to disk and the TxnChain will not have 1060 * to worry about entries that are in log buffers when constructing 1061 * the rollback information. 1062 */ 1063 LogManager logManager = repImpl.getLogManager(); 1064 LogEntry rollbackStart = SingleItemEntry.create( 1065 LogEntryType.LOG_ROLLBACK_START, 1066 new RollbackStart( 1067 matchpointVLSN, matchpointLsn, localActiveTxns.keySet())); 1068 long rollbackStartLsn = 1069 logManager.logForceFlush(rollbackStart, 1070 true, // fsyncRequired, 1071 ReplicationContext.NO_REPLICATE); 1072 rollbackStatus = RBSTATUS_LOG_RBSTART; 1073 1074 /* 1075 * 2. Do rollback in memory. Undo any operations that were logged 1076 * after the matchpointLsn, and save the LSNs for those log 1077 * entries.. There should be something to undo, because we checked 1078 * earlier that there were log entries after the matchpoint. 1079 */ 1080 List<Long> rollbackLsns = new ArrayList<Long>(); 1081 for (ReplayTxn replayTxn : localActiveTxns.values()) { 1082 Collection<Long> txnRollbackLsns = 1083 replayTxn.rollback(matchpointLsn); 1084 1085 /* 1086 * Txns that were entirely rolled back should have been removed 1087 * from the activeTxns map. 1088 */ 1089 assert checkRemoved(replayTxn) : 1090 "Should have removed " + replayTxn; 1091 1092 rollbackLsns.addAll(txnRollbackLsns); 1093 } 1094 rollbackStatus = RBSTATUS_MEM_ROLLBACK; 1095 assert rollbackLsns.size() != 0 : dumpActiveTxns(matchpointLsn); 1096 1097 /* 1098 * 3 & 4 - Mark the rolled back log entries as invisible. But 1099 * before doing so, invoke any registered rewrite listeners, so the 1100 * application knows that existing log files will be modified. 1101 * 1102 * After all are done, fsync the set of files. By waiting, some may 1103 * have made it out on their own. 1104 */ 1105 LogFileRewriteListener listener = repImpl.getLogRewriteListener(); 1106 if (listener != null) { 1107 listener.rewriteLogFiles(getFileNames(rollbackLsns)); 1108 } 1109 RollbackTracker.makeInvisible(repImpl, rollbackLsns); 1110 rollbackStatus = RBSTATUS_INVISIBLE; 1111 1112 /* 1113 * 5. Log RollbackEnd. Flush it so that we can use it to optimize 1114 * recoveries later on. If the RollbackEnd exists, we can skip the 1115 * step of re-making LNs invisible. 1116 */ 1117 logManager.logForceFlush( 1118 SingleItemEntry.create(LogEntryType.LOG_ROLLBACK_END, 1119 new RollbackEnd(matchpointLsn, 1120 rollbackStartLsn)), 1121 true, // fsyncRequired 1122 ReplicationContext.NO_REPLICATE); 1123 1124 /* 1125 * Restart the backup service only if all the steps of the 1126 * rollback were successful. 1127 */ 1128 repImpl.getRepNode().restartNetworkBackup(); 1129 repImpl.setBackupProhibited(false); 1130 rollbackStatus = RBSTATUS_FINISH; 1131 } finally { 1132 1133 /* Reset the lastReplayedVLSN so it's correct when we resume. */ 1134 lastReplayedVLSN = matchpointVLSN; 1135 LoggerUtils.info(logger, repImpl, 1136 "Rollback to matchpoint " + matchpointVLSN + 1137 " at " + DbLsn.getNoFormatString(matchpointLsn) + 1138 " status=" + rollbackStatus); 1139 } 1140 } 1141 1142 /* For debugging support */ dumpActiveTxns(long matchpointLsn)1143 private String dumpActiveTxns(long matchpointLsn) { 1144 StringBuilder sb = new StringBuilder(); 1145 sb.append("matchpointLsn="); 1146 sb.append(DbLsn.getNoFormatString(matchpointLsn)); 1147 for (ReplayTxn replayTxn : copyActiveTxns().values()) { 1148 sb.append("txn id=").append(replayTxn.getId()); 1149 sb.append(" locks=").append(replayTxn.getWriteLockIds()); 1150 sb.append("lastLogged="); 1151 sb.append(DbLsn.getNoFormatString(replayTxn.getLastLsn())); 1152 sb.append("\n"); 1153 } 1154 1155 return sb.toString(); 1156 } 1157 getFileNames(List<Long> lsns)1158 private Set<File> getFileNames(List<Long> lsns) { 1159 Set<Long> fileNums = new HashSet<Long>(); 1160 Set<File> files = new HashSet<File>(); 1161 1162 for (long lsn : lsns) { 1163 fileNums.add(DbLsn.getFileNumber(lsn)); 1164 } 1165 for (long fileNum : fileNums) { 1166 files.add(new File(FileManager.getFileName(fileNum))); 1167 } 1168 return files; 1169 } 1170 checkRemoved(ReplayTxn txn)1171 private boolean checkRemoved(ReplayTxn txn) { 1172 if (txn.isClosed()) { 1173 if (activeTxns.containsKey(txn.getId())) { 1174 return false; 1175 } 1176 } 1177 1178 return true; 1179 } 1180 1181 /** 1182 * Make a copy of activeTxns to avoid holding its mutex while iterating. 1183 * Can be used whenever the cost of the HashMap copy is not significant. 1184 */ copyActiveTxns()1185 private Map<Long, ReplayTxn> copyActiveTxns() { 1186 synchronized (activeTxns) { 1187 return new HashMap<Long, ReplayTxn>(activeTxns); 1188 } 1189 } 1190 1191 /** 1192 * Release all transactions, database handles, etc held by the replay 1193 * unit. The Replicator is closing down and Replay will not be invoked 1194 * again. 1195 */ close()1196 public void close() { 1197 1198 for (ReplayTxn replayTxn : copyActiveTxns().values()) { 1199 try { 1200 if (logger.isLoggable(Level.FINE)) { 1201 LoggerUtils.fine(logger, repImpl, 1202 "Unregistering open replay txn: " + 1203 replayTxn.getId()); 1204 } 1205 replayTxn.cleanup(); 1206 } catch (DatabaseException e) { 1207 LoggerUtils.fine(logger, repImpl, 1208 "Replay txn: " + replayTxn.getId() + 1209 " unregistration failed: " + e.getMessage()); 1210 } 1211 } 1212 assert activeTxns.size() == 0; 1213 } 1214 1215 /** 1216 * Returns a copy of the statistics associated with Replay 1217 */ getStats(StatsConfig config)1218 public StatGroup getStats(StatsConfig config) { 1219 StatGroup ret = statistics.cloneGroup(config.getClear()); 1220 1221 return ret; 1222 } 1223 resetStats()1224 public void resetStats() { 1225 statistics.clear(); 1226 } 1227 1228 /* For unit tests */ getActiveTxns()1229 public Map<Long, ReplayTxn> getActiveTxns() { 1230 return activeTxns; 1231 } 1232 dumpState()1233 public String dumpState() { 1234 StringBuilder sb = new StringBuilder(); 1235 sb.append("lastReplayedTxn=").append(lastReplayedTxn); 1236 sb.append(" lastReplayedVLSN=").append(lastReplayedVLSN); 1237 sb.append(" numActiveReplayTxns=").append(activeTxns.size()); 1238 sb.append("\n"); 1239 return sb.toString(); 1240 } 1241 1242 /** 1243 * Write out any pending acknowledgments. See GroupCommit.flushPendingAcks 1244 * for details. This method is invoked after each log entry is read from 1245 * the replication stream. 1246 * 1247 * @param nowNs the time at the reading of the log entry 1248 */ flushPendingAcks(long nowNs)1249 void flushPendingAcks(long nowNs) 1250 throws IOException { 1251 1252 groupCommit.flushPendingAcks(nowNs); 1253 } 1254 1255 /** 1256 * See GroupCommit.getPollIntervalNs(long) 1257 */ getPollIntervalNs(long defaultNs)1258 long getPollIntervalNs(long defaultNs) { 1259 return groupCommit.getPollIntervalNs(defaultNs); 1260 } 1261 1262 /** 1263 * Implements group commit. It's really a substructure of Replay and exists 1264 * mainly for modularity reasons. 1265 * <p> 1266 * Since replay is single threaded, the group commit mechanism works 1267 * differently in the replica than in the master. In the replica, SYNC 1268 * transactions are converted into NO_SYNC transactions and executed 1269 * immediately, but their acknowledgments are delayed until after either 1270 * the REPLICA_GROUP_COMMIT_INTERVAL (the max amount the first transaction 1271 * in the group is delayed) has expired, or the size of the group (as 1272 * specified by REPLICA_MAX_GROUP_COMMIT) has been exceeded. 1273 */ 1274 private class GroupCommit { 1275 1276 /* Size determines max fsync commits that can be grouped. */ 1277 private final long pendingCommitAcks[]; 1278 1279 /* Number of entries currently in pendingCommitAcks */ 1280 private int nPendingAcks; 1281 1282 /* 1283 * If this time limit is reached, the group will be forced to commit. 1284 * Invariant: nPendingAcks > 0 ==> limitGroupCommitNs > 0 1285 */ 1286 private long limitGroupCommitNs = 0; 1287 1288 /* The time interval that an open group is held back. */ 1289 private final long groupCommitIntervalNs; 1290 1291 private final LongStat nGroupCommitTimeouts; 1292 private final LongStat nGroupCommitMaxExceeded; 1293 private final LongStat nGroupCommits; 1294 private final LongStat nGroupCommitTxns; 1295 GroupCommit(DbConfigManager configManager)1296 private GroupCommit(DbConfigManager configManager) { 1297 pendingCommitAcks = new long[configManager. 1298 getInt(RepParams.REPLICA_MAX_GROUP_COMMIT)]; 1299 1300 nPendingAcks = 0; 1301 1302 final long groupCommitIntervalMs = configManager. 1303 getDuration(RepParams.REPLICA_GROUP_COMMIT_INTERVAL); 1304 1305 groupCommitIntervalNs = 1306 NANOSECONDS.convert(groupCommitIntervalMs, MILLISECONDS); 1307 nGroupCommitTimeouts = 1308 new LongStat(statistics, N_GROUP_COMMIT_TIMEOUTS); 1309 1310 nGroupCommitMaxExceeded = 1311 new LongStat(statistics, N_GROUP_COMMIT_MAX_EXCEEDED); 1312 1313 nGroupCommitTxns = 1314 new LongStat(statistics, N_GROUP_COMMIT_TXNS); 1315 1316 nGroupCommits = 1317 new LongStat(statistics, N_GROUP_COMMITS); 1318 } 1319 1320 /** 1321 * Returns true if group commits are enabled at the replica. 1322 */ isEnabled()1323 private boolean isEnabled() { 1324 return pendingCommitAcks.length > 0; 1325 } 1326 1327 /** 1328 * The interval used to poll for incoming log entries. The time is 1329 * lowered from the defaultNs time, if there are pending 1330 * acknowledgments. 1331 * 1332 * @param defaultNs the default poll interval 1333 * 1334 * @return the actual poll interval 1335 */ getPollIntervalNs(long defaultNs)1336 private long getPollIntervalNs(long defaultNs) { 1337 if (nPendingAcks == 0) { 1338 return defaultNs; 1339 } 1340 final long now = System.nanoTime(); 1341 1342 final long interval = limitGroupCommitNs - now; 1343 return Math.min(interval, defaultNs); 1344 } 1345 1346 /** 1347 * Returns the sync policy to be implemented at the replica. If 1348 * group commit is active, and SYNC is requested it will return 1349 * NO_SYNC instead to delay the fsync. 1350 * 1351 * @param txnSyncPolicy the sync policy as stated in the txn 1352 * 1353 * @return the sync policy to be implemented by the replica 1354 */ getImplSyncPolicy(SyncPolicy txnSyncPolicy)1355 private SyncPolicy getImplSyncPolicy(SyncPolicy txnSyncPolicy) { 1356 return ((txnSyncPolicy == SyncPolicy.SYNC) && isEnabled()) ? 1357 SyncPolicy.NO_SYNC : txnSyncPolicy; 1358 } 1359 1360 /** 1361 * Buffers the acknowledgment if the commit calls for a sync, or if 1362 * there are pending acknowledgments to ensure that acks are sent 1363 * in order. 1364 * 1365 * @param nowNs the current time 1366 * @param ackTxn the txn associated with the ack 1367 * @param txnSyncPolicy the sync policy as request by the committing 1368 * txn 1369 * 1370 * @return true if the ack has been buffered 1371 */ bufferAck(long nowNs, ReplayTxn ackTxn, SyncPolicy txnSyncPolicy)1372 private final boolean bufferAck(long nowNs, 1373 ReplayTxn ackTxn, 1374 SyncPolicy txnSyncPolicy) 1375 throws IOException { 1376 1377 if (!isEnabled() || 1378 !((txnSyncPolicy == SyncPolicy.SYNC) || (nPendingAcks > 0))) { 1379 return false; 1380 } 1381 1382 pendingCommitAcks[nPendingAcks++] = ackTxn.getId(); 1383 1384 if (nPendingAcks == 1) { 1385 /* First txn in group, start the clock. */ 1386 limitGroupCommitNs = nowNs + groupCommitIntervalNs; 1387 } else { 1388 flushPendingAcks(nowNs); 1389 } 1390 return true; 1391 } 1392 1393 /** 1394 * Flush if there are pending acks and either the buffer limit or the 1395 * group interval has been reached. 1396 * 1397 * @param nowNs the current time (passed in to minimize system calls) 1398 */ flushPendingAcks(long nowNs)1399 private final void flushPendingAcks(long nowNs) 1400 throws IOException { 1401 1402 if ((nPendingAcks == 0) || 1403 ((nPendingAcks != pendingCommitAcks.length) && 1404 (NanoTimeUtil.compare(nowNs, limitGroupCommitNs) < 0))) { 1405 1406 return; 1407 } 1408 1409 /* Update statistics. */ 1410 nGroupCommits.increment(); 1411 nGroupCommitTxns.add(nPendingAcks); 1412 if (NanoTimeUtil.compare(nowNs, limitGroupCommitNs) >= 0) { 1413 nGroupCommitTimeouts.increment(); 1414 } else if (nPendingAcks >= pendingCommitAcks.length) { 1415 nGroupCommitMaxExceeded.increment(); 1416 } 1417 1418 /* flush log buffer and fsync to disk */ 1419 repImpl.getLogManager().flush(); 1420 1421 /* commits are on disk, send out acknowledgments on the network. */ 1422 for (int i=0; i < nPendingAcks; i++) { 1423 queueAck(pendingCommitAcks[i]); 1424 pendingCommitAcks[i] = 0; 1425 } 1426 1427 nPendingAcks = 0; 1428 limitGroupCommitNs = 0; 1429 } 1430 } 1431 1432 /** 1433 * Simple helper class to package a Txn vlsn and its associated commit 1434 * time. 1435 */ 1436 public static class TxnInfo { 1437 final VLSN txnVLSN; 1438 final long masterTxnEndTime; 1439 TxnInfo(VLSN txnVLSN, long masterTxnEndTime)1440 private TxnInfo(VLSN txnVLSN, long masterTxnEndTime) { 1441 this.txnVLSN = txnVLSN; 1442 this.masterTxnEndTime = masterTxnEndTime; 1443 } 1444 getTxnVLSN()1445 public VLSN getTxnVLSN() { 1446 return txnVLSN; 1447 } 1448 getMasterTxnEndTime()1449 public long getMasterTxnEndTime() { 1450 return masterTxnEndTime; 1451 } 1452 1453 @Override toString()1454 public String toString() { 1455 return " VLSN: " + txnVLSN + 1456 " masterTxnEndTime=" + new Date(masterTxnEndTime); 1457 } 1458 } 1459 } 1460