1 // Aleth: Ethereum C++ client, tools and libraries.
2 // Copyright 2014-2019 Aleth Authors.
3 // Licensed under the GNU General Public License, Version 3.
4
5 #include "TransactionQueue.h"
6
7 #include <libdevcore/Log.h>
8 #include <libethcore/Exceptions.h>
9 #include "Transaction.h"
10 using namespace std;
11 using namespace dev;
12 using namespace dev::eth;
13
14 namespace
15 {
16 constexpr size_t c_maxVerificationQueueSize = 8192;
17 constexpr size_t c_maxDroppedTransactionCount = 1024;
18 } // namespace
19
TransactionQueue(unsigned _limit,unsigned _futureLimit)20 TransactionQueue::TransactionQueue(unsigned _limit, unsigned _futureLimit)
21 : m_dropped{c_maxDroppedTransactionCount},
22 m_current{PriorityCompare{*this}},
23 m_limit{_limit},
24 m_futureLimit{_futureLimit}
25 {
26 unsigned verifierThreads = std::max(thread::hardware_concurrency(), 3U) - 2U;
27 for (unsigned i = 0; i < verifierThreads; ++i)
__anonaa47edd90202()28 m_verifiers.emplace_back([=](){
29 setThreadName("txcheck" + toString(i));
30 this->verifierBody();
31 });
32 }
33
~TransactionQueue()34 TransactionQueue::~TransactionQueue()
35 {
36 DEV_GUARDED(x_queue)
37 m_aborting = true;
38 m_queueReady.notify_all();
39 for (auto& i: m_verifiers)
40 i.join();
41 }
42
import(bytesConstRef _transactionRLP,IfDropped _ik)43 ImportResult TransactionQueue::import(bytesConstRef _transactionRLP, IfDropped _ik)
44 {
45 try
46 {
47 Transaction t = Transaction(_transactionRLP, CheckTransaction::Everything);
48 return import(t, _ik);
49 }
50 catch (Exception const&)
51 {
52 return ImportResult::Malformed;
53 }
54 }
55
check_WITH_LOCK(h256 const & _h,IfDropped _ik)56 ImportResult TransactionQueue::check_WITH_LOCK(h256 const& _h, IfDropped _ik)
57 {
58 if (m_known.count(_h))
59 return ImportResult::AlreadyKnown;
60
61 if (m_dropped.touch(_h) && _ik == IfDropped::Ignore)
62 return ImportResult::AlreadyInChain;
63
64 return ImportResult::Success;
65 }
66
import(Transaction const & _transaction,IfDropped _ik)67 ImportResult TransactionQueue::import(Transaction const& _transaction, IfDropped _ik)
68 {
69 if (_transaction.hasZeroSignature())
70 return ImportResult::ZeroSignature;
71 // Check if we already know this transaction.
72 h256 h = _transaction.sha3(WithSignature);
73
74 ImportResult ret;
75 {
76 UpgradableGuard l(m_lock);
77 auto ir = check_WITH_LOCK(h, _ik);
78 if (ir != ImportResult::Success)
79 return ir;
80
81 {
82 _transaction.safeSender(); // Perform EC recovery outside of the write lock
83 UpgradeGuard ul(l);
84 ret = manageImport_WITH_LOCK(h, _transaction);
85 }
86 }
87 return ret;
88 }
89
topTransactions(unsigned _limit,h256Hash const & _avoid) const90 Transactions TransactionQueue::topTransactions(unsigned _limit, h256Hash const& _avoid) const
91 {
92 ReadGuard l(m_lock);
93 Transactions ret;
94 for (auto t = m_current.begin(); ret.size() < _limit && t != m_current.end(); ++t)
95 if (!_avoid.count(t->transaction.sha3()))
96 ret.push_back(t->transaction);
97 return ret;
98 }
99
knownTransactions() const100 h256Hash TransactionQueue::knownTransactions() const
101 {
102 ReadGuard l(m_lock);
103 return m_known;
104 }
105
manageImport_WITH_LOCK(h256 const & _h,Transaction const & _transaction)106 ImportResult TransactionQueue::manageImport_WITH_LOCK(h256 const& _h, Transaction const& _transaction)
107 {
108 try
109 {
110 assert(_h == _transaction.sha3());
111 // Remove any prior transaction with the same nonce but a lower gas price.
112 // Bomb out if there's a prior transaction with higher gas price.
113 auto cs = m_currentByAddressAndNonce.find(_transaction.from());
114 if (cs != m_currentByAddressAndNonce.end())
115 {
116 auto t = cs->second.find(_transaction.nonce());
117 if (t != cs->second.end())
118 {
119 if (_transaction.gasPrice() < (*t->second).transaction.gasPrice())
120 return ImportResult::OverbidGasPrice;
121 else
122 {
123 h256 dropped = (*t->second).transaction.sha3();
124 remove_WITH_LOCK(dropped);
125 m_onReplaced(dropped);
126 }
127 }
128 }
129 auto fs = m_future.find(_transaction.from());
130 if (fs != m_future.end())
131 {
132 auto t = fs->second.find(_transaction.nonce());
133 if (t != fs->second.end())
134 {
135 if (_transaction.gasPrice() < t->second.transaction.gasPrice())
136 return ImportResult::OverbidGasPrice;
137 else
138 {
139 fs->second.erase(t);
140 --m_futureSize;
141 if (fs->second.empty())
142 m_future.erase(fs);
143 }
144 }
145 }
146 // If valid, append to transactions.
147 insertCurrent_WITH_LOCK(make_pair(_h, _transaction));
148 LOG(m_loggerDetail) << "Queued vaguely legit-looking transaction " << _h;
149
150 while (m_current.size() > m_limit)
151 {
152 LOG(m_loggerDetail) << "Dropping out of bounds transaction " << _h;
153 remove_WITH_LOCK(m_current.rbegin()->transaction.sha3());
154 }
155
156 m_onReady();
157 }
158 catch (Exception const& _e)
159 {
160 LOG(m_loggerDetail) << "Ignoring invalid transaction: " << diagnostic_information(_e);
161 return ImportResult::Malformed;
162 }
163 catch (std::exception const& _e)
164 {
165 LOG(m_loggerDetail) << "Ignoring invalid transaction: " << _e.what();
166 return ImportResult::Malformed;
167 }
168
169 return ImportResult::Success;
170 }
171
maxNonce(Address const & _a) const172 u256 TransactionQueue::maxNonce(Address const& _a) const
173 {
174 ReadGuard l(m_lock);
175 return maxNonce_WITH_LOCK(_a);
176 }
177
maxNonce_WITH_LOCK(Address const & _a) const178 u256 TransactionQueue::maxNonce_WITH_LOCK(Address const& _a) const
179 {
180 u256 ret = 0;
181 auto cs = m_currentByAddressAndNonce.find(_a);
182 if (cs != m_currentByAddressAndNonce.end() && !cs->second.empty())
183 ret = cs->second.rbegin()->first + 1;
184 auto fs = m_future.find(_a);
185 if (fs != m_future.end() && !fs->second.empty())
186 ret = std::max(ret, fs->second.rbegin()->first + 1);
187 return ret;
188 }
189
insertCurrent_WITH_LOCK(std::pair<h256,Transaction> const & _p)190 void TransactionQueue::insertCurrent_WITH_LOCK(std::pair<h256, Transaction> const& _p)
191 {
192 if (m_currentByHash.count(_p.first))
193 {
194 cwarn << "Transaction hash" << _p.first << "already in current?!";
195 return;
196 }
197
198 Transaction const& t = _p.second;
199 // Insert into current
200 auto inserted = m_currentByAddressAndNonce[t.from()].insert(std::make_pair(t.nonce(), PriorityQueue::iterator()));
201 PriorityQueue::iterator handle = m_current.emplace(VerifiedTransaction(t));
202 inserted.first->second = handle;
203 m_currentByHash[_p.first] = handle;
204
205 // Move following transactions from future to current
206 makeCurrent_WITH_LOCK(t);
207 m_known.insert(_p.first);
208 }
209
remove_WITH_LOCK(h256 const & _txHash)210 bool TransactionQueue::remove_WITH_LOCK(h256 const& _txHash)
211 {
212 auto t = m_currentByHash.find(_txHash);
213 if (t == m_currentByHash.end())
214 return false;
215
216 Address from = (*t->second).transaction.from();
217 auto it = m_currentByAddressAndNonce.find(from);
218 assert (it != m_currentByAddressAndNonce.end());
219 it->second.erase((*t->second).transaction.nonce());
220 m_current.erase(t->second);
221 m_currentByHash.erase(t);
222 if (it->second.empty())
223 m_currentByAddressAndNonce.erase(it);
224 m_known.erase(_txHash);
225 return true;
226 }
227
waiting(Address const & _a) const228 unsigned TransactionQueue::waiting(Address const& _a) const
229 {
230 ReadGuard l(m_lock);
231 unsigned ret = 0;
232 auto cs = m_currentByAddressAndNonce.find(_a);
233 if (cs != m_currentByAddressAndNonce.end())
234 ret = cs->second.size();
235 auto fs = m_future.find(_a);
236 if (fs != m_future.end())
237 ret += fs->second.size();
238 return ret;
239 }
240
setFuture(h256 const & _txHash)241 void TransactionQueue::setFuture(h256 const& _txHash)
242 {
243 WriteGuard l(m_lock);
244 auto it = m_currentByHash.find(_txHash);
245 if (it == m_currentByHash.end())
246 return;
247
248 VerifiedTransaction const& st = *(it->second);
249
250 Address from = st.transaction.from();
251 auto& queue = m_currentByAddressAndNonce[from];
252 auto& target = m_future[from];
253 auto cutoff = queue.lower_bound(st.transaction.nonce());
254 for (auto m = cutoff; m != queue.end(); ++m)
255 {
256 VerifiedTransaction& t = const_cast<VerifiedTransaction&>(*(m->second)); // set has only const iterators. Since we are moving out of container that's fine
257 m_currentByHash.erase(t.transaction.sha3());
258 target.emplace(t.transaction.nonce(), move(t));
259 m_current.erase(m->second);
260 ++m_futureSize;
261 }
262 queue.erase(cutoff, queue.end());
263 if (queue.empty())
264 m_currentByAddressAndNonce.erase(from);
265 }
266
makeCurrent_WITH_LOCK(Transaction const & _t)267 void TransactionQueue::makeCurrent_WITH_LOCK(Transaction const& _t)
268 {
269 bool newCurrent = false;
270 auto fs = m_future.find(_t.from());
271 if (fs != m_future.end())
272 {
273 u256 nonce = _t.nonce() + 1;
274 auto fb = fs->second.find(nonce);
275 if (fb != fs->second.end())
276 {
277 auto ft = fb;
278 while (ft != fs->second.end() && ft->second.transaction.nonce() == nonce)
279 {
280 auto inserted = m_currentByAddressAndNonce[_t.from()].insert(std::make_pair(ft->second.transaction.nonce(), PriorityQueue::iterator()));
281 PriorityQueue::iterator handle = m_current.emplace(move(ft->second));
282 inserted.first->second = handle;
283 m_currentByHash[(*handle).transaction.sha3()] = handle;
284 --m_futureSize;
285 ++ft;
286 ++nonce;
287 newCurrent = true;
288 }
289 fs->second.erase(fb, ft);
290 if (fs->second.empty())
291 m_future.erase(_t.from());
292 }
293 }
294
295 while (m_futureSize > m_futureLimit)
296 {
297 // TODO: priority queue for future transactions
298 // For now just drop random chain end
299 --m_futureSize;
300 LOG(m_loggerDetail) << "Dropping out of bounds future transaction "
301 << m_future.begin()->second.rbegin()->second.transaction.sha3();
302 m_future.begin()->second.erase(--m_future.begin()->second.end());
303 if (m_future.begin()->second.empty())
304 m_future.erase(m_future.begin());
305 }
306
307 if (newCurrent)
308 m_onReady();
309 }
310
drop(h256 const & _txHash)311 void TransactionQueue::drop(h256 const& _txHash)
312 {
313 UpgradableGuard l(m_lock);
314
315 if (!m_known.count(_txHash))
316 return;
317
318 UpgradeGuard ul(l);
319 m_dropped.insert(_txHash, true /* placeholder value */);
320 remove_WITH_LOCK(_txHash);
321 }
322
dropGood(Transaction const & _t)323 void TransactionQueue::dropGood(Transaction const& _t)
324 {
325 WriteGuard l(m_lock);
326 makeCurrent_WITH_LOCK(_t);
327 if (!m_known.count(_t.sha3()))
328 return;
329 remove_WITH_LOCK(_t.sha3());
330 }
331
clear()332 void TransactionQueue::clear()
333 {
334 WriteGuard l(m_lock);
335 m_known.clear();
336 m_current.clear();
337 m_dropped.clear();
338 m_currentByAddressAndNonce.clear();
339 m_currentByHash.clear();
340 m_future.clear();
341 m_futureSize = 0;
342 }
343
enqueue(RLP const & _data,h512 const & _nodeId)344 void TransactionQueue::enqueue(RLP const& _data, h512 const& _nodeId)
345 {
346 bool queued = false;
347 {
348 Guard l(x_queue);
349 unsigned itemCount = _data.itemCount();
350 for (unsigned i = 0; i < itemCount; ++i)
351 {
352 if (m_unverified.size() >= c_maxVerificationQueueSize)
353 {
354 LOG(m_logger) << "Transaction verification queue is full. Dropping "
355 << itemCount - i << " transactions";
356 break;
357 }
358 m_unverified.emplace_back(UnverifiedTransaction(_data[i].data(), _nodeId));
359 queued = true;
360 }
361 }
362 if (queued)
363 m_queueReady.notify_all();
364 }
365
verifierBody()366 void TransactionQueue::verifierBody()
367 {
368 while (!m_aborting)
369 {
370 UnverifiedTransaction work;
371
372 {
373 unique_lock<Mutex> l(x_queue);
374 m_queueReady.wait(l, [&](){ return !m_unverified.empty() || m_aborting; });
375 if (m_aborting)
376 return;
377 work = move(m_unverified.front());
378 m_unverified.pop_front();
379 }
380
381 try
382 {
383 Transaction t(work.transaction, CheckTransaction::Cheap); //Signature will be checked later
384 ImportResult ir = import(t);
385 m_onImport(ir, t.sha3(), work.nodeId);
386 }
387 catch (...)
388 {
389 // should not happen as exceptions are handled in import.
390 cwarn << "Bad transaction:" << boost::current_exception_diagnostic_information();
391 }
392 }
393 }
394