1 // Aleth: Ethereum C++ client, tools and libraries.
2 // Copyright 2015-2019 Aleth Authors.
3 // Licensed under the GNU General Public License, Version 3.
4 
5 #include "BlockChainSync.h"
6 
7 #include "BlockChain.h"
8 #include "BlockQueue.h"
9 #include "EthereumCapability.h"
10 #include <libdevcore/Common.h>
11 #include <libdevcore/TrieHash.h>
12 #include <libethcore/Exceptions.h>
13 #include <libp2p/Host.h>
14 #include <libp2p/Session.h>
15 #include <chrono>
16 
17 using namespace std;
18 using namespace dev;
19 using namespace dev::eth;
20 
operator <<(std::ostream & _out,SyncStatus const & _sync)21 std::ostream& dev::eth::operator<<(std::ostream& _out, SyncStatus const& _sync)
22 {
23     _out << "protocol: " << _sync.protocolVersion << endl;
24     _out << "state: " << EthereumCapability::stateName(_sync.state) << " ";
25     if (_sync.state == SyncState::Blocks)
26         _out << _sync.currentBlockNumber << "/" << _sync.highestBlockNumber;
27     return _out;
28 }
29 
30 namespace  // Helper functions.
31 {
32 
33 constexpr unsigned c_maxPeerUknownNewBlocks = 1024; /// Max number of unknown new blocks peer can give us
34 constexpr unsigned c_maxRequestHeaders = 1024;
35 constexpr unsigned c_maxRequestBodies = 1024;
36 
haveItem(std::map<unsigned,T> & _container,unsigned _number)37 template<typename T> bool haveItem(std::map<unsigned, T>& _container, unsigned _number)
38 {
39     if (_container.empty())
40         return false;
41     auto lower = _container.lower_bound(_number);
42     if (lower != _container.end() && lower->first == _number)
43         return true;
44     if (lower ==  _container.begin())
45         return false;
46     --lower;
47     return lower->first <= _number && (lower->first + lower->second.size()) > _number;
48 }
49 
findItem(std::map<unsigned,std::vector<T>> & _container,unsigned _number)50 template<typename T> T const* findItem(std::map<unsigned, std::vector<T>>& _container, unsigned _number)
51 {
52     if (_container.empty())
53         return nullptr;
54     auto lower = _container.lower_bound(_number);
55     if (lower != _container.end() && lower->first == _number)
56         return &(*lower->second.begin());
57     if (lower ==  _container.begin())
58         return nullptr;
59     --lower;
60     if (lower->first <= _number && (lower->first + lower->second.size()) > _number)
61         return &lower->second.at(_number - lower->first);
62     return nullptr;
63 }
64 
removeItem(std::map<unsigned,std::vector<T>> & _container,unsigned _number)65 template<typename T> void removeItem(std::map<unsigned, std::vector<T>>& _container, unsigned _number)
66 {
67     if (_container.empty())
68         return;
69     auto lower = _container.lower_bound(_number);
70     if (lower != _container.end() && lower->first == _number)
71     {
72         _container.erase(lower);
73         return;
74     }
75     if (lower ==  _container.begin())
76         return;
77     --lower;
78     if (lower->first <= _number && (lower->first + lower->second.size()) > _number)
79         lower->second.erase(lower->second.begin() + (_number - lower->first), lower->second.end());
80 }
81 
removeAllStartingWith(std::map<unsigned,std::vector<T>> & _container,unsigned _number)82 template<typename T> void removeAllStartingWith(std::map<unsigned, std::vector<T>>& _container, unsigned _number)
83 {
84     if (_container.empty())
85         return;
86     auto lower = _container.lower_bound(_number);
87     if (lower != _container.end() && lower->first == _number)
88     {
89         _container.erase(lower, _container.end());
90         return;
91     }
92     if (lower == _container.begin())
93     {
94         _container.clear();
95         return;
96     }
97     --lower;
98     if (lower->first <= _number && (lower->first + lower->second.size()) > _number)
99         lower->second.erase(lower->second.begin() + (_number - lower->first), lower->second.end());
100     _container.erase(++lower, _container.end());
101 }
102 
mergeInto(std::map<unsigned,std::vector<T>> & _container,unsigned _number,T && _data)103 template<typename T> void mergeInto(std::map<unsigned, std::vector<T>>& _container, unsigned _number, T&& _data)
104 {
105     assert(!haveItem(_container, _number));
106     auto lower = _container.lower_bound(_number);
107     if (!_container.empty() && lower != _container.begin())
108         --lower;
109     if (lower != _container.end() && (lower->first + lower->second.size() == _number))
110     {
111         // extend existing chunk
112         lower->second.emplace_back(_data);
113 
114         auto next = lower;
115         ++next;
116         if (next != _container.end() && (lower->first + lower->second.size() == next->first))
117         {
118             // merge with the next chunk
119             std::move(next->second.begin(), next->second.end(), std::back_inserter(lower->second));
120             _container.erase(next);
121         }
122 
123     }
124     else
125     {
126         // insert a new chunk
127         auto inserted = _container.insert(lower, std::make_pair(_number, std::vector<T> { _data }));
128         auto next = inserted;
129         ++next;
130         if (next != _container.end() && next->first == _number + 1)
131         {
132             std::move(next->second.begin(), next->second.end(), std::back_inserter(inserted->second));
133             _container.erase(next);
134         }
135     }
136 }
137 
138 }  // Anonymous namespace -- helper functions.
139 
BlockChainSync(EthereumCapability & _host)140 BlockChainSync::BlockChainSync(EthereumCapability& _host)
141   : m_host(_host),
142     m_chainStartBlock(_host.chain().chainStartBlockNumber()),
143     m_startingBlock(_host.chain().number()),
144     m_lastImportedBlock(m_startingBlock),
145     m_lastImportedBlockHash(_host.chain().currentHash())
146 {
147     m_bqBlocksDrained = host().bq().onBlocksDrained([this]() {
148         if (isSyncPaused() && !host().bq().knownFull())
149         {
150             // Draining freed up space in the block queue. Let's resume syncing.
151             // Ensure that syncing occurs on the network thread (since the block queue handler is
152             // called on the client thread
153             host().capabilityHost().postWork([this]() {
154                 RecursiveGuard l(x_sync);
155                 m_state = SyncState::Blocks;
156                 continueSync();
157             });
158         }
159     });
160 }
161 
~BlockChainSync()162 BlockChainSync::~BlockChainSync()
163 {
164     RecursiveGuard l(x_sync);
165     abortSync();
166 }
167 
onBlockImported(BlockHeader const & _info)168 void BlockChainSync::onBlockImported(BlockHeader const& _info)
169 {
170     //if a block has been added via mining or other block import function
171     //through RPC, then we should count it as a last imported block
172     RecursiveGuard l(x_sync);
173     if (_info.number() > m_lastImportedBlock)
174     {
175         m_lastImportedBlock = static_cast<unsigned>(_info.number());
176         m_lastImportedBlockHash = _info.hash();
177         m_highestBlock = max(m_lastImportedBlock, m_highestBlock);
178     }
179 }
180 
abortSync()181 void BlockChainSync::abortSync()
182 {
183     RecursiveGuard l(x_sync);
184     resetSync();
185     onPeerAborting();
186 }
187 
onPeerStatus(EthereumPeer const & _peer)188 void BlockChainSync::onPeerStatus(EthereumPeer const& _peer)
189 {
190     RecursiveGuard l(x_sync);
191     DEV_INVARIANT_CHECK;
192 
193     auto peerSessionInfo = m_host.capabilityHost().peerSessionInfo(_peer.id());
194     if (!peerSessionInfo)
195         return; // Expired
196 
197     std::string disconnectReason;
198     if (peerSessionInfo->clientVersion.find("/v0.7.0/") != string::npos)
199         disconnectReason = "Blacklisted client version.";
200     else
201         disconnectReason = _peer.validate(
202             host().chain().genesisHash(), host().protocolVersion(), host().networkId());
203 
204     if (!disconnectReason.empty())
205     {
206         LOG(m_logger) << "Peer " << _peer.id() << " not suitable for sync: " << disconnectReason;
207         m_host.capabilityHost().disconnect(_peer.id(), p2p::UselessPeer);
208         return;
209     }
210 
211     // Before starting to exchange the data with the node, let's verify that it's on our chain
212     if (!requestDaoForkBlockHeader(_peer.id()))
213     {
214         // DAO challenge not needed
215         syncPeer(_peer.id(), false);
216     }
217 }
218 
requestDaoForkBlockHeader(NodeID const & _peerID)219 bool BlockChainSync::requestDaoForkBlockHeader(NodeID const& _peerID)
220 {
221     // DAO challenge
222     u256 const daoHardfork = host().chain().sealEngine()->chainParams().daoHardforkBlock;
223     if (daoHardfork == 0 || daoHardfork == c_infiniteBlockNumber || host().chain().number() < daoHardfork)
224         return false;
225 
226     m_daoChallengedPeers.insert(_peerID);
227     m_host.peer(_peerID).requestBlockHeaders(static_cast<unsigned>(daoHardfork), 1, 0, false);
228     return true;
229 }
230 
syncPeer(NodeID const & _peerID,bool _force)231 void BlockChainSync::syncPeer(NodeID const& _peerID, bool _force)
232 {
233     auto& peer = m_host.peer(_peerID);
234     if (!peer.statusReceived())
235     {
236         LOG(m_loggerDetail) << "Can't sync with peer " << _peerID << " - Status not received yet.";
237         return;
238     }
239 
240     if (peer.isConversing())
241     {
242         LOG(m_loggerDetail) << "Can't sync with peer " << _peerID << " - outstanding asks.";
243         return;
244     }
245 
246     if (isSyncPaused())
247     {
248         LOG(m_loggerDetail) << "Can't sync with peer " << _peerID
249                             << " - sync state is paused. Block queue status: "
250                             << host().bq().status();
251         return;
252     }
253 
254     u256 td = host().chain().details().totalDifficulty;
255     if (host().bq().isActive())
256         td += host().bq().difficulty();
257     u256 const syncingDifficulty = std::max(m_syncingTotalDifficulty, td);
258     u256 const peerTotalDifficulty = peer.totalDifficulty();
259 
260     if (_force || peerTotalDifficulty > syncingDifficulty)
261     {
262         if (peerTotalDifficulty > syncingDifficulty)
263             LOG(m_logger) << "Discovered new highest difficulty (" << peerTotalDifficulty
264                           << ") from peer " << peer.id();
265 
266         // start sync
267         m_syncingTotalDifficulty = peerTotalDifficulty;
268         if (m_state == SyncState::Idle || m_state == SyncState::NotSynced)
269         {
270             LOG(m_loggerInfo) << "Starting full sync";
271             LOG(m_logger) << "Syncing with peer " << peer.id();
272             m_state = SyncState::Blocks;
273         }
274 
275         // Request tip of peer's chain
276         peer.requestBlockHeaders(peer.latestHash(), 1 /* count */, 0 /* skip */, false /* reverse */);
277         peer.setWaitingForTransactions(true);
278         return;
279     }
280 
281     if (m_state == SyncState::Blocks)
282     {
283         requestBlocks(_peerID);
284         return;
285     }
286 }
287 
continueSync()288 void BlockChainSync::continueSync()
289 {
290     host().capabilityHost().foreachPeer(m_host.name(), [this](NodeID const& _peerID) {
291         syncPeer(_peerID, false);
292         return true;
293     });
294 }
295 
requestBlocks(NodeID const & _peerID)296 void BlockChainSync::requestBlocks(NodeID const& _peerID)
297 {
298     clearPeerDownload(_peerID);
299     if (host().bq().knownFull())
300     {
301         LOG(m_loggerDetail) << "Waiting for block queue before downloading blocks from " << _peerID
302                             << ". Block queue status: " << host().bq().status();
303         pauseSync();
304         return;
305     }
306     // check to see if we need to download any block bodies first
307     auto header = m_headers.begin();
308     h256s neededBodies;
309     vector<unsigned> neededNumbers;
310     unsigned index = 0;
311     if (m_haveCommonHeader && !m_headers.empty() && m_headers.begin()->first == m_lastImportedBlock + 1)
312     {
313         while (header != m_headers.end() && neededBodies.size() < c_maxRequestBodies && index < header->second.size())
314         {
315             unsigned block = header->first + index;
316             if (m_downloadingBodies.count(block) == 0 && !haveItem(m_bodies, block))
317             {
318                 neededBodies.push_back(header->second[index].hash);
319                 neededNumbers.push_back(block);
320                 m_downloadingBodies.insert(block);
321             }
322 
323             ++index;
324             if (index >= header->second.size())
325                 break; // Download bodies only for validated header chain
326         }
327     }
328     if (neededBodies.size() > 0)
329     {
330         m_bodySyncPeers[_peerID] = neededNumbers;
331         m_host.peer(_peerID).requestBlockBodies(neededBodies);
332     }
333     else
334     {
335         // check if need to download headers
336         unsigned start = 0;
337         if (!m_haveCommonHeader)
338         {
339             // download backwards until common block is found 1 header at a time
340             start = m_lastImportedBlock;
341             if (!m_headers.empty())
342                 start = std::min(start, m_headers.begin()->first - 1);
343             m_lastImportedBlock = start;
344             m_lastImportedBlockHash = host().chain().numberHash(start);
345 
346             if (start <= m_chainStartBlock + 1)
347                 m_haveCommonHeader = true; //reached chain start
348         }
349         if (m_haveCommonHeader)
350         {
351             start = m_lastImportedBlock + 1;
352             auto next = m_headers.begin();
353             unsigned count = 0;
354             if (!m_headers.empty() && start >= m_headers.begin()->first)
355             {
356                 start = m_headers.begin()->first + m_headers.begin()->second.size();
357                 ++next;
358             }
359 
360             while (count == 0 && next != m_headers.end())
361             {
362                 count = std::min(c_maxRequestHeaders, next->first - start);
363                 while(count > 0 && m_downloadingHeaders.count(start) != 0)
364                 {
365                     start++;
366                     count--;
367                 }
368                 std::vector<unsigned> headers;
369                 for (unsigned block = start; block < start + count; block++)
370                     if (m_downloadingHeaders.count(block) == 0)
371                     {
372                         headers.push_back(block);
373                         m_downloadingHeaders.insert(block);
374                     }
375                 count = headers.size();
376                 if (count > 0)
377                 {
378                     m_headerSyncPeers[_peerID] = headers;
379                     assert(!haveItem(m_headers, start));
380                     m_host.peer(_peerID).requestBlockHeaders(start, count, 0, false);
381                 }
382                 else if (start >= next->first)
383                 {
384                     start = next->first + next->second.size();
385                     ++next;
386                 }
387             }
388         }
389         else
390             m_host.peer(_peerID).requestBlockHeaders(start, 1 /* count */, 0 /* skip */, false);
391     }
392 }
393 
clearPeerDownload(NodeID const & _peerID)394 void BlockChainSync::clearPeerDownload(NodeID const& _peerID)
395 {
396     auto syncPeer = m_headerSyncPeers.find(_peerID);
397     if (syncPeer != m_headerSyncPeers.end())
398     {
399         for (unsigned block : syncPeer->second)
400             m_downloadingHeaders.erase(block);
401         m_headerSyncPeers.erase(syncPeer);
402     }
403     syncPeer = m_bodySyncPeers.find(_peerID);
404     if (syncPeer != m_bodySyncPeers.end())
405     {
406         for (unsigned block : syncPeer->second)
407             m_downloadingBodies.erase(block);
408         m_bodySyncPeers.erase(syncPeer);
409     }
410     m_daoChallengedPeers.erase(_peerID);
411 }
412 
clearPeerDownload()413 void BlockChainSync::clearPeerDownload()
414 {
415     for (auto s = m_headerSyncPeers.begin(); s != m_headerSyncPeers.end();)
416     {
417         if (!m_host.capabilityHost().peerSessionInfo(s->first))
418         {
419             for (unsigned block : s->second)
420                 m_downloadingHeaders.erase(block);
421             m_headerSyncPeers.erase(s++);
422         }
423         else
424             ++s;
425     }
426     for (auto s = m_bodySyncPeers.begin(); s != m_bodySyncPeers.end();)
427     {
428         if (!m_host.capabilityHost().peerSessionInfo(s->first))
429         {
430             for (unsigned block : s->second)
431                 m_downloadingBodies.erase(block);
432             m_bodySyncPeers.erase(s++);
433         }
434         else
435             ++s;
436     }
437     for (auto s = m_daoChallengedPeers.begin(); s != m_daoChallengedPeers.end();)
438     {
439         if (!m_host.capabilityHost().peerSessionInfo(*s))
440             m_daoChallengedPeers.erase(s++);
441         else
442             ++s;
443     }
444 }
445 
logNewBlock(h256 const & _h)446 void BlockChainSync::logNewBlock(h256 const& _h)
447 {
448     m_knownNewHashes.erase(_h);
449 }
450 
onPeerBlockHeaders(NodeID const & _peerID,RLP const & _r)451 void BlockChainSync::onPeerBlockHeaders(NodeID const& _peerID, RLP const& _r)
452 {
453     RecursiveGuard l(x_sync);
454     DEV_INVARIANT_CHECK;
455     size_t itemCount = _r.itemCount();
456     LOG(m_logger) << "BlocksHeaders (" << dec << itemCount << " entries) "
457                   << (itemCount ? "" : ": NoMoreHeaders") << " from " << _peerID;
458 
459     if (m_daoChallengedPeers.find(_peerID) != m_daoChallengedPeers.end())
460     {
461         if (verifyDaoChallengeResponse(_r))
462             syncPeer(_peerID, false);
463         else
464             m_host.disablePeer(_peerID, "Peer from another fork.");
465 
466         m_daoChallengedPeers.erase(_peerID);
467         return;
468     }
469 
470     clearPeerDownload(_peerID);
471     if (m_state != SyncState::Blocks && m_state != SyncState::Waiting)
472     {
473         LOG(m_logger) << "Ignoring unexpected blocks from " << _peerID;
474         return;
475     }
476     if (m_state == SyncState::Waiting)
477     {
478         LOG(m_loggerDetail) << "Ignored blocks from " << _peerID << " while waiting";
479         return;
480     }
481     if (itemCount == 0)
482     {
483         LOG(m_loggerDetail) << "Peer " << _peerID << " does not have the blocks requested";
484         m_host.capabilityHost().updateRating(_peerID, -1);
485     }
486     for (unsigned i = 0; i < itemCount; i++)
487     {
488         BlockHeader info(_r[i].data(), HeaderData);
489         unsigned blockNumber = static_cast<unsigned>(info.number());
490         if (blockNumber < m_chainStartBlock)
491         {
492             LOG(m_logger) << "Skipping too old header " << blockNumber << " from " << _peerID;
493             continue;
494         }
495         if (haveItem(m_headers, blockNumber))
496         {
497             LOG(m_logger) << "Skipping header " << blockNumber << " (already downloaded) from "
498                           << _peerID;
499             continue;
500         }
501         if (blockNumber <= m_lastImportedBlock && m_haveCommonHeader)
502         {
503             LOG(m_logger) << "Skipping header " << blockNumber << " (already imported) from "
504                           << _peerID;
505             continue;
506         }
507         if (blockNumber > m_highestBlock)
508             m_highestBlock = blockNumber;
509 
510         auto status = host().bq().blockStatus(info.hash());
511         if (status == QueueStatus::Importing || status == QueueStatus::Ready || host().chain().isKnown(info.hash()))
512         {
513             m_haveCommonHeader = true;
514             m_lastImportedBlock = (unsigned)info.number();
515             m_lastImportedBlockHash = info.hash();
516 
517             if (!m_headers.empty() && m_headers.begin()->first == m_lastImportedBlock + 1 &&
518                 m_headers.begin()->second[0].parent != m_lastImportedBlockHash)
519             {
520                 // Start of the header chain in m_headers doesn't match our known chain,
521                 // probably we've downloaded other fork
522                 LOG(m_loggerWarning)
523                     << "Unknown parent of the downloaded headers, restarting sync with " << _peerID;
524                 restartSync();
525                 return;
526             }
527         }
528         else
529         {
530             Header hdr { _r[i].data().toBytes(), info.hash(), info.parentHash() };
531 
532             // validate chain
533             HeaderId headerId { info.transactionsRoot(), info.sha3Uncles() };
534             if (m_haveCommonHeader)
535             {
536                 Header const* prevBlock = findItem(m_headers, blockNumber - 1);
537                 if ((prevBlock && prevBlock->hash != info.parentHash()) || (blockNumber == m_lastImportedBlock + 1 && info.parentHash() != m_lastImportedBlockHash))
538                 {
539                     // mismatching parent id, delete the previous block and don't add this one
540                     LOG(m_loggerWarning)
541                         << "Unknown block header " << blockNumber << " " << info.hash()
542                         << " (Restart syncing with " << _peerID << ")";
543                     m_host.capabilityHost().updateRating(_peerID, -1);
544                     restartSync();
545                     return ;
546                 }
547 
548                 Header const* nextBlock = findItem(m_headers, blockNumber + 1);
549                 if (nextBlock && nextBlock->parent != info.hash())
550                 {
551                     LOG(m_loggerDetail) << "Unknown block header " << blockNumber + 1 << " "
552                                         << nextBlock->hash << " from " << _peerID;
553                     // clear following headers
554                     unsigned n = blockNumber + 1;
555                     auto headers = m_headers.at(n);
556                     for (auto const& h : headers)
557                     {
558                         BlockHeader deletingInfo(h.data, HeaderData);
559                         m_headerIdToNumber.erase(headerId);
560                         m_downloadingBodies.erase(n);
561                         m_downloadingHeaders.erase(n);
562                         ++n;
563                     }
564                     removeAllStartingWith(m_headers, blockNumber + 1);
565                     removeAllStartingWith(m_bodies, blockNumber + 1);
566                 }
567             }
568 
569             mergeInto(m_headers, blockNumber, std::move(hdr));
570             if (headerId.transactionsRoot == EmptyTrie && headerId.uncles == EmptyListSHA3)
571             {
572                 //empty body, just mark as downloaded
573                 RLPStream r(2);
574                 r.appendRaw(RLPEmptyList);
575                 r.appendRaw(RLPEmptyList);
576                 bytes body;
577                 r.swapOut(body);
578                 mergeInto(m_bodies, blockNumber, std::move(body));
579             }
580             else
581                 m_headerIdToNumber[headerId] = blockNumber;
582         }
583     }
584     collectBlocks();
585     continueSync();
586 }
587 
verifyDaoChallengeResponse(RLP const & _r)588 bool BlockChainSync::verifyDaoChallengeResponse(RLP const& _r)
589 {
590     if (_r.itemCount() != 1)
591         return false;
592 
593     BlockHeader info(_r[0].data(), HeaderData);
594     return info.number() == host().chain().sealEngine()->chainParams().daoHardforkBlock &&
595         info.extraData() == fromHex("0x64616f2d686172642d666f726b");
596 }
597 
onPeerBlockBodies(NodeID const & _peerID,RLP const & _r)598 void BlockChainSync::onPeerBlockBodies(NodeID const& _peerID, RLP const& _r)
599 {
600     RecursiveGuard l(x_sync);
601     DEV_INVARIANT_CHECK;
602     size_t itemCount = _r.itemCount();
603     LOG(m_logger) << "BlocksBodies (" << dec << itemCount << " entries) "
604                   << (itemCount ? "" : ": NoMoreBodies") << " from " << _peerID;
605     clearPeerDownload(_peerID);
606     if (m_state != SyncState::Blocks && m_state != SyncState::Waiting) {
607         LOG(m_logger) << "Ignoring unexpected blocks from " << _peerID;
608         return;
609     }
610     if (m_state == SyncState::Waiting)
611     {
612         LOG(m_loggerDetail) << "Ignored blocks from " << _peerID << " while waiting";
613         return;
614     }
615     if (itemCount == 0)
616     {
617         LOG(m_loggerDetail) << "Peer " << _peerID << " does not have the blocks requested";
618         m_host.capabilityHost().updateRating(_peerID, -1);
619     }
620     for (unsigned i = 0; i < itemCount; i++)
621     {
622         RLP body(_r[i]);
623 
624         auto txList = body[0];
625         h256 transactionRoot = trieRootOver(txList.itemCount(), [&](unsigned i){ return rlp(i); }, [&](unsigned i){ return txList[i].data().toBytes(); });
626         h256 uncles = sha3(body[1].data());
627         HeaderId id { transactionRoot, uncles };
628         auto iter = m_headerIdToNumber.find(id);
629         if (iter == m_headerIdToNumber.end() || !haveItem(m_headers, iter->second))
630         {
631             LOG(m_loggerDetail) << "Ignored unknown block body from " << _peerID;
632             continue;
633         }
634         unsigned blockNumber = iter->second;
635         if (haveItem(m_bodies, blockNumber))
636         {
637             LOG(m_logger) << "Skipping already downloaded block body " << blockNumber << " from "
638                           << _peerID;
639             continue;
640         }
641         m_headerIdToNumber.erase(id);
642         mergeInto(m_bodies, blockNumber, body.data().toBytes());
643     }
644     collectBlocks();
645     continueSync();
646 }
647 
collectBlocks()648 void BlockChainSync::collectBlocks()
649 {
650     if (!m_haveCommonHeader || m_headers.empty() || m_bodies.empty())
651         return;
652 
653     // merge headers and bodies
654     auto& headers = *m_headers.begin();
655     auto& bodies = *m_bodies.begin();
656     if (headers.first != bodies.first || headers.first != m_lastImportedBlock + 1)
657         return;
658 
659     unsigned success = 0;
660     unsigned future = 0;
661     unsigned got = 0;
662     unsigned unknown = 0;
663     size_t i = 0;
664     for (; i < headers.second.size() && i < bodies.second.size(); i++)
665     {
666         RLPStream blockStream(3);
667         blockStream.appendRaw(headers.second[i].data);
668         RLP body(bodies.second[i]);
669         blockStream.appendRaw(body[0].data());
670         blockStream.appendRaw(body[1].data());
671         bytes block;
672         blockStream.swapOut(block);
673         switch (host().bq().import(&block))
674         {
675         case ImportResult::Success:
676             success++;
677             if (headers.first + i > m_lastImportedBlock)
678             {
679                 m_lastImportedBlock = headers.first + (unsigned)i;
680                 m_lastImportedBlockHash = headers.second[i].hash;
681             }
682             break;
683         case ImportResult::Malformed:
684             LOG(m_logger) << "Malformed block #" << headers.first + i << ". Restarting sync.";
685             restartSync();
686             return;
687         case ImportResult::BadChain:
688             LOG(m_logger) << "Block from the bad chain, block #" << headers.first + i
689                           << ". Restarting sync.";
690             restartSync();
691             return;
692 
693         case ImportResult::FutureTimeKnown:
694             future++;
695             break;
696         case ImportResult::AlreadyInChain:
697             break;
698         case ImportResult::AlreadyKnown:
699         case ImportResult::FutureTimeUnknown:
700         case ImportResult::UnknownParent:
701             if (headers.first + i > m_lastImportedBlock)
702             {
703                 logImported(success, future, got, unknown);
704                 LOG(m_logger)
705                     << "Already known or future time & unknown parent or unknown parent, block #"
706                     << headers.first + i << ". Resetting sync.";
707                 resetSync();
708                 m_haveCommonHeader = false; // fork detected, search for common header again
709             }
710             return;
711 
712         default:;
713         }
714     }
715 
716     logImported(success, future, got, unknown);
717 
718     if (host().bq().unknownFull())
719     {
720         LOG(m_loggerWarning) << "Too many unknown blocks, restarting sync";
721         restartSync();
722         return;
723     }
724 
725     auto newHeaders = std::move(headers.second);
726     newHeaders.erase(newHeaders.begin(), newHeaders.begin() + i);
727     unsigned newHeaderHead = headers.first + i;
728     auto newBodies = std::move(bodies.second);
729     newBodies.erase(newBodies.begin(), newBodies.begin() + i);
730     unsigned newBodiesHead = bodies.first + i;
731     m_headers.erase(m_headers.begin());
732     m_bodies.erase(m_bodies.begin());
733     if (!newHeaders.empty())
734         m_headers[newHeaderHead] = newHeaders;
735     if (!newBodies.empty())
736         m_bodies[newBodiesHead] = newBodies;
737 
738     if (m_headers.empty())
739     {
740         assert(m_bodies.empty());
741         completeSync();
742     }
743     DEV_INVARIANT_CHECK_HERE;
744 }
745 
logImported(unsigned _success,unsigned _future,unsigned _got,unsigned _unknown)746 void BlockChainSync::logImported(
747     unsigned _success, unsigned _future, unsigned _got, unsigned _unknown)
748 {
749     LOG(m_logger) << dec << _success << " imported OK, " << _unknown << " with unknown parents, "
750                   << _future << " with future timestamps, " << _got << " already known received.";
751 }
752 
onPeerNewBlock(NodeID const & _peerID,RLP const & _r)753 void BlockChainSync::onPeerNewBlock(NodeID const& _peerID, RLP const& _r)
754 {
755     RecursiveGuard l(x_sync);
756     DEV_INVARIANT_CHECK;
757 
758     if (_r.itemCount() != 2)
759     {
760         m_host.disablePeer(_peerID, "NewBlock without 2 data fields.");
761         return;
762     }
763     BlockHeader info(_r[0][0].data(), HeaderData);
764     auto h = info.hash();
765     auto& peer = m_host.peer(_peerID);
766     peer.markBlockAsKnown(h);
767     unsigned blockNumber = static_cast<unsigned>(info.number());
768     if (blockNumber > (m_lastImportedBlock + 1))
769     {
770         LOG(m_loggerDetail) << "Received unknown new block (" << blockNumber << ") from "
771                             << _peerID;
772         // Update the hash of highest known block of the peer.
773         // syncPeer will then request the highest block header to properly restart syncing
774         peer.setLatestHash(h);
775         syncPeer(_peerID, true);
776         return;
777     }
778     switch (host().bq().import(_r[0].data()))
779     {
780     case ImportResult::Success:
781         m_host.capabilityHost().updateRating(_peerID, 100);
782         logNewBlock(h);
783         if (blockNumber > m_lastImportedBlock)
784         {
785             m_lastImportedBlock = max(m_lastImportedBlock, blockNumber);
786             m_lastImportedBlockHash = h;
787         }
788         m_highestBlock = max(m_lastImportedBlock, m_highestBlock);
789         m_downloadingBodies.erase(blockNumber);
790         m_downloadingHeaders.erase(blockNumber);
791         removeItem(m_headers, blockNumber);
792         removeItem(m_bodies, blockNumber);
793         if (m_headers.empty())
794         {
795             if (!m_bodies.empty())
796             {
797                 LOG(m_loggerDetail) << "Block headers map is empty, but block bodies map is not. "
798                                        "Force-clearing (peer: "
799                                     << _peerID << ")";
800                 m_bodies.clear();
801             }
802             completeSync();
803         }
804         break;
805     case ImportResult::FutureTimeKnown:
806         //TODO: Rating dependent on how far in future it is.
807         break;
808 
809     case ImportResult::Malformed:
810     case ImportResult::BadChain:
811         logNewBlock(h);
812         m_host.disablePeer(_peerID, "Malformed block received.");
813         return;
814 
815     case ImportResult::AlreadyInChain:
816     case ImportResult::AlreadyKnown:
817         break;
818 
819     case ImportResult::FutureTimeUnknown:
820     case ImportResult::UnknownParent:
821     {
822         peer.incrementUnknownNewBlocks();
823         if (peer.unknownNewBlocks() > c_maxPeerUknownNewBlocks)
824         {
825             m_host.disablePeer(_peerID, "Too many uknown new blocks");
826             restartSync();
827         }
828         logNewBlock(h);
829         u256 totalDifficulty = _r[1].toInt<u256>();
830         if (totalDifficulty > peer.totalDifficulty())
831         {
832             LOG(m_loggerDetail) << "Received block (" << blockNumber
833                                 << ") with no known parent. Peer " << _peerID
834                                 << " needs syncing...";
835             syncPeer(_peerID, true);
836         }
837         break;
838     }
839     default:;
840     }
841 }
842 
status() const843 SyncStatus BlockChainSync::status() const
844 {
845     RecursiveGuard l(x_sync);
846     SyncStatus res;
847     res.state = m_state;
848     res.protocolVersion = 62;
849     res.startBlockNumber = m_startingBlock;
850     res.currentBlockNumber = host().chain().number();
851     res.highestBlockNumber = m_highestBlock;
852     return res;
853 }
854 
resetSync()855 void BlockChainSync::resetSync()
856 {
857     m_downloadingHeaders.clear();
858     m_downloadingBodies.clear();
859     m_headers.clear();
860     m_bodies.clear();
861     m_headerSyncPeers.clear();
862     m_bodySyncPeers.clear();
863     m_headerIdToNumber.clear();
864     m_syncingTotalDifficulty = 0;
865     m_state = SyncState::NotSynced;
866 }
867 
restartSync()868 void BlockChainSync::restartSync()
869 {
870     RecursiveGuard l(x_sync);
871     resetSync();
872     m_highestBlock = 0;
873     m_haveCommonHeader = false;
874     host().bq().clear();
875     m_startingBlock = host().chain().number();
876     m_lastImportedBlock = m_startingBlock;
877     m_lastImportedBlockHash = host().chain().currentHash();
878 }
879 
completeSync()880 void BlockChainSync::completeSync()
881 {
882     RecursiveGuard l(x_sync);
883     resetSync();
884     m_state = SyncState::Idle;
885 }
886 
isSyncing() const887 bool BlockChainSync::isSyncing() const
888 {
889     return m_state != SyncState::Idle;
890 }
891 
onPeerNewHashes(NodeID const & _peerID,std::vector<std::pair<h256,u256>> const & _hashes)892 void BlockChainSync::onPeerNewHashes(
893     NodeID const& _peerID, std::vector<std::pair<h256, u256>> const& _hashes)
894 {
895     RecursiveGuard l(x_sync);
896     DEV_INVARIANT_CHECK;
897 
898     auto& peer = m_host.peer(_peerID);
899     if (peer.isConversing())
900     {
901         LOG(m_loggerDetail) << "Ignoring new hashes since we're already downloading from peer "
902                             << _peerID;
903         return;
904     }
905     LOG(m_loggerDetail) << "Not syncing and new block hash discovered: syncing with peer "
906                         << _peerID;
907     unsigned knowns = 0;
908     unsigned unknowns = 0;
909     unsigned maxHeight = 0;
910     for (auto const& p: _hashes)
911     {
912         h256 const& h = p.first;
913         m_host.capabilityHost().updateRating(_peerID, 1);
914         peer.markBlockAsKnown(h);
915         auto status = host().bq().blockStatus(h);
916         if (status == QueueStatus::Importing || status == QueueStatus::Ready || host().chain().isKnown(h))
917             knowns++;
918         else if (status == QueueStatus::Bad)
919         {
920             LOG(m_loggerWarning) << "block hash bad!" << h << ". Bailing... (peer: " << _peerID
921                                  << ")";
922             return;
923         }
924         else if (status == QueueStatus::Unknown)
925         {
926             unknowns++;
927             if (p.second > maxHeight)
928             {
929                 maxHeight = (unsigned)p.second;
930                 peer.setLatestHash(h);
931             }
932         }
933         else
934             knowns++;
935     }
936     LOG(m_logger) << knowns << " knowns, " << unknowns << " unknowns (peer: " << _peerID << ")";
937     if (unknowns > 0)
938     {
939         LOG(m_loggerDetail) << "Not syncing and new block hash discovered: start syncing with "
940                             << _peerID;
941         syncPeer(_peerID, true);
942     }
943 }
944 
onPeerAborting()945 void BlockChainSync::onPeerAborting()
946 {
947     RecursiveGuard l(x_sync);
948     // Can't check invariants here since the peers is already removed from the list and the state is not updated yet.
949     clearPeerDownload();
950     continueSync();
951     DEV_INVARIANT_CHECK_HERE;
952 }
953 
invariants() const954 bool BlockChainSync::invariants() const
955 {
956     if (!isSyncing() && !m_headers.empty())
957         BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Got headers while not syncing"));
958     if (!isSyncing() && !m_bodies.empty())
959         BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Got bodies while not syncing"));
960     if (isSyncing() && m_host.chain().number() > 0 && m_haveCommonHeader && m_lastImportedBlock == 0)
961         BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Common block not found"));
962     if (isSyncing() && !m_headers.empty() &&  m_lastImportedBlock >= m_headers.begin()->first)
963         BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Header is too old"));
964     if (m_headerSyncPeers.empty() != m_downloadingHeaders.empty())
965         BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Header download map mismatch"));
966     if (m_bodySyncPeers.empty() != m_downloadingBodies.empty() && m_downloadingBodies.size() <= m_headerIdToNumber.size())
967         BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Body download map mismatch"));
968     return true;
969 }
970