1 /*-
2  * See the file LICENSE for redistribution information.
3  *
4  * Copyright (c) 2002, 2014 Oracle and/or its affiliates.  All rights reserved.
5  *
6  */
7 
8 package com.sleepycat.je.txn;
9 
10 import static com.sleepycat.je.dbi.TxnStatDefinition.TXN_ABORTS;
11 import static com.sleepycat.je.dbi.TxnStatDefinition.TXN_ACTIVE;
12 import static com.sleepycat.je.dbi.TxnStatDefinition.TXN_ACTIVE_TXNS;
13 import static com.sleepycat.je.dbi.TxnStatDefinition.TXN_BEGINS;
14 import static com.sleepycat.je.dbi.TxnStatDefinition.TXN_COMMITS;
15 import static com.sleepycat.je.dbi.TxnStatDefinition.TXN_XAABORTS;
16 import static com.sleepycat.je.dbi.TxnStatDefinition.TXN_XACOMMITS;
17 import static com.sleepycat.je.dbi.TxnStatDefinition.TXN_XAPREPARES;
18 
19 import java.util.Collections;
20 import java.util.HashMap;
21 import java.util.HashSet;
22 import java.util.Iterator;
23 import java.util.Map;
24 import java.util.Set;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.atomic.AtomicInteger;
27 import java.util.concurrent.atomic.AtomicLong;
28 
29 import javax.transaction.xa.Xid;
30 
31 import com.sleepycat.je.DatabaseException;
32 import com.sleepycat.je.LockStats;
33 import com.sleepycat.je.StatsConfig;
34 import com.sleepycat.je.Transaction;
35 import com.sleepycat.je.TransactionConfig;
36 import com.sleepycat.je.TransactionStats;
37 import com.sleepycat.je.dbi.EnvironmentImpl;
38 import com.sleepycat.je.dbi.MemoryBudget;
39 import com.sleepycat.je.latch.LatchFactory;
40 import com.sleepycat.je.latch.SharedLatch;
41 import com.sleepycat.je.utilint.ActiveTxnArrayStat;
42 import com.sleepycat.je.utilint.DbLsn;
43 import com.sleepycat.je.utilint.IntStat;
44 import com.sleepycat.je.utilint.LongStat;
45 import com.sleepycat.je.utilint.StatGroup;
46 
47 /**
48  * Class to manage transactions.  Basically a Set of all transactions with add
49  * and remove methods and a latch around the set.
50  */
51 public class TxnManager {
52 
53     /*
54      * All NullTxns share the same id so as not to eat from the id number
55      * space.
56      *
57      * Negative transaction ids are used by the master node of a replication
58      * group. That sequence begins at -10 to avoid conflict with the
59      * NULL_TXN_ID and leave room for other special purpose ids.
60      */
61     static final long NULL_TXN_ID = -1;
62     private static final long FIRST_NEGATIVE_ID = -10;
63     private LockManager lockManager;
64     private final EnvironmentImpl envImpl;
65     private final SharedLatch allTxnsLatch;
66     private final Map<Txn, Txn> allTxns;
67 
68     /* Maps Xids to Txns. */
69     private final Map<Xid, Txn> allXATxns;
70 
71     /* Maps Threads to Txns when there are thread implied transactions. */
72     private final Map<Thread, Transaction> thread2Txn;
73 
74     /*
75      * Positive and negative transaction ids are used in a replicated system,
76      * to let replicated transactions intermingle with local transactions.
77      */
78     private final AtomicLong lastUsedLocalTxnId;
79     private final AtomicLong lastUsedReplicatedTxnId;
80     private final AtomicInteger nActiveSerializable;
81 
82     /* Locker Stats */
83     private StatGroup stats;
84     private IntStat nActive;
85     private LongStat numBegins;
86     private LongStat numCommits;
87     private LongStat numAborts;
88     private LongStat numXAPrepares;
89     private LongStat numXACommits;
90     private LongStat numXAAborts;
91     private ActiveTxnArrayStat activeTxns;
92 
TxnManager(EnvironmentImpl envImpl)93     public TxnManager(EnvironmentImpl envImpl) {
94         lockManager = new SyncedLockManager(envImpl);
95 
96         if (envImpl.isNoLocking()) {
97             lockManager = new DummyLockManager(envImpl, lockManager);
98         }
99 
100         this.envImpl = envImpl;
101         allTxnsLatch = LatchFactory.createSharedLatch(
102             envImpl, "TxnManager.allTxns", false /*exclusiveOnly*/);
103         allTxns = new ConcurrentHashMap<Txn, Txn>();
104         allXATxns = Collections.synchronizedMap(new HashMap<Xid, Txn>());
105         thread2Txn = new ConcurrentHashMap<Thread, Transaction>();
106 
107         lastUsedLocalTxnId = new AtomicLong(0);
108         lastUsedReplicatedTxnId = new AtomicLong(FIRST_NEGATIVE_ID);
109         nActiveSerializable = new AtomicInteger(0);
110 
111         /* Do the stats definition. */
112         stats = new StatGroup("Transaction", "Transaction statistics");
113         nActive = new IntStat(stats, TXN_ACTIVE);
114         numBegins = new LongStat(stats, TXN_BEGINS);
115         numCommits = new LongStat(stats, TXN_COMMITS);
116         numAborts = new LongStat(stats, TXN_ABORTS);
117         numXAPrepares = new LongStat(stats, TXN_XAPREPARES);
118         numXACommits = new LongStat(stats, TXN_XACOMMITS);
119         numXAAborts = new LongStat(stats, TXN_XAABORTS);
120         activeTxns = new ActiveTxnArrayStat(stats, TXN_ACTIVE_TXNS);
121     }
122 
123     /**
124      * Set the txn id sequence.
125      */
setLastTxnId(long lastReplicatedTxnId, long lastLocalId)126     public void setLastTxnId(long lastReplicatedTxnId, long lastLocalId) {
127         lastUsedReplicatedTxnId.set(lastReplicatedTxnId);
128         lastUsedLocalTxnId.set(lastLocalId);
129     }
130 
131     /**
132      * Get the last used id, for checkpoint info.
133      */
getLastLocalTxnId()134     public long getLastLocalTxnId() {
135         return lastUsedLocalTxnId.get();
136     }
137 
getLastReplicatedTxnId()138     public long getLastReplicatedTxnId() {
139         return lastUsedReplicatedTxnId.get();
140     }
141 
getNextReplicatedTxnId()142     public long getNextReplicatedTxnId() {
143         return lastUsedReplicatedTxnId.decrementAndGet();
144     }
145 
146     /* @return true if this id is for a replicated txn. */
isReplicatedTxn(long txnId)147     public static boolean isReplicatedTxn(long txnId) {
148         return (txnId <= FIRST_NEGATIVE_ID);
149     }
150 
151     /**
152      * Get the next transaction id for a non-replicated transaction. Note
153      * than in the future, a replicated node could conceivable issue an
154      * application level, non-replicated transaction.
155      */
getNextTxnId()156     long getNextTxnId() {
157         return lastUsedLocalTxnId.incrementAndGet();
158     }
159 
160     /*
161      * Tracks the lowest replicated transaction id used during a replay of the
162      * replication stream, so that it's available as the starting point if this
163      * replica transitions to being the master.
164      */
updateFromReplay(long replayTxnId)165     public void updateFromReplay(long replayTxnId) {
166         assert !envImpl.isMaster();
167         assert replayTxnId < 0 :
168             "replay txn id is unexpectedly positive " + replayTxnId;
169 
170         if (replayTxnId < lastUsedReplicatedTxnId.get()) {
171             lastUsedReplicatedTxnId.set(replayTxnId);
172         }
173     }
174 
175     /**
176      * Create a new transaction.
177      * @param parent for nested transactions, not yet supported
178      * @param txnConfig specifies txn attributes
179      * @return the new txn
180      */
181     public Txn txnBegin(Transaction parent, TransactionConfig txnConfig)
182         throws DatabaseException {
183 
184         return Txn.createUserTxn(envImpl, txnConfig);
185     }
186 
187     /**
188      * Give transactions and environment access to lock manager.
189      */
190     public LockManager getLockManager() {
191         return lockManager;
192     }
193 
194     /**
195      * Called when txn is created.
196      */
197     public void registerTxn(Txn txn) {
198         allTxnsLatch.acquireShared();
199         try {
200             allTxns.put(txn, txn);
201             if (txn.isSerializableIsolation()) {
202                 nActiveSerializable.incrementAndGet();
203             }
204             numBegins.increment();
205         } finally {
206             allTxnsLatch.release();
207         }
208     }
209 
210     /**
211      * Called when txn ends.
212      */
213     void unRegisterTxn(Txn txn, boolean isCommit) {
214         allTxnsLatch.acquireShared();
215         try {
216             allTxns.remove(txn);
217 
218             /* Remove any accumulated MemoryBudget delta for the Txn. */
219             envImpl.getMemoryBudget().
220                 updateTxnMemoryUsage(0 - txn.getBudgetedMemorySize());
221             if (isCommit) {
222                 numCommits.increment();
223             } else {
224                 numAborts.increment();
225             }
226             if (txn.isSerializableIsolation()) {
227                 nActiveSerializable.decrementAndGet();
228             }
229         } finally {
230             allTxnsLatch.release();
231         }
232     }
233 
234     /**
235      * Called when txn is created.
236      */
237     public void registerXATxn(Xid xid, Txn txn, boolean isPrepare) {
238         if (!allXATxns.containsKey(xid)) {
239             allXATxns.put(xid, txn);
240             envImpl.getMemoryBudget().updateTxnMemoryUsage
241                 (MemoryBudget.HASHMAP_ENTRY_OVERHEAD);
242         }
243 
244         if (isPrepare) {
245             numXAPrepares.increment();
246         }
247     }
248 
249     /**
250      * Called when XATransaction is prepared.
251      */
252     public void notePrepare() {
253         numXAPrepares.increment();
254     }
255 
256     /**
257      * Called when txn ends.
258      *
259      * @throws IllegalStateException via XAResource
260      */
261     void unRegisterXATxn(Xid xid, boolean isCommit)
262         throws DatabaseException {
263 
264         if (allXATxns.remove(xid) == null) {
265             throw new IllegalStateException
266                 ("XA Transaction " + xid + " is not registered.");
267         }
268         envImpl.getMemoryBudget().updateTxnMemoryUsage
269             (0 - MemoryBudget.HASHMAP_ENTRY_OVERHEAD);
270         if (isCommit) {
271             numXACommits.increment();
272         } else {
273             numXAAborts.increment();
274         }
275     }
276 
277     /**
278      * Retrieve a Txn object from an Xid.
279      */
280     public Txn getTxnFromXid(Xid xid) {
281         return allXATxns.get(xid);
282     }
283 
284     /**
285      * Called when txn is assoc'd with this thread.
286      */
287     public void setTxnForThread(Transaction txn) {
288 
289         Thread curThread = Thread.currentThread();
290         if (txn == null) {
291             unsetTxnForThread();
292         } else {
293             thread2Txn.put(curThread, txn);
294         }
295     }
296 
297     /**
298      * Called when txn is assoc'd with this thread.
299      */
300     public Transaction unsetTxnForThread() {
301         Thread curThread = Thread.currentThread();
302         return thread2Txn.remove(curThread);
303     }
304 
305     /**
306      * Retrieve a Txn object for this Thread.
307      */
308     public Transaction getTxnForThread() {
309         return thread2Txn.get(Thread.currentThread());
310     }
311 
312     public Xid[] XARecover() {
313         Set<Xid> xidSet = allXATxns.keySet();
314         Xid[] ret = new Xid[xidSet.size()];
315         ret = xidSet.toArray(ret);
316 
317         return ret;
318     }
319 
320     /**
321      * Returns whether there are any active serializable transactions,
322      * excluding the transaction given (if non-null).  This is intentionally
323      * returned without latching, since latching would not make the act of
324      * reading an integer more atomic than it already is.
325      */
326     public boolean
327         areOtherSerializableTransactionsActive(Locker excludeLocker) {
328         int exclude =
329             (excludeLocker != null &&
330              excludeLocker.isSerializableIsolation()) ?
331             1 : 0;
332         return (nActiveSerializable.get() - exclude > 0);
333     }
334 
335     /**
336      * Get the earliest LSN of all the active transactions, for checkpoint.
337      * Returns NULL_LSN is no transaction is currently active.
338      */
339     public long getFirstActiveLsn() {
340 
341         /*
342          * Note that the latching hierarchy calls for synchronizing on
343          * allTxns first, then synchronizing on individual txns.
344          */
345         long firstActive = DbLsn.NULL_LSN;
346         allTxnsLatch.acquireExclusive();
347         try {
348             Iterator<Txn> iter = allTxns.keySet().iterator();
349             while (iter.hasNext()) {
350                 long txnFirstActive = iter.next().getFirstActiveLsn();
351                 if (firstActive == DbLsn.NULL_LSN) {
352                     firstActive = txnFirstActive;
353                 } else if (txnFirstActive != DbLsn.NULL_LSN) {
354                     if (DbLsn.compareTo(txnFirstActive, firstActive) < 0) {
355                         firstActive = txnFirstActive;
356                     }
357                 }
358             }
359         } finally {
360             allTxnsLatch.release();
361         }
362 
363         return firstActive;
364     }
365 
366     /*
367      * Statistics
368      */
369 
370     /**
371      * Collect transaction related stats.
372      */
373     public TransactionStats txnStat(StatsConfig config) {
374         TransactionStats txnStats = null;
375         allTxnsLatch.acquireShared();
376         try {
377             nActive.set(allTxns.size());
378             TransactionStats.Active[] activeSet =
379                 new TransactionStats.Active[nActive.get()];
380             Iterator<Txn> iter = allTxns.keySet().iterator();
381             int i = 0;
382             while (iter.hasNext() && i < activeSet.length) {
383                 Locker txn = iter.next();
384                 activeSet[i] = new TransactionStats.Active
385                     (txn.toString(), txn.getId(), 0);
386                 i++;
387             }
388             activeTxns.set(activeSet);
389             txnStats = new TransactionStats(stats.cloneGroup(false));
390             if (config.getClear()) {
391                 numCommits.clear();
392                 numAborts.clear();
393                 numXACommits.clear();
394                 numXAAborts.clear();
395             }
396         } finally {
397             allTxnsLatch.release();
398         }
399 
400         return txnStats;
401     }
402 
403     public StatGroup loadStats(StatsConfig config) {
404         return lockManager.loadStats(config);
405     }
406 
407     /**
408      * Collect lock related stats.
409      */
410     public LockStats lockStat(StatsConfig config)
411         throws DatabaseException {
412 
413         return lockManager.lockStat(config);
414     }
415 
416     /**
417      * Examine the transaction set and return those that are HA MasterTxns.
418      */
419     public Set<Txn> getMasterTxns() {
420         Set<Txn> targetSet = new HashSet<Txn>();
421         allTxnsLatch.acquireShared();
422         try {
423             Set<Txn> all = allTxns.keySet();
424             for (Txn t: all) {
425                 if (t.isMasterTxn()) {
426                     targetSet.add(t);
427                 }
428             }
429         } finally {
430             allTxnsLatch.release();
431         }
432         return targetSet;
433     }
434 }
435