1 /*- 2 * See the file LICENSE for redistribution information. 3 * 4 * Copyright (c) 2002, 2014 Oracle and/or its affiliates. All rights reserved. 5 * 6 */ 7 8 package com.sleepycat.je.rep.stream; 9 10 import static com.sleepycat.je.rep.stream.FeederTxnStatDefinition.ACK_WAIT_MS; 11 import static com.sleepycat.je.rep.stream.FeederTxnStatDefinition.TOTAL_TXN_MS; 12 import static com.sleepycat.je.rep.stream.FeederTxnStatDefinition.TXNS_ACKED; 13 import static com.sleepycat.je.rep.stream.FeederTxnStatDefinition.TXNS_NOT_ACKED; 14 15 import java.util.Map; 16 import java.util.concurrent.ConcurrentHashMap; 17 import java.util.concurrent.CountDownLatch; 18 import java.util.concurrent.TimeUnit; 19 20 import com.sleepycat.je.StatsConfig; 21 import com.sleepycat.je.rep.InsufficientAcksException; 22 import com.sleepycat.je.rep.impl.RepImpl; 23 import com.sleepycat.je.rep.impl.RepNodeImpl; 24 import com.sleepycat.je.rep.impl.node.DurabilityQuorum; 25 import com.sleepycat.je.rep.impl.node.RepNode; 26 import com.sleepycat.je.rep.txn.MasterTxn; 27 import com.sleepycat.je.txn.Txn; 28 import com.sleepycat.je.utilint.AtomicLongStat; 29 import com.sleepycat.je.utilint.StatGroup; 30 import com.sleepycat.je.utilint.VLSN; 31 32 /** 33 * FeederTxns manages transactions that need acknowledgments. 34 */ 35 public class FeederTxns { 36 37 /* 38 * Tracks transactions that have not yet been acknowledged for the entire 39 * replication node. 40 */ 41 private final Map<Long, TxnInfo> txnMap; 42 43 private final RepImpl repImpl; 44 private final StatGroup statistics; 45 private final AtomicLongStat txnsAcked; 46 private final AtomicLongStat txnsNotAcked; 47 private final AtomicLongStat ackWaitMs; 48 private final AtomicLongStat totalTxnMs; 49 FeederTxns(RepImpl repImpl)50 public FeederTxns(RepImpl repImpl) { 51 52 txnMap = new ConcurrentHashMap<Long, TxnInfo>(); 53 this.repImpl = repImpl; 54 statistics = new StatGroup(FeederTxnStatDefinition.GROUP_NAME, 55 FeederTxnStatDefinition.GROUP_DESC); 56 txnsAcked = new AtomicLongStat(statistics, TXNS_ACKED); 57 txnsNotAcked = new AtomicLongStat(statistics, TXNS_NOT_ACKED); 58 ackWaitMs = new AtomicLongStat(statistics, ACK_WAIT_MS); 59 totalTxnMs = new AtomicLongStat(statistics, TOTAL_TXN_MS); 60 } 61 62 /** 63 * Create a new TxnInfo so that transaction commit can wait on the latch it 64 * sets up. 65 * 66 * @param txn identifies the transaction. 67 */ setupForAcks(MasterTxn txn)68 public void setupForAcks(MasterTxn txn) { 69 if (txn.getRequiredAckCount() == 0) { 70 /* No acks called for, no setup needed. */ 71 return; 72 } 73 TxnInfo txnInfo = new TxnInfo(txn); 74 TxnInfo prevInfo = txnMap.put(txn.getId(), txnInfo); 75 assert(prevInfo == null); 76 } 77 78 /** 79 * Returns the transaction if it's waiting for acknowledgments. Returns 80 * null otherwise. 81 */ getAckTxn(long txnId)82 public MasterTxn getAckTxn(long txnId) { 83 TxnInfo txnInfo = txnMap.get(txnId); 84 return (txnInfo == null) ? null : txnInfo.txn; 85 } 86 87 /* 88 * Clears any ack requirements associated with the transaction. It's 89 * typically invoked on a transaction abort. 90 */ clearTransactionAcks(Txn txn)91 public void clearTransactionAcks(Txn txn) { 92 txnMap.remove(txn.getId()); 93 } 94 95 /** 96 * Notes that an acknowledgment was received from a replica. 97 * 98 * @param replica the replica node 99 * @param txnId the locally committed transaction that was acknowledged. 100 * 101 * @return the transaction VLSN, if txnId needs an ack, null otherwise 102 */ noteReplicaAck(final RepNodeImpl replica, final long txnId)103 public VLSN noteReplicaAck(final RepNodeImpl replica, final long txnId) { 104 final DurabilityQuorum durabilityQuorum = 105 repImpl.getRepNode().getDurabilityQuorum(); 106 if (!durabilityQuorum.replicaAcksQualify(replica)) { 107 return null; 108 } 109 final TxnInfo txnInfo = txnMap.get(txnId); 110 if (txnInfo == null) { 111 return null; 112 } 113 txnInfo.countDown(); 114 return txnInfo.getCommitVLSN(); 115 } 116 117 /** 118 * Waits for the required number of replica acks to come through. 119 * 120 * @param txn identifies the transaction to wait for. 121 * 122 * @param timeoutMs the amount of time to wait for the acknowledgments 123 * before giving up. 124 * 125 * @throws InsufficientAcksException if the ack requirements were not met 126 */ awaitReplicaAcks(MasterTxn txn, int timeoutMs)127 public void awaitReplicaAcks(MasterTxn txn, int timeoutMs) 128 throws InterruptedException { 129 130 TxnInfo txnInfo = txnMap.get(txn.getId()); 131 if (txnInfo == null) { 132 return; 133 } 134 txnInfo.await(timeoutMs); 135 txnMap.remove(txn.getId()); 136 final RepNode repNode = repImpl.getRepNode(); 137 if (repNode != null) { 138 repNode.getDurabilityQuorum().ensureSufficientAcks( 139 txnInfo, timeoutMs); 140 } 141 } 142 143 /** 144 * Used to track the latch and the transaction information associated with 145 * a transaction needing an acknowledgment. 146 */ 147 public class TxnInfo { 148 /* The latch used to track transaction acknowledgments. */ 149 final private CountDownLatch latch; 150 final MasterTxn txn; 151 TxnInfo(MasterTxn txn)152 private TxnInfo(MasterTxn txn) { 153 assert(txn != null); 154 final int numRequiredAcks = txn.getRequiredAckCount(); 155 this.latch = (numRequiredAcks == 0) ? 156 null : 157 new CountDownLatch(numRequiredAcks); 158 this.txn = txn; 159 } 160 161 /** 162 * Returns the VLSN associated with the committed txn, or null if the 163 * txn has not yet been committed. 164 */ getCommitVLSN()165 public VLSN getCommitVLSN() { 166 return txn.getCommitVLSN(); 167 } 168 await(int timeoutMs)169 private final boolean await(int timeoutMs) 170 throws InterruptedException { 171 172 final long ackAwaitStartMs = System.currentTimeMillis(); 173 boolean isZero = (latch == null) || 174 latch.await(timeoutMs, TimeUnit.MILLISECONDS); 175 if (isZero) { 176 txnsAcked.increment(); 177 final long now = System.currentTimeMillis(); 178 ackWaitMs.add(now - ackAwaitStartMs); 179 totalTxnMs.add(now - txn.getStartMs()); 180 } else { 181 txnsNotAcked.increment(); 182 } 183 return isZero; 184 } 185 countDown()186 public final void countDown() { 187 if (latch == null) { 188 return; 189 } 190 191 latch.countDown(); 192 } 193 getPendingAcks()194 public final int getPendingAcks() { 195 if (latch == null) { 196 return 0; 197 } 198 199 return (int) latch.getCount(); 200 } 201 getTxn()202 public final MasterTxn getTxn() { 203 return txn; 204 } 205 } 206 getStats()207 public StatGroup getStats() { 208 StatGroup ret = statistics.cloneGroup(false); 209 210 return ret; 211 } 212 resetStats()213 public void resetStats() { 214 statistics.clear(); 215 } 216 getStats(StatsConfig config)217 public StatGroup getStats(StatsConfig config) { 218 219 StatGroup cloneStats = statistics.cloneGroup(config.getClear()); 220 221 return cloneStats; 222 } 223 } 224