1 // Aleth: Ethereum C++ client, tools and libraries. 2 // Copyright 2014-2019 Aleth Authors. 3 // Licensed under the GNU General Public License, Version 3. 4 5 #pragma once 6 7 #include "Transaction.h" 8 #include <libdevcore/Common.h> 9 #include <libdevcore/Guards.h> 10 #include <libdevcore/Log.h> 11 #include <libdevcore/LruCache.h> 12 #include <libethcore/Common.h> 13 #include <condition_variable> 14 #include <deque> 15 #include <functional> 16 #include <thread> 17 18 namespace dev 19 { 20 namespace eth 21 { 22 23 /** 24 * @brief A queue of Transactions, each stored as RLP. 25 * Maintains a transaction queue sorted by nonce diff and gas price. 26 * @threadsafe 27 */ 28 class TransactionQueue 29 { 30 public: 31 struct Limits { size_t current; size_t future; }; 32 33 /// @brief TransactionQueue 34 /// @param _limit Maximum number of pending transactions in the queue. 35 /// @param _futureLimit Maximum number of future nonce transactions. 36 TransactionQueue(unsigned _limit = 1024, unsigned _futureLimit = 1024); TransactionQueue(Limits const & _l)37 TransactionQueue(Limits const& _l): TransactionQueue(_l.current, _l.future) {} 38 ~TransactionQueue(); 39 /// Add transaction to the queue to be verified and imported. 40 /// @param _data RLP encoded transaction data. 41 /// @param _nodeId Optional network identified of a node transaction comes from. 42 void enqueue(RLP const& _data, h512 const& _nodeId); 43 44 /// Verify and add transaction to the queue synchronously. 45 /// @param _tx RLP encoded transaction data. 46 /// @param _ik Set to Retry to force re-addinga transaction that was previously dropped. 47 /// @returns Import result code. 48 ImportResult import(bytes const& _tx, IfDropped _ik = IfDropped::Ignore) { return import(&_tx, _ik); } 49 50 /// Verify and add transaction to the queue synchronously. 51 /// @param _tx Trasnaction data. 52 /// @param _ik Set to Retry to force re-addinga transaction that was previously dropped. 53 /// @returns Import result code. 54 ImportResult import(Transaction const& _tx, IfDropped _ik = IfDropped::Ignore); 55 56 /// Remove transaction from the queue 57 /// @param _txHash Trasnaction hash 58 void drop(h256 const& _txHash); 59 60 /// Get number of pending transactions for account. 61 /// @returns Pending transaction count. 62 unsigned waiting(Address const& _a) const; 63 64 /// Get top transactions from the queue. Returned transactions are not removed from the queue automatically. 65 /// @param _limit Max number of transactions to return. 66 /// @param _avoid Transactions to avoid returning. 67 /// @returns up to _limit transactions ordered by nonce and gas price. 68 Transactions topTransactions(unsigned _limit, h256Hash const& _avoid = h256Hash()) const; 69 70 /// Get a hash set of transactions in the queue 71 /// @returns A hash set of all transactions in the queue 72 h256Hash knownTransactions() const; 73 74 /// Get max nonce for an account 75 /// @returns Max transaction nonce for account in the queue 76 u256 maxNonce(Address const& _a) const; 77 78 /// Mark transaction as future. It wont be retured in topTransactions list until a transaction with a preceeding nonce is imported or marked with dropGood 79 /// @param _t Transaction hash 80 void setFuture(h256 const& _t); 81 82 /// Drop a trasnaction from the list if exists and move following future trasnactions to current (if any) 83 /// @param _t Transaction hash 84 void dropGood(Transaction const& _t); 85 86 struct Status 87 { 88 size_t current; 89 size_t future; 90 size_t unverified; 91 size_t dropped; 92 }; 93 /// @returns the status of the transaction queue. status()94 Status status() const { Status ret; DEV_GUARDED(x_queue) { ret.unverified = m_unverified.size(); } ReadGuard l(m_lock); ret.dropped = m_dropped.size(); ret.current = m_currentByHash.size(); ret.future = m_future.size(); return ret; } 95 96 /// @returns the transacrtion limits on current/future. limits()97 Limits limits() const { return Limits{m_limit, m_futureLimit}; } 98 99 /// Clear the queue 100 void clear(); 101 102 /// Register a handler that will be called once there is a new transaction imported onReady(T const & _t)103 template <class T> Handler<> onReady(T const& _t) { return m_onReady.add(_t); } 104 105 /// Register a handler that will be called once asynchronous verification is comeplte an transaction has been imported onImport(T const & _t)106 template <class T> Handler<ImportResult, h256 const&, h512 const&> onImport(T const& _t) { return m_onImport.add(_t); } 107 108 /// Register a handler that will be called once asynchronous verification is comeplte an transaction has been imported onReplaced(T const & _t)109 template <class T> Handler<h256 const&> onReplaced(T const& _t) { return m_onReplaced.add(_t); } 110 111 private: 112 113 /// Verified and imported transaction 114 struct VerifiedTransaction 115 { VerifiedTransactionVerifiedTransaction116 VerifiedTransaction(Transaction const& _t): transaction(_t) {} VerifiedTransactionVerifiedTransaction117 VerifiedTransaction(VerifiedTransaction&& _t): transaction(std::move(_t.transaction)) {} 118 119 VerifiedTransaction(VerifiedTransaction const&) = delete; 120 VerifiedTransaction& operator=(VerifiedTransaction const&) = delete; 121 122 Transaction transaction; ///< Transaction data 123 }; 124 125 /// Transaction pending verification 126 struct UnverifiedTransaction 127 { UnverifiedTransactionUnverifiedTransaction128 UnverifiedTransaction() {} UnverifiedTransactionUnverifiedTransaction129 UnverifiedTransaction(bytesConstRef const& _t, h512 const& _nodeId): transaction(_t.toBytes()), nodeId(_nodeId) {} UnverifiedTransactionUnverifiedTransaction130 UnverifiedTransaction(UnverifiedTransaction&& _t): transaction(std::move(_t.transaction)), nodeId(std::move(_t.nodeId)) {} 131 UnverifiedTransaction& operator=(UnverifiedTransaction&& _other) 132 { 133 assert(&_other != this); 134 135 transaction = std::move(_other.transaction); 136 nodeId = std::move(_other.nodeId); 137 return *this; 138 } 139 140 UnverifiedTransaction(UnverifiedTransaction const&) = delete; 141 UnverifiedTransaction& operator=(UnverifiedTransaction const&) = delete; 142 143 bytes transaction; ///< RLP encoded transaction data 144 h512 nodeId; ///< Network Id of the peer transaction comes from 145 }; 146 147 struct PriorityCompare 148 { 149 TransactionQueue& queue; 150 /// Compare transaction by nonce height and gas price. operatorPriorityCompare151 bool operator()(VerifiedTransaction const& _first, VerifiedTransaction const& _second) const 152 { 153 u256 const& height1 = _first.transaction.nonce() - queue.m_currentByAddressAndNonce[_first.transaction.sender()].begin()->first; 154 u256 const& height2 = _second.transaction.nonce() - queue.m_currentByAddressAndNonce[_second.transaction.sender()].begin()->first; 155 return height1 < height2 || (height1 == height2 && _first.transaction.gasPrice() > _second.transaction.gasPrice()); 156 } 157 }; 158 159 // Use a set with dynamic comparator for minmax priority queue. The comparator takes into account min account nonce. Updating it does not affect the order. 160 using PriorityQueue = std::multiset<VerifiedTransaction, PriorityCompare>; 161 162 ImportResult import(bytesConstRef _tx, IfDropped _ik = IfDropped::Ignore); 163 ImportResult check_WITH_LOCK(h256 const& _h, IfDropped _ik); 164 ImportResult manageImport_WITH_LOCK(h256 const& _h, Transaction const& _transaction); 165 166 void insertCurrent_WITH_LOCK(std::pair<h256, Transaction> const& _p); 167 void makeCurrent_WITH_LOCK(Transaction const& _t); 168 bool remove_WITH_LOCK(h256 const& _txHash); 169 u256 maxNonce_WITH_LOCK(Address const& _a) const; 170 void verifierBody(); 171 172 mutable SharedMutex m_lock; ///< General lock. 173 h256Hash m_known; ///< Headers of transactions in both sets. 174 175 std::unordered_map<h256, std::function<void(ImportResult)>> m_callbacks; ///< Called once. 176 177 ///< Transactions that have previously been dropped. We technically only need to store the tx 178 ///< hash, but we also store bool as a placeholder value so that we can use an LRU cache to cap 179 ///< the number of transaction hashes stored. 180 LruCache<h256, bool> m_dropped; 181 182 PriorityQueue m_current; 183 std::unordered_map<h256, PriorityQueue::iterator> m_currentByHash; ///< Transaction hash to set ref 184 std::unordered_map<Address, std::map<u256, PriorityQueue::iterator>> m_currentByAddressAndNonce; ///< Transactions grouped by account and nonce 185 std::unordered_map<Address, std::map<u256, VerifiedTransaction>> m_future; /// Future transactions 186 187 Signal<> m_onReady; ///< Called when a subsequent call to import transactions will return a non-empty container. Be nice and exit fast. 188 Signal<ImportResult, h256 const&, h512 const&> m_onImport; ///< Called for each import attempt. Arguments are result, transaction id an node id. Be nice and exit fast. 189 Signal<h256 const&> m_onReplaced; ///< Called whan transction is dropped during a call to import() to make room for another transaction. 190 unsigned m_limit; ///< Max number of pending transactions 191 unsigned m_futureLimit; ///< Max number of future transactions 192 unsigned m_futureSize = 0; ///< Current number of future transactions 193 194 std::condition_variable m_queueReady; ///< Signaled when m_unverified has a new entry. 195 std::vector<std::thread> m_verifiers; 196 std::deque<UnverifiedTransaction> m_unverified; ///< Pending verification queue 197 mutable Mutex x_queue; ///< Verification queue mutex 198 std::atomic<bool> m_aborting = {false}; ///< Exit condition for verifier. 199 200 Logger m_logger{createLogger(VerbosityInfo, "tq")}; 201 Logger m_loggerDetail{createLogger(VerbosityDebug, "tq")}; 202 }; 203 204 } 205 } 206 207