1// Copyright 2015 The go-ethereum Authors 2// This file is part of the go-ethereum library. 3// 4// The go-ethereum library is free software: you can redistribute it and/or modify 5// it under the terms of the GNU Lesser General Public License as published by 6// the Free Software Foundation, either version 3 of the License, or 7// (at your option) any later version. 8// 9// The go-ethereum library is distributed in the hope that it will be useful, 10// but WITHOUT ANY WARRANTY; without even the implied warranty of 11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12// GNU Lesser General Public License for more details. 13// 14// You should have received a copy of the GNU Lesser General Public License 15// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. 16 17// Package downloader contains the manual full chain synchronisation. 18package downloader 19 20import ( 21 "errors" 22 "fmt" 23 "math/big" 24 "sync" 25 "sync/atomic" 26 "time" 27 28 ethereum "github.com/ethereum/go-ethereum" 29 "github.com/ethereum/go-ethereum/common" 30 "github.com/ethereum/go-ethereum/core/rawdb" 31 "github.com/ethereum/go-ethereum/core/types" 32 "github.com/ethereum/go-ethereum/ethdb" 33 "github.com/ethereum/go-ethereum/event" 34 "github.com/ethereum/go-ethereum/log" 35 "github.com/ethereum/go-ethereum/metrics" 36 "github.com/ethereum/go-ethereum/params" 37) 38 39var ( 40 MaxHashFetch = 512 // Amount of hashes to be fetched per retrieval request 41 MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request 42 MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request 43 MaxSkeletonSize = 128 // Number of header fetches to need for a skeleton assembly 44 MaxBodyFetch = 128 // Amount of block bodies to be fetched per retrieval request 45 MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request 46 MaxStateFetch = 384 // Amount of node state values to allow fetching per request 47 48 MaxForkAncestry = 3 * params.EpochDuration // Maximum chain reorganisation 49 rttMinEstimate = 2 * time.Second // Minimum round-trip time to target for download requests 50 rttMaxEstimate = 20 * time.Second // Maximum round-trip time to target for download requests 51 rttMinConfidence = 0.1 // Worse confidence factor in our estimated RTT value 52 ttlScaling = 3 // Constant scaling factor for RTT -> TTL conversion 53 ttlLimit = time.Minute // Maximum TTL allowance to prevent reaching crazy timeouts 54 55 qosTuningPeers = 5 // Number of peers to tune based on (best peers) 56 qosConfidenceCap = 10 // Number of peers above which not to modify RTT confidence 57 qosTuningImpact = 0.25 // Impact that a new tuning target has on the previous value 58 59 maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection) 60 maxHeadersProcess = 2048 // Number of header download results to import at once into the chain 61 maxResultsProcess = 2048 // Number of content download results to import at once into the chain 62 63 fsHeaderCheckFrequency = 100 // Verification frequency of the downloaded headers during fast sync 64 fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected 65 fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it 66 fsHeaderContCheck = 3 * time.Second // Time interval to check for header continuations during state download 67 fsMinFullBlocks = 64 // Number of blocks to retrieve fully even in fast sync 68) 69 70var ( 71 errBusy = errors.New("busy") 72 errUnknownPeer = errors.New("peer is unknown or unhealthy") 73 errBadPeer = errors.New("action from bad peer ignored") 74 errStallingPeer = errors.New("peer is stalling") 75 errNoPeers = errors.New("no peers to keep download active") 76 errTimeout = errors.New("timeout") 77 errEmptyHeaderSet = errors.New("empty header set by peer") 78 errPeersUnavailable = errors.New("no peers available or all tried for download") 79 errInvalidAncestor = errors.New("retrieved ancestor is invalid") 80 errInvalidChain = errors.New("retrieved hash chain is invalid") 81 errInvalidBlock = errors.New("retrieved block is invalid") 82 errInvalidBody = errors.New("retrieved block body is invalid") 83 errInvalidReceipt = errors.New("retrieved receipt is invalid") 84 errCancelBlockFetch = errors.New("block download canceled (requested)") 85 errCancelHeaderFetch = errors.New("block header download canceled (requested)") 86 errCancelBodyFetch = errors.New("block body download canceled (requested)") 87 errCancelReceiptFetch = errors.New("receipt download canceled (requested)") 88 errCancelStateFetch = errors.New("state data download canceled (requested)") 89 errCancelHeaderProcessing = errors.New("header processing canceled (requested)") 90 errCancelContentProcessing = errors.New("content processing canceled (requested)") 91 errNoSyncActive = errors.New("no sync active") 92 errTooOld = errors.New("peer doesn't speak recent enough protocol version (need version >= 62)") 93) 94 95type Downloader struct { 96 mode SyncMode // Synchronisation mode defining the strategy used (per sync cycle) 97 mux *event.TypeMux // Event multiplexer to announce sync operation events 98 99 queue *queue // Scheduler for selecting the hashes to download 100 peers *peerSet // Set of active peers from which download can proceed 101 stateDB ethdb.Database 102 103 rttEstimate uint64 // Round trip time to target for download requests 104 rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops) 105 106 // Statistics 107 syncStatsChainOrigin uint64 // Origin block number where syncing started at 108 syncStatsChainHeight uint64 // Highest block number known when syncing started 109 syncStatsState stateSyncStats 110 syncStatsLock sync.RWMutex // Lock protecting the sync stats fields 111 112 lightchain LightChain 113 blockchain BlockChain 114 115 // Callbacks 116 dropPeer peerDropFn // Drops a peer for misbehaving 117 118 // Status 119 synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing 120 synchronising int32 121 notified int32 122 committed int32 123 124 // Channels 125 headerCh chan dataPack // [eth/62] Channel receiving inbound block headers 126 bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies 127 receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts 128 bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks 129 receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks 130 headerProcCh chan []*types.Header // [eth/62] Channel to feed the header processor new tasks 131 132 // for stateFetcher 133 stateSyncStart chan *stateSync 134 trackStateReq chan *stateReq 135 stateCh chan dataPack // [eth/63] Channel receiving inbound node state data 136 137 // Cancellation and termination 138 cancelPeer string // Identifier of the peer currently being used as the master (cancel on drop) 139 cancelCh chan struct{} // Channel to cancel mid-flight syncs 140 cancelLock sync.RWMutex // Lock to protect the cancel channel and peer in delivers 141 cancelWg sync.WaitGroup // Make sure all fetcher goroutines have exited. 142 143 quitCh chan struct{} // Quit channel to signal termination 144 quitLock sync.RWMutex // Lock to prevent double closes 145 146 downloads sync.WaitGroup // Keeps track of the currently active downloads 147 148 // Testing hooks 149 syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run 150 bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch 151 receiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetch 152 chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations) 153} 154 155// LightChain encapsulates functions required to synchronise a light chain. 156type LightChain interface { 157 // HasHeader verifies a header's presence in the local chain. 158 HasHeader(common.Hash, uint64) bool 159 160 // GetHeaderByHash retrieves a header from the local chain. 161 GetHeaderByHash(common.Hash) *types.Header 162 163 // CurrentHeader retrieves the head header from the local chain. 164 CurrentHeader() *types.Header 165 166 // GetTd returns the total difficulty of a local block. 167 GetTd(common.Hash, uint64) *big.Int 168 169 // InsertHeaderChain inserts a batch of headers into the local chain. 170 InsertHeaderChain([]*types.Header, int) (int, error) 171 172 // Rollback removes a few recently added elements from the local chain. 173 Rollback([]common.Hash) 174} 175 176// BlockChain encapsulates functions required to sync a (full or fast) blockchain. 177type BlockChain interface { 178 LightChain 179 180 // HasBlock verifies a block's presence in the local chain. 181 HasBlock(common.Hash, uint64) bool 182 183 // GetBlockByHash retrieves a block from the local chain. 184 GetBlockByHash(common.Hash) *types.Block 185 186 // CurrentBlock retrieves the head block from the local chain. 187 CurrentBlock() *types.Block 188 189 // CurrentFastBlock retrieves the head fast block from the local chain. 190 CurrentFastBlock() *types.Block 191 192 // FastSyncCommitHead directly commits the head block to a certain entity. 193 FastSyncCommitHead(common.Hash) error 194 195 // InsertChain inserts a batch of blocks into the local chain. 196 InsertChain(types.Blocks) (int, error) 197 198 // InsertReceiptChain inserts a batch of receipts into the local chain. 199 InsertReceiptChain(types.Blocks, []types.Receipts) (int, error) 200} 201 202// New creates a new downloader to fetch hashes and blocks from remote peers. 203func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader { 204 if lightchain == nil { 205 lightchain = chain 206 } 207 208 dl := &Downloader{ 209 mode: mode, 210 stateDB: stateDb, 211 mux: mux, 212 queue: newQueue(), 213 peers: newPeerSet(), 214 rttEstimate: uint64(rttMaxEstimate), 215 rttConfidence: uint64(1000000), 216 blockchain: chain, 217 lightchain: lightchain, 218 dropPeer: dropPeer, 219 headerCh: make(chan dataPack, 1), 220 bodyCh: make(chan dataPack, 1), 221 receiptCh: make(chan dataPack, 1), 222 bodyWakeCh: make(chan bool, 1), 223 receiptWakeCh: make(chan bool, 1), 224 headerProcCh: make(chan []*types.Header, 1), 225 quitCh: make(chan struct{}), 226 stateCh: make(chan dataPack), 227 stateSyncStart: make(chan *stateSync), 228 syncStatsState: stateSyncStats{ 229 processed: rawdb.ReadFastTrieProgress(stateDb), 230 }, 231 trackStateReq: make(chan *stateReq), 232 } 233 go dl.qosTuner() 234 go dl.stateFetcher() 235 return dl 236} 237 238// Progress retrieves the synchronisation boundaries, specifically the origin 239// block where synchronisation started at (may have failed/suspended); the block 240// or header sync is currently at; and the latest known block which the sync targets. 241// 242// In addition, during the state download phase of fast synchronisation the number 243// of processed and the total number of known states are also returned. Otherwise 244// these are zero. 245func (d *Downloader) Progress() ethereum.SyncProgress { 246 // Lock the current stats and return the progress 247 d.syncStatsLock.RLock() 248 defer d.syncStatsLock.RUnlock() 249 250 current := uint64(0) 251 switch d.mode { 252 case FullSync: 253 current = d.blockchain.CurrentBlock().NumberU64() 254 case FastSync: 255 current = d.blockchain.CurrentFastBlock().NumberU64() 256 case LightSync: 257 current = d.lightchain.CurrentHeader().Number.Uint64() 258 } 259 return ethereum.SyncProgress{ 260 StartingBlock: d.syncStatsChainOrigin, 261 CurrentBlock: current, 262 HighestBlock: d.syncStatsChainHeight, 263 PulledStates: d.syncStatsState.processed, 264 KnownStates: d.syncStatsState.processed + d.syncStatsState.pending, 265 } 266} 267 268// Synchronising returns whether the downloader is currently retrieving blocks. 269func (d *Downloader) Synchronising() bool { 270 return atomic.LoadInt32(&d.synchronising) > 0 271} 272 273// RegisterPeer injects a new download peer into the set of block source to be 274// used for fetching hashes and blocks from. 275func (d *Downloader) RegisterPeer(id string, version int, peer Peer) error { 276 logger := log.New("peer", id) 277 logger.Trace("Registering sync peer") 278 if err := d.peers.Register(newPeerConnection(id, version, peer, logger)); err != nil { 279 logger.Error("Failed to register sync peer", "err", err) 280 return err 281 } 282 d.qosReduceConfidence() 283 284 return nil 285} 286 287// RegisterLightPeer injects a light client peer, wrapping it so it appears as a regular peer. 288func (d *Downloader) RegisterLightPeer(id string, version int, peer LightPeer) error { 289 return d.RegisterPeer(id, version, &lightPeerWrapper{peer}) 290} 291 292// UnregisterPeer remove a peer from the known list, preventing any action from 293// the specified peer. An effort is also made to return any pending fetches into 294// the queue. 295func (d *Downloader) UnregisterPeer(id string) error { 296 // Unregister the peer from the active peer set and revoke any fetch tasks 297 logger := log.New("peer", id) 298 logger.Trace("Unregistering sync peer") 299 if err := d.peers.Unregister(id); err != nil { 300 logger.Error("Failed to unregister sync peer", "err", err) 301 return err 302 } 303 d.queue.Revoke(id) 304 305 // If this peer was the master peer, abort sync immediately 306 d.cancelLock.RLock() 307 master := id == d.cancelPeer 308 d.cancelLock.RUnlock() 309 310 if master { 311 d.cancel() 312 } 313 return nil 314} 315 316// Synchronise tries to sync up our local block chain with a remote peer, both 317// adding various sanity checks as well as wrapping it with various log entries. 318func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode SyncMode) error { 319 err := d.synchronise(id, head, td, mode) 320 switch err { 321 case nil: 322 case errBusy: 323 324 case errTimeout, errBadPeer, errStallingPeer, 325 errEmptyHeaderSet, errPeersUnavailable, errTooOld, 326 errInvalidAncestor, errInvalidChain: 327 log.Warn("Synchronisation failed, dropping peer", "peer", id, "err", err) 328 if d.dropPeer == nil { 329 // The dropPeer method is nil when `--copydb` is used for a local copy. 330 // Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored 331 log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", id) 332 } else { 333 d.dropPeer(id) 334 } 335 default: 336 log.Warn("Synchronisation failed, retrying", "err", err) 337 } 338 return err 339} 340 341// synchronise will select the peer and use it for synchronising. If an empty string is given 342// it will use the best peer possible and synchronize if its TD is higher than our own. If any of the 343// checks fail an error will be returned. This method is synchronous 344func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode SyncMode) error { 345 // Mock out the synchronisation if testing 346 if d.synchroniseMock != nil { 347 return d.synchroniseMock(id, hash) 348 } 349 // Make sure only one goroutine is ever allowed past this point at once 350 if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) { 351 return errBusy 352 } 353 defer atomic.StoreInt32(&d.synchronising, 0) 354 355 // Post a user notification of the sync (only once per session) 356 if atomic.CompareAndSwapInt32(&d.notified, 0, 1) { 357 log.Info("Block synchronisation started") 358 } 359 // Reset the queue, peer set and wake channels to clean any internal leftover state 360 d.queue.Reset() 361 d.peers.Reset() 362 363 for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { 364 select { 365 case <-ch: 366 default: 367 } 368 } 369 for _, ch := range []chan dataPack{d.headerCh, d.bodyCh, d.receiptCh} { 370 for empty := false; !empty; { 371 select { 372 case <-ch: 373 default: 374 empty = true 375 } 376 } 377 } 378 for empty := false; !empty; { 379 select { 380 case <-d.headerProcCh: 381 default: 382 empty = true 383 } 384 } 385 // Create cancel channel for aborting mid-flight and mark the master peer 386 d.cancelLock.Lock() 387 d.cancelCh = make(chan struct{}) 388 d.cancelPeer = id 389 d.cancelLock.Unlock() 390 391 defer d.Cancel() // No matter what, we can't leave the cancel channel open 392 393 // Set the requested sync mode, unless it's forbidden 394 d.mode = mode 395 396 // Retrieve the origin peer and initiate the downloading process 397 p := d.peers.Peer(id) 398 if p == nil { 399 return errUnknownPeer 400 } 401 return d.syncWithPeer(p, hash, td) 402} 403 404// syncWithPeer starts a block synchronization based on the hash chain from the 405// specified peer and head hash. 406func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) { 407 d.mux.Post(StartEvent{}) 408 d.downloads.Add(1) 409 defer func() { 410 d.downloads.Done() 411 // reset on error 412 if err != nil { 413 d.mux.Post(FailedEvent{err}) 414 } else { 415 d.mux.Post(DoneEvent{}) 416 } 417 }() 418 if p.version < 62 { 419 return errTooOld 420 } 421 422 log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash, "td", td, "mode", d.mode) 423 defer func(start time.Time) { 424 log.Debug("Synchronisation terminated", "elapsed", time.Since(start)) 425 }(time.Now()) 426 427 // Look up the sync boundaries: the common ancestor and the target block 428 latest, err := d.fetchHeight(p) 429 if err != nil { 430 return err 431 } 432 height := latest.Number.Uint64() 433 434 origin, err := d.findAncestor(p, height) 435 if err != nil { 436 return err 437 } 438 d.syncStatsLock.Lock() 439 if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin { 440 d.syncStatsChainOrigin = origin 441 } 442 d.syncStatsChainHeight = height 443 d.syncStatsLock.Unlock() 444 445 // Ensure our origin point is below any fast sync pivot point 446 pivot := uint64(0) 447 if d.mode == FastSync { 448 if height <= uint64(fsMinFullBlocks) { 449 origin = 0 450 } else { 451 pivot = height - uint64(fsMinFullBlocks) 452 if pivot <= origin { 453 origin = pivot - 1 454 } 455 } 456 } 457 d.committed = 1 458 if d.mode == FastSync && pivot != 0 { 459 d.committed = 0 460 } 461 // Initiate the sync using a concurrent header and content retrieval algorithm 462 d.queue.Prepare(origin+1, d.mode) 463 if d.syncInitHook != nil { 464 d.syncInitHook(origin, height) 465 } 466 467 fetchers := []func() error{ 468 func() error { return d.fetchHeaders(p, origin+1, pivot) }, // Headers are always retrieved 469 func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync 470 func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync 471 func() error { return d.processHeaders(origin+1, pivot, td) }, 472 } 473 if d.mode == FastSync { 474 fetchers = append(fetchers, func() error { return d.processFastSyncContent(latest) }) 475 } else if d.mode == FullSync { 476 fetchers = append(fetchers, d.processFullSyncContent) 477 } 478 return d.spawnSync(errCancelHeaderFetch, fetchers) 479} 480 481// spawnSync runs d.process and all given fetcher functions to completion in 482// separate goroutines, returning the first error that appears. 483func (d *Downloader) spawnSync(errCancel error, fetchers []func() error) error { 484 d.cancelLock.Lock() 485 select { 486 case <-d.cancelCh: 487 d.cancelLock.Unlock() 488 return errCancel 489 default: 490 } 491 errc := make(chan error, len(fetchers)) 492 d.cancelWg.Add(len(fetchers)) 493 d.cancelLock.Unlock() 494 for _, fn := range fetchers { 495 fn := fn 496 go func() { defer d.cancelWg.Done(); errc <- fn() }() 497 } 498 // Wait for the first error, then terminate the others. 499 var err error 500 for i := 0; i < len(fetchers); i++ { 501 if i == len(fetchers)-1 { 502 // Close the queue when all fetchers have exited. 503 // This will cause the block processor to end when 504 // it has processed the queue. 505 d.queue.Close() 506 } 507 if err = <-errc; err != nil { 508 break 509 } 510 } 511 d.queue.Close() 512 d.Cancel() 513 return err 514} 515 516// cancel aborts all of the operations and resets the queue. However, cancel does 517// not wait for the running download goroutines to finish. This method should be 518// used when cancelling the downloads from inside the downloader. 519func (d *Downloader) cancel() { 520 // Close the current cancel channel 521 d.cancelLock.Lock() 522 if d.cancelCh != nil { 523 select { 524 case <-d.cancelCh: 525 // Channel was already closed 526 default: 527 close(d.cancelCh) 528 } 529 } 530 d.cancelLock.Unlock() 531} 532 533// Cancel aborts all of the operations and waits for all download goroutines to 534// finish before returning. 535func (d *Downloader) Cancel() { 536 d.cancel() 537 d.cancelWg.Wait() 538} 539 540// Terminate interrupts the downloader, canceling all pending operations. 541// The downloader cannot be reused after calling Terminate. 542func (d *Downloader) Terminate() { 543 // Close the termination channel (make sure double close is allowed) 544 d.quitLock.Lock() 545 select { 546 case <-d.quitCh: 547 default: 548 close(d.quitCh) 549 } 550 d.quitLock.Unlock() 551 552 // Cancel any pending download requests 553 d.Cancel() 554 555 // Wait, so external dependencies aren't destroyed 556 // until the download processing is done. 557 d.downloads.Wait() 558} 559 560// fetchHeight retrieves the head header of the remote peer to aid in estimating 561// the total time a pending synchronisation would take. 562func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) { 563 p.log.Debug("Retrieving remote chain height") 564 565 // Request the advertised remote head block and wait for the response 566 head, _ := p.peer.Head() 567 go p.peer.RequestHeadersByHash(head, 1, 0, false) 568 569 ttl := d.requestTTL() 570 timeout := time.After(ttl) 571 for { 572 select { 573 case <-d.cancelCh: 574 return nil, errCancelBlockFetch 575 576 case packet := <-d.headerCh: 577 // Discard anything not from the origin peer 578 if packet.PeerId() != p.id { 579 log.Debug("Received headers from incorrect peer", "peer", packet.PeerId()) 580 break 581 } 582 // Make sure the peer actually gave something valid 583 headers := packet.(*headerPack).headers 584 if len(headers) != 1 { 585 p.log.Debug("Multiple headers for single request", "headers", len(headers)) 586 return nil, errBadPeer 587 } 588 head := headers[0] 589 p.log.Debug("Remote head header identified", "number", head.Number, "hash", head.Hash()) 590 return head, nil 591 592 case <-timeout: 593 p.log.Debug("Waiting for head header timed out", "elapsed", ttl) 594 return nil, errTimeout 595 596 case <-d.bodyCh: 597 case <-d.receiptCh: 598 // Out of bounds delivery, ignore 599 } 600 } 601} 602 603// findAncestor tries to locate the common ancestor link of the local chain and 604// a remote peers blockchain. In the general case when our node was in sync and 605// on the correct chain, checking the top N links should already get us a match. 606// In the rare scenario when we ended up on a long reorganisation (i.e. none of 607// the head links match), we do a binary search to find the common ancestor. 608func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, error) { 609 // Figure out the valid ancestor range to prevent rewrite attacks 610 floor, ceil := int64(-1), d.lightchain.CurrentHeader().Number.Uint64() 611 612 if d.mode == FullSync { 613 ceil = d.blockchain.CurrentBlock().NumberU64() 614 } else if d.mode == FastSync { 615 ceil = d.blockchain.CurrentFastBlock().NumberU64() 616 } 617 if ceil >= MaxForkAncestry { 618 floor = int64(ceil - MaxForkAncestry) 619 } 620 p.log.Debug("Looking for common ancestor", "local", ceil, "remote", height) 621 622 // Request the topmost blocks to short circuit binary ancestor lookup 623 head := ceil 624 if head > height { 625 head = height 626 } 627 from := int64(head) - int64(MaxHeaderFetch) 628 if from < 0 { 629 from = 0 630 } 631 // Span out with 15 block gaps into the future to catch bad head reports 632 limit := 2 * MaxHeaderFetch / 16 633 count := 1 + int((int64(ceil)-from)/16) 634 if count > limit { 635 count = limit 636 } 637 go p.peer.RequestHeadersByNumber(uint64(from), count, 15, false) 638 639 // Wait for the remote response to the head fetch 640 number, hash := uint64(0), common.Hash{} 641 642 ttl := d.requestTTL() 643 timeout := time.After(ttl) 644 645 for finished := false; !finished; { 646 select { 647 case <-d.cancelCh: 648 return 0, errCancelHeaderFetch 649 650 case packet := <-d.headerCh: 651 // Discard anything not from the origin peer 652 if packet.PeerId() != p.id { 653 log.Debug("Received headers from incorrect peer", "peer", packet.PeerId()) 654 break 655 } 656 // Make sure the peer actually gave something valid 657 headers := packet.(*headerPack).headers 658 if len(headers) == 0 { 659 p.log.Warn("Empty head header set") 660 return 0, errEmptyHeaderSet 661 } 662 // Make sure the peer's reply conforms to the request 663 for i := 0; i < len(headers); i++ { 664 if number := headers[i].Number.Int64(); number != from+int64(i)*16 { 665 p.log.Warn("Head headers broke chain ordering", "index", i, "requested", from+int64(i)*16, "received", number) 666 return 0, errInvalidChain 667 } 668 } 669 // Check if a common ancestor was found 670 finished = true 671 for i := len(headers) - 1; i >= 0; i-- { 672 // Skip any headers that underflow/overflow our requested set 673 if headers[i].Number.Int64() < from || headers[i].Number.Uint64() > ceil { 674 continue 675 } 676 // Otherwise check if we already know the header or not 677 if (d.mode == FullSync && d.blockchain.HasBlock(headers[i].Hash(), headers[i].Number.Uint64())) || (d.mode != FullSync && d.lightchain.HasHeader(headers[i].Hash(), headers[i].Number.Uint64())) { 678 number, hash = headers[i].Number.Uint64(), headers[i].Hash() 679 680 // If every header is known, even future ones, the peer straight out lied about its head 681 if number > height && i == limit-1 { 682 p.log.Warn("Lied about chain head", "reported", height, "found", number) 683 return 0, errStallingPeer 684 } 685 break 686 } 687 } 688 689 case <-timeout: 690 p.log.Debug("Waiting for head header timed out", "elapsed", ttl) 691 return 0, errTimeout 692 693 case <-d.bodyCh: 694 case <-d.receiptCh: 695 // Out of bounds delivery, ignore 696 } 697 } 698 // If the head fetch already found an ancestor, return 699 if hash != (common.Hash{}) { 700 if int64(number) <= floor { 701 p.log.Warn("Ancestor below allowance", "number", number, "hash", hash, "allowance", floor) 702 return 0, errInvalidAncestor 703 } 704 p.log.Debug("Found common ancestor", "number", number, "hash", hash) 705 return number, nil 706 } 707 // Ancestor not found, we need to binary search over our chain 708 start, end := uint64(0), head 709 if floor > 0 { 710 start = uint64(floor) 711 } 712 for start+1 < end { 713 // Split our chain interval in two, and request the hash to cross check 714 check := (start + end) / 2 715 716 ttl := d.requestTTL() 717 timeout := time.After(ttl) 718 719 go p.peer.RequestHeadersByNumber(check, 1, 0, false) 720 721 // Wait until a reply arrives to this request 722 for arrived := false; !arrived; { 723 select { 724 case <-d.cancelCh: 725 return 0, errCancelHeaderFetch 726 727 case packer := <-d.headerCh: 728 // Discard anything not from the origin peer 729 if packer.PeerId() != p.id { 730 log.Debug("Received headers from incorrect peer", "peer", packer.PeerId()) 731 break 732 } 733 // Make sure the peer actually gave something valid 734 headers := packer.(*headerPack).headers 735 if len(headers) != 1 { 736 p.log.Debug("Multiple headers for single request", "headers", len(headers)) 737 return 0, errBadPeer 738 } 739 arrived = true 740 741 // Modify the search interval based on the response 742 if (d.mode == FullSync && !d.blockchain.HasBlock(headers[0].Hash(), headers[0].Number.Uint64())) || (d.mode != FullSync && !d.lightchain.HasHeader(headers[0].Hash(), headers[0].Number.Uint64())) { 743 end = check 744 break 745 } 746 header := d.lightchain.GetHeaderByHash(headers[0].Hash()) // Independent of sync mode, header surely exists 747 if header.Number.Uint64() != check { 748 p.log.Debug("Received non requested header", "number", header.Number, "hash", header.Hash(), "request", check) 749 return 0, errBadPeer 750 } 751 start = check 752 753 case <-timeout: 754 p.log.Debug("Waiting for search header timed out", "elapsed", ttl) 755 return 0, errTimeout 756 757 case <-d.bodyCh: 758 case <-d.receiptCh: 759 // Out of bounds delivery, ignore 760 } 761 } 762 } 763 // Ensure valid ancestry and return 764 if int64(start) <= floor { 765 p.log.Warn("Ancestor below allowance", "number", start, "hash", hash, "allowance", floor) 766 return 0, errInvalidAncestor 767 } 768 p.log.Debug("Found common ancestor", "number", start, "hash", hash) 769 return start, nil 770} 771 772// fetchHeaders keeps retrieving headers concurrently from the number 773// requested, until no more are returned, potentially throttling on the way. To 774// facilitate concurrency but still protect against malicious nodes sending bad 775// headers, we construct a header chain skeleton using the "origin" peer we are 776// syncing with, and fill in the missing headers using anyone else. Headers from 777// other peers are only accepted if they map cleanly to the skeleton. If no one 778// can fill in the skeleton - not even the origin peer - it's assumed invalid and 779// the origin is dropped. 780func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) error { 781 p.log.Debug("Directing header downloads", "origin", from) 782 defer p.log.Debug("Header download terminated") 783 784 // Create a timeout timer, and the associated header fetcher 785 skeleton := true // Skeleton assembly phase or finishing up 786 request := time.Now() // time of the last skeleton fetch request 787 timeout := time.NewTimer(0) // timer to dump a non-responsive active peer 788 <-timeout.C // timeout channel should be initially empty 789 defer timeout.Stop() 790 791 var ttl time.Duration 792 getHeaders := func(from uint64) { 793 request = time.Now() 794 795 ttl = d.requestTTL() 796 timeout.Reset(ttl) 797 798 if skeleton { 799 p.log.Trace("Fetching skeleton headers", "count", MaxHeaderFetch, "from", from) 800 go p.peer.RequestHeadersByNumber(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false) 801 } else { 802 p.log.Trace("Fetching full headers", "count", MaxHeaderFetch, "from", from) 803 go p.peer.RequestHeadersByNumber(from, MaxHeaderFetch, 0, false) 804 } 805 } 806 // Start pulling the header chain skeleton until all is done 807 getHeaders(from) 808 809 for { 810 select { 811 case <-d.cancelCh: 812 return errCancelHeaderFetch 813 814 case packet := <-d.headerCh: 815 // Make sure the active peer is giving us the skeleton headers 816 if packet.PeerId() != p.id { 817 log.Debug("Received skeleton from incorrect peer", "peer", packet.PeerId()) 818 break 819 } 820 headerReqTimer.UpdateSince(request) 821 timeout.Stop() 822 823 // If the skeleton's finished, pull any remaining head headers directly from the origin 824 if packet.Items() == 0 && skeleton { 825 skeleton = false 826 getHeaders(from) 827 continue 828 } 829 // If no more headers are inbound, notify the content fetchers and return 830 if packet.Items() == 0 { 831 // Don't abort header fetches while the pivot is downloading 832 if atomic.LoadInt32(&d.committed) == 0 && pivot <= from { 833 p.log.Debug("No headers, waiting for pivot commit") 834 select { 835 case <-time.After(fsHeaderContCheck): 836 getHeaders(from) 837 continue 838 case <-d.cancelCh: 839 return errCancelHeaderFetch 840 } 841 } 842 // Pivot done (or not in fast sync) and no more headers, terminate the process 843 p.log.Debug("No more headers available") 844 select { 845 case d.headerProcCh <- nil: 846 return nil 847 case <-d.cancelCh: 848 return errCancelHeaderFetch 849 } 850 } 851 headers := packet.(*headerPack).headers 852 853 // If we received a skeleton batch, resolve internals concurrently 854 if skeleton { 855 filled, proced, err := d.fillHeaderSkeleton(from, headers) 856 if err != nil { 857 p.log.Debug("Skeleton chain invalid", "err", err) 858 return errInvalidChain 859 } 860 headers = filled[proced:] 861 from += uint64(proced) 862 } 863 // Insert all the new headers and fetch the next batch 864 if len(headers) > 0 { 865 p.log.Trace("Scheduling new headers", "count", len(headers), "from", from) 866 select { 867 case d.headerProcCh <- headers: 868 case <-d.cancelCh: 869 return errCancelHeaderFetch 870 } 871 from += uint64(len(headers)) 872 } 873 getHeaders(from) 874 875 case <-timeout.C: 876 if d.dropPeer == nil { 877 // The dropPeer method is nil when `--copydb` is used for a local copy. 878 // Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored 879 p.log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", p.id) 880 break 881 } 882 // Header retrieval timed out, consider the peer bad and drop 883 p.log.Debug("Header request timed out", "elapsed", ttl) 884 headerTimeoutMeter.Mark(1) 885 d.dropPeer(p.id) 886 887 // Finish the sync gracefully instead of dumping the gathered data though 888 for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { 889 select { 890 case ch <- false: 891 case <-d.cancelCh: 892 } 893 } 894 select { 895 case d.headerProcCh <- nil: 896 case <-d.cancelCh: 897 } 898 return errBadPeer 899 } 900 } 901} 902 903// fillHeaderSkeleton concurrently retrieves headers from all our available peers 904// and maps them to the provided skeleton header chain. 905// 906// Any partial results from the beginning of the skeleton is (if possible) forwarded 907// immediately to the header processor to keep the rest of the pipeline full even 908// in the case of header stalls. 909// 910// The method returns the entire filled skeleton and also the number of headers 911// already forwarded for processing. 912func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ([]*types.Header, int, error) { 913 log.Debug("Filling up skeleton", "from", from) 914 d.queue.ScheduleSkeleton(from, skeleton) 915 916 var ( 917 deliver = func(packet dataPack) (int, error) { 918 pack := packet.(*headerPack) 919 return d.queue.DeliverHeaders(pack.peerID, pack.headers, d.headerProcCh) 920 } 921 expire = func() map[string]int { return d.queue.ExpireHeaders(d.requestTTL()) } 922 throttle = func() bool { return false } 923 reserve = func(p *peerConnection, count int) (*fetchRequest, bool, error) { 924 return d.queue.ReserveHeaders(p, count), false, nil 925 } 926 fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) } 927 capacity = func(p *peerConnection) int { return p.HeaderCapacity(d.requestRTT()) } 928 setIdle = func(p *peerConnection, accepted int) { p.SetHeadersIdle(accepted) } 929 ) 930 err := d.fetchParts(errCancelHeaderFetch, d.headerCh, deliver, d.queue.headerContCh, expire, 931 d.queue.PendingHeaders, d.queue.InFlightHeaders, throttle, reserve, 932 nil, fetch, d.queue.CancelHeaders, capacity, d.peers.HeaderIdlePeers, setIdle, "headers") 933 934 log.Debug("Skeleton fill terminated", "err", err) 935 936 filled, proced := d.queue.RetrieveHeaders() 937 return filled, proced, err 938} 939 940// fetchBodies iteratively downloads the scheduled block bodies, taking any 941// available peers, reserving a chunk of blocks for each, waiting for delivery 942// and also periodically checking for timeouts. 943func (d *Downloader) fetchBodies(from uint64) error { 944 log.Debug("Downloading block bodies", "origin", from) 945 946 var ( 947 deliver = func(packet dataPack) (int, error) { 948 pack := packet.(*bodyPack) 949 return d.queue.DeliverBodies(pack.peerID, pack.transactions, pack.uncles) 950 } 951 expire = func() map[string]int { return d.queue.ExpireBodies(d.requestTTL()) } 952 fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchBodies(req) } 953 capacity = func(p *peerConnection) int { return p.BlockCapacity(d.requestRTT()) } 954 setIdle = func(p *peerConnection, accepted int) { p.SetBodiesIdle(accepted) } 955 ) 956 err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire, 957 d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies, 958 d.bodyFetchHook, fetch, d.queue.CancelBodies, capacity, d.peers.BodyIdlePeers, setIdle, "bodies") 959 960 log.Debug("Block body download terminated", "err", err) 961 return err 962} 963 964// fetchReceipts iteratively downloads the scheduled block receipts, taking any 965// available peers, reserving a chunk of receipts for each, waiting for delivery 966// and also periodically checking for timeouts. 967func (d *Downloader) fetchReceipts(from uint64) error { 968 log.Debug("Downloading transaction receipts", "origin", from) 969 970 var ( 971 deliver = func(packet dataPack) (int, error) { 972 pack := packet.(*receiptPack) 973 return d.queue.DeliverReceipts(pack.peerID, pack.receipts) 974 } 975 expire = func() map[string]int { return d.queue.ExpireReceipts(d.requestTTL()) } 976 fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchReceipts(req) } 977 capacity = func(p *peerConnection) int { return p.ReceiptCapacity(d.requestRTT()) } 978 setIdle = func(p *peerConnection, accepted int) { p.SetReceiptsIdle(accepted) } 979 ) 980 err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire, 981 d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts, 982 d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "receipts") 983 984 log.Debug("Transaction receipt download terminated", "err", err) 985 return err 986} 987 988// fetchParts iteratively downloads scheduled block parts, taking any available 989// peers, reserving a chunk of fetch requests for each, waiting for delivery and 990// also periodically checking for timeouts. 991// 992// As the scheduling/timeout logic mostly is the same for all downloaded data 993// types, this method is used by each for data gathering and is instrumented with 994// various callbacks to handle the slight differences between processing them. 995// 996// The instrumentation parameters: 997// - errCancel: error type to return if the fetch operation is cancelled (mostly makes logging nicer) 998// - deliveryCh: channel from which to retrieve downloaded data packets (merged from all concurrent peers) 999// - deliver: processing callback to deliver data packets into type specific download queues (usually within `queue`) 1000// - wakeCh: notification channel for waking the fetcher when new tasks are available (or sync completed) 1001// - expire: task callback method to abort requests that took too long and return the faulty peers (traffic shaping) 1002// - pending: task callback for the number of requests still needing download (detect completion/non-completability) 1003// - inFlight: task callback for the number of in-progress requests (wait for all active downloads to finish) 1004// - throttle: task callback to check if the processing queue is full and activate throttling (bound memory use) 1005// - reserve: task callback to reserve new download tasks to a particular peer (also signals partial completions) 1006// - fetchHook: tester callback to notify of new tasks being initiated (allows testing the scheduling logic) 1007// - fetch: network callback to actually send a particular download request to a physical remote peer 1008// - cancel: task callback to abort an in-flight download request and allow rescheduling it (in case of lost peer) 1009// - capacity: network callback to retrieve the estimated type-specific bandwidth capacity of a peer (traffic shaping) 1010// - idle: network callback to retrieve the currently (type specific) idle peers that can be assigned tasks 1011// - setIdle: network callback to set a peer back to idle and update its estimated capacity (traffic shaping) 1012// - kind: textual label of the type being downloaded to display in log mesages 1013func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool, 1014 expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, error), 1015 fetchHook func([]*types.Header), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int, 1016 idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int), kind string) error { 1017 1018 // Create a ticker to detect expired retrieval tasks 1019 ticker := time.NewTicker(100 * time.Millisecond) 1020 defer ticker.Stop() 1021 1022 update := make(chan struct{}, 1) 1023 1024 // Prepare the queue and fetch block parts until the block header fetcher's done 1025 finished := false 1026 for { 1027 select { 1028 case <-d.cancelCh: 1029 return errCancel 1030 1031 case packet := <-deliveryCh: 1032 // If the peer was previously banned and failed to deliver its pack 1033 // in a reasonable time frame, ignore its message. 1034 if peer := d.peers.Peer(packet.PeerId()); peer != nil { 1035 // Deliver the received chunk of data and check chain validity 1036 accepted, err := deliver(packet) 1037 if err == errInvalidChain { 1038 return err 1039 } 1040 // Unless a peer delivered something completely else than requested (usually 1041 // caused by a timed out request which came through in the end), set it to 1042 // idle. If the delivery's stale, the peer should have already been idled. 1043 if err != errStaleDelivery { 1044 setIdle(peer, accepted) 1045 } 1046 // Issue a log to the user to see what's going on 1047 switch { 1048 case err == nil && packet.Items() == 0: 1049 peer.log.Trace("Requested data not delivered", "type", kind) 1050 case err == nil: 1051 peer.log.Trace("Delivered new batch of data", "type", kind, "count", packet.Stats()) 1052 default: 1053 peer.log.Trace("Failed to deliver retrieved data", "type", kind, "err", err) 1054 } 1055 } 1056 // Blocks assembled, try to update the progress 1057 select { 1058 case update <- struct{}{}: 1059 default: 1060 } 1061 1062 case cont := <-wakeCh: 1063 // The header fetcher sent a continuation flag, check if it's done 1064 if !cont { 1065 finished = true 1066 } 1067 // Headers arrive, try to update the progress 1068 select { 1069 case update <- struct{}{}: 1070 default: 1071 } 1072 1073 case <-ticker.C: 1074 // Sanity check update the progress 1075 select { 1076 case update <- struct{}{}: 1077 default: 1078 } 1079 1080 case <-update: 1081 // Short circuit if we lost all our peers 1082 if d.peers.Len() == 0 { 1083 return errNoPeers 1084 } 1085 // Check for fetch request timeouts and demote the responsible peers 1086 for pid, fails := range expire() { 1087 if peer := d.peers.Peer(pid); peer != nil { 1088 // If a lot of retrieval elements expired, we might have overestimated the remote peer or perhaps 1089 // ourselves. Only reset to minimal throughput but don't drop just yet. If even the minimal times 1090 // out that sync wise we need to get rid of the peer. 1091 // 1092 // The reason the minimum threshold is 2 is because the downloader tries to estimate the bandwidth 1093 // and latency of a peer separately, which requires pushing the measures capacity a bit and seeing 1094 // how response times reacts, to it always requests one more than the minimum (i.e. min 2). 1095 if fails > 2 { 1096 peer.log.Trace("Data delivery timed out", "type", kind) 1097 setIdle(peer, 0) 1098 } else { 1099 peer.log.Debug("Stalling delivery, dropping", "type", kind) 1100 if d.dropPeer == nil { 1101 // The dropPeer method is nil when `--copydb` is used for a local copy. 1102 // Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored 1103 peer.log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", pid) 1104 } else { 1105 d.dropPeer(pid) 1106 } 1107 } 1108 } 1109 } 1110 // If there's nothing more to fetch, wait or terminate 1111 if pending() == 0 { 1112 if !inFlight() && finished { 1113 log.Debug("Data fetching completed", "type", kind) 1114 return nil 1115 } 1116 break 1117 } 1118 // Send a download request to all idle peers, until throttled 1119 progressed, throttled, running := false, false, inFlight() 1120 idles, total := idle() 1121 1122 for _, peer := range idles { 1123 // Short circuit if throttling activated 1124 if throttle() { 1125 throttled = true 1126 break 1127 } 1128 // Short circuit if there is no more available task. 1129 if pending() == 0 { 1130 break 1131 } 1132 // Reserve a chunk of fetches for a peer. A nil can mean either that 1133 // no more headers are available, or that the peer is known not to 1134 // have them. 1135 request, progress, err := reserve(peer, capacity(peer)) 1136 if err != nil { 1137 return err 1138 } 1139 if progress { 1140 progressed = true 1141 } 1142 if request == nil { 1143 continue 1144 } 1145 if request.From > 0 { 1146 peer.log.Trace("Requesting new batch of data", "type", kind, "from", request.From) 1147 } else { 1148 peer.log.Trace("Requesting new batch of data", "type", kind, "count", len(request.Headers), "from", request.Headers[0].Number) 1149 } 1150 // Fetch the chunk and make sure any errors return the hashes to the queue 1151 if fetchHook != nil { 1152 fetchHook(request.Headers) 1153 } 1154 if err := fetch(peer, request); err != nil { 1155 // Although we could try and make an attempt to fix this, this error really 1156 // means that we've double allocated a fetch task to a peer. If that is the 1157 // case, the internal state of the downloader and the queue is very wrong so 1158 // better hard crash and note the error instead of silently accumulating into 1159 // a much bigger issue. 1160 panic(fmt.Sprintf("%v: %s fetch assignment failed", peer, kind)) 1161 } 1162 running = true 1163 } 1164 // Make sure that we have peers available for fetching. If all peers have been tried 1165 // and all failed throw an error 1166 if !progressed && !throttled && !running && len(idles) == total && pending() > 0 { 1167 return errPeersUnavailable 1168 } 1169 } 1170 } 1171} 1172 1173// processHeaders takes batches of retrieved headers from an input channel and 1174// keeps processing and scheduling them into the header chain and downloader's 1175// queue until the stream ends or a failure occurs. 1176func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) error { 1177 // Keep a count of uncertain headers to roll back 1178 rollback := []*types.Header{} 1179 defer func() { 1180 if len(rollback) > 0 { 1181 // Flatten the headers and roll them back 1182 hashes := make([]common.Hash, len(rollback)) 1183 for i, header := range rollback { 1184 hashes[i] = header.Hash() 1185 } 1186 lastHeader, lastFastBlock, lastBlock := d.lightchain.CurrentHeader().Number, common.Big0, common.Big0 1187 if d.mode != LightSync { 1188 lastFastBlock = d.blockchain.CurrentFastBlock().Number() 1189 lastBlock = d.blockchain.CurrentBlock().Number() 1190 } 1191 d.lightchain.Rollback(hashes) 1192 curFastBlock, curBlock := common.Big0, common.Big0 1193 if d.mode != LightSync { 1194 curFastBlock = d.blockchain.CurrentFastBlock().Number() 1195 curBlock = d.blockchain.CurrentBlock().Number() 1196 } 1197 log.Warn("Rolled back headers", "count", len(hashes), 1198 "header", fmt.Sprintf("%d->%d", lastHeader, d.lightchain.CurrentHeader().Number), 1199 "fast", fmt.Sprintf("%d->%d", lastFastBlock, curFastBlock), 1200 "block", fmt.Sprintf("%d->%d", lastBlock, curBlock)) 1201 } 1202 }() 1203 1204 // Wait for batches of headers to process 1205 gotHeaders := false 1206 1207 for { 1208 select { 1209 case <-d.cancelCh: 1210 return errCancelHeaderProcessing 1211 1212 case headers := <-d.headerProcCh: 1213 // Terminate header processing if we synced up 1214 if len(headers) == 0 { 1215 // Notify everyone that headers are fully processed 1216 for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { 1217 select { 1218 case ch <- false: 1219 case <-d.cancelCh: 1220 } 1221 } 1222 // If no headers were retrieved at all, the peer violated its TD promise that it had a 1223 // better chain compared to ours. The only exception is if its promised blocks were 1224 // already imported by other means (e.g. fecher): 1225 // 1226 // R <remote peer>, L <local node>: Both at block 10 1227 // R: Mine block 11, and propagate it to L 1228 // L: Queue block 11 for import 1229 // L: Notice that R's head and TD increased compared to ours, start sync 1230 // L: Import of block 11 finishes 1231 // L: Sync begins, and finds common ancestor at 11 1232 // L: Request new headers up from 11 (R's TD was higher, it must have something) 1233 // R: Nothing to give 1234 if d.mode != LightSync { 1235 head := d.blockchain.CurrentBlock() 1236 if !gotHeaders && td.Cmp(d.blockchain.GetTd(head.Hash(), head.NumberU64())) > 0 { 1237 return errStallingPeer 1238 } 1239 } 1240 // If fast or light syncing, ensure promised headers are indeed delivered. This is 1241 // needed to detect scenarios where an attacker feeds a bad pivot and then bails out 1242 // of delivering the post-pivot blocks that would flag the invalid content. 1243 // 1244 // This check cannot be executed "as is" for full imports, since blocks may still be 1245 // queued for processing when the header download completes. However, as long as the 1246 // peer gave us something useful, we're already happy/progressed (above check). 1247 if d.mode == FastSync || d.mode == LightSync { 1248 head := d.lightchain.CurrentHeader() 1249 if td.Cmp(d.lightchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 { 1250 return errStallingPeer 1251 } 1252 } 1253 // Disable any rollback and return 1254 rollback = nil 1255 return nil 1256 } 1257 // Otherwise split the chunk of headers into batches and process them 1258 gotHeaders = true 1259 1260 for len(headers) > 0 { 1261 // Terminate if something failed in between processing chunks 1262 select { 1263 case <-d.cancelCh: 1264 return errCancelHeaderProcessing 1265 default: 1266 } 1267 // Select the next chunk of headers to import 1268 limit := maxHeadersProcess 1269 if limit > len(headers) { 1270 limit = len(headers) 1271 } 1272 chunk := headers[:limit] 1273 1274 // In case of header only syncing, validate the chunk immediately 1275 if d.mode == FastSync || d.mode == LightSync { 1276 // Collect the yet unknown headers to mark them as uncertain 1277 unknown := make([]*types.Header, 0, len(headers)) 1278 for _, header := range chunk { 1279 if !d.lightchain.HasHeader(header.Hash(), header.Number.Uint64()) { 1280 unknown = append(unknown, header) 1281 } 1282 } 1283 // If we're importing pure headers, verify based on their recentness 1284 frequency := fsHeaderCheckFrequency 1285 if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot { 1286 frequency = 1 1287 } 1288 if n, err := d.lightchain.InsertHeaderChain(chunk, frequency); err != nil { 1289 // If some headers were inserted, add them too to the rollback list 1290 if n > 0 { 1291 rollback = append(rollback, chunk[:n]...) 1292 } 1293 log.Debug("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "err", err) 1294 return errInvalidChain 1295 } 1296 // All verifications passed, store newly found uncertain headers 1297 rollback = append(rollback, unknown...) 1298 if len(rollback) > fsHeaderSafetyNet { 1299 rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...) 1300 } 1301 } 1302 // Unless we're doing light chains, schedule the headers for associated content retrieval 1303 if d.mode == FullSync || d.mode == FastSync { 1304 // If we've reached the allowed number of pending headers, stall a bit 1305 for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders { 1306 select { 1307 case <-d.cancelCh: 1308 return errCancelHeaderProcessing 1309 case <-time.After(time.Second): 1310 } 1311 } 1312 // Otherwise insert the headers for content retrieval 1313 inserts := d.queue.Schedule(chunk, origin) 1314 if len(inserts) != len(chunk) { 1315 log.Debug("Stale headers") 1316 return errBadPeer 1317 } 1318 } 1319 headers = headers[limit:] 1320 origin += uint64(limit) 1321 } 1322 1323 // Update the highest block number we know if a higher one is found. 1324 d.syncStatsLock.Lock() 1325 if d.syncStatsChainHeight < origin { 1326 d.syncStatsChainHeight = origin - 1 1327 } 1328 d.syncStatsLock.Unlock() 1329 1330 // Signal the content downloaders of the availablility of new tasks 1331 for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { 1332 select { 1333 case ch <- true: 1334 default: 1335 } 1336 } 1337 } 1338 } 1339} 1340 1341// processFullSyncContent takes fetch results from the queue and imports them into the chain. 1342func (d *Downloader) processFullSyncContent() error { 1343 for { 1344 results := d.queue.Results(true) 1345 if len(results) == 0 { 1346 return nil 1347 } 1348 if d.chainInsertHook != nil { 1349 d.chainInsertHook(results) 1350 } 1351 if err := d.importBlockResults(results); err != nil { 1352 return err 1353 } 1354 } 1355} 1356 1357func (d *Downloader) importBlockResults(results []*fetchResult) error { 1358 // Check for any early termination requests 1359 if len(results) == 0 { 1360 return nil 1361 } 1362 select { 1363 case <-d.quitCh: 1364 return errCancelContentProcessing 1365 default: 1366 } 1367 // Retrieve the a batch of results to import 1368 first, last := results[0].Header, results[len(results)-1].Header 1369 log.Debug("Inserting downloaded chain", "items", len(results), 1370 "firstnum", first.Number, "firsthash", first.Hash(), 1371 "lastnum", last.Number, "lasthash", last.Hash(), 1372 ) 1373 blocks := make([]*types.Block, len(results)) 1374 for i, result := range results { 1375 blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) 1376 } 1377 if index, err := d.blockchain.InsertChain(blocks); err != nil { 1378 log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) 1379 return errInvalidChain 1380 } 1381 return nil 1382} 1383 1384// processFastSyncContent takes fetch results from the queue and writes them to the 1385// database. It also controls the synchronisation of state nodes of the pivot block. 1386func (d *Downloader) processFastSyncContent(latest *types.Header) error { 1387 // Start syncing state of the reported head block. This should get us most of 1388 // the state of the pivot block. 1389 stateSync := d.syncState(latest.Root) 1390 defer stateSync.Cancel() 1391 go func() { 1392 if err := stateSync.Wait(); err != nil && err != errCancelStateFetch { 1393 d.queue.Close() // wake up WaitResults 1394 } 1395 }() 1396 // Figure out the ideal pivot block. Note, that this goalpost may move if the 1397 // sync takes long enough for the chain head to move significantly. 1398 pivot := uint64(0) 1399 if height := latest.Number.Uint64(); height > uint64(fsMinFullBlocks) { 1400 pivot = height - uint64(fsMinFullBlocks) 1401 } 1402 // To cater for moving pivot points, track the pivot block and subsequently 1403 // accumulated download results separately. 1404 var ( 1405 oldPivot *fetchResult // Locked in pivot block, might change eventually 1406 oldTail []*fetchResult // Downloaded content after the pivot 1407 ) 1408 for { 1409 // Wait for the next batch of downloaded data to be available, and if the pivot 1410 // block became stale, move the goalpost 1411 results := d.queue.Results(oldPivot == nil) // Block if we're not monitoring pivot staleness 1412 if len(results) == 0 { 1413 // If pivot sync is done, stop 1414 if oldPivot == nil { 1415 return stateSync.Cancel() 1416 } 1417 // If sync failed, stop 1418 select { 1419 case <-d.cancelCh: 1420 return stateSync.Cancel() 1421 default: 1422 } 1423 } 1424 if d.chainInsertHook != nil { 1425 d.chainInsertHook(results) 1426 } 1427 if oldPivot != nil { 1428 results = append(append([]*fetchResult{oldPivot}, oldTail...), results...) 1429 } 1430 // Split around the pivot block and process the two sides via fast/full sync 1431 if atomic.LoadInt32(&d.committed) == 0 { 1432 latest = results[len(results)-1].Header 1433 if height := latest.Number.Uint64(); height > pivot+2*uint64(fsMinFullBlocks) { 1434 log.Warn("Pivot became stale, moving", "old", pivot, "new", height-uint64(fsMinFullBlocks)) 1435 pivot = height - uint64(fsMinFullBlocks) 1436 } 1437 } 1438 P, beforeP, afterP := splitAroundPivot(pivot, results) 1439 if err := d.commitFastSyncData(beforeP, stateSync); err != nil { 1440 return err 1441 } 1442 if P != nil { 1443 // If new pivot block found, cancel old state retrieval and restart 1444 if oldPivot != P { 1445 stateSync.Cancel() 1446 1447 stateSync = d.syncState(P.Header.Root) 1448 defer stateSync.Cancel() 1449 go func() { 1450 if err := stateSync.Wait(); err != nil && err != errCancelStateFetch { 1451 d.queue.Close() // wake up WaitResults 1452 } 1453 }() 1454 oldPivot = P 1455 } 1456 // Wait for completion, occasionally checking for pivot staleness 1457 select { 1458 case <-stateSync.done: 1459 if stateSync.err != nil { 1460 return stateSync.err 1461 } 1462 if err := d.commitPivotBlock(P); err != nil { 1463 return err 1464 } 1465 oldPivot = nil 1466 1467 case <-time.After(time.Second): 1468 oldTail = afterP 1469 continue 1470 } 1471 } 1472 // Fast sync done, pivot commit done, full import 1473 if err := d.importBlockResults(afterP); err != nil { 1474 return err 1475 } 1476 } 1477} 1478 1479func splitAroundPivot(pivot uint64, results []*fetchResult) (p *fetchResult, before, after []*fetchResult) { 1480 for _, result := range results { 1481 num := result.Header.Number.Uint64() 1482 switch { 1483 case num < pivot: 1484 before = append(before, result) 1485 case num == pivot: 1486 p = result 1487 default: 1488 after = append(after, result) 1489 } 1490 } 1491 return p, before, after 1492} 1493 1494func (d *Downloader) commitFastSyncData(results []*fetchResult, stateSync *stateSync) error { 1495 // Check for any early termination requests 1496 if len(results) == 0 { 1497 return nil 1498 } 1499 select { 1500 case <-d.quitCh: 1501 return errCancelContentProcessing 1502 case <-stateSync.done: 1503 if err := stateSync.Wait(); err != nil { 1504 return err 1505 } 1506 default: 1507 } 1508 // Retrieve the a batch of results to import 1509 first, last := results[0].Header, results[len(results)-1].Header 1510 log.Debug("Inserting fast-sync blocks", "items", len(results), 1511 "firstnum", first.Number, "firsthash", first.Hash(), 1512 "lastnumn", last.Number, "lasthash", last.Hash(), 1513 ) 1514 blocks := make([]*types.Block, len(results)) 1515 receipts := make([]types.Receipts, len(results)) 1516 for i, result := range results { 1517 blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) 1518 receipts[i] = result.Receipts 1519 } 1520 if index, err := d.blockchain.InsertReceiptChain(blocks, receipts); err != nil { 1521 log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) 1522 return errInvalidChain 1523 } 1524 return nil 1525} 1526 1527func (d *Downloader) commitPivotBlock(result *fetchResult) error { 1528 block := types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) 1529 log.Debug("Committing fast sync pivot as new head", "number", block.Number(), "hash", block.Hash()) 1530 if _, err := d.blockchain.InsertReceiptChain([]*types.Block{block}, []types.Receipts{result.Receipts}); err != nil { 1531 return err 1532 } 1533 if err := d.blockchain.FastSyncCommitHead(block.Hash()); err != nil { 1534 return err 1535 } 1536 atomic.StoreInt32(&d.committed, 1) 1537 return nil 1538} 1539 1540// DeliverHeaders injects a new batch of block headers received from a remote 1541// node into the download schedule. 1542func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) (err error) { 1543 return d.deliver(id, d.headerCh, &headerPack{id, headers}, headerInMeter, headerDropMeter) 1544} 1545 1546// DeliverBodies injects a new batch of block bodies received from a remote node. 1547func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transaction, uncles [][]*types.Header) (err error) { 1548 return d.deliver(id, d.bodyCh, &bodyPack{id, transactions, uncles}, bodyInMeter, bodyDropMeter) 1549} 1550 1551// DeliverReceipts injects a new batch of receipts received from a remote node. 1552func (d *Downloader) DeliverReceipts(id string, receipts [][]*types.Receipt) (err error) { 1553 return d.deliver(id, d.receiptCh, &receiptPack{id, receipts}, receiptInMeter, receiptDropMeter) 1554} 1555 1556// DeliverNodeData injects a new batch of node state data received from a remote node. 1557func (d *Downloader) DeliverNodeData(id string, data [][]byte) (err error) { 1558 return d.deliver(id, d.stateCh, &statePack{id, data}, stateInMeter, stateDropMeter) 1559} 1560 1561// deliver injects a new batch of data received from a remote node. 1562func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, inMeter, dropMeter metrics.Meter) (err error) { 1563 // Update the delivery metrics for both good and failed deliveries 1564 inMeter.Mark(int64(packet.Items())) 1565 defer func() { 1566 if err != nil { 1567 dropMeter.Mark(int64(packet.Items())) 1568 } 1569 }() 1570 // Deliver or abort if the sync is canceled while queuing 1571 d.cancelLock.RLock() 1572 cancel := d.cancelCh 1573 d.cancelLock.RUnlock() 1574 if cancel == nil { 1575 return errNoSyncActive 1576 } 1577 select { 1578 case destCh <- packet: 1579 return nil 1580 case <-cancel: 1581 return errNoSyncActive 1582 } 1583} 1584 1585// qosTuner is the quality of service tuning loop that occasionally gathers the 1586// peer latency statistics and updates the estimated request round trip time. 1587func (d *Downloader) qosTuner() { 1588 for { 1589 // Retrieve the current median RTT and integrate into the previoust target RTT 1590 rtt := time.Duration((1-qosTuningImpact)*float64(atomic.LoadUint64(&d.rttEstimate)) + qosTuningImpact*float64(d.peers.medianRTT())) 1591 atomic.StoreUint64(&d.rttEstimate, uint64(rtt)) 1592 1593 // A new RTT cycle passed, increase our confidence in the estimated RTT 1594 conf := atomic.LoadUint64(&d.rttConfidence) 1595 conf = conf + (1000000-conf)/2 1596 atomic.StoreUint64(&d.rttConfidence, conf) 1597 1598 // Log the new QoS values and sleep until the next RTT 1599 log.Debug("Recalculated downloader QoS values", "rtt", rtt, "confidence", float64(conf)/1000000.0, "ttl", d.requestTTL()) 1600 select { 1601 case <-d.quitCh: 1602 return 1603 case <-time.After(rtt): 1604 } 1605 } 1606} 1607 1608// qosReduceConfidence is meant to be called when a new peer joins the downloader's 1609// peer set, needing to reduce the confidence we have in out QoS estimates. 1610func (d *Downloader) qosReduceConfidence() { 1611 // If we have a single peer, confidence is always 1 1612 peers := uint64(d.peers.Len()) 1613 if peers == 0 { 1614 // Ensure peer connectivity races don't catch us off guard 1615 return 1616 } 1617 if peers == 1 { 1618 atomic.StoreUint64(&d.rttConfidence, 1000000) 1619 return 1620 } 1621 // If we have a ton of peers, don't drop confidence) 1622 if peers >= uint64(qosConfidenceCap) { 1623 return 1624 } 1625 // Otherwise drop the confidence factor 1626 conf := atomic.LoadUint64(&d.rttConfidence) * (peers - 1) / peers 1627 if float64(conf)/1000000 < rttMinConfidence { 1628 conf = uint64(rttMinConfidence * 1000000) 1629 } 1630 atomic.StoreUint64(&d.rttConfidence, conf) 1631 1632 rtt := time.Duration(atomic.LoadUint64(&d.rttEstimate)) 1633 log.Debug("Relaxed downloader QoS values", "rtt", rtt, "confidence", float64(conf)/1000000.0, "ttl", d.requestTTL()) 1634} 1635 1636// requestRTT returns the current target round trip time for a download request 1637// to complete in. 1638// 1639// Note, the returned RTT is .9 of the actually estimated RTT. The reason is that 1640// the downloader tries to adapt queries to the RTT, so multiple RTT values can 1641// be adapted to, but smaller ones are preferred (stabler download stream). 1642func (d *Downloader) requestRTT() time.Duration { 1643 return time.Duration(atomic.LoadUint64(&d.rttEstimate)) * 9 / 10 1644} 1645 1646// requestTTL returns the current timeout allowance for a single download request 1647// to finish under. 1648func (d *Downloader) requestTTL() time.Duration { 1649 var ( 1650 rtt = time.Duration(atomic.LoadUint64(&d.rttEstimate)) 1651 conf = float64(atomic.LoadUint64(&d.rttConfidence)) / 1000000.0 1652 ) 1653 ttl := time.Duration(ttlScaling) * time.Duration(float64(rtt)/conf) 1654 if ttl > ttlLimit { 1655 ttl = ttlLimit 1656 } 1657 return ttl 1658} 1659