1 // Aleth: Ethereum C++ client, tools and libraries.
2 // Copyright 2014-2019 Aleth Authors.
3 // Licensed under the GNU General Public License, Version 3.
4 
5 #include "TransactionQueue.h"
6 
7 #include <libdevcore/Log.h>
8 #include <libethcore/Exceptions.h>
9 #include "Transaction.h"
10 using namespace std;
11 using namespace dev;
12 using namespace dev::eth;
13 
14 namespace
15 {
16 constexpr size_t c_maxVerificationQueueSize = 8192;
17 constexpr size_t c_maxDroppedTransactionCount = 1024;
18 }  // namespace
19 
TransactionQueue(unsigned _limit,unsigned _futureLimit)20 TransactionQueue::TransactionQueue(unsigned _limit, unsigned _futureLimit)
21   : m_dropped{c_maxDroppedTransactionCount},
22     m_current{PriorityCompare{*this}},
23     m_limit{_limit},
24     m_futureLimit{_futureLimit}
25 {
26     unsigned verifierThreads = std::max(thread::hardware_concurrency(), 3U) - 2U;
27     for (unsigned i = 0; i < verifierThreads; ++i)
__anonaa47edd90202()28         m_verifiers.emplace_back([=](){
29             setThreadName("txcheck" + toString(i));
30             this->verifierBody();
31         });
32 }
33 
~TransactionQueue()34 TransactionQueue::~TransactionQueue()
35 {
36     DEV_GUARDED(x_queue)
37         m_aborting = true;
38     m_queueReady.notify_all();
39     for (auto& i: m_verifiers)
40         i.join();
41 }
42 
import(bytesConstRef _transactionRLP,IfDropped _ik)43 ImportResult TransactionQueue::import(bytesConstRef _transactionRLP, IfDropped _ik)
44 {
45     try
46     {
47         Transaction t = Transaction(_transactionRLP, CheckTransaction::Everything);
48         return import(t, _ik);
49     }
50     catch (Exception const&)
51     {
52         return ImportResult::Malformed;
53     }
54 }
55 
check_WITH_LOCK(h256 const & _h,IfDropped _ik)56 ImportResult TransactionQueue::check_WITH_LOCK(h256 const& _h, IfDropped _ik)
57 {
58     if (m_known.count(_h))
59         return ImportResult::AlreadyKnown;
60 
61     if (m_dropped.touch(_h) && _ik == IfDropped::Ignore)
62         return ImportResult::AlreadyInChain;
63 
64     return ImportResult::Success;
65 }
66 
import(Transaction const & _transaction,IfDropped _ik)67 ImportResult TransactionQueue::import(Transaction const& _transaction, IfDropped _ik)
68 {
69     if (_transaction.hasZeroSignature())
70         return ImportResult::ZeroSignature;
71     // Check if we already know this transaction.
72     h256 h = _transaction.sha3(WithSignature);
73 
74     ImportResult ret;
75     {
76         UpgradableGuard l(m_lock);
77         auto ir = check_WITH_LOCK(h, _ik);
78         if (ir != ImportResult::Success)
79             return ir;
80 
81         {
82             _transaction.safeSender();  // Perform EC recovery outside of the write lock
83             UpgradeGuard ul(l);
84             ret = manageImport_WITH_LOCK(h, _transaction);
85         }
86     }
87     return ret;
88 }
89 
topTransactions(unsigned _limit,h256Hash const & _avoid) const90 Transactions TransactionQueue::topTransactions(unsigned _limit, h256Hash const& _avoid) const
91 {
92     ReadGuard l(m_lock);
93     Transactions ret;
94     for (auto t = m_current.begin(); ret.size() < _limit && t != m_current.end(); ++t)
95         if (!_avoid.count(t->transaction.sha3()))
96             ret.push_back(t->transaction);
97     return ret;
98 }
99 
knownTransactions() const100 h256Hash TransactionQueue::knownTransactions() const
101 {
102     ReadGuard l(m_lock);
103     return m_known;
104 }
105 
manageImport_WITH_LOCK(h256 const & _h,Transaction const & _transaction)106 ImportResult TransactionQueue::manageImport_WITH_LOCK(h256 const& _h, Transaction const& _transaction)
107 {
108     try
109     {
110         assert(_h == _transaction.sha3());
111         // Remove any prior transaction with the same nonce but a lower gas price.
112         // Bomb out if there's a prior transaction with higher gas price.
113         auto cs = m_currentByAddressAndNonce.find(_transaction.from());
114         if (cs != m_currentByAddressAndNonce.end())
115         {
116             auto t = cs->second.find(_transaction.nonce());
117             if (t != cs->second.end())
118             {
119                 if (_transaction.gasPrice() < (*t->second).transaction.gasPrice())
120                     return ImportResult::OverbidGasPrice;
121                 else
122                 {
123                     h256 dropped = (*t->second).transaction.sha3();
124                     remove_WITH_LOCK(dropped);
125                     m_onReplaced(dropped);
126                 }
127             }
128         }
129         auto fs = m_future.find(_transaction.from());
130         if (fs != m_future.end())
131         {
132             auto t = fs->second.find(_transaction.nonce());
133             if (t != fs->second.end())
134             {
135                 if (_transaction.gasPrice() < t->second.transaction.gasPrice())
136                     return ImportResult::OverbidGasPrice;
137                 else
138                 {
139                     fs->second.erase(t);
140                     --m_futureSize;
141                     if (fs->second.empty())
142                         m_future.erase(fs);
143                 }
144             }
145         }
146         // If valid, append to transactions.
147         insertCurrent_WITH_LOCK(make_pair(_h, _transaction));
148         LOG(m_loggerDetail) << "Queued vaguely legit-looking transaction " << _h;
149 
150         while (m_current.size() > m_limit)
151         {
152             LOG(m_loggerDetail) << "Dropping out of bounds transaction " << _h;
153             remove_WITH_LOCK(m_current.rbegin()->transaction.sha3());
154         }
155 
156         m_onReady();
157     }
158     catch (Exception const& _e)
159     {
160         LOG(m_loggerDetail) << "Ignoring invalid transaction: " << diagnostic_information(_e);
161         return ImportResult::Malformed;
162     }
163     catch (std::exception const& _e)
164     {
165         LOG(m_loggerDetail) << "Ignoring invalid transaction: " << _e.what();
166         return ImportResult::Malformed;
167     }
168 
169     return ImportResult::Success;
170 }
171 
maxNonce(Address const & _a) const172 u256 TransactionQueue::maxNonce(Address const& _a) const
173 {
174     ReadGuard l(m_lock);
175     return maxNonce_WITH_LOCK(_a);
176 }
177 
maxNonce_WITH_LOCK(Address const & _a) const178 u256 TransactionQueue::maxNonce_WITH_LOCK(Address const& _a) const
179 {
180     u256 ret = 0;
181     auto cs = m_currentByAddressAndNonce.find(_a);
182     if (cs != m_currentByAddressAndNonce.end() && !cs->second.empty())
183         ret = cs->second.rbegin()->first + 1;
184     auto fs = m_future.find(_a);
185     if (fs != m_future.end() && !fs->second.empty())
186         ret = std::max(ret, fs->second.rbegin()->first + 1);
187     return ret;
188 }
189 
insertCurrent_WITH_LOCK(std::pair<h256,Transaction> const & _p)190 void TransactionQueue::insertCurrent_WITH_LOCK(std::pair<h256, Transaction> const& _p)
191 {
192     if (m_currentByHash.count(_p.first))
193     {
194         cwarn << "Transaction hash" << _p.first << "already in current?!";
195         return;
196     }
197 
198     Transaction const& t = _p.second;
199     // Insert into current
200     auto inserted = m_currentByAddressAndNonce[t.from()].insert(std::make_pair(t.nonce(), PriorityQueue::iterator()));
201     PriorityQueue::iterator handle = m_current.emplace(VerifiedTransaction(t));
202     inserted.first->second = handle;
203     m_currentByHash[_p.first] = handle;
204 
205     // Move following transactions from future to current
206     makeCurrent_WITH_LOCK(t);
207     m_known.insert(_p.first);
208 }
209 
remove_WITH_LOCK(h256 const & _txHash)210 bool TransactionQueue::remove_WITH_LOCK(h256 const& _txHash)
211 {
212     auto t = m_currentByHash.find(_txHash);
213     if (t == m_currentByHash.end())
214         return false;
215 
216     Address from = (*t->second).transaction.from();
217     auto it = m_currentByAddressAndNonce.find(from);
218     assert (it != m_currentByAddressAndNonce.end());
219     it->second.erase((*t->second).transaction.nonce());
220     m_current.erase(t->second);
221     m_currentByHash.erase(t);
222     if (it->second.empty())
223         m_currentByAddressAndNonce.erase(it);
224     m_known.erase(_txHash);
225     return true;
226 }
227 
waiting(Address const & _a) const228 unsigned TransactionQueue::waiting(Address const& _a) const
229 {
230     ReadGuard l(m_lock);
231     unsigned ret = 0;
232     auto cs = m_currentByAddressAndNonce.find(_a);
233     if (cs != m_currentByAddressAndNonce.end())
234         ret = cs->second.size();
235     auto fs = m_future.find(_a);
236     if (fs != m_future.end())
237         ret += fs->second.size();
238     return ret;
239 }
240 
setFuture(h256 const & _txHash)241 void TransactionQueue::setFuture(h256 const& _txHash)
242 {
243     WriteGuard l(m_lock);
244     auto it = m_currentByHash.find(_txHash);
245     if (it == m_currentByHash.end())
246         return;
247 
248     VerifiedTransaction const& st = *(it->second);
249 
250     Address from = st.transaction.from();
251     auto& queue = m_currentByAddressAndNonce[from];
252     auto& target = m_future[from];
253     auto cutoff = queue.lower_bound(st.transaction.nonce());
254     for (auto m = cutoff; m != queue.end(); ++m)
255     {
256         VerifiedTransaction& t = const_cast<VerifiedTransaction&>(*(m->second)); // set has only const iterators. Since we are moving out of container that's fine
257         m_currentByHash.erase(t.transaction.sha3());
258         target.emplace(t.transaction.nonce(), move(t));
259         m_current.erase(m->second);
260         ++m_futureSize;
261     }
262     queue.erase(cutoff, queue.end());
263     if (queue.empty())
264         m_currentByAddressAndNonce.erase(from);
265 }
266 
makeCurrent_WITH_LOCK(Transaction const & _t)267 void TransactionQueue::makeCurrent_WITH_LOCK(Transaction const& _t)
268 {
269     bool newCurrent = false;
270     auto fs = m_future.find(_t.from());
271     if (fs != m_future.end())
272     {
273         u256 nonce = _t.nonce() + 1;
274         auto fb = fs->second.find(nonce);
275         if (fb != fs->second.end())
276         {
277             auto ft = fb;
278             while (ft != fs->second.end() && ft->second.transaction.nonce() == nonce)
279             {
280                 auto inserted = m_currentByAddressAndNonce[_t.from()].insert(std::make_pair(ft->second.transaction.nonce(), PriorityQueue::iterator()));
281                 PriorityQueue::iterator handle = m_current.emplace(move(ft->second));
282                 inserted.first->second = handle;
283                 m_currentByHash[(*handle).transaction.sha3()] = handle;
284                 --m_futureSize;
285                 ++ft;
286                 ++nonce;
287                 newCurrent = true;
288             }
289             fs->second.erase(fb, ft);
290             if (fs->second.empty())
291                 m_future.erase(_t.from());
292         }
293     }
294 
295     while (m_futureSize > m_futureLimit)
296     {
297         // TODO: priority queue for future transactions
298         // For now just drop random chain end
299         --m_futureSize;
300         LOG(m_loggerDetail) << "Dropping out of bounds future transaction "
301                             << m_future.begin()->second.rbegin()->second.transaction.sha3();
302         m_future.begin()->second.erase(--m_future.begin()->second.end());
303         if (m_future.begin()->second.empty())
304             m_future.erase(m_future.begin());
305     }
306 
307     if (newCurrent)
308         m_onReady();
309 }
310 
drop(h256 const & _txHash)311 void TransactionQueue::drop(h256 const& _txHash)
312 {
313     UpgradableGuard l(m_lock);
314 
315     if (!m_known.count(_txHash))
316         return;
317 
318     UpgradeGuard ul(l);
319     m_dropped.insert(_txHash, true /* placeholder value */);
320     remove_WITH_LOCK(_txHash);
321 }
322 
dropGood(Transaction const & _t)323 void TransactionQueue::dropGood(Transaction const& _t)
324 {
325     WriteGuard l(m_lock);
326     makeCurrent_WITH_LOCK(_t);
327     if (!m_known.count(_t.sha3()))
328         return;
329     remove_WITH_LOCK(_t.sha3());
330 }
331 
clear()332 void TransactionQueue::clear()
333 {
334     WriteGuard l(m_lock);
335     m_known.clear();
336     m_current.clear();
337     m_dropped.clear();
338     m_currentByAddressAndNonce.clear();
339     m_currentByHash.clear();
340     m_future.clear();
341     m_futureSize = 0;
342 }
343 
enqueue(RLP const & _data,h512 const & _nodeId)344 void TransactionQueue::enqueue(RLP const& _data, h512 const& _nodeId)
345 {
346     bool queued = false;
347     {
348         Guard l(x_queue);
349         unsigned itemCount = _data.itemCount();
350         for (unsigned i = 0; i < itemCount; ++i)
351         {
352             if (m_unverified.size() >= c_maxVerificationQueueSize)
353             {
354                 LOG(m_logger) << "Transaction verification queue is full. Dropping "
355                               << itemCount - i << " transactions";
356                 break;
357             }
358             m_unverified.emplace_back(UnverifiedTransaction(_data[i].data(), _nodeId));
359             queued = true;
360         }
361     }
362     if (queued)
363         m_queueReady.notify_all();
364 }
365 
verifierBody()366 void TransactionQueue::verifierBody()
367 {
368     while (!m_aborting)
369     {
370         UnverifiedTransaction work;
371 
372         {
373             unique_lock<Mutex> l(x_queue);
374             m_queueReady.wait(l, [&](){ return !m_unverified.empty() || m_aborting; });
375             if (m_aborting)
376                 return;
377             work = move(m_unverified.front());
378             m_unverified.pop_front();
379         }
380 
381         try
382         {
383             Transaction t(work.transaction, CheckTransaction::Cheap); //Signature will be checked later
384             ImportResult ir = import(t);
385             m_onImport(ir, t.sha3(), work.nodeId);
386         }
387         catch (...)
388         {
389             // should not happen as exceptions are handled in import.
390             cwarn << "Bad transaction:" << boost::current_exception_diagnostic_information();
391         }
392     }
393 }
394