1 // Aleth: Ethereum C++ client, tools and libraries.
2 // Copyright 2015-2019 Aleth Authors.
3 // Licensed under the GNU General Public License, Version 3.
4
5 #include "BlockChainSync.h"
6
7 #include "BlockChain.h"
8 #include "BlockQueue.h"
9 #include "EthereumCapability.h"
10 #include <libdevcore/Common.h>
11 #include <libdevcore/TrieHash.h>
12 #include <libethcore/Exceptions.h>
13 #include <libp2p/Host.h>
14 #include <libp2p/Session.h>
15 #include <chrono>
16
17 using namespace std;
18 using namespace dev;
19 using namespace dev::eth;
20
operator <<(std::ostream & _out,SyncStatus const & _sync)21 std::ostream& dev::eth::operator<<(std::ostream& _out, SyncStatus const& _sync)
22 {
23 _out << "protocol: " << _sync.protocolVersion << endl;
24 _out << "state: " << EthereumCapability::stateName(_sync.state) << " ";
25 if (_sync.state == SyncState::Blocks)
26 _out << _sync.currentBlockNumber << "/" << _sync.highestBlockNumber;
27 return _out;
28 }
29
30 namespace // Helper functions.
31 {
32
33 constexpr unsigned c_maxPeerUknownNewBlocks = 1024; /// Max number of unknown new blocks peer can give us
34 constexpr unsigned c_maxRequestHeaders = 1024;
35 constexpr unsigned c_maxRequestBodies = 1024;
36
haveItem(std::map<unsigned,T> & _container,unsigned _number)37 template<typename T> bool haveItem(std::map<unsigned, T>& _container, unsigned _number)
38 {
39 if (_container.empty())
40 return false;
41 auto lower = _container.lower_bound(_number);
42 if (lower != _container.end() && lower->first == _number)
43 return true;
44 if (lower == _container.begin())
45 return false;
46 --lower;
47 return lower->first <= _number && (lower->first + lower->second.size()) > _number;
48 }
49
findItem(std::map<unsigned,std::vector<T>> & _container,unsigned _number)50 template<typename T> T const* findItem(std::map<unsigned, std::vector<T>>& _container, unsigned _number)
51 {
52 if (_container.empty())
53 return nullptr;
54 auto lower = _container.lower_bound(_number);
55 if (lower != _container.end() && lower->first == _number)
56 return &(*lower->second.begin());
57 if (lower == _container.begin())
58 return nullptr;
59 --lower;
60 if (lower->first <= _number && (lower->first + lower->second.size()) > _number)
61 return &lower->second.at(_number - lower->first);
62 return nullptr;
63 }
64
removeItem(std::map<unsigned,std::vector<T>> & _container,unsigned _number)65 template<typename T> void removeItem(std::map<unsigned, std::vector<T>>& _container, unsigned _number)
66 {
67 if (_container.empty())
68 return;
69 auto lower = _container.lower_bound(_number);
70 if (lower != _container.end() && lower->first == _number)
71 {
72 _container.erase(lower);
73 return;
74 }
75 if (lower == _container.begin())
76 return;
77 --lower;
78 if (lower->first <= _number && (lower->first + lower->second.size()) > _number)
79 lower->second.erase(lower->second.begin() + (_number - lower->first), lower->second.end());
80 }
81
removeAllStartingWith(std::map<unsigned,std::vector<T>> & _container,unsigned _number)82 template<typename T> void removeAllStartingWith(std::map<unsigned, std::vector<T>>& _container, unsigned _number)
83 {
84 if (_container.empty())
85 return;
86 auto lower = _container.lower_bound(_number);
87 if (lower != _container.end() && lower->first == _number)
88 {
89 _container.erase(lower, _container.end());
90 return;
91 }
92 if (lower == _container.begin())
93 {
94 _container.clear();
95 return;
96 }
97 --lower;
98 if (lower->first <= _number && (lower->first + lower->second.size()) > _number)
99 lower->second.erase(lower->second.begin() + (_number - lower->first), lower->second.end());
100 _container.erase(++lower, _container.end());
101 }
102
mergeInto(std::map<unsigned,std::vector<T>> & _container,unsigned _number,T && _data)103 template<typename T> void mergeInto(std::map<unsigned, std::vector<T>>& _container, unsigned _number, T&& _data)
104 {
105 assert(!haveItem(_container, _number));
106 auto lower = _container.lower_bound(_number);
107 if (!_container.empty() && lower != _container.begin())
108 --lower;
109 if (lower != _container.end() && (lower->first + lower->second.size() == _number))
110 {
111 // extend existing chunk
112 lower->second.emplace_back(_data);
113
114 auto next = lower;
115 ++next;
116 if (next != _container.end() && (lower->first + lower->second.size() == next->first))
117 {
118 // merge with the next chunk
119 std::move(next->second.begin(), next->second.end(), std::back_inserter(lower->second));
120 _container.erase(next);
121 }
122
123 }
124 else
125 {
126 // insert a new chunk
127 auto inserted = _container.insert(lower, std::make_pair(_number, std::vector<T> { _data }));
128 auto next = inserted;
129 ++next;
130 if (next != _container.end() && next->first == _number + 1)
131 {
132 std::move(next->second.begin(), next->second.end(), std::back_inserter(inserted->second));
133 _container.erase(next);
134 }
135 }
136 }
137
138 } // Anonymous namespace -- helper functions.
139
BlockChainSync(EthereumCapability & _host)140 BlockChainSync::BlockChainSync(EthereumCapability& _host)
141 : m_host(_host),
142 m_chainStartBlock(_host.chain().chainStartBlockNumber()),
143 m_startingBlock(_host.chain().number()),
144 m_lastImportedBlock(m_startingBlock),
145 m_lastImportedBlockHash(_host.chain().currentHash())
146 {
147 m_bqBlocksDrained = host().bq().onBlocksDrained([this]() {
148 if (isSyncPaused() && !host().bq().knownFull())
149 {
150 // Draining freed up space in the block queue. Let's resume syncing.
151 // Ensure that syncing occurs on the network thread (since the block queue handler is
152 // called on the client thread
153 host().capabilityHost().postWork([this]() {
154 RecursiveGuard l(x_sync);
155 m_state = SyncState::Blocks;
156 continueSync();
157 });
158 }
159 });
160 }
161
~BlockChainSync()162 BlockChainSync::~BlockChainSync()
163 {
164 RecursiveGuard l(x_sync);
165 abortSync();
166 }
167
onBlockImported(BlockHeader const & _info)168 void BlockChainSync::onBlockImported(BlockHeader const& _info)
169 {
170 //if a block has been added via mining or other block import function
171 //through RPC, then we should count it as a last imported block
172 RecursiveGuard l(x_sync);
173 if (_info.number() > m_lastImportedBlock)
174 {
175 m_lastImportedBlock = static_cast<unsigned>(_info.number());
176 m_lastImportedBlockHash = _info.hash();
177 m_highestBlock = max(m_lastImportedBlock, m_highestBlock);
178 }
179 }
180
abortSync()181 void BlockChainSync::abortSync()
182 {
183 RecursiveGuard l(x_sync);
184 resetSync();
185 onPeerAborting();
186 }
187
onPeerStatus(EthereumPeer const & _peer)188 void BlockChainSync::onPeerStatus(EthereumPeer const& _peer)
189 {
190 RecursiveGuard l(x_sync);
191 DEV_INVARIANT_CHECK;
192
193 auto peerSessionInfo = m_host.capabilityHost().peerSessionInfo(_peer.id());
194 if (!peerSessionInfo)
195 return; // Expired
196
197 std::string disconnectReason;
198 if (peerSessionInfo->clientVersion.find("/v0.7.0/") != string::npos)
199 disconnectReason = "Blacklisted client version.";
200 else
201 disconnectReason = _peer.validate(
202 host().chain().genesisHash(), host().protocolVersion(), host().networkId());
203
204 if (!disconnectReason.empty())
205 {
206 LOG(m_logger) << "Peer " << _peer.id() << " not suitable for sync: " << disconnectReason;
207 m_host.capabilityHost().disconnect(_peer.id(), p2p::UselessPeer);
208 return;
209 }
210
211 // Before starting to exchange the data with the node, let's verify that it's on our chain
212 if (!requestDaoForkBlockHeader(_peer.id()))
213 {
214 // DAO challenge not needed
215 syncPeer(_peer.id(), false);
216 }
217 }
218
requestDaoForkBlockHeader(NodeID const & _peerID)219 bool BlockChainSync::requestDaoForkBlockHeader(NodeID const& _peerID)
220 {
221 // DAO challenge
222 u256 const daoHardfork = host().chain().sealEngine()->chainParams().daoHardforkBlock;
223 if (daoHardfork == 0 || daoHardfork == c_infiniteBlockNumber || host().chain().number() < daoHardfork)
224 return false;
225
226 m_daoChallengedPeers.insert(_peerID);
227 m_host.peer(_peerID).requestBlockHeaders(static_cast<unsigned>(daoHardfork), 1, 0, false);
228 return true;
229 }
230
syncPeer(NodeID const & _peerID,bool _force)231 void BlockChainSync::syncPeer(NodeID const& _peerID, bool _force)
232 {
233 auto& peer = m_host.peer(_peerID);
234 if (!peer.statusReceived())
235 {
236 LOG(m_loggerDetail) << "Can't sync with peer " << _peerID << " - Status not received yet.";
237 return;
238 }
239
240 if (peer.isConversing())
241 {
242 LOG(m_loggerDetail) << "Can't sync with peer " << _peerID << " - outstanding asks.";
243 return;
244 }
245
246 if (isSyncPaused())
247 {
248 LOG(m_loggerDetail) << "Can't sync with peer " << _peerID
249 << " - sync state is paused. Block queue status: "
250 << host().bq().status();
251 return;
252 }
253
254 u256 td = host().chain().details().totalDifficulty;
255 if (host().bq().isActive())
256 td += host().bq().difficulty();
257 u256 const syncingDifficulty = std::max(m_syncingTotalDifficulty, td);
258 u256 const peerTotalDifficulty = peer.totalDifficulty();
259
260 if (_force || peerTotalDifficulty > syncingDifficulty)
261 {
262 if (peerTotalDifficulty > syncingDifficulty)
263 LOG(m_logger) << "Discovered new highest difficulty (" << peerTotalDifficulty
264 << ") from peer " << peer.id();
265
266 // start sync
267 m_syncingTotalDifficulty = peerTotalDifficulty;
268 if (m_state == SyncState::Idle || m_state == SyncState::NotSynced)
269 {
270 LOG(m_loggerInfo) << "Starting full sync";
271 LOG(m_logger) << "Syncing with peer " << peer.id();
272 m_state = SyncState::Blocks;
273 }
274
275 // Request tip of peer's chain
276 peer.requestBlockHeaders(peer.latestHash(), 1 /* count */, 0 /* skip */, false /* reverse */);
277 peer.setWaitingForTransactions(true);
278 return;
279 }
280
281 if (m_state == SyncState::Blocks)
282 {
283 requestBlocks(_peerID);
284 return;
285 }
286 }
287
continueSync()288 void BlockChainSync::continueSync()
289 {
290 host().capabilityHost().foreachPeer(m_host.name(), [this](NodeID const& _peerID) {
291 syncPeer(_peerID, false);
292 return true;
293 });
294 }
295
requestBlocks(NodeID const & _peerID)296 void BlockChainSync::requestBlocks(NodeID const& _peerID)
297 {
298 clearPeerDownload(_peerID);
299 if (host().bq().knownFull())
300 {
301 LOG(m_loggerDetail) << "Waiting for block queue before downloading blocks from " << _peerID
302 << ". Block queue status: " << host().bq().status();
303 pauseSync();
304 return;
305 }
306 // check to see if we need to download any block bodies first
307 auto header = m_headers.begin();
308 h256s neededBodies;
309 vector<unsigned> neededNumbers;
310 unsigned index = 0;
311 if (m_haveCommonHeader && !m_headers.empty() && m_headers.begin()->first == m_lastImportedBlock + 1)
312 {
313 while (header != m_headers.end() && neededBodies.size() < c_maxRequestBodies && index < header->second.size())
314 {
315 unsigned block = header->first + index;
316 if (m_downloadingBodies.count(block) == 0 && !haveItem(m_bodies, block))
317 {
318 neededBodies.push_back(header->second[index].hash);
319 neededNumbers.push_back(block);
320 m_downloadingBodies.insert(block);
321 }
322
323 ++index;
324 if (index >= header->second.size())
325 break; // Download bodies only for validated header chain
326 }
327 }
328 if (neededBodies.size() > 0)
329 {
330 m_bodySyncPeers[_peerID] = neededNumbers;
331 m_host.peer(_peerID).requestBlockBodies(neededBodies);
332 }
333 else
334 {
335 // check if need to download headers
336 unsigned start = 0;
337 if (!m_haveCommonHeader)
338 {
339 // download backwards until common block is found 1 header at a time
340 start = m_lastImportedBlock;
341 if (!m_headers.empty())
342 start = std::min(start, m_headers.begin()->first - 1);
343 m_lastImportedBlock = start;
344 m_lastImportedBlockHash = host().chain().numberHash(start);
345
346 if (start <= m_chainStartBlock + 1)
347 m_haveCommonHeader = true; //reached chain start
348 }
349 if (m_haveCommonHeader)
350 {
351 start = m_lastImportedBlock + 1;
352 auto next = m_headers.begin();
353 unsigned count = 0;
354 if (!m_headers.empty() && start >= m_headers.begin()->first)
355 {
356 start = m_headers.begin()->first + m_headers.begin()->second.size();
357 ++next;
358 }
359
360 while (count == 0 && next != m_headers.end())
361 {
362 count = std::min(c_maxRequestHeaders, next->first - start);
363 while(count > 0 && m_downloadingHeaders.count(start) != 0)
364 {
365 start++;
366 count--;
367 }
368 std::vector<unsigned> headers;
369 for (unsigned block = start; block < start + count; block++)
370 if (m_downloadingHeaders.count(block) == 0)
371 {
372 headers.push_back(block);
373 m_downloadingHeaders.insert(block);
374 }
375 count = headers.size();
376 if (count > 0)
377 {
378 m_headerSyncPeers[_peerID] = headers;
379 assert(!haveItem(m_headers, start));
380 m_host.peer(_peerID).requestBlockHeaders(start, count, 0, false);
381 }
382 else if (start >= next->first)
383 {
384 start = next->first + next->second.size();
385 ++next;
386 }
387 }
388 }
389 else
390 m_host.peer(_peerID).requestBlockHeaders(start, 1 /* count */, 0 /* skip */, false);
391 }
392 }
393
clearPeerDownload(NodeID const & _peerID)394 void BlockChainSync::clearPeerDownload(NodeID const& _peerID)
395 {
396 auto syncPeer = m_headerSyncPeers.find(_peerID);
397 if (syncPeer != m_headerSyncPeers.end())
398 {
399 for (unsigned block : syncPeer->second)
400 m_downloadingHeaders.erase(block);
401 m_headerSyncPeers.erase(syncPeer);
402 }
403 syncPeer = m_bodySyncPeers.find(_peerID);
404 if (syncPeer != m_bodySyncPeers.end())
405 {
406 for (unsigned block : syncPeer->second)
407 m_downloadingBodies.erase(block);
408 m_bodySyncPeers.erase(syncPeer);
409 }
410 m_daoChallengedPeers.erase(_peerID);
411 }
412
clearPeerDownload()413 void BlockChainSync::clearPeerDownload()
414 {
415 for (auto s = m_headerSyncPeers.begin(); s != m_headerSyncPeers.end();)
416 {
417 if (!m_host.capabilityHost().peerSessionInfo(s->first))
418 {
419 for (unsigned block : s->second)
420 m_downloadingHeaders.erase(block);
421 m_headerSyncPeers.erase(s++);
422 }
423 else
424 ++s;
425 }
426 for (auto s = m_bodySyncPeers.begin(); s != m_bodySyncPeers.end();)
427 {
428 if (!m_host.capabilityHost().peerSessionInfo(s->first))
429 {
430 for (unsigned block : s->second)
431 m_downloadingBodies.erase(block);
432 m_bodySyncPeers.erase(s++);
433 }
434 else
435 ++s;
436 }
437 for (auto s = m_daoChallengedPeers.begin(); s != m_daoChallengedPeers.end();)
438 {
439 if (!m_host.capabilityHost().peerSessionInfo(*s))
440 m_daoChallengedPeers.erase(s++);
441 else
442 ++s;
443 }
444 }
445
logNewBlock(h256 const & _h)446 void BlockChainSync::logNewBlock(h256 const& _h)
447 {
448 m_knownNewHashes.erase(_h);
449 }
450
onPeerBlockHeaders(NodeID const & _peerID,RLP const & _r)451 void BlockChainSync::onPeerBlockHeaders(NodeID const& _peerID, RLP const& _r)
452 {
453 RecursiveGuard l(x_sync);
454 DEV_INVARIANT_CHECK;
455 size_t itemCount = _r.itemCount();
456 LOG(m_logger) << "BlocksHeaders (" << dec << itemCount << " entries) "
457 << (itemCount ? "" : ": NoMoreHeaders") << " from " << _peerID;
458
459 if (m_daoChallengedPeers.find(_peerID) != m_daoChallengedPeers.end())
460 {
461 if (verifyDaoChallengeResponse(_r))
462 syncPeer(_peerID, false);
463 else
464 m_host.disablePeer(_peerID, "Peer from another fork.");
465
466 m_daoChallengedPeers.erase(_peerID);
467 return;
468 }
469
470 clearPeerDownload(_peerID);
471 if (m_state != SyncState::Blocks && m_state != SyncState::Waiting)
472 {
473 LOG(m_logger) << "Ignoring unexpected blocks from " << _peerID;
474 return;
475 }
476 if (m_state == SyncState::Waiting)
477 {
478 LOG(m_loggerDetail) << "Ignored blocks from " << _peerID << " while waiting";
479 return;
480 }
481 if (itemCount == 0)
482 {
483 LOG(m_loggerDetail) << "Peer " << _peerID << " does not have the blocks requested";
484 m_host.capabilityHost().updateRating(_peerID, -1);
485 }
486 for (unsigned i = 0; i < itemCount; i++)
487 {
488 BlockHeader info(_r[i].data(), HeaderData);
489 unsigned blockNumber = static_cast<unsigned>(info.number());
490 if (blockNumber < m_chainStartBlock)
491 {
492 LOG(m_logger) << "Skipping too old header " << blockNumber << " from " << _peerID;
493 continue;
494 }
495 if (haveItem(m_headers, blockNumber))
496 {
497 LOG(m_logger) << "Skipping header " << blockNumber << " (already downloaded) from "
498 << _peerID;
499 continue;
500 }
501 if (blockNumber <= m_lastImportedBlock && m_haveCommonHeader)
502 {
503 LOG(m_logger) << "Skipping header " << blockNumber << " (already imported) from "
504 << _peerID;
505 continue;
506 }
507 if (blockNumber > m_highestBlock)
508 m_highestBlock = blockNumber;
509
510 auto status = host().bq().blockStatus(info.hash());
511 if (status == QueueStatus::Importing || status == QueueStatus::Ready || host().chain().isKnown(info.hash()))
512 {
513 m_haveCommonHeader = true;
514 m_lastImportedBlock = (unsigned)info.number();
515 m_lastImportedBlockHash = info.hash();
516
517 if (!m_headers.empty() && m_headers.begin()->first == m_lastImportedBlock + 1 &&
518 m_headers.begin()->second[0].parent != m_lastImportedBlockHash)
519 {
520 // Start of the header chain in m_headers doesn't match our known chain,
521 // probably we've downloaded other fork
522 LOG(m_loggerWarning)
523 << "Unknown parent of the downloaded headers, restarting sync with " << _peerID;
524 restartSync();
525 return;
526 }
527 }
528 else
529 {
530 Header hdr { _r[i].data().toBytes(), info.hash(), info.parentHash() };
531
532 // validate chain
533 HeaderId headerId { info.transactionsRoot(), info.sha3Uncles() };
534 if (m_haveCommonHeader)
535 {
536 Header const* prevBlock = findItem(m_headers, blockNumber - 1);
537 if ((prevBlock && prevBlock->hash != info.parentHash()) || (blockNumber == m_lastImportedBlock + 1 && info.parentHash() != m_lastImportedBlockHash))
538 {
539 // mismatching parent id, delete the previous block and don't add this one
540 LOG(m_loggerWarning)
541 << "Unknown block header " << blockNumber << " " << info.hash()
542 << " (Restart syncing with " << _peerID << ")";
543 m_host.capabilityHost().updateRating(_peerID, -1);
544 restartSync();
545 return ;
546 }
547
548 Header const* nextBlock = findItem(m_headers, blockNumber + 1);
549 if (nextBlock && nextBlock->parent != info.hash())
550 {
551 LOG(m_loggerDetail) << "Unknown block header " << blockNumber + 1 << " "
552 << nextBlock->hash << " from " << _peerID;
553 // clear following headers
554 unsigned n = blockNumber + 1;
555 auto headers = m_headers.at(n);
556 for (auto const& h : headers)
557 {
558 BlockHeader deletingInfo(h.data, HeaderData);
559 m_headerIdToNumber.erase(headerId);
560 m_downloadingBodies.erase(n);
561 m_downloadingHeaders.erase(n);
562 ++n;
563 }
564 removeAllStartingWith(m_headers, blockNumber + 1);
565 removeAllStartingWith(m_bodies, blockNumber + 1);
566 }
567 }
568
569 mergeInto(m_headers, blockNumber, std::move(hdr));
570 if (headerId.transactionsRoot == EmptyTrie && headerId.uncles == EmptyListSHA3)
571 {
572 //empty body, just mark as downloaded
573 RLPStream r(2);
574 r.appendRaw(RLPEmptyList);
575 r.appendRaw(RLPEmptyList);
576 bytes body;
577 r.swapOut(body);
578 mergeInto(m_bodies, blockNumber, std::move(body));
579 }
580 else
581 m_headerIdToNumber[headerId] = blockNumber;
582 }
583 }
584 collectBlocks();
585 continueSync();
586 }
587
verifyDaoChallengeResponse(RLP const & _r)588 bool BlockChainSync::verifyDaoChallengeResponse(RLP const& _r)
589 {
590 if (_r.itemCount() != 1)
591 return false;
592
593 BlockHeader info(_r[0].data(), HeaderData);
594 return info.number() == host().chain().sealEngine()->chainParams().daoHardforkBlock &&
595 info.extraData() == fromHex("0x64616f2d686172642d666f726b");
596 }
597
onPeerBlockBodies(NodeID const & _peerID,RLP const & _r)598 void BlockChainSync::onPeerBlockBodies(NodeID const& _peerID, RLP const& _r)
599 {
600 RecursiveGuard l(x_sync);
601 DEV_INVARIANT_CHECK;
602 size_t itemCount = _r.itemCount();
603 LOG(m_logger) << "BlocksBodies (" << dec << itemCount << " entries) "
604 << (itemCount ? "" : ": NoMoreBodies") << " from " << _peerID;
605 clearPeerDownload(_peerID);
606 if (m_state != SyncState::Blocks && m_state != SyncState::Waiting) {
607 LOG(m_logger) << "Ignoring unexpected blocks from " << _peerID;
608 return;
609 }
610 if (m_state == SyncState::Waiting)
611 {
612 LOG(m_loggerDetail) << "Ignored blocks from " << _peerID << " while waiting";
613 return;
614 }
615 if (itemCount == 0)
616 {
617 LOG(m_loggerDetail) << "Peer " << _peerID << " does not have the blocks requested";
618 m_host.capabilityHost().updateRating(_peerID, -1);
619 }
620 for (unsigned i = 0; i < itemCount; i++)
621 {
622 RLP body(_r[i]);
623
624 auto txList = body[0];
625 h256 transactionRoot = trieRootOver(txList.itemCount(), [&](unsigned i){ return rlp(i); }, [&](unsigned i){ return txList[i].data().toBytes(); });
626 h256 uncles = sha3(body[1].data());
627 HeaderId id { transactionRoot, uncles };
628 auto iter = m_headerIdToNumber.find(id);
629 if (iter == m_headerIdToNumber.end() || !haveItem(m_headers, iter->second))
630 {
631 LOG(m_loggerDetail) << "Ignored unknown block body from " << _peerID;
632 continue;
633 }
634 unsigned blockNumber = iter->second;
635 if (haveItem(m_bodies, blockNumber))
636 {
637 LOG(m_logger) << "Skipping already downloaded block body " << blockNumber << " from "
638 << _peerID;
639 continue;
640 }
641 m_headerIdToNumber.erase(id);
642 mergeInto(m_bodies, blockNumber, body.data().toBytes());
643 }
644 collectBlocks();
645 continueSync();
646 }
647
collectBlocks()648 void BlockChainSync::collectBlocks()
649 {
650 if (!m_haveCommonHeader || m_headers.empty() || m_bodies.empty())
651 return;
652
653 // merge headers and bodies
654 auto& headers = *m_headers.begin();
655 auto& bodies = *m_bodies.begin();
656 if (headers.first != bodies.first || headers.first != m_lastImportedBlock + 1)
657 return;
658
659 unsigned success = 0;
660 unsigned future = 0;
661 unsigned got = 0;
662 unsigned unknown = 0;
663 size_t i = 0;
664 for (; i < headers.second.size() && i < bodies.second.size(); i++)
665 {
666 RLPStream blockStream(3);
667 blockStream.appendRaw(headers.second[i].data);
668 RLP body(bodies.second[i]);
669 blockStream.appendRaw(body[0].data());
670 blockStream.appendRaw(body[1].data());
671 bytes block;
672 blockStream.swapOut(block);
673 switch (host().bq().import(&block))
674 {
675 case ImportResult::Success:
676 success++;
677 if (headers.first + i > m_lastImportedBlock)
678 {
679 m_lastImportedBlock = headers.first + (unsigned)i;
680 m_lastImportedBlockHash = headers.second[i].hash;
681 }
682 break;
683 case ImportResult::Malformed:
684 LOG(m_logger) << "Malformed block #" << headers.first + i << ". Restarting sync.";
685 restartSync();
686 return;
687 case ImportResult::BadChain:
688 LOG(m_logger) << "Block from the bad chain, block #" << headers.first + i
689 << ". Restarting sync.";
690 restartSync();
691 return;
692
693 case ImportResult::FutureTimeKnown:
694 future++;
695 break;
696 case ImportResult::AlreadyInChain:
697 break;
698 case ImportResult::AlreadyKnown:
699 case ImportResult::FutureTimeUnknown:
700 case ImportResult::UnknownParent:
701 if (headers.first + i > m_lastImportedBlock)
702 {
703 logImported(success, future, got, unknown);
704 LOG(m_logger)
705 << "Already known or future time & unknown parent or unknown parent, block #"
706 << headers.first + i << ". Resetting sync.";
707 resetSync();
708 m_haveCommonHeader = false; // fork detected, search for common header again
709 }
710 return;
711
712 default:;
713 }
714 }
715
716 logImported(success, future, got, unknown);
717
718 if (host().bq().unknownFull())
719 {
720 LOG(m_loggerWarning) << "Too many unknown blocks, restarting sync";
721 restartSync();
722 return;
723 }
724
725 auto newHeaders = std::move(headers.second);
726 newHeaders.erase(newHeaders.begin(), newHeaders.begin() + i);
727 unsigned newHeaderHead = headers.first + i;
728 auto newBodies = std::move(bodies.second);
729 newBodies.erase(newBodies.begin(), newBodies.begin() + i);
730 unsigned newBodiesHead = bodies.first + i;
731 m_headers.erase(m_headers.begin());
732 m_bodies.erase(m_bodies.begin());
733 if (!newHeaders.empty())
734 m_headers[newHeaderHead] = newHeaders;
735 if (!newBodies.empty())
736 m_bodies[newBodiesHead] = newBodies;
737
738 if (m_headers.empty())
739 {
740 assert(m_bodies.empty());
741 completeSync();
742 }
743 DEV_INVARIANT_CHECK_HERE;
744 }
745
logImported(unsigned _success,unsigned _future,unsigned _got,unsigned _unknown)746 void BlockChainSync::logImported(
747 unsigned _success, unsigned _future, unsigned _got, unsigned _unknown)
748 {
749 LOG(m_logger) << dec << _success << " imported OK, " << _unknown << " with unknown parents, "
750 << _future << " with future timestamps, " << _got << " already known received.";
751 }
752
onPeerNewBlock(NodeID const & _peerID,RLP const & _r)753 void BlockChainSync::onPeerNewBlock(NodeID const& _peerID, RLP const& _r)
754 {
755 RecursiveGuard l(x_sync);
756 DEV_INVARIANT_CHECK;
757
758 if (_r.itemCount() != 2)
759 {
760 m_host.disablePeer(_peerID, "NewBlock without 2 data fields.");
761 return;
762 }
763 BlockHeader info(_r[0][0].data(), HeaderData);
764 auto h = info.hash();
765 auto& peer = m_host.peer(_peerID);
766 peer.markBlockAsKnown(h);
767 unsigned blockNumber = static_cast<unsigned>(info.number());
768 if (blockNumber > (m_lastImportedBlock + 1))
769 {
770 LOG(m_loggerDetail) << "Received unknown new block (" << blockNumber << ") from "
771 << _peerID;
772 // Update the hash of highest known block of the peer.
773 // syncPeer will then request the highest block header to properly restart syncing
774 peer.setLatestHash(h);
775 syncPeer(_peerID, true);
776 return;
777 }
778 switch (host().bq().import(_r[0].data()))
779 {
780 case ImportResult::Success:
781 m_host.capabilityHost().updateRating(_peerID, 100);
782 logNewBlock(h);
783 if (blockNumber > m_lastImportedBlock)
784 {
785 m_lastImportedBlock = max(m_lastImportedBlock, blockNumber);
786 m_lastImportedBlockHash = h;
787 }
788 m_highestBlock = max(m_lastImportedBlock, m_highestBlock);
789 m_downloadingBodies.erase(blockNumber);
790 m_downloadingHeaders.erase(blockNumber);
791 removeItem(m_headers, blockNumber);
792 removeItem(m_bodies, blockNumber);
793 if (m_headers.empty())
794 {
795 if (!m_bodies.empty())
796 {
797 LOG(m_loggerDetail) << "Block headers map is empty, but block bodies map is not. "
798 "Force-clearing (peer: "
799 << _peerID << ")";
800 m_bodies.clear();
801 }
802 completeSync();
803 }
804 break;
805 case ImportResult::FutureTimeKnown:
806 //TODO: Rating dependent on how far in future it is.
807 break;
808
809 case ImportResult::Malformed:
810 case ImportResult::BadChain:
811 logNewBlock(h);
812 m_host.disablePeer(_peerID, "Malformed block received.");
813 return;
814
815 case ImportResult::AlreadyInChain:
816 case ImportResult::AlreadyKnown:
817 break;
818
819 case ImportResult::FutureTimeUnknown:
820 case ImportResult::UnknownParent:
821 {
822 peer.incrementUnknownNewBlocks();
823 if (peer.unknownNewBlocks() > c_maxPeerUknownNewBlocks)
824 {
825 m_host.disablePeer(_peerID, "Too many uknown new blocks");
826 restartSync();
827 }
828 logNewBlock(h);
829 u256 totalDifficulty = _r[1].toInt<u256>();
830 if (totalDifficulty > peer.totalDifficulty())
831 {
832 LOG(m_loggerDetail) << "Received block (" << blockNumber
833 << ") with no known parent. Peer " << _peerID
834 << " needs syncing...";
835 syncPeer(_peerID, true);
836 }
837 break;
838 }
839 default:;
840 }
841 }
842
status() const843 SyncStatus BlockChainSync::status() const
844 {
845 RecursiveGuard l(x_sync);
846 SyncStatus res;
847 res.state = m_state;
848 res.protocolVersion = 62;
849 res.startBlockNumber = m_startingBlock;
850 res.currentBlockNumber = host().chain().number();
851 res.highestBlockNumber = m_highestBlock;
852 return res;
853 }
854
resetSync()855 void BlockChainSync::resetSync()
856 {
857 m_downloadingHeaders.clear();
858 m_downloadingBodies.clear();
859 m_headers.clear();
860 m_bodies.clear();
861 m_headerSyncPeers.clear();
862 m_bodySyncPeers.clear();
863 m_headerIdToNumber.clear();
864 m_syncingTotalDifficulty = 0;
865 m_state = SyncState::NotSynced;
866 }
867
restartSync()868 void BlockChainSync::restartSync()
869 {
870 RecursiveGuard l(x_sync);
871 resetSync();
872 m_highestBlock = 0;
873 m_haveCommonHeader = false;
874 host().bq().clear();
875 m_startingBlock = host().chain().number();
876 m_lastImportedBlock = m_startingBlock;
877 m_lastImportedBlockHash = host().chain().currentHash();
878 }
879
completeSync()880 void BlockChainSync::completeSync()
881 {
882 RecursiveGuard l(x_sync);
883 resetSync();
884 m_state = SyncState::Idle;
885 }
886
isSyncing() const887 bool BlockChainSync::isSyncing() const
888 {
889 return m_state != SyncState::Idle;
890 }
891
onPeerNewHashes(NodeID const & _peerID,std::vector<std::pair<h256,u256>> const & _hashes)892 void BlockChainSync::onPeerNewHashes(
893 NodeID const& _peerID, std::vector<std::pair<h256, u256>> const& _hashes)
894 {
895 RecursiveGuard l(x_sync);
896 DEV_INVARIANT_CHECK;
897
898 auto& peer = m_host.peer(_peerID);
899 if (peer.isConversing())
900 {
901 LOG(m_loggerDetail) << "Ignoring new hashes since we're already downloading from peer "
902 << _peerID;
903 return;
904 }
905 LOG(m_loggerDetail) << "Not syncing and new block hash discovered: syncing with peer "
906 << _peerID;
907 unsigned knowns = 0;
908 unsigned unknowns = 0;
909 unsigned maxHeight = 0;
910 for (auto const& p: _hashes)
911 {
912 h256 const& h = p.first;
913 m_host.capabilityHost().updateRating(_peerID, 1);
914 peer.markBlockAsKnown(h);
915 auto status = host().bq().blockStatus(h);
916 if (status == QueueStatus::Importing || status == QueueStatus::Ready || host().chain().isKnown(h))
917 knowns++;
918 else if (status == QueueStatus::Bad)
919 {
920 LOG(m_loggerWarning) << "block hash bad!" << h << ". Bailing... (peer: " << _peerID
921 << ")";
922 return;
923 }
924 else if (status == QueueStatus::Unknown)
925 {
926 unknowns++;
927 if (p.second > maxHeight)
928 {
929 maxHeight = (unsigned)p.second;
930 peer.setLatestHash(h);
931 }
932 }
933 else
934 knowns++;
935 }
936 LOG(m_logger) << knowns << " knowns, " << unknowns << " unknowns (peer: " << _peerID << ")";
937 if (unknowns > 0)
938 {
939 LOG(m_loggerDetail) << "Not syncing and new block hash discovered: start syncing with "
940 << _peerID;
941 syncPeer(_peerID, true);
942 }
943 }
944
onPeerAborting()945 void BlockChainSync::onPeerAborting()
946 {
947 RecursiveGuard l(x_sync);
948 // Can't check invariants here since the peers is already removed from the list and the state is not updated yet.
949 clearPeerDownload();
950 continueSync();
951 DEV_INVARIANT_CHECK_HERE;
952 }
953
invariants() const954 bool BlockChainSync::invariants() const
955 {
956 if (!isSyncing() && !m_headers.empty())
957 BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Got headers while not syncing"));
958 if (!isSyncing() && !m_bodies.empty())
959 BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Got bodies while not syncing"));
960 if (isSyncing() && m_host.chain().number() > 0 && m_haveCommonHeader && m_lastImportedBlock == 0)
961 BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Common block not found"));
962 if (isSyncing() && !m_headers.empty() && m_lastImportedBlock >= m_headers.begin()->first)
963 BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Header is too old"));
964 if (m_headerSyncPeers.empty() != m_downloadingHeaders.empty())
965 BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Header download map mismatch"));
966 if (m_bodySyncPeers.empty() != m_downloadingBodies.empty() && m_downloadingBodies.size() <= m_headerIdToNumber.size())
967 BOOST_THROW_EXCEPTION(FailedInvariant() << errinfo_comment("Body download map mismatch"));
968 return true;
969 }
970