1 /*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2002, 2014 Oracle and/or its affiliates. All rights reserved. 5 * 6 */ 7 8 package com.sleepycat.je.txn; 9 10 import static com.sleepycat.je.dbi.TxnStatDefinition.TXN_ABORTS; 11 import static com.sleepycat.je.dbi.TxnStatDefinition.TXN_ACTIVE; 12 import static com.sleepycat.je.dbi.TxnStatDefinition.TXN_ACTIVE_TXNS; 13 import static com.sleepycat.je.dbi.TxnStatDefinition.TXN_BEGINS; 14 import static com.sleepycat.je.dbi.TxnStatDefinition.TXN_COMMITS; 15 import static com.sleepycat.je.dbi.TxnStatDefinition.TXN_XAABORTS; 16 import static com.sleepycat.je.dbi.TxnStatDefinition.TXN_XACOMMITS; 17 import static com.sleepycat.je.dbi.TxnStatDefinition.TXN_XAPREPARES; 18 19 import java.util.Collections; 20 import java.util.HashMap; 21 import java.util.HashSet; 22 import java.util.Iterator; 23 import java.util.Map; 24 import java.util.Set; 25 import java.util.concurrent.ConcurrentHashMap; 26 import java.util.concurrent.atomic.AtomicInteger; 27 import java.util.concurrent.atomic.AtomicLong; 28 29 import javax.transaction.xa.Xid; 30 31 import com.sleepycat.je.DatabaseException; 32 import com.sleepycat.je.LockStats; 33 import com.sleepycat.je.StatsConfig; 34 import com.sleepycat.je.Transaction; 35 import com.sleepycat.je.TransactionConfig; 36 import com.sleepycat.je.TransactionStats; 37 import com.sleepycat.je.dbi.EnvironmentImpl; 38 import com.sleepycat.je.dbi.MemoryBudget; 39 import com.sleepycat.je.latch.LatchFactory; 40 import com.sleepycat.je.latch.SharedLatch; 41 import com.sleepycat.je.utilint.ActiveTxnArrayStat; 42 import com.sleepycat.je.utilint.DbLsn; 43 import com.sleepycat.je.utilint.IntStat; 44 import com.sleepycat.je.utilint.LongStat; 45 import com.sleepycat.je.utilint.StatGroup; 46 47 /** 48 * Class to manage transactions. Basically a Set of all transactions with add 49 * and remove methods and a latch around the set. 50 */ 51 public class TxnManager { 52 53 /* 54 * All NullTxns share the same id so as not to eat from the id number 55 * space. 56 * 57 * Negative transaction ids are used by the master node of a replication 58 * group. That sequence begins at -10 to avoid conflict with the 59 * NULL_TXN_ID and leave room for other special purpose ids. 60 */ 61 static final long NULL_TXN_ID = -1; 62 private static final long FIRST_NEGATIVE_ID = -10; 63 private LockManager lockManager; 64 private final EnvironmentImpl envImpl; 65 private final SharedLatch allTxnsLatch; 66 private final Map<Txn, Txn> allTxns; 67 68 /* Maps Xids to Txns. */ 69 private final Map<Xid, Txn> allXATxns; 70 71 /* Maps Threads to Txns when there are thread implied transactions. */ 72 private final Map<Thread, Transaction> thread2Txn; 73 74 /* 75 * Positive and negative transaction ids are used in a replicated system, 76 * to let replicated transactions intermingle with local transactions. 77 */ 78 private final AtomicLong lastUsedLocalTxnId; 79 private final AtomicLong lastUsedReplicatedTxnId; 80 private final AtomicInteger nActiveSerializable; 81 82 /* Locker Stats */ 83 private StatGroup stats; 84 private IntStat nActive; 85 private LongStat numBegins; 86 private LongStat numCommits; 87 private LongStat numAborts; 88 private LongStat numXAPrepares; 89 private LongStat numXACommits; 90 private LongStat numXAAborts; 91 private ActiveTxnArrayStat activeTxns; 92 TxnManager(EnvironmentImpl envImpl)93 public TxnManager(EnvironmentImpl envImpl) { 94 lockManager = new SyncedLockManager(envImpl); 95 96 if (envImpl.isNoLocking()) { 97 lockManager = new DummyLockManager(envImpl, lockManager); 98 } 99 100 this.envImpl = envImpl; 101 allTxnsLatch = LatchFactory.createSharedLatch( 102 envImpl, "TxnManager.allTxns", false /*exclusiveOnly*/); 103 allTxns = new ConcurrentHashMap<Txn, Txn>(); 104 allXATxns = Collections.synchronizedMap(new HashMap<Xid, Txn>()); 105 thread2Txn = new ConcurrentHashMap<Thread, Transaction>(); 106 107 lastUsedLocalTxnId = new AtomicLong(0); 108 lastUsedReplicatedTxnId = new AtomicLong(FIRST_NEGATIVE_ID); 109 nActiveSerializable = new AtomicInteger(0); 110 111 /* Do the stats definition. */ 112 stats = new StatGroup("Transaction", "Transaction statistics"); 113 nActive = new IntStat(stats, TXN_ACTIVE); 114 numBegins = new LongStat(stats, TXN_BEGINS); 115 numCommits = new LongStat(stats, TXN_COMMITS); 116 numAborts = new LongStat(stats, TXN_ABORTS); 117 numXAPrepares = new LongStat(stats, TXN_XAPREPARES); 118 numXACommits = new LongStat(stats, TXN_XACOMMITS); 119 numXAAborts = new LongStat(stats, TXN_XAABORTS); 120 activeTxns = new ActiveTxnArrayStat(stats, TXN_ACTIVE_TXNS); 121 } 122 123 /** 124 * Set the txn id sequence. 125 */ setLastTxnId(long lastReplicatedTxnId, long lastLocalId)126 public void setLastTxnId(long lastReplicatedTxnId, long lastLocalId) { 127 lastUsedReplicatedTxnId.set(lastReplicatedTxnId); 128 lastUsedLocalTxnId.set(lastLocalId); 129 } 130 131 /** 132 * Get the last used id, for checkpoint info. 133 */ getLastLocalTxnId()134 public long getLastLocalTxnId() { 135 return lastUsedLocalTxnId.get(); 136 } 137 getLastReplicatedTxnId()138 public long getLastReplicatedTxnId() { 139 return lastUsedReplicatedTxnId.get(); 140 } 141 getNextReplicatedTxnId()142 public long getNextReplicatedTxnId() { 143 return lastUsedReplicatedTxnId.decrementAndGet(); 144 } 145 146 /* @return true if this id is for a replicated txn. */ isReplicatedTxn(long txnId)147 public static boolean isReplicatedTxn(long txnId) { 148 return (txnId <= FIRST_NEGATIVE_ID); 149 } 150 151 /** 152 * Get the next transaction id for a non-replicated transaction. Note 153 * than in the future, a replicated node could conceivable issue an 154 * application level, non-replicated transaction. 155 */ getNextTxnId()156 long getNextTxnId() { 157 return lastUsedLocalTxnId.incrementAndGet(); 158 } 159 160 /* 161 * Tracks the lowest replicated transaction id used during a replay of the 162 * replication stream, so that it's available as the starting point if this 163 * replica transitions to being the master. 164 */ updateFromReplay(long replayTxnId)165 public void updateFromReplay(long replayTxnId) { 166 assert !envImpl.isMaster(); 167 assert replayTxnId < 0 : 168 "replay txn id is unexpectedly positive " + replayTxnId; 169 170 if (replayTxnId < lastUsedReplicatedTxnId.get()) { 171 lastUsedReplicatedTxnId.set(replayTxnId); 172 } 173 } 174 175 /** 176 * Create a new transaction. 177 * @param parent for nested transactions, not yet supported 178 * @param txnConfig specifies txn attributes 179 * @return the new txn 180 */ 181 public Txn txnBegin(Transaction parent, TransactionConfig txnConfig) 182 throws DatabaseException { 183 184 return Txn.createUserTxn(envImpl, txnConfig); 185 } 186 187 /** 188 * Give transactions and environment access to lock manager. 189 */ 190 public LockManager getLockManager() { 191 return lockManager; 192 } 193 194 /** 195 * Called when txn is created. 196 */ 197 public void registerTxn(Txn txn) { 198 allTxnsLatch.acquireShared(); 199 try { 200 allTxns.put(txn, txn); 201 if (txn.isSerializableIsolation()) { 202 nActiveSerializable.incrementAndGet(); 203 } 204 numBegins.increment(); 205 } finally { 206 allTxnsLatch.release(); 207 } 208 } 209 210 /** 211 * Called when txn ends. 212 */ 213 void unRegisterTxn(Txn txn, boolean isCommit) { 214 allTxnsLatch.acquireShared(); 215 try { 216 allTxns.remove(txn); 217 218 /* Remove any accumulated MemoryBudget delta for the Txn. */ 219 envImpl.getMemoryBudget(). 220 updateTxnMemoryUsage(0 - txn.getBudgetedMemorySize()); 221 if (isCommit) { 222 numCommits.increment(); 223 } else { 224 numAborts.increment(); 225 } 226 if (txn.isSerializableIsolation()) { 227 nActiveSerializable.decrementAndGet(); 228 } 229 } finally { 230 allTxnsLatch.release(); 231 } 232 } 233 234 /** 235 * Called when txn is created. 236 */ 237 public void registerXATxn(Xid xid, Txn txn, boolean isPrepare) { 238 if (!allXATxns.containsKey(xid)) { 239 allXATxns.put(xid, txn); 240 envImpl.getMemoryBudget().updateTxnMemoryUsage 241 (MemoryBudget.HASHMAP_ENTRY_OVERHEAD); 242 } 243 244 if (isPrepare) { 245 numXAPrepares.increment(); 246 } 247 } 248 249 /** 250 * Called when XATransaction is prepared. 251 */ 252 public void notePrepare() { 253 numXAPrepares.increment(); 254 } 255 256 /** 257 * Called when txn ends. 258 * 259 * @throws IllegalStateException via XAResource 260 */ 261 void unRegisterXATxn(Xid xid, boolean isCommit) 262 throws DatabaseException { 263 264 if (allXATxns.remove(xid) == null) { 265 throw new IllegalStateException 266 ("XA Transaction " + xid + " is not registered."); 267 } 268 envImpl.getMemoryBudget().updateTxnMemoryUsage 269 (0 - MemoryBudget.HASHMAP_ENTRY_OVERHEAD); 270 if (isCommit) { 271 numXACommits.increment(); 272 } else { 273 numXAAborts.increment(); 274 } 275 } 276 277 /** 278 * Retrieve a Txn object from an Xid. 279 */ 280 public Txn getTxnFromXid(Xid xid) { 281 return allXATxns.get(xid); 282 } 283 284 /** 285 * Called when txn is assoc'd with this thread. 286 */ 287 public void setTxnForThread(Transaction txn) { 288 289 Thread curThread = Thread.currentThread(); 290 if (txn == null) { 291 unsetTxnForThread(); 292 } else { 293 thread2Txn.put(curThread, txn); 294 } 295 } 296 297 /** 298 * Called when txn is assoc'd with this thread. 299 */ 300 public Transaction unsetTxnForThread() { 301 Thread curThread = Thread.currentThread(); 302 return thread2Txn.remove(curThread); 303 } 304 305 /** 306 * Retrieve a Txn object for this Thread. 307 */ 308 public Transaction getTxnForThread() { 309 return thread2Txn.get(Thread.currentThread()); 310 } 311 312 public Xid[] XARecover() { 313 Set<Xid> xidSet = allXATxns.keySet(); 314 Xid[] ret = new Xid[xidSet.size()]; 315 ret = xidSet.toArray(ret); 316 317 return ret; 318 } 319 320 /** 321 * Returns whether there are any active serializable transactions, 322 * excluding the transaction given (if non-null). This is intentionally 323 * returned without latching, since latching would not make the act of 324 * reading an integer more atomic than it already is. 325 */ 326 public boolean 327 areOtherSerializableTransactionsActive(Locker excludeLocker) { 328 int exclude = 329 (excludeLocker != null && 330 excludeLocker.isSerializableIsolation()) ? 331 1 : 0; 332 return (nActiveSerializable.get() - exclude > 0); 333 } 334 335 /** 336 * Get the earliest LSN of all the active transactions, for checkpoint. 337 * Returns NULL_LSN is no transaction is currently active. 338 */ 339 public long getFirstActiveLsn() { 340 341 /* 342 * Note that the latching hierarchy calls for synchronizing on 343 * allTxns first, then synchronizing on individual txns. 344 */ 345 long firstActive = DbLsn.NULL_LSN; 346 allTxnsLatch.acquireExclusive(); 347 try { 348 Iterator<Txn> iter = allTxns.keySet().iterator(); 349 while (iter.hasNext()) { 350 long txnFirstActive = iter.next().getFirstActiveLsn(); 351 if (firstActive == DbLsn.NULL_LSN) { 352 firstActive = txnFirstActive; 353 } else if (txnFirstActive != DbLsn.NULL_LSN) { 354 if (DbLsn.compareTo(txnFirstActive, firstActive) < 0) { 355 firstActive = txnFirstActive; 356 } 357 } 358 } 359 } finally { 360 allTxnsLatch.release(); 361 } 362 363 return firstActive; 364 } 365 366 /* 367 * Statistics 368 */ 369 370 /** 371 * Collect transaction related stats. 372 */ 373 public TransactionStats txnStat(StatsConfig config) { 374 TransactionStats txnStats = null; 375 allTxnsLatch.acquireShared(); 376 try { 377 nActive.set(allTxns.size()); 378 TransactionStats.Active[] activeSet = 379 new TransactionStats.Active[nActive.get()]; 380 Iterator<Txn> iter = allTxns.keySet().iterator(); 381 int i = 0; 382 while (iter.hasNext() && i < activeSet.length) { 383 Locker txn = iter.next(); 384 activeSet[i] = new TransactionStats.Active 385 (txn.toString(), txn.getId(), 0); 386 i++; 387 } 388 activeTxns.set(activeSet); 389 txnStats = new TransactionStats(stats.cloneGroup(false)); 390 if (config.getClear()) { 391 numCommits.clear(); 392 numAborts.clear(); 393 numXACommits.clear(); 394 numXAAborts.clear(); 395 } 396 } finally { 397 allTxnsLatch.release(); 398 } 399 400 return txnStats; 401 } 402 403 public StatGroup loadStats(StatsConfig config) { 404 return lockManager.loadStats(config); 405 } 406 407 /** 408 * Collect lock related stats. 409 */ 410 public LockStats lockStat(StatsConfig config) 411 throws DatabaseException { 412 413 return lockManager.lockStat(config); 414 } 415 416 /** 417 * Examine the transaction set and return those that are HA MasterTxns. 418 */ 419 public Set<Txn> getMasterTxns() { 420 Set<Txn> targetSet = new HashSet<Txn>(); 421 allTxnsLatch.acquireShared(); 422 try { 423 Set<Txn> all = allTxns.keySet(); 424 for (Txn t: all) { 425 if (t.isMasterTxn()) { 426 targetSet.add(t); 427 } 428 } 429 } finally { 430 allTxnsLatch.release(); 431 } 432 return targetSet; 433 } 434 } 435