1// Copyright (c) 2013-2017 The btcsuite developers 2// Use of this source code is governed by an ISC 3// license that can be found in the LICENSE file. 4 5package netsync 6 7import ( 8 "container/list" 9 "math/rand" 10 "net" 11 "sync" 12 "sync/atomic" 13 "time" 14 15 "github.com/btcsuite/btcd/blockchain" 16 "github.com/btcsuite/btcd/chaincfg" 17 "github.com/btcsuite/btcd/chaincfg/chainhash" 18 "github.com/btcsuite/btcd/database" 19 "github.com/btcsuite/btcd/mempool" 20 peerpkg "github.com/btcsuite/btcd/peer" 21 "github.com/btcsuite/btcd/wire" 22 "github.com/btcsuite/btcutil" 23) 24 25const ( 26 // minInFlightBlocks is the minimum number of blocks that should be 27 // in the request queue for headers-first mode before requesting 28 // more. 29 minInFlightBlocks = 10 30 31 // maxRejectedTxns is the maximum number of rejected transactions 32 // hashes to store in memory. 33 maxRejectedTxns = 1000 34 35 // maxRequestedBlocks is the maximum number of requested block 36 // hashes to store in memory. 37 maxRequestedBlocks = wire.MaxInvPerMsg 38 39 // maxRequestedTxns is the maximum number of requested transactions 40 // hashes to store in memory. 41 maxRequestedTxns = wire.MaxInvPerMsg 42 43 // maxStallDuration is the time after which we will disconnect our 44 // current sync peer if we haven't made progress. 45 maxStallDuration = 3 * time.Minute 46 47 // stallSampleInterval the interval at which we will check to see if our 48 // sync has stalled. 49 stallSampleInterval = 30 * time.Second 50) 51 52// zeroHash is the zero value hash (all zeros). It is defined as a convenience. 53var zeroHash chainhash.Hash 54 55// newPeerMsg signifies a newly connected peer to the block handler. 56type newPeerMsg struct { 57 peer *peerpkg.Peer 58} 59 60// blockMsg packages a bitcoin block message and the peer it came from together 61// so the block handler has access to that information. 62type blockMsg struct { 63 block *btcutil.Block 64 peer *peerpkg.Peer 65 reply chan struct{} 66} 67 68// invMsg packages a bitcoin inv message and the peer it came from together 69// so the block handler has access to that information. 70type invMsg struct { 71 inv *wire.MsgInv 72 peer *peerpkg.Peer 73} 74 75// headersMsg packages a bitcoin headers message and the peer it came from 76// together so the block handler has access to that information. 77type headersMsg struct { 78 headers *wire.MsgHeaders 79 peer *peerpkg.Peer 80} 81 82// donePeerMsg signifies a newly disconnected peer to the block handler. 83type donePeerMsg struct { 84 peer *peerpkg.Peer 85} 86 87// txMsg packages a bitcoin tx message and the peer it came from together 88// so the block handler has access to that information. 89type txMsg struct { 90 tx *btcutil.Tx 91 peer *peerpkg.Peer 92 reply chan struct{} 93} 94 95// getSyncPeerMsg is a message type to be sent across the message channel for 96// retrieving the current sync peer. 97type getSyncPeerMsg struct { 98 reply chan int32 99} 100 101// processBlockResponse is a response sent to the reply channel of a 102// processBlockMsg. 103type processBlockResponse struct { 104 isOrphan bool 105 err error 106} 107 108// processBlockMsg is a message type to be sent across the message channel 109// for requested a block is processed. Note this call differs from blockMsg 110// above in that blockMsg is intended for blocks that came from peers and have 111// extra handling whereas this message essentially is just a concurrent safe 112// way to call ProcessBlock on the internal block chain instance. 113type processBlockMsg struct { 114 block *btcutil.Block 115 flags blockchain.BehaviorFlags 116 reply chan processBlockResponse 117} 118 119// isCurrentMsg is a message type to be sent across the message channel for 120// requesting whether or not the sync manager believes it is synced with the 121// currently connected peers. 122type isCurrentMsg struct { 123 reply chan bool 124} 125 126// pauseMsg is a message type to be sent across the message channel for 127// pausing the sync manager. This effectively provides the caller with 128// exclusive access over the manager until a receive is performed on the 129// unpause channel. 130type pauseMsg struct { 131 unpause <-chan struct{} 132} 133 134// headerNode is used as a node in a list of headers that are linked together 135// between checkpoints. 136type headerNode struct { 137 height int32 138 hash *chainhash.Hash 139} 140 141// peerSyncState stores additional information that the SyncManager tracks 142// about a peer. 143type peerSyncState struct { 144 syncCandidate bool 145 requestQueue []*wire.InvVect 146 requestedTxns map[chainhash.Hash]struct{} 147 requestedBlocks map[chainhash.Hash]struct{} 148} 149 150// SyncManager is used to communicate block related messages with peers. The 151// SyncManager is started as by executing Start() in a goroutine. Once started, 152// it selects peers to sync from and starts the initial block download. Once the 153// chain is in sync, the SyncManager handles incoming block and header 154// notifications and relays announcements of new blocks to peers. 155type SyncManager struct { 156 peerNotifier PeerNotifier 157 started int32 158 shutdown int32 159 chain *blockchain.BlockChain 160 txMemPool *mempool.TxPool 161 chainParams *chaincfg.Params 162 progressLogger *blockProgressLogger 163 msgChan chan interface{} 164 wg sync.WaitGroup 165 quit chan struct{} 166 167 // These fields should only be accessed from the blockHandler thread 168 rejectedTxns map[chainhash.Hash]struct{} 169 requestedTxns map[chainhash.Hash]struct{} 170 requestedBlocks map[chainhash.Hash]struct{} 171 syncPeer *peerpkg.Peer 172 peerStates map[*peerpkg.Peer]*peerSyncState 173 lastProgressTime time.Time 174 175 // The following fields are used for headers-first mode. 176 headersFirstMode bool 177 headerList *list.List 178 startHeader *list.Element 179 nextCheckpoint *chaincfg.Checkpoint 180 181 // An optional fee estimator. 182 feeEstimator *mempool.FeeEstimator 183} 184 185// resetHeaderState sets the headers-first mode state to values appropriate for 186// syncing from a new peer. 187func (sm *SyncManager) resetHeaderState(newestHash *chainhash.Hash, newestHeight int32) { 188 sm.headersFirstMode = false 189 sm.headerList.Init() 190 sm.startHeader = nil 191 192 // When there is a next checkpoint, add an entry for the latest known 193 // block into the header pool. This allows the next downloaded header 194 // to prove it links to the chain properly. 195 if sm.nextCheckpoint != nil { 196 node := headerNode{height: newestHeight, hash: newestHash} 197 sm.headerList.PushBack(&node) 198 } 199} 200 201// findNextHeaderCheckpoint returns the next checkpoint after the passed height. 202// It returns nil when there is not one either because the height is already 203// later than the final checkpoint or some other reason such as disabled 204// checkpoints. 205func (sm *SyncManager) findNextHeaderCheckpoint(height int32) *chaincfg.Checkpoint { 206 checkpoints := sm.chain.Checkpoints() 207 if len(checkpoints) == 0 { 208 return nil 209 } 210 211 // There is no next checkpoint if the height is already after the final 212 // checkpoint. 213 finalCheckpoint := &checkpoints[len(checkpoints)-1] 214 if height >= finalCheckpoint.Height { 215 return nil 216 } 217 218 // Find the next checkpoint. 219 nextCheckpoint := finalCheckpoint 220 for i := len(checkpoints) - 2; i >= 0; i-- { 221 if height >= checkpoints[i].Height { 222 break 223 } 224 nextCheckpoint = &checkpoints[i] 225 } 226 return nextCheckpoint 227} 228 229// startSync will choose the best peer among the available candidate peers to 230// download/sync the blockchain from. When syncing is already running, it 231// simply returns. It also examines the candidates for any which are no longer 232// candidates and removes them as needed. 233func (sm *SyncManager) startSync() { 234 // Return now if we're already syncing. 235 if sm.syncPeer != nil { 236 return 237 } 238 239 // Once the segwit soft-fork package has activated, we only 240 // want to sync from peers which are witness enabled to ensure 241 // that we fully validate all blockchain data. 242 segwitActive, err := sm.chain.IsDeploymentActive(chaincfg.DeploymentSegwit) 243 if err != nil { 244 log.Errorf("Unable to query for segwit soft-fork state: %v", err) 245 return 246 } 247 248 best := sm.chain.BestSnapshot() 249 var higherPeers, equalPeers []*peerpkg.Peer 250 for peer, state := range sm.peerStates { 251 if !state.syncCandidate { 252 continue 253 } 254 255 if segwitActive && !peer.IsWitnessEnabled() { 256 log.Debugf("peer %v not witness enabled, skipping", peer) 257 continue 258 } 259 260 // Remove sync candidate peers that are no longer candidates due 261 // to passing their latest known block. NOTE: The < is 262 // intentional as opposed to <=. While technically the peer 263 // doesn't have a later block when it's equal, it will likely 264 // have one soon so it is a reasonable choice. It also allows 265 // the case where both are at 0 such as during regression test. 266 if peer.LastBlock() < best.Height { 267 state.syncCandidate = false 268 continue 269 } 270 271 // If the peer is at the same height as us, we'll add it a set 272 // of backup peers in case we do not find one with a higher 273 // height. If we are synced up with all of our peers, all of 274 // them will be in this set. 275 if peer.LastBlock() == best.Height { 276 equalPeers = append(equalPeers, peer) 277 continue 278 } 279 280 // This peer has a height greater than our own, we'll consider 281 // it in the set of better peers from which we'll randomly 282 // select. 283 higherPeers = append(higherPeers, peer) 284 } 285 286 // Pick randomly from the set of peers greater than our block height, 287 // falling back to a random peer of the same height if none are greater. 288 // 289 // TODO(conner): Use a better algorithm to ranking peers based on 290 // observed metrics and/or sync in parallel. 291 var bestPeer *peerpkg.Peer 292 switch { 293 case len(higherPeers) > 0: 294 bestPeer = higherPeers[rand.Intn(len(higherPeers))] 295 296 case len(equalPeers) > 0: 297 bestPeer = equalPeers[rand.Intn(len(equalPeers))] 298 } 299 300 // Start syncing from the best peer if one was selected. 301 if bestPeer != nil { 302 // Clear the requestedBlocks if the sync peer changes, otherwise 303 // we may ignore blocks we need that the last sync peer failed 304 // to send. 305 sm.requestedBlocks = make(map[chainhash.Hash]struct{}) 306 307 locator, err := sm.chain.LatestBlockLocator() 308 if err != nil { 309 log.Errorf("Failed to get block locator for the "+ 310 "latest block: %v", err) 311 return 312 } 313 314 log.Infof("Syncing to block height %d from peer %v", 315 bestPeer.LastBlock(), bestPeer.Addr()) 316 317 // When the current height is less than a known checkpoint we 318 // can use block headers to learn about which blocks comprise 319 // the chain up to the checkpoint and perform less validation 320 // for them. This is possible since each header contains the 321 // hash of the previous header and a merkle root. Therefore if 322 // we validate all of the received headers link together 323 // properly and the checkpoint hashes match, we can be sure the 324 // hashes for the blocks in between are accurate. Further, once 325 // the full blocks are downloaded, the merkle root is computed 326 // and compared against the value in the header which proves the 327 // full block hasn't been tampered with. 328 // 329 // Once we have passed the final checkpoint, or checkpoints are 330 // disabled, use standard inv messages learn about the blocks 331 // and fully validate them. Finally, regression test mode does 332 // not support the headers-first approach so do normal block 333 // downloads when in regression test mode. 334 if sm.nextCheckpoint != nil && 335 best.Height < sm.nextCheckpoint.Height && 336 sm.chainParams != &chaincfg.RegressionNetParams { 337 338 bestPeer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash) 339 sm.headersFirstMode = true 340 log.Infof("Downloading headers for blocks %d to "+ 341 "%d from peer %s", best.Height+1, 342 sm.nextCheckpoint.Height, bestPeer.Addr()) 343 } else { 344 bestPeer.PushGetBlocksMsg(locator, &zeroHash) 345 } 346 sm.syncPeer = bestPeer 347 348 // Reset the last progress time now that we have a non-nil 349 // syncPeer to avoid instantly detecting it as stalled in the 350 // event the progress time hasn't been updated recently. 351 sm.lastProgressTime = time.Now() 352 } else { 353 log.Warnf("No sync peer candidates available") 354 } 355} 356 357// isSyncCandidate returns whether or not the peer is a candidate to consider 358// syncing from. 359func (sm *SyncManager) isSyncCandidate(peer *peerpkg.Peer) bool { 360 // Typically a peer is not a candidate for sync if it's not a full node, 361 // however regression test is special in that the regression tool is 362 // not a full node and still needs to be considered a sync candidate. 363 if sm.chainParams == &chaincfg.RegressionNetParams { 364 // The peer is not a candidate if it's not coming from localhost 365 // or the hostname can't be determined for some reason. 366 host, _, err := net.SplitHostPort(peer.Addr()) 367 if err != nil { 368 return false 369 } 370 371 if host != "127.0.0.1" && host != "localhost" { 372 return false 373 } 374 } else { 375 // The peer is not a candidate for sync if it's not a full 376 // node. Additionally, if the segwit soft-fork package has 377 // activated, then the peer must also be upgraded. 378 segwitActive, err := sm.chain.IsDeploymentActive(chaincfg.DeploymentSegwit) 379 if err != nil { 380 log.Errorf("Unable to query for segwit "+ 381 "soft-fork state: %v", err) 382 } 383 nodeServices := peer.Services() 384 if nodeServices&wire.SFNodeNetwork != wire.SFNodeNetwork || 385 (segwitActive && !peer.IsWitnessEnabled()) { 386 return false 387 } 388 } 389 390 // Candidate if all checks passed. 391 return true 392} 393 394// handleNewPeerMsg deals with new peers that have signalled they may 395// be considered as a sync peer (they have already successfully negotiated). It 396// also starts syncing if needed. It is invoked from the syncHandler goroutine. 397func (sm *SyncManager) handleNewPeerMsg(peer *peerpkg.Peer) { 398 // Ignore if in the process of shutting down. 399 if atomic.LoadInt32(&sm.shutdown) != 0 { 400 return 401 } 402 403 log.Infof("New valid peer %s (%s)", peer, peer.UserAgent()) 404 405 // Initialize the peer state 406 isSyncCandidate := sm.isSyncCandidate(peer) 407 sm.peerStates[peer] = &peerSyncState{ 408 syncCandidate: isSyncCandidate, 409 requestedTxns: make(map[chainhash.Hash]struct{}), 410 requestedBlocks: make(map[chainhash.Hash]struct{}), 411 } 412 413 // Start syncing by choosing the best candidate if needed. 414 if isSyncCandidate && sm.syncPeer == nil { 415 sm.startSync() 416 } 417} 418 419// handleStallSample will switch to a new sync peer if the current one has 420// stalled. This is detected when by comparing the last progress timestamp with 421// the current time, and disconnecting the peer if we stalled before reaching 422// their highest advertised block. 423func (sm *SyncManager) handleStallSample() { 424 if atomic.LoadInt32(&sm.shutdown) != 0 { 425 return 426 } 427 428 // If we don't have an active sync peer, exit early. 429 if sm.syncPeer == nil { 430 return 431 } 432 433 // If the stall timeout has not elapsed, exit early. 434 if time.Since(sm.lastProgressTime) <= maxStallDuration { 435 return 436 } 437 438 // Check to see that the peer's sync state exists. 439 state, exists := sm.peerStates[sm.syncPeer] 440 if !exists { 441 return 442 } 443 444 sm.clearRequestedState(state) 445 446 disconnectSyncPeer := sm.shouldDCStalledSyncPeer() 447 sm.updateSyncPeer(disconnectSyncPeer) 448} 449 450// shouldDCStalledSyncPeer determines whether or not we should disconnect a 451// stalled sync peer. If the peer has stalled and its reported height is greater 452// than our own best height, we will disconnect it. Otherwise, we will keep the 453// peer connected in case we are already at tip. 454func (sm *SyncManager) shouldDCStalledSyncPeer() bool { 455 lastBlock := sm.syncPeer.LastBlock() 456 startHeight := sm.syncPeer.StartingHeight() 457 458 var peerHeight int32 459 if lastBlock > startHeight { 460 peerHeight = lastBlock 461 } else { 462 peerHeight = startHeight 463 } 464 465 // If we've stalled out yet the sync peer reports having more blocks for 466 // us we will disconnect them. This allows us at tip to not disconnect 467 // peers when we are equal or they temporarily lag behind us. 468 best := sm.chain.BestSnapshot() 469 return peerHeight > best.Height 470} 471 472// handleDonePeerMsg deals with peers that have signalled they are done. It 473// removes the peer as a candidate for syncing and in the case where it was 474// the current sync peer, attempts to select a new best peer to sync from. It 475// is invoked from the syncHandler goroutine. 476func (sm *SyncManager) handleDonePeerMsg(peer *peerpkg.Peer) { 477 state, exists := sm.peerStates[peer] 478 if !exists { 479 log.Warnf("Received done peer message for unknown peer %s", peer) 480 return 481 } 482 483 // Remove the peer from the list of candidate peers. 484 delete(sm.peerStates, peer) 485 486 log.Infof("Lost peer %s", peer) 487 488 sm.clearRequestedState(state) 489 490 if peer == sm.syncPeer { 491 // Update the sync peer. The server has already disconnected the 492 // peer before signaling to the sync manager. 493 sm.updateSyncPeer(false) 494 } 495} 496 497// clearRequestedState wipes all expected transactions and blocks from the sync 498// manager's requested maps that were requested under a peer's sync state, This 499// allows them to be rerequested by a subsequent sync peer. 500func (sm *SyncManager) clearRequestedState(state *peerSyncState) { 501 // Remove requested transactions from the global map so that they will 502 // be fetched from elsewhere next time we get an inv. 503 for txHash := range state.requestedTxns { 504 delete(sm.requestedTxns, txHash) 505 } 506 507 // Remove requested blocks from the global map so that they will be 508 // fetched from elsewhere next time we get an inv. 509 // TODO: we could possibly here check which peers have these blocks 510 // and request them now to speed things up a little. 511 for blockHash := range state.requestedBlocks { 512 delete(sm.requestedBlocks, blockHash) 513 } 514} 515 516// updateSyncPeer choose a new sync peer to replace the current one. If 517// dcSyncPeer is true, this method will also disconnect the current sync peer. 518// If we are in header first mode, any header state related to prefetching is 519// also reset in preparation for the next sync peer. 520func (sm *SyncManager) updateSyncPeer(dcSyncPeer bool) { 521 log.Debugf("Updating sync peer, no progress for: %v", 522 time.Since(sm.lastProgressTime)) 523 524 // First, disconnect the current sync peer if requested. 525 if dcSyncPeer { 526 sm.syncPeer.Disconnect() 527 } 528 529 // Reset any header state before we choose our next active sync peer. 530 if sm.headersFirstMode { 531 best := sm.chain.BestSnapshot() 532 sm.resetHeaderState(&best.Hash, best.Height) 533 } 534 535 sm.syncPeer = nil 536 sm.startSync() 537} 538 539// handleTxMsg handles transaction messages from all peers. 540func (sm *SyncManager) handleTxMsg(tmsg *txMsg) { 541 peer := tmsg.peer 542 state, exists := sm.peerStates[peer] 543 if !exists { 544 log.Warnf("Received tx message from unknown peer %s", peer) 545 return 546 } 547 548 // NOTE: BitcoinJ, and possibly other wallets, don't follow the spec of 549 // sending an inventory message and allowing the remote peer to decide 550 // whether or not they want to request the transaction via a getdata 551 // message. Unfortunately, the reference implementation permits 552 // unrequested data, so it has allowed wallets that don't follow the 553 // spec to proliferate. While this is not ideal, there is no check here 554 // to disconnect peers for sending unsolicited transactions to provide 555 // interoperability. 556 txHash := tmsg.tx.Hash() 557 558 // Ignore transactions that we have already rejected. Do not 559 // send a reject message here because if the transaction was already 560 // rejected, the transaction was unsolicited. 561 if _, exists = sm.rejectedTxns[*txHash]; exists { 562 log.Debugf("Ignoring unsolicited previously rejected "+ 563 "transaction %v from %s", txHash, peer) 564 return 565 } 566 567 // Process the transaction to include validation, insertion in the 568 // memory pool, orphan handling, etc. 569 acceptedTxs, err := sm.txMemPool.ProcessTransaction(tmsg.tx, 570 true, true, mempool.Tag(peer.ID())) 571 572 // Remove transaction from request maps. Either the mempool/chain 573 // already knows about it and as such we shouldn't have any more 574 // instances of trying to fetch it, or we failed to insert and thus 575 // we'll retry next time we get an inv. 576 delete(state.requestedTxns, *txHash) 577 delete(sm.requestedTxns, *txHash) 578 579 if err != nil { 580 // Do not request this transaction again until a new block 581 // has been processed. 582 sm.rejectedTxns[*txHash] = struct{}{} 583 sm.limitMap(sm.rejectedTxns, maxRejectedTxns) 584 585 // When the error is a rule error, it means the transaction was 586 // simply rejected as opposed to something actually going wrong, 587 // so log it as such. Otherwise, something really did go wrong, 588 // so log it as an actual error. 589 if _, ok := err.(mempool.RuleError); ok { 590 log.Debugf("Rejected transaction %v from %s: %v", 591 txHash, peer, err) 592 } else { 593 log.Errorf("Failed to process transaction %v: %v", 594 txHash, err) 595 } 596 597 // Convert the error into an appropriate reject message and 598 // send it. 599 code, reason := mempool.ErrToRejectErr(err) 600 peer.PushRejectMsg(wire.CmdTx, code, reason, txHash, false) 601 return 602 } 603 604 sm.peerNotifier.AnnounceNewTransactions(acceptedTxs) 605} 606 607// current returns true if we believe we are synced with our peers, false if we 608// still have blocks to check 609func (sm *SyncManager) current() bool { 610 if !sm.chain.IsCurrent() { 611 return false 612 } 613 614 // if blockChain thinks we are current and we have no syncPeer it 615 // is probably right. 616 if sm.syncPeer == nil { 617 return true 618 } 619 620 // No matter what chain thinks, if we are below the block we are syncing 621 // to we are not current. 622 if sm.chain.BestSnapshot().Height < sm.syncPeer.LastBlock() { 623 return false 624 } 625 return true 626} 627 628// handleBlockMsg handles block messages from all peers. 629func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) { 630 peer := bmsg.peer 631 state, exists := sm.peerStates[peer] 632 if !exists { 633 log.Warnf("Received block message from unknown peer %s", peer) 634 return 635 } 636 637 // If we didn't ask for this block then the peer is misbehaving. 638 blockHash := bmsg.block.Hash() 639 if _, exists = state.requestedBlocks[*blockHash]; !exists { 640 // The regression test intentionally sends some blocks twice 641 // to test duplicate block insertion fails. Don't disconnect 642 // the peer or ignore the block when we're in regression test 643 // mode in this case so the chain code is actually fed the 644 // duplicate blocks. 645 if sm.chainParams != &chaincfg.RegressionNetParams { 646 log.Warnf("Got unrequested block %v from %s -- "+ 647 "disconnecting", blockHash, peer.Addr()) 648 peer.Disconnect() 649 return 650 } 651 } 652 653 // When in headers-first mode, if the block matches the hash of the 654 // first header in the list of headers that are being fetched, it's 655 // eligible for less validation since the headers have already been 656 // verified to link together and are valid up to the next checkpoint. 657 // Also, remove the list entry for all blocks except the checkpoint 658 // since it is needed to verify the next round of headers links 659 // properly. 660 isCheckpointBlock := false 661 behaviorFlags := blockchain.BFNone 662 if sm.headersFirstMode { 663 firstNodeEl := sm.headerList.Front() 664 if firstNodeEl != nil { 665 firstNode := firstNodeEl.Value.(*headerNode) 666 if blockHash.IsEqual(firstNode.hash) { 667 behaviorFlags |= blockchain.BFFastAdd 668 if firstNode.hash.IsEqual(sm.nextCheckpoint.Hash) { 669 isCheckpointBlock = true 670 } else { 671 sm.headerList.Remove(firstNodeEl) 672 } 673 } 674 } 675 } 676 677 // Remove block from request maps. Either chain will know about it and 678 // so we shouldn't have any more instances of trying to fetch it, or we 679 // will fail the insert and thus we'll retry next time we get an inv. 680 delete(state.requestedBlocks, *blockHash) 681 delete(sm.requestedBlocks, *blockHash) 682 683 // Process the block to include validation, best chain selection, orphan 684 // handling, etc. 685 _, isOrphan, err := sm.chain.ProcessBlock(bmsg.block, behaviorFlags) 686 if err != nil { 687 // When the error is a rule error, it means the block was simply 688 // rejected as opposed to something actually going wrong, so log 689 // it as such. Otherwise, something really did go wrong, so log 690 // it as an actual error. 691 if _, ok := err.(blockchain.RuleError); ok { 692 log.Infof("Rejected block %v from %s: %v", blockHash, 693 peer, err) 694 } else { 695 log.Errorf("Failed to process block %v: %v", 696 blockHash, err) 697 } 698 if dbErr, ok := err.(database.Error); ok && dbErr.ErrorCode == 699 database.ErrCorruption { 700 panic(dbErr) 701 } 702 703 // Convert the error into an appropriate reject message and 704 // send it. 705 code, reason := mempool.ErrToRejectErr(err) 706 peer.PushRejectMsg(wire.CmdBlock, code, reason, blockHash, false) 707 return 708 } 709 710 // Meta-data about the new block this peer is reporting. We use this 711 // below to update this peer's latest block height and the heights of 712 // other peers based on their last announced block hash. This allows us 713 // to dynamically update the block heights of peers, avoiding stale 714 // heights when looking for a new sync peer. Upon acceptance of a block 715 // or recognition of an orphan, we also use this information to update 716 // the block heights over other peers who's invs may have been ignored 717 // if we are actively syncing while the chain is not yet current or 718 // who may have lost the lock announcement race. 719 var heightUpdate int32 720 var blkHashUpdate *chainhash.Hash 721 722 // Request the parents for the orphan block from the peer that sent it. 723 if isOrphan { 724 // We've just received an orphan block from a peer. In order 725 // to update the height of the peer, we try to extract the 726 // block height from the scriptSig of the coinbase transaction. 727 // Extraction is only attempted if the block's version is 728 // high enough (ver 2+). 729 header := &bmsg.block.MsgBlock().Header 730 if blockchain.ShouldHaveSerializedBlockHeight(header) { 731 coinbaseTx := bmsg.block.Transactions()[0] 732 cbHeight, err := blockchain.ExtractCoinbaseHeight(coinbaseTx) 733 if err != nil { 734 log.Warnf("Unable to extract height from "+ 735 "coinbase tx: %v", err) 736 } else { 737 log.Debugf("Extracted height of %v from "+ 738 "orphan block", cbHeight) 739 heightUpdate = cbHeight 740 blkHashUpdate = blockHash 741 } 742 } 743 744 orphanRoot := sm.chain.GetOrphanRoot(blockHash) 745 locator, err := sm.chain.LatestBlockLocator() 746 if err != nil { 747 log.Warnf("Failed to get block locator for the "+ 748 "latest block: %v", err) 749 } else { 750 peer.PushGetBlocksMsg(locator, orphanRoot) 751 } 752 } else { 753 if peer == sm.syncPeer { 754 sm.lastProgressTime = time.Now() 755 } 756 757 // When the block is not an orphan, log information about it and 758 // update the chain state. 759 sm.progressLogger.LogBlockHeight(bmsg.block) 760 761 // Update this peer's latest block height, for future 762 // potential sync node candidacy. 763 best := sm.chain.BestSnapshot() 764 heightUpdate = best.Height 765 blkHashUpdate = &best.Hash 766 767 // Clear the rejected transactions. 768 sm.rejectedTxns = make(map[chainhash.Hash]struct{}) 769 } 770 771 // Update the block height for this peer. But only send a message to 772 // the server for updating peer heights if this is an orphan or our 773 // chain is "current". This avoids sending a spammy amount of messages 774 // if we're syncing the chain from scratch. 775 if blkHashUpdate != nil && heightUpdate != 0 { 776 peer.UpdateLastBlockHeight(heightUpdate) 777 if isOrphan || sm.current() { 778 go sm.peerNotifier.UpdatePeerHeights(blkHashUpdate, heightUpdate, 779 peer) 780 } 781 } 782 783 // Nothing more to do if we aren't in headers-first mode. 784 if !sm.headersFirstMode { 785 return 786 } 787 788 // This is headers-first mode, so if the block is not a checkpoint 789 // request more blocks using the header list when the request queue is 790 // getting short. 791 if !isCheckpointBlock { 792 if sm.startHeader != nil && 793 len(state.requestedBlocks) < minInFlightBlocks { 794 sm.fetchHeaderBlocks() 795 } 796 return 797 } 798 799 // This is headers-first mode and the block is a checkpoint. When 800 // there is a next checkpoint, get the next round of headers by asking 801 // for headers starting from the block after this one up to the next 802 // checkpoint. 803 prevHeight := sm.nextCheckpoint.Height 804 prevHash := sm.nextCheckpoint.Hash 805 sm.nextCheckpoint = sm.findNextHeaderCheckpoint(prevHeight) 806 if sm.nextCheckpoint != nil { 807 locator := blockchain.BlockLocator([]*chainhash.Hash{prevHash}) 808 err := peer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash) 809 if err != nil { 810 log.Warnf("Failed to send getheaders message to "+ 811 "peer %s: %v", peer.Addr(), err) 812 return 813 } 814 log.Infof("Downloading headers for blocks %d to %d from "+ 815 "peer %s", prevHeight+1, sm.nextCheckpoint.Height, 816 sm.syncPeer.Addr()) 817 return 818 } 819 820 // This is headers-first mode, the block is a checkpoint, and there are 821 // no more checkpoints, so switch to normal mode by requesting blocks 822 // from the block after this one up to the end of the chain (zero hash). 823 sm.headersFirstMode = false 824 sm.headerList.Init() 825 log.Infof("Reached the final checkpoint -- switching to normal mode") 826 locator := blockchain.BlockLocator([]*chainhash.Hash{blockHash}) 827 err = peer.PushGetBlocksMsg(locator, &zeroHash) 828 if err != nil { 829 log.Warnf("Failed to send getblocks message to peer %s: %v", 830 peer.Addr(), err) 831 return 832 } 833} 834 835// fetchHeaderBlocks creates and sends a request to the syncPeer for the next 836// list of blocks to be downloaded based on the current list of headers. 837func (sm *SyncManager) fetchHeaderBlocks() { 838 // Nothing to do if there is no start header. 839 if sm.startHeader == nil { 840 log.Warnf("fetchHeaderBlocks called with no start header") 841 return 842 } 843 844 // Build up a getdata request for the list of blocks the headers 845 // describe. The size hint will be limited to wire.MaxInvPerMsg by 846 // the function, so no need to double check it here. 847 gdmsg := wire.NewMsgGetDataSizeHint(uint(sm.headerList.Len())) 848 numRequested := 0 849 for e := sm.startHeader; e != nil; e = e.Next() { 850 node, ok := e.Value.(*headerNode) 851 if !ok { 852 log.Warn("Header list node type is not a headerNode") 853 continue 854 } 855 856 iv := wire.NewInvVect(wire.InvTypeBlock, node.hash) 857 haveInv, err := sm.haveInventory(iv) 858 if err != nil { 859 log.Warnf("Unexpected failure when checking for "+ 860 "existing inventory during header block "+ 861 "fetch: %v", err) 862 } 863 if !haveInv { 864 syncPeerState := sm.peerStates[sm.syncPeer] 865 866 sm.requestedBlocks[*node.hash] = struct{}{} 867 syncPeerState.requestedBlocks[*node.hash] = struct{}{} 868 869 // If we're fetching from a witness enabled peer 870 // post-fork, then ensure that we receive all the 871 // witness data in the blocks. 872 if sm.syncPeer.IsWitnessEnabled() { 873 iv.Type = wire.InvTypeWitnessBlock 874 } 875 876 gdmsg.AddInvVect(iv) 877 numRequested++ 878 } 879 sm.startHeader = e.Next() 880 if numRequested >= wire.MaxInvPerMsg { 881 break 882 } 883 } 884 if len(gdmsg.InvList) > 0 { 885 sm.syncPeer.QueueMessage(gdmsg, nil) 886 } 887} 888 889// handleHeadersMsg handles block header messages from all peers. Headers are 890// requested when performing a headers-first sync. 891func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) { 892 peer := hmsg.peer 893 _, exists := sm.peerStates[peer] 894 if !exists { 895 log.Warnf("Received headers message from unknown peer %s", peer) 896 return 897 } 898 899 // The remote peer is misbehaving if we didn't request headers. 900 msg := hmsg.headers 901 numHeaders := len(msg.Headers) 902 if !sm.headersFirstMode { 903 log.Warnf("Got %d unrequested headers from %s -- "+ 904 "disconnecting", numHeaders, peer.Addr()) 905 peer.Disconnect() 906 return 907 } 908 909 // Nothing to do for an empty headers message. 910 if numHeaders == 0 { 911 return 912 } 913 914 // Process all of the received headers ensuring each one connects to the 915 // previous and that checkpoints match. 916 receivedCheckpoint := false 917 var finalHash *chainhash.Hash 918 for _, blockHeader := range msg.Headers { 919 blockHash := blockHeader.BlockHash() 920 finalHash = &blockHash 921 922 // Ensure there is a previous header to compare against. 923 prevNodeEl := sm.headerList.Back() 924 if prevNodeEl == nil { 925 log.Warnf("Header list does not contain a previous" + 926 "element as expected -- disconnecting peer") 927 peer.Disconnect() 928 return 929 } 930 931 // Ensure the header properly connects to the previous one and 932 // add it to the list of headers. 933 node := headerNode{hash: &blockHash} 934 prevNode := prevNodeEl.Value.(*headerNode) 935 if prevNode.hash.IsEqual(&blockHeader.PrevBlock) { 936 node.height = prevNode.height + 1 937 e := sm.headerList.PushBack(&node) 938 if sm.startHeader == nil { 939 sm.startHeader = e 940 } 941 } else { 942 log.Warnf("Received block header that does not "+ 943 "properly connect to the chain from peer %s "+ 944 "-- disconnecting", peer.Addr()) 945 peer.Disconnect() 946 return 947 } 948 949 // Verify the header at the next checkpoint height matches. 950 if node.height == sm.nextCheckpoint.Height { 951 if node.hash.IsEqual(sm.nextCheckpoint.Hash) { 952 receivedCheckpoint = true 953 log.Infof("Verified downloaded block "+ 954 "header against checkpoint at height "+ 955 "%d/hash %s", node.height, node.hash) 956 } else { 957 log.Warnf("Block header at height %d/hash "+ 958 "%s from peer %s does NOT match "+ 959 "expected checkpoint hash of %s -- "+ 960 "disconnecting", node.height, 961 node.hash, peer.Addr(), 962 sm.nextCheckpoint.Hash) 963 peer.Disconnect() 964 return 965 } 966 break 967 } 968 } 969 970 // When this header is a checkpoint, switch to fetching the blocks for 971 // all of the headers since the last checkpoint. 972 if receivedCheckpoint { 973 // Since the first entry of the list is always the final block 974 // that is already in the database and is only used to ensure 975 // the next header links properly, it must be removed before 976 // fetching the blocks. 977 sm.headerList.Remove(sm.headerList.Front()) 978 log.Infof("Received %v block headers: Fetching blocks", 979 sm.headerList.Len()) 980 sm.progressLogger.SetLastLogTime(time.Now()) 981 sm.fetchHeaderBlocks() 982 return 983 } 984 985 // This header is not a checkpoint, so request the next batch of 986 // headers starting from the latest known header and ending with the 987 // next checkpoint. 988 locator := blockchain.BlockLocator([]*chainhash.Hash{finalHash}) 989 err := peer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash) 990 if err != nil { 991 log.Warnf("Failed to send getheaders message to "+ 992 "peer %s: %v", peer.Addr(), err) 993 return 994 } 995} 996 997// haveInventory returns whether or not the inventory represented by the passed 998// inventory vector is known. This includes checking all of the various places 999// inventory can be when it is in different states such as blocks that are part 1000// of the main chain, on a side chain, in the orphan pool, and transactions that 1001// are in the memory pool (either the main pool or orphan pool). 1002func (sm *SyncManager) haveInventory(invVect *wire.InvVect) (bool, error) { 1003 switch invVect.Type { 1004 case wire.InvTypeWitnessBlock: 1005 fallthrough 1006 case wire.InvTypeBlock: 1007 // Ask chain if the block is known to it in any form (main 1008 // chain, side chain, or orphan). 1009 return sm.chain.HaveBlock(&invVect.Hash) 1010 1011 case wire.InvTypeWitnessTx: 1012 fallthrough 1013 case wire.InvTypeTx: 1014 // Ask the transaction memory pool if the transaction is known 1015 // to it in any form (main pool or orphan). 1016 if sm.txMemPool.HaveTransaction(&invVect.Hash) { 1017 return true, nil 1018 } 1019 1020 // Check if the transaction exists from the point of view of the 1021 // end of the main chain. Note that this is only a best effort 1022 // since it is expensive to check existence of every output and 1023 // the only purpose of this check is to avoid downloading 1024 // already known transactions. Only the first two outputs are 1025 // checked because the vast majority of transactions consist of 1026 // two outputs where one is some form of "pay-to-somebody-else" 1027 // and the other is a change output. 1028 prevOut := wire.OutPoint{Hash: invVect.Hash} 1029 for i := uint32(0); i < 2; i++ { 1030 prevOut.Index = i 1031 entry, err := sm.chain.FetchUtxoEntry(prevOut) 1032 if err != nil { 1033 return false, err 1034 } 1035 if entry != nil && !entry.IsSpent() { 1036 return true, nil 1037 } 1038 } 1039 1040 return false, nil 1041 } 1042 1043 // The requested inventory is is an unsupported type, so just claim 1044 // it is known to avoid requesting it. 1045 return true, nil 1046} 1047 1048// handleInvMsg handles inv messages from all peers. 1049// We examine the inventory advertised by the remote peer and act accordingly. 1050func (sm *SyncManager) handleInvMsg(imsg *invMsg) { 1051 peer := imsg.peer 1052 state, exists := sm.peerStates[peer] 1053 if !exists { 1054 log.Warnf("Received inv message from unknown peer %s", peer) 1055 return 1056 } 1057 1058 // Attempt to find the final block in the inventory list. There may 1059 // not be one. 1060 lastBlock := -1 1061 invVects := imsg.inv.InvList 1062 for i := len(invVects) - 1; i >= 0; i-- { 1063 if invVects[i].Type == wire.InvTypeBlock { 1064 lastBlock = i 1065 break 1066 } 1067 } 1068 1069 // If this inv contains a block announcement, and this isn't coming from 1070 // our current sync peer or we're current, then update the last 1071 // announced block for this peer. We'll use this information later to 1072 // update the heights of peers based on blocks we've accepted that they 1073 // previously announced. 1074 if lastBlock != -1 && (peer != sm.syncPeer || sm.current()) { 1075 peer.UpdateLastAnnouncedBlock(&invVects[lastBlock].Hash) 1076 } 1077 1078 // Ignore invs from peers that aren't the sync if we are not current. 1079 // Helps prevent fetching a mass of orphans. 1080 if peer != sm.syncPeer && !sm.current() { 1081 return 1082 } 1083 1084 // If our chain is current and a peer announces a block we already 1085 // know of, then update their current block height. 1086 if lastBlock != -1 && sm.current() { 1087 blkHeight, err := sm.chain.BlockHeightByHash(&invVects[lastBlock].Hash) 1088 if err == nil { 1089 peer.UpdateLastBlockHeight(blkHeight) 1090 } 1091 } 1092 1093 // Request the advertised inventory if we don't already have it. Also, 1094 // request parent blocks of orphans if we receive one we already have. 1095 // Finally, attempt to detect potential stalls due to long side chains 1096 // we already have and request more blocks to prevent them. 1097 for i, iv := range invVects { 1098 // Ignore unsupported inventory types. 1099 switch iv.Type { 1100 case wire.InvTypeBlock: 1101 case wire.InvTypeTx: 1102 case wire.InvTypeWitnessBlock: 1103 case wire.InvTypeWitnessTx: 1104 default: 1105 continue 1106 } 1107 1108 // Add the inventory to the cache of known inventory 1109 // for the peer. 1110 peer.AddKnownInventory(iv) 1111 1112 // Ignore inventory when we're in headers-first mode. 1113 if sm.headersFirstMode { 1114 continue 1115 } 1116 1117 // Request the inventory if we don't already have it. 1118 haveInv, err := sm.haveInventory(iv) 1119 if err != nil { 1120 log.Warnf("Unexpected failure when checking for "+ 1121 "existing inventory during inv message "+ 1122 "processing: %v", err) 1123 continue 1124 } 1125 if !haveInv { 1126 if iv.Type == wire.InvTypeTx { 1127 // Skip the transaction if it has already been 1128 // rejected. 1129 if _, exists := sm.rejectedTxns[iv.Hash]; exists { 1130 continue 1131 } 1132 } 1133 1134 // Ignore invs block invs from non-witness enabled 1135 // peers, as after segwit activation we only want to 1136 // download from peers that can provide us full witness 1137 // data for blocks. 1138 if !peer.IsWitnessEnabled() && iv.Type == wire.InvTypeBlock { 1139 continue 1140 } 1141 1142 // Add it to the request queue. 1143 state.requestQueue = append(state.requestQueue, iv) 1144 continue 1145 } 1146 1147 if iv.Type == wire.InvTypeBlock { 1148 // The block is an orphan block that we already have. 1149 // When the existing orphan was processed, it requested 1150 // the missing parent blocks. When this scenario 1151 // happens, it means there were more blocks missing 1152 // than are allowed into a single inventory message. As 1153 // a result, once this peer requested the final 1154 // advertised block, the remote peer noticed and is now 1155 // resending the orphan block as an available block 1156 // to signal there are more missing blocks that need to 1157 // be requested. 1158 if sm.chain.IsKnownOrphan(&iv.Hash) { 1159 // Request blocks starting at the latest known 1160 // up to the root of the orphan that just came 1161 // in. 1162 orphanRoot := sm.chain.GetOrphanRoot(&iv.Hash) 1163 locator, err := sm.chain.LatestBlockLocator() 1164 if err != nil { 1165 log.Errorf("PEER: Failed to get block "+ 1166 "locator for the latest block: "+ 1167 "%v", err) 1168 continue 1169 } 1170 peer.PushGetBlocksMsg(locator, orphanRoot) 1171 continue 1172 } 1173 1174 // We already have the final block advertised by this 1175 // inventory message, so force a request for more. This 1176 // should only happen if we're on a really long side 1177 // chain. 1178 if i == lastBlock { 1179 // Request blocks after this one up to the 1180 // final one the remote peer knows about (zero 1181 // stop hash). 1182 locator := sm.chain.BlockLocatorFromHash(&iv.Hash) 1183 peer.PushGetBlocksMsg(locator, &zeroHash) 1184 } 1185 } 1186 } 1187 1188 // Request as much as possible at once. Anything that won't fit into 1189 // the request will be requested on the next inv message. 1190 numRequested := 0 1191 gdmsg := wire.NewMsgGetData() 1192 requestQueue := state.requestQueue 1193 for len(requestQueue) != 0 { 1194 iv := requestQueue[0] 1195 requestQueue[0] = nil 1196 requestQueue = requestQueue[1:] 1197 1198 switch iv.Type { 1199 case wire.InvTypeWitnessBlock: 1200 fallthrough 1201 case wire.InvTypeBlock: 1202 // Request the block if there is not already a pending 1203 // request. 1204 if _, exists := sm.requestedBlocks[iv.Hash]; !exists { 1205 sm.requestedBlocks[iv.Hash] = struct{}{} 1206 sm.limitMap(sm.requestedBlocks, maxRequestedBlocks) 1207 state.requestedBlocks[iv.Hash] = struct{}{} 1208 1209 if peer.IsWitnessEnabled() { 1210 iv.Type = wire.InvTypeWitnessBlock 1211 } 1212 1213 gdmsg.AddInvVect(iv) 1214 numRequested++ 1215 } 1216 1217 case wire.InvTypeWitnessTx: 1218 fallthrough 1219 case wire.InvTypeTx: 1220 // Request the transaction if there is not already a 1221 // pending request. 1222 if _, exists := sm.requestedTxns[iv.Hash]; !exists { 1223 sm.requestedTxns[iv.Hash] = struct{}{} 1224 sm.limitMap(sm.requestedTxns, maxRequestedTxns) 1225 state.requestedTxns[iv.Hash] = struct{}{} 1226 1227 // If the peer is capable, request the txn 1228 // including all witness data. 1229 if peer.IsWitnessEnabled() { 1230 iv.Type = wire.InvTypeWitnessTx 1231 } 1232 1233 gdmsg.AddInvVect(iv) 1234 numRequested++ 1235 } 1236 } 1237 1238 if numRequested >= wire.MaxInvPerMsg { 1239 break 1240 } 1241 } 1242 state.requestQueue = requestQueue 1243 if len(gdmsg.InvList) > 0 { 1244 peer.QueueMessage(gdmsg, nil) 1245 } 1246} 1247 1248// limitMap is a helper function for maps that require a maximum limit by 1249// evicting a random transaction if adding a new value would cause it to 1250// overflow the maximum allowed. 1251func (sm *SyncManager) limitMap(m map[chainhash.Hash]struct{}, limit int) { 1252 if len(m)+1 > limit { 1253 // Remove a random entry from the map. For most compilers, Go's 1254 // range statement iterates starting at a random item although 1255 // that is not 100% guaranteed by the spec. The iteration order 1256 // is not important here because an adversary would have to be 1257 // able to pull off preimage attacks on the hashing function in 1258 // order to target eviction of specific entries anyways. 1259 for txHash := range m { 1260 delete(m, txHash) 1261 return 1262 } 1263 } 1264} 1265 1266// blockHandler is the main handler for the sync manager. It must be run as a 1267// goroutine. It processes block and inv messages in a separate goroutine 1268// from the peer handlers so the block (MsgBlock) messages are handled by a 1269// single thread without needing to lock memory data structures. This is 1270// important because the sync manager controls which blocks are needed and how 1271// the fetching should proceed. 1272func (sm *SyncManager) blockHandler() { 1273 stallTicker := time.NewTicker(stallSampleInterval) 1274 defer stallTicker.Stop() 1275 1276out: 1277 for { 1278 select { 1279 case m := <-sm.msgChan: 1280 switch msg := m.(type) { 1281 case *newPeerMsg: 1282 sm.handleNewPeerMsg(msg.peer) 1283 1284 case *txMsg: 1285 sm.handleTxMsg(msg) 1286 msg.reply <- struct{}{} 1287 1288 case *blockMsg: 1289 sm.handleBlockMsg(msg) 1290 msg.reply <- struct{}{} 1291 1292 case *invMsg: 1293 sm.handleInvMsg(msg) 1294 1295 case *headersMsg: 1296 sm.handleHeadersMsg(msg) 1297 1298 case *donePeerMsg: 1299 sm.handleDonePeerMsg(msg.peer) 1300 1301 case getSyncPeerMsg: 1302 var peerID int32 1303 if sm.syncPeer != nil { 1304 peerID = sm.syncPeer.ID() 1305 } 1306 msg.reply <- peerID 1307 1308 case processBlockMsg: 1309 _, isOrphan, err := sm.chain.ProcessBlock( 1310 msg.block, msg.flags) 1311 if err != nil { 1312 msg.reply <- processBlockResponse{ 1313 isOrphan: false, 1314 err: err, 1315 } 1316 } 1317 1318 msg.reply <- processBlockResponse{ 1319 isOrphan: isOrphan, 1320 err: nil, 1321 } 1322 1323 case isCurrentMsg: 1324 msg.reply <- sm.current() 1325 1326 case pauseMsg: 1327 // Wait until the sender unpauses the manager. 1328 <-msg.unpause 1329 1330 default: 1331 log.Warnf("Invalid message type in block "+ 1332 "handler: %T", msg) 1333 } 1334 1335 case <-stallTicker.C: 1336 sm.handleStallSample() 1337 1338 case <-sm.quit: 1339 break out 1340 } 1341 } 1342 1343 sm.wg.Done() 1344 log.Trace("Block handler done") 1345} 1346 1347// handleBlockchainNotification handles notifications from blockchain. It does 1348// things such as request orphan block parents and relay accepted blocks to 1349// connected peers. 1350func (sm *SyncManager) handleBlockchainNotification(notification *blockchain.Notification) { 1351 switch notification.Type { 1352 // A block has been accepted into the block chain. Relay it to other 1353 // peers. 1354 case blockchain.NTBlockAccepted: 1355 // Don't relay if we are not current. Other peers that are 1356 // current should already know about it. 1357 if !sm.current() { 1358 return 1359 } 1360 1361 block, ok := notification.Data.(*btcutil.Block) 1362 if !ok { 1363 log.Warnf("Chain accepted notification is not a block.") 1364 break 1365 } 1366 1367 // Generate the inventory vector and relay it. 1368 iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash()) 1369 sm.peerNotifier.RelayInventory(iv, block.MsgBlock().Header) 1370 1371 // A block has been connected to the main block chain. 1372 case blockchain.NTBlockConnected: 1373 block, ok := notification.Data.(*btcutil.Block) 1374 if !ok { 1375 log.Warnf("Chain connected notification is not a block.") 1376 break 1377 } 1378 1379 // Remove all of the transactions (except the coinbase) in the 1380 // connected block from the transaction pool. Secondly, remove any 1381 // transactions which are now double spends as a result of these 1382 // new transactions. Finally, remove any transaction that is 1383 // no longer an orphan. Transactions which depend on a confirmed 1384 // transaction are NOT removed recursively because they are still 1385 // valid. 1386 for _, tx := range block.Transactions()[1:] { 1387 sm.txMemPool.RemoveTransaction(tx, false) 1388 sm.txMemPool.RemoveDoubleSpends(tx) 1389 sm.txMemPool.RemoveOrphan(tx) 1390 sm.peerNotifier.TransactionConfirmed(tx) 1391 acceptedTxs := sm.txMemPool.ProcessOrphans(tx) 1392 sm.peerNotifier.AnnounceNewTransactions(acceptedTxs) 1393 } 1394 1395 // Register block with the fee estimator, if it exists. 1396 if sm.feeEstimator != nil { 1397 err := sm.feeEstimator.RegisterBlock(block) 1398 1399 // If an error is somehow generated then the fee estimator 1400 // has entered an invalid state. Since it doesn't know how 1401 // to recover, create a new one. 1402 if err != nil { 1403 sm.feeEstimator = mempool.NewFeeEstimator( 1404 mempool.DefaultEstimateFeeMaxRollback, 1405 mempool.DefaultEstimateFeeMinRegisteredBlocks) 1406 } 1407 } 1408 1409 // A block has been disconnected from the main block chain. 1410 case blockchain.NTBlockDisconnected: 1411 block, ok := notification.Data.(*btcutil.Block) 1412 if !ok { 1413 log.Warnf("Chain disconnected notification is not a block.") 1414 break 1415 } 1416 1417 // Reinsert all of the transactions (except the coinbase) into 1418 // the transaction pool. 1419 for _, tx := range block.Transactions()[1:] { 1420 _, _, err := sm.txMemPool.MaybeAcceptTransaction(tx, 1421 false, false) 1422 if err != nil { 1423 // Remove the transaction and all transactions 1424 // that depend on it if it wasn't accepted into 1425 // the transaction pool. 1426 sm.txMemPool.RemoveTransaction(tx, true) 1427 } 1428 } 1429 1430 // Rollback previous block recorded by the fee estimator. 1431 if sm.feeEstimator != nil { 1432 sm.feeEstimator.Rollback(block.Hash()) 1433 } 1434 } 1435} 1436 1437// NewPeer informs the sync manager of a newly active peer. 1438func (sm *SyncManager) NewPeer(peer *peerpkg.Peer) { 1439 // Ignore if we are shutting down. 1440 if atomic.LoadInt32(&sm.shutdown) != 0 { 1441 return 1442 } 1443 sm.msgChan <- &newPeerMsg{peer: peer} 1444} 1445 1446// QueueTx adds the passed transaction message and peer to the block handling 1447// queue. Responds to the done channel argument after the tx message is 1448// processed. 1449func (sm *SyncManager) QueueTx(tx *btcutil.Tx, peer *peerpkg.Peer, done chan struct{}) { 1450 // Don't accept more transactions if we're shutting down. 1451 if atomic.LoadInt32(&sm.shutdown) != 0 { 1452 done <- struct{}{} 1453 return 1454 } 1455 1456 sm.msgChan <- &txMsg{tx: tx, peer: peer, reply: done} 1457} 1458 1459// QueueBlock adds the passed block message and peer to the block handling 1460// queue. Responds to the done channel argument after the block message is 1461// processed. 1462func (sm *SyncManager) QueueBlock(block *btcutil.Block, peer *peerpkg.Peer, done chan struct{}) { 1463 // Don't accept more blocks if we're shutting down. 1464 if atomic.LoadInt32(&sm.shutdown) != 0 { 1465 done <- struct{}{} 1466 return 1467 } 1468 1469 sm.msgChan <- &blockMsg{block: block, peer: peer, reply: done} 1470} 1471 1472// QueueInv adds the passed inv message and peer to the block handling queue. 1473func (sm *SyncManager) QueueInv(inv *wire.MsgInv, peer *peerpkg.Peer) { 1474 // No channel handling here because peers do not need to block on inv 1475 // messages. 1476 if atomic.LoadInt32(&sm.shutdown) != 0 { 1477 return 1478 } 1479 1480 sm.msgChan <- &invMsg{inv: inv, peer: peer} 1481} 1482 1483// QueueHeaders adds the passed headers message and peer to the block handling 1484// queue. 1485func (sm *SyncManager) QueueHeaders(headers *wire.MsgHeaders, peer *peerpkg.Peer) { 1486 // No channel handling here because peers do not need to block on 1487 // headers messages. 1488 if atomic.LoadInt32(&sm.shutdown) != 0 { 1489 return 1490 } 1491 1492 sm.msgChan <- &headersMsg{headers: headers, peer: peer} 1493} 1494 1495// DonePeer informs the blockmanager that a peer has disconnected. 1496func (sm *SyncManager) DonePeer(peer *peerpkg.Peer) { 1497 // Ignore if we are shutting down. 1498 if atomic.LoadInt32(&sm.shutdown) != 0 { 1499 return 1500 } 1501 1502 sm.msgChan <- &donePeerMsg{peer: peer} 1503} 1504 1505// Start begins the core block handler which processes block and inv messages. 1506func (sm *SyncManager) Start() { 1507 // Already started? 1508 if atomic.AddInt32(&sm.started, 1) != 1 { 1509 return 1510 } 1511 1512 log.Trace("Starting sync manager") 1513 sm.wg.Add(1) 1514 go sm.blockHandler() 1515} 1516 1517// Stop gracefully shuts down the sync manager by stopping all asynchronous 1518// handlers and waiting for them to finish. 1519func (sm *SyncManager) Stop() error { 1520 if atomic.AddInt32(&sm.shutdown, 1) != 1 { 1521 log.Warnf("Sync manager is already in the process of " + 1522 "shutting down") 1523 return nil 1524 } 1525 1526 log.Infof("Sync manager shutting down") 1527 close(sm.quit) 1528 sm.wg.Wait() 1529 return nil 1530} 1531 1532// SyncPeerID returns the ID of the current sync peer, or 0 if there is none. 1533func (sm *SyncManager) SyncPeerID() int32 { 1534 reply := make(chan int32) 1535 sm.msgChan <- getSyncPeerMsg{reply: reply} 1536 return <-reply 1537} 1538 1539// ProcessBlock makes use of ProcessBlock on an internal instance of a block 1540// chain. 1541func (sm *SyncManager) ProcessBlock(block *btcutil.Block, flags blockchain.BehaviorFlags) (bool, error) { 1542 reply := make(chan processBlockResponse, 1) 1543 sm.msgChan <- processBlockMsg{block: block, flags: flags, reply: reply} 1544 response := <-reply 1545 return response.isOrphan, response.err 1546} 1547 1548// IsCurrent returns whether or not the sync manager believes it is synced with 1549// the connected peers. 1550func (sm *SyncManager) IsCurrent() bool { 1551 reply := make(chan bool) 1552 sm.msgChan <- isCurrentMsg{reply: reply} 1553 return <-reply 1554} 1555 1556// Pause pauses the sync manager until the returned channel is closed. 1557// 1558// Note that while paused, all peer and block processing is halted. The 1559// message sender should avoid pausing the sync manager for long durations. 1560func (sm *SyncManager) Pause() chan<- struct{} { 1561 c := make(chan struct{}) 1562 sm.msgChan <- pauseMsg{c} 1563 return c 1564} 1565 1566// New constructs a new SyncManager. Use Start to begin processing asynchronous 1567// block, tx, and inv updates. 1568func New(config *Config) (*SyncManager, error) { 1569 sm := SyncManager{ 1570 peerNotifier: config.PeerNotifier, 1571 chain: config.Chain, 1572 txMemPool: config.TxMemPool, 1573 chainParams: config.ChainParams, 1574 rejectedTxns: make(map[chainhash.Hash]struct{}), 1575 requestedTxns: make(map[chainhash.Hash]struct{}), 1576 requestedBlocks: make(map[chainhash.Hash]struct{}), 1577 peerStates: make(map[*peerpkg.Peer]*peerSyncState), 1578 progressLogger: newBlockProgressLogger("Processed", log), 1579 msgChan: make(chan interface{}, config.MaxPeers*3), 1580 headerList: list.New(), 1581 quit: make(chan struct{}), 1582 feeEstimator: config.FeeEstimator, 1583 } 1584 1585 best := sm.chain.BestSnapshot() 1586 if !config.DisableCheckpoints { 1587 // Initialize the next checkpoint based on the current height. 1588 sm.nextCheckpoint = sm.findNextHeaderCheckpoint(best.Height) 1589 if sm.nextCheckpoint != nil { 1590 sm.resetHeaderState(&best.Hash, best.Height) 1591 } 1592 } else { 1593 log.Info("Checkpoints are disabled") 1594 } 1595 1596 sm.chain.Subscribe(sm.handleBlockchainNotification) 1597 1598 return &sm, nil 1599} 1600