1 // Aleth: Ethereum C++ client, tools and libraries.
2 // Copyright 2014-2019 Aleth Authors.
3 // Licensed under the GNU General Public License, Version 3.
4 
5 #pragma once
6 
7 #include "Transaction.h"
8 #include <libdevcore/Common.h>
9 #include <libdevcore/Guards.h>
10 #include <libdevcore/Log.h>
11 #include <libdevcore/LruCache.h>
12 #include <libethcore/Common.h>
13 #include <condition_variable>
14 #include <deque>
15 #include <functional>
16 #include <thread>
17 
18 namespace dev
19 {
20 namespace eth
21 {
22 
23 /**
24  * @brief A queue of Transactions, each stored as RLP.
25  * Maintains a transaction queue sorted by nonce diff and gas price.
26  * @threadsafe
27  */
28 class TransactionQueue
29 {
30 public:
31     struct Limits { size_t current; size_t future; };
32 
33     /// @brief TransactionQueue
34     /// @param _limit Maximum number of pending transactions in the queue.
35     /// @param _futureLimit Maximum number of future nonce transactions.
36     TransactionQueue(unsigned _limit = 1024, unsigned _futureLimit = 1024);
TransactionQueue(Limits const & _l)37     TransactionQueue(Limits const& _l): TransactionQueue(_l.current, _l.future) {}
38     ~TransactionQueue();
39     /// Add transaction to the queue to be verified and imported.
40     /// @param _data RLP encoded transaction data.
41     /// @param _nodeId Optional network identified of a node transaction comes from.
42     void enqueue(RLP const& _data, h512 const& _nodeId);
43 
44     /// Verify and add transaction to the queue synchronously.
45     /// @param _tx RLP encoded transaction data.
46     /// @param _ik Set to Retry to force re-addinga transaction that was previously dropped.
47     /// @returns Import result code.
48     ImportResult import(bytes const& _tx, IfDropped _ik = IfDropped::Ignore) { return import(&_tx, _ik); }
49 
50     /// Verify and add transaction to the queue synchronously.
51     /// @param _tx Trasnaction data.
52     /// @param _ik Set to Retry to force re-addinga transaction that was previously dropped.
53     /// @returns Import result code.
54     ImportResult import(Transaction const& _tx, IfDropped _ik = IfDropped::Ignore);
55 
56     /// Remove transaction from the queue
57     /// @param _txHash Trasnaction hash
58     void drop(h256 const& _txHash);
59 
60     /// Get number of pending transactions for account.
61     /// @returns Pending transaction count.
62     unsigned waiting(Address const& _a) const;
63 
64     /// Get top transactions from the queue. Returned transactions are not removed from the queue automatically.
65     /// @param _limit Max number of transactions to return.
66     /// @param _avoid Transactions to avoid returning.
67     /// @returns up to _limit transactions ordered by nonce and gas price.
68     Transactions topTransactions(unsigned _limit, h256Hash const& _avoid = h256Hash()) const;
69 
70     /// Get a hash set of transactions in the queue
71     /// @returns A hash set of all transactions in the queue
72     h256Hash knownTransactions() const;
73 
74     /// Get max nonce for an account
75     /// @returns Max transaction nonce for account in the queue
76     u256 maxNonce(Address const& _a) const;
77 
78     /// Mark transaction as future. It wont be retured in topTransactions list until a transaction with a preceeding nonce is imported or marked with dropGood
79     /// @param _t Transaction hash
80     void setFuture(h256 const& _t);
81 
82     /// Drop a trasnaction from the list if exists and move following future trasnactions to current (if any)
83     /// @param _t Transaction hash
84     void dropGood(Transaction const& _t);
85 
86     struct Status
87     {
88         size_t current;
89         size_t future;
90         size_t unverified;
91         size_t dropped;
92     };
93     /// @returns the status of the transaction queue.
status()94     Status status() const { Status ret; DEV_GUARDED(x_queue) { ret.unverified = m_unverified.size(); } ReadGuard l(m_lock); ret.dropped = m_dropped.size(); ret.current = m_currentByHash.size(); ret.future = m_future.size(); return ret; }
95 
96     /// @returns the transacrtion limits on current/future.
limits()97     Limits limits() const { return Limits{m_limit, m_futureLimit}; }
98 
99     /// Clear the queue
100     void clear();
101 
102     /// Register a handler that will be called once there is a new transaction imported
onReady(T const & _t)103     template <class T> Handler<> onReady(T const& _t) { return m_onReady.add(_t); }
104 
105     /// Register a handler that will be called once asynchronous verification is comeplte an transaction has been imported
onImport(T const & _t)106     template <class T> Handler<ImportResult, h256 const&, h512 const&> onImport(T const& _t) { return m_onImport.add(_t); }
107 
108     /// Register a handler that will be called once asynchronous verification is comeplte an transaction has been imported
onReplaced(T const & _t)109     template <class T> Handler<h256 const&> onReplaced(T const& _t) { return m_onReplaced.add(_t); }
110 
111 private:
112 
113     /// Verified and imported transaction
114     struct VerifiedTransaction
115     {
VerifiedTransactionVerifiedTransaction116         VerifiedTransaction(Transaction const& _t): transaction(_t) {}
VerifiedTransactionVerifiedTransaction117         VerifiedTransaction(VerifiedTransaction&& _t): transaction(std::move(_t.transaction)) {}
118 
119         VerifiedTransaction(VerifiedTransaction const&) = delete;
120         VerifiedTransaction& operator=(VerifiedTransaction const&) = delete;
121 
122         Transaction transaction;  ///< Transaction data
123     };
124 
125     /// Transaction pending verification
126     struct UnverifiedTransaction
127     {
UnverifiedTransactionUnverifiedTransaction128         UnverifiedTransaction() {}
UnverifiedTransactionUnverifiedTransaction129         UnverifiedTransaction(bytesConstRef const& _t, h512 const& _nodeId): transaction(_t.toBytes()), nodeId(_nodeId) {}
UnverifiedTransactionUnverifiedTransaction130         UnverifiedTransaction(UnverifiedTransaction&& _t): transaction(std::move(_t.transaction)), nodeId(std::move(_t.nodeId)) {}
131         UnverifiedTransaction& operator=(UnverifiedTransaction&& _other)
132         {
133             assert(&_other != this);
134 
135             transaction = std::move(_other.transaction);
136             nodeId = std::move(_other.nodeId);
137             return *this;
138         }
139 
140         UnverifiedTransaction(UnverifiedTransaction const&) = delete;
141         UnverifiedTransaction& operator=(UnverifiedTransaction const&) = delete;
142 
143         bytes transaction;  ///< RLP encoded transaction data
144         h512 nodeId;        ///< Network Id of the peer transaction comes from
145     };
146 
147     struct PriorityCompare
148     {
149         TransactionQueue& queue;
150         /// Compare transaction by nonce height and gas price.
operatorPriorityCompare151         bool operator()(VerifiedTransaction const& _first, VerifiedTransaction const& _second) const
152         {
153             u256 const& height1 = _first.transaction.nonce() - queue.m_currentByAddressAndNonce[_first.transaction.sender()].begin()->first;
154             u256 const& height2 = _second.transaction.nonce() - queue.m_currentByAddressAndNonce[_second.transaction.sender()].begin()->first;
155             return height1 < height2 || (height1 == height2 && _first.transaction.gasPrice() > _second.transaction.gasPrice());
156         }
157     };
158 
159     // Use a set with dynamic comparator for minmax priority queue. The comparator takes into account min account nonce. Updating it does not affect the order.
160     using PriorityQueue = std::multiset<VerifiedTransaction, PriorityCompare>;
161 
162     ImportResult import(bytesConstRef _tx, IfDropped _ik = IfDropped::Ignore);
163     ImportResult check_WITH_LOCK(h256 const& _h, IfDropped _ik);
164     ImportResult manageImport_WITH_LOCK(h256 const& _h, Transaction const& _transaction);
165 
166     void insertCurrent_WITH_LOCK(std::pair<h256, Transaction> const& _p);
167     void makeCurrent_WITH_LOCK(Transaction const& _t);
168     bool remove_WITH_LOCK(h256 const& _txHash);
169     u256 maxNonce_WITH_LOCK(Address const& _a) const;
170     void verifierBody();
171 
172     mutable SharedMutex m_lock;  ///< General lock.
173     h256Hash m_known;            ///< Headers of transactions in both sets.
174 
175     std::unordered_map<h256, std::function<void(ImportResult)>> m_callbacks;	///< Called once.
176 
177     ///< Transactions that have previously been dropped. We technically only need to store the tx
178     ///< hash, but we also store bool as a placeholder value so that we can use an LRU cache to cap
179     ///< the number of transaction hashes stored.
180     LruCache<h256, bool> m_dropped;
181 
182     PriorityQueue m_current;
183     std::unordered_map<h256, PriorityQueue::iterator> m_currentByHash;			///< Transaction hash to set ref
184     std::unordered_map<Address, std::map<u256, PriorityQueue::iterator>> m_currentByAddressAndNonce; ///< Transactions grouped by account and nonce
185     std::unordered_map<Address, std::map<u256, VerifiedTransaction>> m_future;	/// Future transactions
186 
187     Signal<> m_onReady;															///< Called when a subsequent call to import transactions will return a non-empty container. Be nice and exit fast.
188     Signal<ImportResult, h256 const&, h512 const&> m_onImport;					///< Called for each import attempt. Arguments are result, transaction id an node id. Be nice and exit fast.
189     Signal<h256 const&> m_onReplaced;											///< Called whan transction is dropped during a call to import() to make room for another transaction.
190     unsigned m_limit;															///< Max number of pending transactions
191     unsigned m_futureLimit;														///< Max number of future transactions
192     unsigned m_futureSize = 0;													///< Current number of future transactions
193 
194     std::condition_variable m_queueReady;										///< Signaled when m_unverified has a new entry.
195     std::vector<std::thread> m_verifiers;
196     std::deque<UnverifiedTransaction> m_unverified;  ///< Pending verification queue
197     mutable Mutex x_queue;                           ///< Verification queue mutex
198     std::atomic<bool> m_aborting = {false};          ///< Exit condition for verifier.
199 
200     Logger m_logger{createLogger(VerbosityInfo, "tq")};
201     Logger m_loggerDetail{createLogger(VerbosityDebug, "tq")};
202 };
203 
204 }
205 }
206 
207