1 /*-
2  * See the file LICENSE for redistribution information.
3  *
4  * Copyright (c) 2002, 2014 Oracle and/or its affiliates.  All rights reserved.
5  *
6  */
7 
8 package com.sleepycat.je.rep.txn;
9 
10 import java.util.ArrayList;
11 import java.util.Collections;
12 import java.util.List;
13 import java.util.Set;
14 import java.util.logging.Logger;
15 
16 import com.sleepycat.je.CommitToken;
17 import com.sleepycat.je.DatabaseException;
18 import com.sleepycat.je.Durability.ReplicaAckPolicy;
19 import com.sleepycat.je.EnvironmentFailureException;
20 import com.sleepycat.je.LockConflictException;
21 import com.sleepycat.je.LockNotAvailableException;
22 import com.sleepycat.je.ThreadInterruptedException;
23 import com.sleepycat.je.TransactionConfig;
24 import com.sleepycat.je.dbi.DatabaseImpl;
25 import com.sleepycat.je.dbi.EnvironmentImpl;
26 import com.sleepycat.je.log.LogItem;
27 import com.sleepycat.je.log.ReplicationContext;
28 import com.sleepycat.je.rep.InsufficientAcksException;
29 import com.sleepycat.je.rep.ReplicaWriteException;
30 import com.sleepycat.je.rep.ReplicatedEnvironment;
31 import com.sleepycat.je.rep.UnknownMasterException;
32 import com.sleepycat.je.rep.impl.RepImpl;
33 import com.sleepycat.je.rep.impl.node.NameIdPair;
34 import com.sleepycat.je.rep.impl.node.Replay;
35 import com.sleepycat.je.rep.impl.node.Replica;
36 import com.sleepycat.je.txn.LockResult;
37 import com.sleepycat.je.txn.LockType;
38 import com.sleepycat.je.txn.Txn;
39 import com.sleepycat.je.txn.TxnManager;
40 import com.sleepycat.je.txn.WriteLockInfo;
41 import com.sleepycat.je.utilint.DbLsn;
42 import com.sleepycat.je.utilint.LoggerUtils;
43 import com.sleepycat.je.utilint.TestHook;
44 import com.sleepycat.je.utilint.TestHookExecute;
45 import com.sleepycat.je.utilint.VLSN;
46 
47 /**
48  * A MasterTxn represents:
49  *  - a user initiated Txn executed on the Master node, when local-write and
50  *    read-only are not configured, or
51  *  - an auto-commit Txn on the Master node for a replicated DB.
52  *
53  * This class uses the hooks defined by Txn to support the durability
54  * requirements of a replicated transaction on the Master.
55  */
56 public class MasterTxn extends Txn {
57 
58     /* Holds the commit VLSN after a successful commit. */
59     private VLSN commitVLSN = VLSN.NULL_VLSN;
60     private final NameIdPair nameIdPair;
61 
62     /* The number of acks required by this txn commit. */
63     private int requiredAckCount = -1;
64 
65     /*
66      * Used to measure replicated transaction commit performance. All deltas
67      * are measured relative to the start time, to minimize storage overhead.
68      */
69 
70     /* The time the transaction was started. */
71     private final long startMs = System.currentTimeMillis();
72 
73     /* The start relative delta time when the commit pre hook exited. */
74     private int preLogCommitEndDeltaMs = 0;
75 
76     /*
77      * The start relative delta time when the commit message was written to
78      * the rep stream.
79      */
80     private int repWriteStartDeltaMs = 0;
81 
82     /**
83      * Flag to keep track of whether this transaction has taken the read lock
84      * that protects access to the blocking latch used by Master Transfer.
85      */
86     private boolean locked;
87 
88     /**
89      * Flag to prevent any change to the txn's contents. Used in
90      * master->replica transition. Intentionally volatile, so it can be
91      * interleaved with use of the MasterTxn mutex.
92      */
93     private volatile boolean freeze;
94 
95     /* For unit testing */
96     private TestHook<Integer> convertHook;
97 
98     /* The default factory used to create MasterTxns */
99     private static final MasterTxnFactory DEFAULT_FACTORY =
100         new MasterTxnFactory() {
101 
102             @Override
103             public MasterTxn create(EnvironmentImpl envImpl,
104                                     TransactionConfig config,
105                                     NameIdPair nameIdPair) {
106                 return new MasterTxn(envImpl, config, nameIdPair);
107             }
108     };
109 
110     /* The current Txn Factory. */
111     private static MasterTxnFactory factory = DEFAULT_FACTORY;
112 
MasterTxn(EnvironmentImpl envImpl, TransactionConfig config, NameIdPair nameIdPair)113     public MasterTxn(EnvironmentImpl envImpl,
114                      TransactionConfig config,
115                      NameIdPair nameIdPair)
116         throws DatabaseException {
117 
118         super(envImpl, config, ReplicationContext.MASTER);
119         this.nameIdPair = nameIdPair;
120         assert !config.getLocalWrite();
121     }
122 
123     @Override
isLocalWrite()124     public boolean isLocalWrite() {
125         return false;
126     }
127 
128     /**
129      * Returns the transaction commit token used to identify the transaction.
130      *
131      * @see com.sleepycat.je.txn.Txn#getCommitToken()
132      */
133     @Override
getCommitToken()134     public CommitToken getCommitToken() {
135         if (commitVLSN.isNull()) {
136             return null;
137         }
138         RepImpl repImpl = (RepImpl) envImpl;
139         return new CommitToken(repImpl.getUUID(), commitVLSN.getSequence());
140     }
141 
getCommitVLSN()142     public VLSN getCommitVLSN() {
143         return commitVLSN;
144     }
145 
146     /**
147      * MasterTxns use txn ids from a reserved negative space. So override
148      * the default generation of ids.
149      */
150     @Override
generateId(TxnManager txnManager, long ignore )151     protected long generateId(TxnManager txnManager,
152                               long ignore /* mandatedId */) {
153         assert(ignore == 0);
154         return txnManager.getNextReplicatedTxnId();
155     }
156 
157     /**
158      * Causes the transaction to wait until we have sufficient replicas to
159      * acknowledge the commit.
160      */
161     @Override
162     @SuppressWarnings("unused")
txnBeginHook(TransactionConfig config)163     protected void txnBeginHook(TransactionConfig config)
164         throws DatabaseException {
165 
166         RepImpl repImpl = (RepImpl) envImpl;
167         try {
168             repImpl.txnBeginHook(this);
169         } catch (InterruptedException e) {
170             throw new ThreadInterruptedException(envImpl, e);
171         }
172     }
173 
174     @Override
preLogCommitHook()175     protected void preLogCommitHook()
176         throws DatabaseException {
177 
178         RepImpl repImpl = (RepImpl) envImpl;
179         ReplicaAckPolicy ackPolicy = getCommitDurability().getReplicaAck();
180         requiredAckCount =
181             repImpl.getRepNode().getDurabilityQuorum().
182             getCurrentRequiredAckCount(ackPolicy);
183 
184         /*
185          * TODO: An optimization we'd like to do is to identify transactions
186          * that only modify non-replicated databases, so they can avoid waiting
187          * for Replica commit acks and avoid checks like the one that requires
188          * that the node be a master before proceeding with the transaction.
189          */
190         repImpl.preLogCommitHook(this);
191         preLogCommitEndDeltaMs = (int) (System.currentTimeMillis() - startMs);
192     }
193 
194     @Override
postLogCommitHook(LogItem commitItem)195     protected void postLogCommitHook(LogItem commitItem)
196         throws DatabaseException {
197 
198         commitVLSN = commitItem.getHeader().getVLSN();
199         try {
200             RepImpl repImpl = (RepImpl) envImpl;
201             repImpl.postLogCommitHook(this);
202         } catch (InterruptedException e) {
203             throw new ThreadInterruptedException(envImpl, e);
204         }
205     }
206 
207     @Override
preLogAbortHook()208     protected void preLogAbortHook()
209         throws DatabaseException {
210 
211         RepImpl repImpl = (RepImpl) envImpl;
212         repImpl.preLogAbortHook(this);
213     }
214 
215     @Override
postLogCommitAbortHook()216     protected void postLogCommitAbortHook() {
217 
218         RepImpl repImpl = (RepImpl) envImpl;
219         repImpl.postLogCommitAbortHook(this);
220     }
221 
222     @Override
postLogAbortHook()223     protected void postLogAbortHook() {
224         RepImpl repImpl = (RepImpl)envImpl;
225         repImpl.postLogAbortHook(this);
226     }
227 
228     /**
229      * Prevent this MasterTxn from taking locks if the node becomes a
230      * replica. The application has a reference to this Txn, and may
231      * attempt to use it well after the node has transitioned from master
232      * to replica.
233      */
234     @Override
lockInternal(long lsn, LockType lockType, boolean noWait, boolean jumpAheadOfWaiters, DatabaseImpl database)235     public LockResult lockInternal(long lsn,
236                                    LockType lockType,
237                                    boolean noWait,
238                                    boolean jumpAheadOfWaiters,
239                                    DatabaseImpl database)
240         throws LockNotAvailableException, LockConflictException,
241                DatabaseException {
242         ReplicatedEnvironment.State nodeState = ((RepImpl)envImpl).getState();
243         if (nodeState.isMaster()) {
244             return super.lockInternal
245                 (lsn, lockType, noWait, jumpAheadOfWaiters, database);
246         }
247 
248         throwNotMaster(nodeState);
249         return null; /* not reached */
250     }
251 
throwNotMaster(ReplicatedEnvironment.State nodeState)252     private void throwNotMaster(ReplicatedEnvironment.State nodeState) {
253         if (nodeState.isReplica()) {
254             throw new ReplicaWriteException
255                 (this, ((RepImpl)envImpl).getStateChangeEvent());
256         }
257         throw new UnknownMasterException
258             ("Transaction " + getId() +
259              " cannot execute write operations because this node is" +
260              " no longer a master");
261     }
262 
263     /**
264      * If logging occurs before locking, we must screen out write locks here.
265      */
266     @Override
preLogWithoutLock(DatabaseImpl database)267     public synchronized void preLogWithoutLock(DatabaseImpl database) {
268         ReplicatedEnvironment.State nodeState = ((RepImpl)envImpl).getState();
269         if (nodeState.isMaster()) {
270             super.preLogWithoutLock(database);
271             return;
272         }
273 
274         throwNotMaster(nodeState);
275     }
276 
277     /**
278      * Determines whether we should lock the block-latch lock.
279      * <p>
280      * We acquire the lock during pre-log hook, and release it during post-log
281      * hook.  Specifically, there are the following cases:
282      * <ol>
283      * <li>
284      * For a normal commit, we acquire it in {@code preLogCommitHook()} and
285      * release it in {@code postLogCommitHook()}
286      * <li>
287      * For a normal abort (invoked by the application on the {@code
288      * Txn.abort()} API), we acquire the lock in {@code preLogAbortHook()} and
289      * release it in {@code postLogAbortHook()}.
290      * <li>
291      * When a commit fails in such a way as to call {@code
292      * Txn.throwPreCommitException()}, we go through the abort path as well.
293      * In this case:
294      * <ul>
295      * <li>we will of course already have called {@code preLogCommitHook()};
296      * <li>the abort path calls {@code preLogAbortHook()} and {@code
297      * postLogAbortHook()} as always;
298      * <li>finally we call {@code postLogCommitAbortHook()}
299      * </ul>
300      * Fortunately we can avoid the complexity of dealing with a second
301      * (recursive) lock acquisition here, because by the time either post-hook
302      * is called we've done any writing of VLSNs.  Thus, when we want to
303      * take the lock, we take it if it hasn't already been taken, and do
304      * nothing if it has; when releasing, we release it if we have it, and do
305      * nothing if we don't.
306      * </ol>
307      * <p>
308      * See additional javadoc at {@code RepImpl.blockLatchLock}
309      */
lockOnce()310     public boolean lockOnce() {
311         if (locked) {
312             return false;
313         }
314         locked = true;
315         return true;
316     }
317 
318     /**
319      * Determines whether we should unlock the block-latch lock.
320      *
321      * @see #lockOnce
322      */
unlockOnce()323     public boolean unlockOnce() {
324         if (locked) {
325             locked = false;
326             return true;
327         }
328         return false;
329     }
330 
getRequiredAckCount()331     public int getRequiredAckCount() {
332         return requiredAckCount;
333     }
334 
resetRequiredAckCount()335     public void resetRequiredAckCount() {
336         requiredAckCount = 0;
337     }
338 
339     /** A masterTxn always writes its own id into the commit or abort. */
340     @Override
getReplicatorNodeId()341     protected int getReplicatorNodeId() {
342         return nameIdPair.getId();
343     }
344 
getStartMs()345     public long getStartMs() {
346         return startMs;
347     }
348 
stampRepWriteTime()349     public void stampRepWriteTime() {
350         this.repWriteStartDeltaMs =
351             (int)(System.currentTimeMillis() - startMs);
352     }
353 
354     /**
355      * Returns the amount of time it took to copy the commit record from the
356      * log buffer to the rep stream. It's measured as the time interval
357      * starting with the time the preCommit hook completed, to the time the
358      * message write to the replication stream was initiated.
359      */
messageTransferMs()360     public long messageTransferMs() {
361         return repWriteStartDeltaMs > 0 ?
362 
363                 (repWriteStartDeltaMs - preLogCommitEndDeltaMs) :
364 
365                 /*
366                  * The message was invoked before the post commit hook fired.
367                  */
368                 0;
369     }
370 
371     @Override
372     protected boolean
propagatePostCommitException(DatabaseException postCommitException)373         propagatePostCommitException(DatabaseException postCommitException) {
374         return (postCommitException instanceof InsufficientAcksException) ?
375                 true :
376                 super.propagatePostCommitException(postCommitException);
377     }
378 
379     /* The Txn factory interface. */
380     public interface MasterTxnFactory  {
create(EnvironmentImpl envImpl, TransactionConfig config, NameIdPair nameIdPair)381         MasterTxn create(EnvironmentImpl envImpl,
382                          TransactionConfig config,
383                          NameIdPair nameIdPair);
384     }
385 
386     /* The method used to create user Master Txns via the factory. */
create(EnvironmentImpl envImpl, TransactionConfig config, NameIdPair nameIdPair)387     public static MasterTxn create(EnvironmentImpl envImpl,
388                                    TransactionConfig config,
389                                    NameIdPair nameIdPair) {
390         return factory.create(envImpl, config, nameIdPair);
391     }
392 
393     /**
394      * Method used for unit testing.
395      *
396      * Sets the factory to the one supplied. If the argument is null it
397      * restores the factory to the original default value.
398      */
setFactory(MasterTxnFactory factory)399     public static void setFactory(MasterTxnFactory factory) {
400         MasterTxn.factory = (factory == null) ? DEFAULT_FACTORY : factory;
401     }
402 
403     /**
404      * Convert a MasterTxn that has any write locks into a ReplayTxn, and close
405      * the MasterTxn after it is disemboweled. A MasterTxn that only has read
406      * locks is unchanged and is still usable by the application. To be clear,
407      * the application can never use a MasterTxn to obtain a lock if the node
408      * is in Replica mode, but may indeed be able to use a read-lock-only
409      * MasterTxn if the node cycles back into Master status.
410      *
411      * For converted MasterTxns, all write locks are transferred to a replay
412      * transaction, read locks are released, and the txn is closed. Used when a
413      * node is transitioning from master to replica mode without recovery,
414      * which may happen for an explicit master transfer request, or merely for
415      * a network partition/election of new
416      * master.
417      *
418      * The newly created replay transaction will need to be in the appropriate
419      * state, holding all write locks, so that the node in replica form can
420      * execute the proper syncups.  Note that the resulting replay txn will
421      * only be aborted, and will never be committed, because the txn originated
422      * on this node, which is transitioning from master -> replica.
423      *
424      * We only transfer write locks. We need not transfer read locks, because
425      * replays only operate on writes, and are never required to obtain read
426      * locks. Read locks are released though, because (a) this txn is now only
427      * abortable, and (b) although the Replay can preempt any read locks held
428      * by the MasterTxn, such preemption will add delay.
429      *
430      * @return a ReplayTxn, if there were locks in this transaction, and
431      * there's a need to create a ReplayTxn.
432      */
convertToReplayTxnAndClose(Logger logger, Replay replay)433     public ReplayTxn convertToReplayTxnAndClose(Logger logger,
434                                                 Replay replay) {
435 
436         /* Assertion */
437         if (!freeze) {
438             throw EnvironmentFailureException.unexpectedState
439                 (envImpl,
440                  "Txn " + getId() +
441                  " should be frozen when converting to replay txn");
442         }
443 
444         /*
445          * This is an important and relatively rare operation, and worth
446          * logging.
447          */
448         LoggerUtils.info(logger, envImpl,
449                          "Transforming txn " + getId() +
450                          " from MasterTxn to ReplayTxn");
451 
452         int hookCount = 0;
453         ReplayTxn replayTxn = null;
454         boolean needToClose = true;
455         try {
456             synchronized (this) {
457 
458                 if (isClosed()) {
459                     LoggerUtils.info(logger, envImpl,
460                                      "Txn " + getId() +
461                                      " is closed, no tranform needed");
462                     needToClose = false;
463                     return null;
464                 }
465 
466                 /*
467                  * Get the list of write locks, and process them in lsn order,
468                  * so we properly maintain the lastLoggedLsn and firstLoggedLSN
469                  * fields of the newly created ReplayTxn.
470                  */
471                 final Set<Long> lockedLSNs = getWriteLockIds();
472 
473                 /*
474                  * This transaction may not actually have any write locks. In
475                  * that case, we permit it to live on.
476                  */
477                 if (lockedLSNs.size() == 0) {
478                     LoggerUtils.info(logger, envImpl, "Txn " + getId() +
479                                      " had no write locks, didn't create" +
480                                      " ReplayTxn");
481                     needToClose = false;
482                     return null;
483                 }
484 
485                 /*
486                  * We have write locks. Make sure that this txn can now
487                  * only be aborted. Otherwise, there could be this window
488                  * in this method:
489                  *  t1: locks stolen, no locks left in this txn
490                  *  t2: txn unfrozen, commits and aborts possible
491                  *    -- at this point, another thread could sneak in and
492                  *    -- try to commit. The txn would commmit successfully,
493                  *    -- because a commit w/no write locks is a no-op.
494                  *    -- but that would convey the false impression that the
495                  *    -- txn's write operations had commmitted.
496                  *  t3: txn is closed
497                  */
498                 setOnlyAbortable(new UnknownMasterException
499                                  (envImpl.getName() +
500                                   " is no longer a master"));
501                 replayTxn = replay.getReplayTxn(getId(), false);
502 
503                 /*
504                  * Order the lsns, so that the locks are taken in the proper
505                  * order, and the txn's firstLoggedLsn and lastLoggedLsn fields
506                  * are properly set.
507                  */
508                 List<Long> sortedLsns = new ArrayList<Long>(lockedLSNs);
509                 Collections.sort(sortedLsns);
510                 LoggerUtils.info(logger, envImpl,
511                                  "Txn " + getId()  + " has " +
512                                  lockedLSNs.size() + " locks to transform");
513                 /*
514                  * Transfer each lock. Note that ultimately, since mastership
515                  * is changing, and replicated commits will only be executed
516                  * when a txn has originated on that node, the target ReplayTxn
517                  * can never be committed, and will only be aborted.
518                  */
519                 for (Long lsn: sortedLsns) {
520 
521                     LoggerUtils.info(logger, envImpl,
522                                      "Txn " + getId() +
523                                      " is transferring lock " + lsn);
524 
525                     /*
526                      * Use a special method to steal the lock. Another approach
527                      * might have been to have the replayTxn merely attempt a
528                      * lock(); as an importunate txn, the replayTxn would
529                      * preempt the MasterTxn's lock. However, that path doesn't
530                      * work because lock() requires having a databaseImpl in
531                      * hand, and that's not available here.
532                      */
533                     replayTxn.stealLockFromMasterTxn(lsn);
534 
535                     /*
536                      * Copy all the lock's info into the Replay and remove it
537                      * from the master. Normally, undo clears write locks, but
538                      * this MasterTxn will not be executing undo.
539                      */
540                     WriteLockInfo replayWLI = replayTxn.getWriteLockInfo(lsn);
541                     WriteLockInfo masterWLI = getWriteLockInfo(lsn);
542                     replayWLI.copyAllInfo(masterWLI);
543                     removeLock(lsn);
544                 }
545 
546                 /*
547                  * Txns have collections of undoDatabases and deletedDatabases.
548                  * Undo databases are normally incrementally added to the txn
549                  * as locks are obtained Unlike normal locking or recovery
550                  * locking, in this case we don't have a reference to the
551                  * databaseImpl that goes with this lock, so we copy the undo
552                  * collection in one fell swoop.
553                  */
554                 replayTxn.copyDatabasesForConversion(this);
555 
556                 /*
557                  * This txn is no longer responsible for databaseImpl
558                  * cleanup, as that issue now lies with the ReplayTxn, so
559                  * remove the collection.
560                  */
561                 deletedDatabases = null;
562 
563                 /*
564                  * All locks have been removed from this transaction. Clear
565                  * the firstLoggedLsn and lastLoggedLsn so there's no danger
566                  * of attempting to undo anything; this txn is no longer
567                  * responsible for any records.
568                  */
569                 lastLoggedLsn = DbLsn.NULL_LSN;
570                 firstLoggedLsn = DbLsn.NULL_LSN;
571 
572                 /* If this txn also had read locks, clear them */
573                 clearReadLocks();
574             }
575         } finally {
576 
577             assert TestHookExecute.doHookIfSet(convertHook, hookCount++);
578 
579             unfreeze();
580 
581             assert TestHookExecute.doHookIfSet(convertHook, hookCount++);
582 
583             /*
584              * We need to abort the txn, but we can't call abort() because that
585              * method checks whether we are the master! Instead, call the
586              * internal method, close(), in order to end this transaction and
587              * unregister it from the transactionManager. Must be called
588              * outside the synchronization block.
589              */
590             if (needToClose) {
591                 LoggerUtils.info(logger, envImpl, "About to close txn " +
592                                  getId() + " state=" + getState());
593                 close(false /*isCommit */);
594                 LoggerUtils.info(logger, envImpl, "Closed txn " +  getId() +
595                                  " state=" + getState());
596             }
597             assert TestHookExecute.doHookIfSet(convertHook, hookCount++);
598         }
599 
600         return replayTxn;
601     }
602 
freeze()603     public void freeze() {
604         freeze = true;
605     }
606 
unfreeze()607     private void unfreeze() {
608         freeze = false;
609     }
610 
611     /**
612      * Used to hold the transaction stable while it is being cloned as a
613      * ReplayTxn, during master->replica transitions. Essentially, there
614      * are two parties that now have a reference to this transaction -- the
615      * originating application thread, and the RepNode thread that is
616      * trying to set up internal state so it can begin to act as a replica.
617      *
618      * The transaction will throw UnknownMasterException or
619      * ReplicaWriteException if the transaction is frozen, so that the
620      * application knows that the transaction is no longer viable, but it
621      * doesn't attempt to do most of the follow-on cleanup and release of locks
622      * that failed aborts and commits normally attempt. One aspect of
623      * transaction cleanup can't be skipped though. It is necessary to do the
624      * post log hooks to free up the block txn latch lock so that the
625      * transaction can be closed by the RepNode thread. For example:
626      * - application thread starts transaction
627      * - application takes the block txn latch lock and attempts commit or
628      * abort, but is stopped because the txn is frozen by master transfer.
629      * - the application must release the block txn latch lock.
630      * @see Replica#replicaTransitionCleanup
631      */
632     @Override
checkIfFrozen(boolean isCommit)633     protected void checkIfFrozen(boolean isCommit) {
634         if (freeze) {
635             try {
636                 ((RepImpl) envImpl).checkIfMaster(this);
637             } catch (DatabaseException e) {
638                 if (isCommit) {
639                     postLogCommitAbortHook();
640                 } else {
641                     postLogAbortHook();
642                 }
643                 throw e;
644             }
645         }
646     }
647 
648     /* For unit testing */
setConvertHook(TestHook<Integer> hook)649     public void setConvertHook(TestHook<Integer> hook) {
650         convertHook = hook;
651     }
652 
653     @Override
isMasterTxn()654     public boolean isMasterTxn() {
655         return true;
656     }
657 }
658