1 /*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2002, 2014 Oracle and/or its affiliates. All rights reserved. 5 * 6 */ 7 8 package com.sleepycat.je.rep.txn; 9 10 import java.util.ArrayList; 11 import java.util.Collections; 12 import java.util.List; 13 import java.util.Set; 14 import java.util.logging.Logger; 15 16 import com.sleepycat.je.CommitToken; 17 import com.sleepycat.je.DatabaseException; 18 import com.sleepycat.je.Durability.ReplicaAckPolicy; 19 import com.sleepycat.je.EnvironmentFailureException; 20 import com.sleepycat.je.LockConflictException; 21 import com.sleepycat.je.LockNotAvailableException; 22 import com.sleepycat.je.ThreadInterruptedException; 23 import com.sleepycat.je.TransactionConfig; 24 import com.sleepycat.je.dbi.DatabaseImpl; 25 import com.sleepycat.je.dbi.EnvironmentImpl; 26 import com.sleepycat.je.log.LogItem; 27 import com.sleepycat.je.log.ReplicationContext; 28 import com.sleepycat.je.rep.InsufficientAcksException; 29 import com.sleepycat.je.rep.ReplicaWriteException; 30 import com.sleepycat.je.rep.ReplicatedEnvironment; 31 import com.sleepycat.je.rep.UnknownMasterException; 32 import com.sleepycat.je.rep.impl.RepImpl; 33 import com.sleepycat.je.rep.impl.node.NameIdPair; 34 import com.sleepycat.je.rep.impl.node.Replay; 35 import com.sleepycat.je.rep.impl.node.Replica; 36 import com.sleepycat.je.txn.LockResult; 37 import com.sleepycat.je.txn.LockType; 38 import com.sleepycat.je.txn.Txn; 39 import com.sleepycat.je.txn.TxnManager; 40 import com.sleepycat.je.txn.WriteLockInfo; 41 import com.sleepycat.je.utilint.DbLsn; 42 import com.sleepycat.je.utilint.LoggerUtils; 43 import com.sleepycat.je.utilint.TestHook; 44 import com.sleepycat.je.utilint.TestHookExecute; 45 import com.sleepycat.je.utilint.VLSN; 46 47 /** 48 * A MasterTxn represents: 49 * - a user initiated Txn executed on the Master node, when local-write and 50 * read-only are not configured, or 51 * - an auto-commit Txn on the Master node for a replicated DB. 52 * 53 * This class uses the hooks defined by Txn to support the durability 54 * requirements of a replicated transaction on the Master. 55 */ 56 public class MasterTxn extends Txn { 57 58 /* Holds the commit VLSN after a successful commit. */ 59 private VLSN commitVLSN = VLSN.NULL_VLSN; 60 private final NameIdPair nameIdPair; 61 62 /* The number of acks required by this txn commit. */ 63 private int requiredAckCount = -1; 64 65 /* 66 * Used to measure replicated transaction commit performance. All deltas 67 * are measured relative to the start time, to minimize storage overhead. 68 */ 69 70 /* The time the transaction was started. */ 71 private final long startMs = System.currentTimeMillis(); 72 73 /* The start relative delta time when the commit pre hook exited. */ 74 private int preLogCommitEndDeltaMs = 0; 75 76 /* 77 * The start relative delta time when the commit message was written to 78 * the rep stream. 79 */ 80 private int repWriteStartDeltaMs = 0; 81 82 /** 83 * Flag to keep track of whether this transaction has taken the read lock 84 * that protects access to the blocking latch used by Master Transfer. 85 */ 86 private boolean locked; 87 88 /** 89 * Flag to prevent any change to the txn's contents. Used in 90 * master->replica transition. Intentionally volatile, so it can be 91 * interleaved with use of the MasterTxn mutex. 92 */ 93 private volatile boolean freeze; 94 95 /* For unit testing */ 96 private TestHook<Integer> convertHook; 97 98 /* The default factory used to create MasterTxns */ 99 private static final MasterTxnFactory DEFAULT_FACTORY = 100 new MasterTxnFactory() { 101 102 @Override 103 public MasterTxn create(EnvironmentImpl envImpl, 104 TransactionConfig config, 105 NameIdPair nameIdPair) { 106 return new MasterTxn(envImpl, config, nameIdPair); 107 } 108 }; 109 110 /* The current Txn Factory. */ 111 private static MasterTxnFactory factory = DEFAULT_FACTORY; 112 MasterTxn(EnvironmentImpl envImpl, TransactionConfig config, NameIdPair nameIdPair)113 public MasterTxn(EnvironmentImpl envImpl, 114 TransactionConfig config, 115 NameIdPair nameIdPair) 116 throws DatabaseException { 117 118 super(envImpl, config, ReplicationContext.MASTER); 119 this.nameIdPair = nameIdPair; 120 assert !config.getLocalWrite(); 121 } 122 123 @Override isLocalWrite()124 public boolean isLocalWrite() { 125 return false; 126 } 127 128 /** 129 * Returns the transaction commit token used to identify the transaction. 130 * 131 * @see com.sleepycat.je.txn.Txn#getCommitToken() 132 */ 133 @Override getCommitToken()134 public CommitToken getCommitToken() { 135 if (commitVLSN.isNull()) { 136 return null; 137 } 138 RepImpl repImpl = (RepImpl) envImpl; 139 return new CommitToken(repImpl.getUUID(), commitVLSN.getSequence()); 140 } 141 getCommitVLSN()142 public VLSN getCommitVLSN() { 143 return commitVLSN; 144 } 145 146 /** 147 * MasterTxns use txn ids from a reserved negative space. So override 148 * the default generation of ids. 149 */ 150 @Override generateId(TxnManager txnManager, long ignore )151 protected long generateId(TxnManager txnManager, 152 long ignore /* mandatedId */) { 153 assert(ignore == 0); 154 return txnManager.getNextReplicatedTxnId(); 155 } 156 157 /** 158 * Causes the transaction to wait until we have sufficient replicas to 159 * acknowledge the commit. 160 */ 161 @Override 162 @SuppressWarnings("unused") txnBeginHook(TransactionConfig config)163 protected void txnBeginHook(TransactionConfig config) 164 throws DatabaseException { 165 166 RepImpl repImpl = (RepImpl) envImpl; 167 try { 168 repImpl.txnBeginHook(this); 169 } catch (InterruptedException e) { 170 throw new ThreadInterruptedException(envImpl, e); 171 } 172 } 173 174 @Override preLogCommitHook()175 protected void preLogCommitHook() 176 throws DatabaseException { 177 178 RepImpl repImpl = (RepImpl) envImpl; 179 ReplicaAckPolicy ackPolicy = getCommitDurability().getReplicaAck(); 180 requiredAckCount = 181 repImpl.getRepNode().getDurabilityQuorum(). 182 getCurrentRequiredAckCount(ackPolicy); 183 184 /* 185 * TODO: An optimization we'd like to do is to identify transactions 186 * that only modify non-replicated databases, so they can avoid waiting 187 * for Replica commit acks and avoid checks like the one that requires 188 * that the node be a master before proceeding with the transaction. 189 */ 190 repImpl.preLogCommitHook(this); 191 preLogCommitEndDeltaMs = (int) (System.currentTimeMillis() - startMs); 192 } 193 194 @Override postLogCommitHook(LogItem commitItem)195 protected void postLogCommitHook(LogItem commitItem) 196 throws DatabaseException { 197 198 commitVLSN = commitItem.getHeader().getVLSN(); 199 try { 200 RepImpl repImpl = (RepImpl) envImpl; 201 repImpl.postLogCommitHook(this); 202 } catch (InterruptedException e) { 203 throw new ThreadInterruptedException(envImpl, e); 204 } 205 } 206 207 @Override preLogAbortHook()208 protected void preLogAbortHook() 209 throws DatabaseException { 210 211 RepImpl repImpl = (RepImpl) envImpl; 212 repImpl.preLogAbortHook(this); 213 } 214 215 @Override postLogCommitAbortHook()216 protected void postLogCommitAbortHook() { 217 218 RepImpl repImpl = (RepImpl) envImpl; 219 repImpl.postLogCommitAbortHook(this); 220 } 221 222 @Override postLogAbortHook()223 protected void postLogAbortHook() { 224 RepImpl repImpl = (RepImpl)envImpl; 225 repImpl.postLogAbortHook(this); 226 } 227 228 /** 229 * Prevent this MasterTxn from taking locks if the node becomes a 230 * replica. The application has a reference to this Txn, and may 231 * attempt to use it well after the node has transitioned from master 232 * to replica. 233 */ 234 @Override lockInternal(long lsn, LockType lockType, boolean noWait, boolean jumpAheadOfWaiters, DatabaseImpl database)235 public LockResult lockInternal(long lsn, 236 LockType lockType, 237 boolean noWait, 238 boolean jumpAheadOfWaiters, 239 DatabaseImpl database) 240 throws LockNotAvailableException, LockConflictException, 241 DatabaseException { 242 ReplicatedEnvironment.State nodeState = ((RepImpl)envImpl).getState(); 243 if (nodeState.isMaster()) { 244 return super.lockInternal 245 (lsn, lockType, noWait, jumpAheadOfWaiters, database); 246 } 247 248 throwNotMaster(nodeState); 249 return null; /* not reached */ 250 } 251 throwNotMaster(ReplicatedEnvironment.State nodeState)252 private void throwNotMaster(ReplicatedEnvironment.State nodeState) { 253 if (nodeState.isReplica()) { 254 throw new ReplicaWriteException 255 (this, ((RepImpl)envImpl).getStateChangeEvent()); 256 } 257 throw new UnknownMasterException 258 ("Transaction " + getId() + 259 " cannot execute write operations because this node is" + 260 " no longer a master"); 261 } 262 263 /** 264 * If logging occurs before locking, we must screen out write locks here. 265 */ 266 @Override preLogWithoutLock(DatabaseImpl database)267 public synchronized void preLogWithoutLock(DatabaseImpl database) { 268 ReplicatedEnvironment.State nodeState = ((RepImpl)envImpl).getState(); 269 if (nodeState.isMaster()) { 270 super.preLogWithoutLock(database); 271 return; 272 } 273 274 throwNotMaster(nodeState); 275 } 276 277 /** 278 * Determines whether we should lock the block-latch lock. 279 * <p> 280 * We acquire the lock during pre-log hook, and release it during post-log 281 * hook. Specifically, there are the following cases: 282 * <ol> 283 * <li> 284 * For a normal commit, we acquire it in {@code preLogCommitHook()} and 285 * release it in {@code postLogCommitHook()} 286 * <li> 287 * For a normal abort (invoked by the application on the {@code 288 * Txn.abort()} API), we acquire the lock in {@code preLogAbortHook()} and 289 * release it in {@code postLogAbortHook()}. 290 * <li> 291 * When a commit fails in such a way as to call {@code 292 * Txn.throwPreCommitException()}, we go through the abort path as well. 293 * In this case: 294 * <ul> 295 * <li>we will of course already have called {@code preLogCommitHook()}; 296 * <li>the abort path calls {@code preLogAbortHook()} and {@code 297 * postLogAbortHook()} as always; 298 * <li>finally we call {@code postLogCommitAbortHook()} 299 * </ul> 300 * Fortunately we can avoid the complexity of dealing with a second 301 * (recursive) lock acquisition here, because by the time either post-hook 302 * is called we've done any writing of VLSNs. Thus, when we want to 303 * take the lock, we take it if it hasn't already been taken, and do 304 * nothing if it has; when releasing, we release it if we have it, and do 305 * nothing if we don't. 306 * </ol> 307 * <p> 308 * See additional javadoc at {@code RepImpl.blockLatchLock} 309 */ lockOnce()310 public boolean lockOnce() { 311 if (locked) { 312 return false; 313 } 314 locked = true; 315 return true; 316 } 317 318 /** 319 * Determines whether we should unlock the block-latch lock. 320 * 321 * @see #lockOnce 322 */ unlockOnce()323 public boolean unlockOnce() { 324 if (locked) { 325 locked = false; 326 return true; 327 } 328 return false; 329 } 330 getRequiredAckCount()331 public int getRequiredAckCount() { 332 return requiredAckCount; 333 } 334 resetRequiredAckCount()335 public void resetRequiredAckCount() { 336 requiredAckCount = 0; 337 } 338 339 /** A masterTxn always writes its own id into the commit or abort. */ 340 @Override getReplicatorNodeId()341 protected int getReplicatorNodeId() { 342 return nameIdPair.getId(); 343 } 344 getStartMs()345 public long getStartMs() { 346 return startMs; 347 } 348 stampRepWriteTime()349 public void stampRepWriteTime() { 350 this.repWriteStartDeltaMs = 351 (int)(System.currentTimeMillis() - startMs); 352 } 353 354 /** 355 * Returns the amount of time it took to copy the commit record from the 356 * log buffer to the rep stream. It's measured as the time interval 357 * starting with the time the preCommit hook completed, to the time the 358 * message write to the replication stream was initiated. 359 */ messageTransferMs()360 public long messageTransferMs() { 361 return repWriteStartDeltaMs > 0 ? 362 363 (repWriteStartDeltaMs - preLogCommitEndDeltaMs) : 364 365 /* 366 * The message was invoked before the post commit hook fired. 367 */ 368 0; 369 } 370 371 @Override 372 protected boolean propagatePostCommitException(DatabaseException postCommitException)373 propagatePostCommitException(DatabaseException postCommitException) { 374 return (postCommitException instanceof InsufficientAcksException) ? 375 true : 376 super.propagatePostCommitException(postCommitException); 377 } 378 379 /* The Txn factory interface. */ 380 public interface MasterTxnFactory { create(EnvironmentImpl envImpl, TransactionConfig config, NameIdPair nameIdPair)381 MasterTxn create(EnvironmentImpl envImpl, 382 TransactionConfig config, 383 NameIdPair nameIdPair); 384 } 385 386 /* The method used to create user Master Txns via the factory. */ create(EnvironmentImpl envImpl, TransactionConfig config, NameIdPair nameIdPair)387 public static MasterTxn create(EnvironmentImpl envImpl, 388 TransactionConfig config, 389 NameIdPair nameIdPair) { 390 return factory.create(envImpl, config, nameIdPair); 391 } 392 393 /** 394 * Method used for unit testing. 395 * 396 * Sets the factory to the one supplied. If the argument is null it 397 * restores the factory to the original default value. 398 */ setFactory(MasterTxnFactory factory)399 public static void setFactory(MasterTxnFactory factory) { 400 MasterTxn.factory = (factory == null) ? DEFAULT_FACTORY : factory; 401 } 402 403 /** 404 * Convert a MasterTxn that has any write locks into a ReplayTxn, and close 405 * the MasterTxn after it is disemboweled. A MasterTxn that only has read 406 * locks is unchanged and is still usable by the application. To be clear, 407 * the application can never use a MasterTxn to obtain a lock if the node 408 * is in Replica mode, but may indeed be able to use a read-lock-only 409 * MasterTxn if the node cycles back into Master status. 410 * 411 * For converted MasterTxns, all write locks are transferred to a replay 412 * transaction, read locks are released, and the txn is closed. Used when a 413 * node is transitioning from master to replica mode without recovery, 414 * which may happen for an explicit master transfer request, or merely for 415 * a network partition/election of new 416 * master. 417 * 418 * The newly created replay transaction will need to be in the appropriate 419 * state, holding all write locks, so that the node in replica form can 420 * execute the proper syncups. Note that the resulting replay txn will 421 * only be aborted, and will never be committed, because the txn originated 422 * on this node, which is transitioning from master -> replica. 423 * 424 * We only transfer write locks. We need not transfer read locks, because 425 * replays only operate on writes, and are never required to obtain read 426 * locks. Read locks are released though, because (a) this txn is now only 427 * abortable, and (b) although the Replay can preempt any read locks held 428 * by the MasterTxn, such preemption will add delay. 429 * 430 * @return a ReplayTxn, if there were locks in this transaction, and 431 * there's a need to create a ReplayTxn. 432 */ convertToReplayTxnAndClose(Logger logger, Replay replay)433 public ReplayTxn convertToReplayTxnAndClose(Logger logger, 434 Replay replay) { 435 436 /* Assertion */ 437 if (!freeze) { 438 throw EnvironmentFailureException.unexpectedState 439 (envImpl, 440 "Txn " + getId() + 441 " should be frozen when converting to replay txn"); 442 } 443 444 /* 445 * This is an important and relatively rare operation, and worth 446 * logging. 447 */ 448 LoggerUtils.info(logger, envImpl, 449 "Transforming txn " + getId() + 450 " from MasterTxn to ReplayTxn"); 451 452 int hookCount = 0; 453 ReplayTxn replayTxn = null; 454 boolean needToClose = true; 455 try { 456 synchronized (this) { 457 458 if (isClosed()) { 459 LoggerUtils.info(logger, envImpl, 460 "Txn " + getId() + 461 " is closed, no tranform needed"); 462 needToClose = false; 463 return null; 464 } 465 466 /* 467 * Get the list of write locks, and process them in lsn order, 468 * so we properly maintain the lastLoggedLsn and firstLoggedLSN 469 * fields of the newly created ReplayTxn. 470 */ 471 final Set<Long> lockedLSNs = getWriteLockIds(); 472 473 /* 474 * This transaction may not actually have any write locks. In 475 * that case, we permit it to live on. 476 */ 477 if (lockedLSNs.size() == 0) { 478 LoggerUtils.info(logger, envImpl, "Txn " + getId() + 479 " had no write locks, didn't create" + 480 " ReplayTxn"); 481 needToClose = false; 482 return null; 483 } 484 485 /* 486 * We have write locks. Make sure that this txn can now 487 * only be aborted. Otherwise, there could be this window 488 * in this method: 489 * t1: locks stolen, no locks left in this txn 490 * t2: txn unfrozen, commits and aborts possible 491 * -- at this point, another thread could sneak in and 492 * -- try to commit. The txn would commmit successfully, 493 * -- because a commit w/no write locks is a no-op. 494 * -- but that would convey the false impression that the 495 * -- txn's write operations had commmitted. 496 * t3: txn is closed 497 */ 498 setOnlyAbortable(new UnknownMasterException 499 (envImpl.getName() + 500 " is no longer a master")); 501 replayTxn = replay.getReplayTxn(getId(), false); 502 503 /* 504 * Order the lsns, so that the locks are taken in the proper 505 * order, and the txn's firstLoggedLsn and lastLoggedLsn fields 506 * are properly set. 507 */ 508 List<Long> sortedLsns = new ArrayList<Long>(lockedLSNs); 509 Collections.sort(sortedLsns); 510 LoggerUtils.info(logger, envImpl, 511 "Txn " + getId() + " has " + 512 lockedLSNs.size() + " locks to transform"); 513 /* 514 * Transfer each lock. Note that ultimately, since mastership 515 * is changing, and replicated commits will only be executed 516 * when a txn has originated on that node, the target ReplayTxn 517 * can never be committed, and will only be aborted. 518 */ 519 for (Long lsn: sortedLsns) { 520 521 LoggerUtils.info(logger, envImpl, 522 "Txn " + getId() + 523 " is transferring lock " + lsn); 524 525 /* 526 * Use a special method to steal the lock. Another approach 527 * might have been to have the replayTxn merely attempt a 528 * lock(); as an importunate txn, the replayTxn would 529 * preempt the MasterTxn's lock. However, that path doesn't 530 * work because lock() requires having a databaseImpl in 531 * hand, and that's not available here. 532 */ 533 replayTxn.stealLockFromMasterTxn(lsn); 534 535 /* 536 * Copy all the lock's info into the Replay and remove it 537 * from the master. Normally, undo clears write locks, but 538 * this MasterTxn will not be executing undo. 539 */ 540 WriteLockInfo replayWLI = replayTxn.getWriteLockInfo(lsn); 541 WriteLockInfo masterWLI = getWriteLockInfo(lsn); 542 replayWLI.copyAllInfo(masterWLI); 543 removeLock(lsn); 544 } 545 546 /* 547 * Txns have collections of undoDatabases and deletedDatabases. 548 * Undo databases are normally incrementally added to the txn 549 * as locks are obtained Unlike normal locking or recovery 550 * locking, in this case we don't have a reference to the 551 * databaseImpl that goes with this lock, so we copy the undo 552 * collection in one fell swoop. 553 */ 554 replayTxn.copyDatabasesForConversion(this); 555 556 /* 557 * This txn is no longer responsible for databaseImpl 558 * cleanup, as that issue now lies with the ReplayTxn, so 559 * remove the collection. 560 */ 561 deletedDatabases = null; 562 563 /* 564 * All locks have been removed from this transaction. Clear 565 * the firstLoggedLsn and lastLoggedLsn so there's no danger 566 * of attempting to undo anything; this txn is no longer 567 * responsible for any records. 568 */ 569 lastLoggedLsn = DbLsn.NULL_LSN; 570 firstLoggedLsn = DbLsn.NULL_LSN; 571 572 /* If this txn also had read locks, clear them */ 573 clearReadLocks(); 574 } 575 } finally { 576 577 assert TestHookExecute.doHookIfSet(convertHook, hookCount++); 578 579 unfreeze(); 580 581 assert TestHookExecute.doHookIfSet(convertHook, hookCount++); 582 583 /* 584 * We need to abort the txn, but we can't call abort() because that 585 * method checks whether we are the master! Instead, call the 586 * internal method, close(), in order to end this transaction and 587 * unregister it from the transactionManager. Must be called 588 * outside the synchronization block. 589 */ 590 if (needToClose) { 591 LoggerUtils.info(logger, envImpl, "About to close txn " + 592 getId() + " state=" + getState()); 593 close(false /*isCommit */); 594 LoggerUtils.info(logger, envImpl, "Closed txn " + getId() + 595 " state=" + getState()); 596 } 597 assert TestHookExecute.doHookIfSet(convertHook, hookCount++); 598 } 599 600 return replayTxn; 601 } 602 freeze()603 public void freeze() { 604 freeze = true; 605 } 606 unfreeze()607 private void unfreeze() { 608 freeze = false; 609 } 610 611 /** 612 * Used to hold the transaction stable while it is being cloned as a 613 * ReplayTxn, during master->replica transitions. Essentially, there 614 * are two parties that now have a reference to this transaction -- the 615 * originating application thread, and the RepNode thread that is 616 * trying to set up internal state so it can begin to act as a replica. 617 * 618 * The transaction will throw UnknownMasterException or 619 * ReplicaWriteException if the transaction is frozen, so that the 620 * application knows that the transaction is no longer viable, but it 621 * doesn't attempt to do most of the follow-on cleanup and release of locks 622 * that failed aborts and commits normally attempt. One aspect of 623 * transaction cleanup can't be skipped though. It is necessary to do the 624 * post log hooks to free up the block txn latch lock so that the 625 * transaction can be closed by the RepNode thread. For example: 626 * - application thread starts transaction 627 * - application takes the block txn latch lock and attempts commit or 628 * abort, but is stopped because the txn is frozen by master transfer. 629 * - the application must release the block txn latch lock. 630 * @see Replica#replicaTransitionCleanup 631 */ 632 @Override checkIfFrozen(boolean isCommit)633 protected void checkIfFrozen(boolean isCommit) { 634 if (freeze) { 635 try { 636 ((RepImpl) envImpl).checkIfMaster(this); 637 } catch (DatabaseException e) { 638 if (isCommit) { 639 postLogCommitAbortHook(); 640 } else { 641 postLogAbortHook(); 642 } 643 throw e; 644 } 645 } 646 } 647 648 /* For unit testing */ setConvertHook(TestHook<Integer> hook)649 public void setConvertHook(TestHook<Integer> hook) { 650 convertHook = hook; 651 } 652 653 @Override isMasterTxn()654 public boolean isMasterTxn() { 655 return true; 656 } 657 } 658