1 /*-
2  * See the file LICENSE for redistribution information.
3  *
4  * Copyright (c) 2002, 2014 Oracle and/or its affiliates.  All rights reserved.
5  *
6  */
7 
8 package com.sleepycat.je.rep.stream;
9 
10 import static com.sleepycat.je.rep.stream.FeederTxnStatDefinition.ACK_WAIT_MS;
11 import static com.sleepycat.je.rep.stream.FeederTxnStatDefinition.TOTAL_TXN_MS;
12 import static com.sleepycat.je.rep.stream.FeederTxnStatDefinition.TXNS_ACKED;
13 import static com.sleepycat.je.rep.stream.FeederTxnStatDefinition.TXNS_NOT_ACKED;
14 
15 import java.util.Map;
16 import java.util.concurrent.ConcurrentHashMap;
17 import java.util.concurrent.CountDownLatch;
18 import java.util.concurrent.TimeUnit;
19 
20 import com.sleepycat.je.StatsConfig;
21 import com.sleepycat.je.rep.InsufficientAcksException;
22 import com.sleepycat.je.rep.impl.RepImpl;
23 import com.sleepycat.je.rep.impl.RepNodeImpl;
24 import com.sleepycat.je.rep.impl.node.DurabilityQuorum;
25 import com.sleepycat.je.rep.impl.node.RepNode;
26 import com.sleepycat.je.rep.txn.MasterTxn;
27 import com.sleepycat.je.txn.Txn;
28 import com.sleepycat.je.utilint.AtomicLongStat;
29 import com.sleepycat.je.utilint.StatGroup;
30 import com.sleepycat.je.utilint.VLSN;
31 
32 /**
33  * FeederTxns manages transactions that need acknowledgments.
34  */
35 public class FeederTxns {
36 
37     /*
38      * Tracks transactions that have not yet been acknowledged for the entire
39      * replication node.
40      */
41     private final Map<Long, TxnInfo> txnMap;
42 
43     private final RepImpl repImpl;
44     private final StatGroup statistics;
45     private final AtomicLongStat txnsAcked;
46     private final AtomicLongStat txnsNotAcked;
47     private final AtomicLongStat ackWaitMs;
48     private final AtomicLongStat totalTxnMs;
49 
FeederTxns(RepImpl repImpl)50     public FeederTxns(RepImpl repImpl) {
51 
52         txnMap = new ConcurrentHashMap<Long, TxnInfo>();
53         this.repImpl = repImpl;
54         statistics = new StatGroup(FeederTxnStatDefinition.GROUP_NAME,
55                                    FeederTxnStatDefinition.GROUP_DESC);
56         txnsAcked = new AtomicLongStat(statistics, TXNS_ACKED);
57         txnsNotAcked = new AtomicLongStat(statistics, TXNS_NOT_ACKED);
58         ackWaitMs = new AtomicLongStat(statistics, ACK_WAIT_MS);
59         totalTxnMs = new AtomicLongStat(statistics, TOTAL_TXN_MS);
60     }
61 
62     /**
63      * Create a new TxnInfo so that transaction commit can wait on the latch it
64      * sets up.
65      *
66      * @param txn identifies the transaction.
67      */
setupForAcks(MasterTxn txn)68     public void setupForAcks(MasterTxn txn) {
69         if (txn.getRequiredAckCount() == 0) {
70             /* No acks called for, no setup needed. */
71             return;
72         }
73         TxnInfo txnInfo = new TxnInfo(txn);
74         TxnInfo  prevInfo = txnMap.put(txn.getId(), txnInfo);
75         assert(prevInfo == null);
76     }
77 
78     /**
79      * Returns the transaction if it's waiting for acknowledgments. Returns
80      * null otherwise.
81      */
getAckTxn(long txnId)82     public MasterTxn getAckTxn(long txnId) {
83         TxnInfo txnInfo = txnMap.get(txnId);
84         return (txnInfo == null) ? null : txnInfo.txn;
85     }
86 
87     /*
88      * Clears any ack requirements associated with the transaction. It's
89      * typically invoked on a transaction abort.
90      */
clearTransactionAcks(Txn txn)91     public void clearTransactionAcks(Txn txn) {
92         txnMap.remove(txn.getId());
93     }
94 
95     /**
96      * Notes that an acknowledgment was received from a replica.
97      *
98      * @param replica the replica node
99      * @param txnId the locally committed transaction that was acknowledged.
100      *
101      * @return the transaction VLSN, if txnId needs an ack, null otherwise
102      */
noteReplicaAck(final RepNodeImpl replica, final long txnId)103     public VLSN noteReplicaAck(final RepNodeImpl replica, final long txnId) {
104         final DurabilityQuorum durabilityQuorum =
105             repImpl.getRepNode().getDurabilityQuorum();
106         if (!durabilityQuorum.replicaAcksQualify(replica)) {
107             return null;
108         }
109         final TxnInfo txnInfo = txnMap.get(txnId);
110         if (txnInfo == null) {
111             return null;
112         }
113         txnInfo.countDown();
114         return txnInfo.getCommitVLSN();
115     }
116 
117     /**
118      * Waits for the required number of replica acks to come through.
119      *
120      * @param txn identifies the transaction to wait for.
121      *
122      * @param timeoutMs the amount of time to wait for the acknowledgments
123      * before giving up.
124      *
125      * @throws InsufficientAcksException if the ack requirements were not met
126      */
awaitReplicaAcks(MasterTxn txn, int timeoutMs)127     public void awaitReplicaAcks(MasterTxn txn, int timeoutMs)
128         throws InterruptedException {
129 
130         TxnInfo txnInfo = txnMap.get(txn.getId());
131         if (txnInfo == null) {
132             return;
133         }
134         txnInfo.await(timeoutMs);
135         txnMap.remove(txn.getId());
136         final RepNode repNode = repImpl.getRepNode();
137         if (repNode != null) {
138             repNode.getDurabilityQuorum().ensureSufficientAcks(
139                 txnInfo, timeoutMs);
140         }
141     }
142 
143     /**
144      * Used to track the latch and the transaction information associated with
145      * a transaction needing an acknowledgment.
146      */
147     public class TxnInfo {
148         /* The latch used to track transaction acknowledgments. */
149         final private CountDownLatch latch;
150         final MasterTxn txn;
151 
TxnInfo(MasterTxn txn)152         private TxnInfo(MasterTxn txn) {
153             assert(txn != null);
154             final int numRequiredAcks = txn.getRequiredAckCount();
155             this.latch = (numRequiredAcks == 0) ?
156                 null :
157                 new CountDownLatch(numRequiredAcks);
158             this.txn = txn;
159         }
160 
161         /**
162          * Returns the VLSN associated with the committed txn, or null if the
163          * txn has not yet been committed.
164          */
getCommitVLSN()165         public VLSN getCommitVLSN() {
166             return txn.getCommitVLSN();
167         }
168 
await(int timeoutMs)169         private final boolean await(int timeoutMs)
170             throws InterruptedException {
171 
172             final long ackAwaitStartMs = System.currentTimeMillis();
173             boolean isZero = (latch == null) ||
174                 latch.await(timeoutMs, TimeUnit.MILLISECONDS);
175             if (isZero) {
176                 txnsAcked.increment();
177                 final long now = System.currentTimeMillis();
178                 ackWaitMs.add(now - ackAwaitStartMs);
179                 totalTxnMs.add(now - txn.getStartMs());
180             } else {
181                 txnsNotAcked.increment();
182             }
183             return isZero;
184         }
185 
countDown()186         public final void countDown() {
187             if (latch == null) {
188                 return;
189             }
190 
191             latch.countDown();
192         }
193 
getPendingAcks()194         public final int getPendingAcks() {
195             if (latch == null) {
196                 return 0;
197             }
198 
199             return (int) latch.getCount();
200         }
201 
getTxn()202         public final MasterTxn getTxn() {
203             return txn;
204         }
205     }
206 
getStats()207     public StatGroup getStats() {
208         StatGroup ret = statistics.cloneGroup(false);
209 
210         return ret;
211     }
212 
resetStats()213     public void resetStats() {
214         statistics.clear();
215     }
216 
getStats(StatsConfig config)217     public StatGroup getStats(StatsConfig config) {
218 
219         StatGroup cloneStats = statistics.cloneGroup(config.getClear());
220 
221         return cloneStats;
222     }
223 }
224