1// Copyright (c) 2013-2017 The btcsuite developers
2// Use of this source code is governed by an ISC
3// license that can be found in the LICENSE file.
4
5package netsync
6
7import (
8	"container/list"
9	"math/rand"
10	"net"
11	"sync"
12	"sync/atomic"
13	"time"
14
15	"github.com/btcsuite/btcd/blockchain"
16	"github.com/btcsuite/btcd/chaincfg"
17	"github.com/btcsuite/btcd/chaincfg/chainhash"
18	"github.com/btcsuite/btcd/database"
19	"github.com/btcsuite/btcd/mempool"
20	peerpkg "github.com/btcsuite/btcd/peer"
21	"github.com/btcsuite/btcd/wire"
22	"github.com/btcsuite/btcutil"
23)
24
25const (
26	// minInFlightBlocks is the minimum number of blocks that should be
27	// in the request queue for headers-first mode before requesting
28	// more.
29	minInFlightBlocks = 10
30
31	// maxRejectedTxns is the maximum number of rejected transactions
32	// hashes to store in memory.
33	maxRejectedTxns = 1000
34
35	// maxRequestedBlocks is the maximum number of requested block
36	// hashes to store in memory.
37	maxRequestedBlocks = wire.MaxInvPerMsg
38
39	// maxRequestedTxns is the maximum number of requested transactions
40	// hashes to store in memory.
41	maxRequestedTxns = wire.MaxInvPerMsg
42
43	// maxStallDuration is the time after which we will disconnect our
44	// current sync peer if we haven't made progress.
45	maxStallDuration = 3 * time.Minute
46
47	// stallSampleInterval the interval at which we will check to see if our
48	// sync has stalled.
49	stallSampleInterval = 30 * time.Second
50)
51
52// zeroHash is the zero value hash (all zeros).  It is defined as a convenience.
53var zeroHash chainhash.Hash
54
55// newPeerMsg signifies a newly connected peer to the block handler.
56type newPeerMsg struct {
57	peer *peerpkg.Peer
58}
59
60// blockMsg packages a bitcoin block message and the peer it came from together
61// so the block handler has access to that information.
62type blockMsg struct {
63	block *btcutil.Block
64	peer  *peerpkg.Peer
65	reply chan struct{}
66}
67
68// invMsg packages a bitcoin inv message and the peer it came from together
69// so the block handler has access to that information.
70type invMsg struct {
71	inv  *wire.MsgInv
72	peer *peerpkg.Peer
73}
74
75// headersMsg packages a bitcoin headers message and the peer it came from
76// together so the block handler has access to that information.
77type headersMsg struct {
78	headers *wire.MsgHeaders
79	peer    *peerpkg.Peer
80}
81
82// donePeerMsg signifies a newly disconnected peer to the block handler.
83type donePeerMsg struct {
84	peer *peerpkg.Peer
85}
86
87// txMsg packages a bitcoin tx message and the peer it came from together
88// so the block handler has access to that information.
89type txMsg struct {
90	tx    *btcutil.Tx
91	peer  *peerpkg.Peer
92	reply chan struct{}
93}
94
95// getSyncPeerMsg is a message type to be sent across the message channel for
96// retrieving the current sync peer.
97type getSyncPeerMsg struct {
98	reply chan int32
99}
100
101// processBlockResponse is a response sent to the reply channel of a
102// processBlockMsg.
103type processBlockResponse struct {
104	isOrphan bool
105	err      error
106}
107
108// processBlockMsg is a message type to be sent across the message channel
109// for requested a block is processed.  Note this call differs from blockMsg
110// above in that blockMsg is intended for blocks that came from peers and have
111// extra handling whereas this message essentially is just a concurrent safe
112// way to call ProcessBlock on the internal block chain instance.
113type processBlockMsg struct {
114	block *btcutil.Block
115	flags blockchain.BehaviorFlags
116	reply chan processBlockResponse
117}
118
119// isCurrentMsg is a message type to be sent across the message channel for
120// requesting whether or not the sync manager believes it is synced with the
121// currently connected peers.
122type isCurrentMsg struct {
123	reply chan bool
124}
125
126// pauseMsg is a message type to be sent across the message channel for
127// pausing the sync manager.  This effectively provides the caller with
128// exclusive access over the manager until a receive is performed on the
129// unpause channel.
130type pauseMsg struct {
131	unpause <-chan struct{}
132}
133
134// headerNode is used as a node in a list of headers that are linked together
135// between checkpoints.
136type headerNode struct {
137	height int32
138	hash   *chainhash.Hash
139}
140
141// peerSyncState stores additional information that the SyncManager tracks
142// about a peer.
143type peerSyncState struct {
144	syncCandidate   bool
145	requestQueue    []*wire.InvVect
146	requestedTxns   map[chainhash.Hash]struct{}
147	requestedBlocks map[chainhash.Hash]struct{}
148}
149
150// SyncManager is used to communicate block related messages with peers. The
151// SyncManager is started as by executing Start() in a goroutine. Once started,
152// it selects peers to sync from and starts the initial block download. Once the
153// chain is in sync, the SyncManager handles incoming block and header
154// notifications and relays announcements of new blocks to peers.
155type SyncManager struct {
156	peerNotifier   PeerNotifier
157	started        int32
158	shutdown       int32
159	chain          *blockchain.BlockChain
160	txMemPool      *mempool.TxPool
161	chainParams    *chaincfg.Params
162	progressLogger *blockProgressLogger
163	msgChan        chan interface{}
164	wg             sync.WaitGroup
165	quit           chan struct{}
166
167	// These fields should only be accessed from the blockHandler thread
168	rejectedTxns     map[chainhash.Hash]struct{}
169	requestedTxns    map[chainhash.Hash]struct{}
170	requestedBlocks  map[chainhash.Hash]struct{}
171	syncPeer         *peerpkg.Peer
172	peerStates       map[*peerpkg.Peer]*peerSyncState
173	lastProgressTime time.Time
174
175	// The following fields are used for headers-first mode.
176	headersFirstMode bool
177	headerList       *list.List
178	startHeader      *list.Element
179	nextCheckpoint   *chaincfg.Checkpoint
180
181	// An optional fee estimator.
182	feeEstimator *mempool.FeeEstimator
183}
184
185// resetHeaderState sets the headers-first mode state to values appropriate for
186// syncing from a new peer.
187func (sm *SyncManager) resetHeaderState(newestHash *chainhash.Hash, newestHeight int32) {
188	sm.headersFirstMode = false
189	sm.headerList.Init()
190	sm.startHeader = nil
191
192	// When there is a next checkpoint, add an entry for the latest known
193	// block into the header pool.  This allows the next downloaded header
194	// to prove it links to the chain properly.
195	if sm.nextCheckpoint != nil {
196		node := headerNode{height: newestHeight, hash: newestHash}
197		sm.headerList.PushBack(&node)
198	}
199}
200
201// findNextHeaderCheckpoint returns the next checkpoint after the passed height.
202// It returns nil when there is not one either because the height is already
203// later than the final checkpoint or some other reason such as disabled
204// checkpoints.
205func (sm *SyncManager) findNextHeaderCheckpoint(height int32) *chaincfg.Checkpoint {
206	checkpoints := sm.chain.Checkpoints()
207	if len(checkpoints) == 0 {
208		return nil
209	}
210
211	// There is no next checkpoint if the height is already after the final
212	// checkpoint.
213	finalCheckpoint := &checkpoints[len(checkpoints)-1]
214	if height >= finalCheckpoint.Height {
215		return nil
216	}
217
218	// Find the next checkpoint.
219	nextCheckpoint := finalCheckpoint
220	for i := len(checkpoints) - 2; i >= 0; i-- {
221		if height >= checkpoints[i].Height {
222			break
223		}
224		nextCheckpoint = &checkpoints[i]
225	}
226	return nextCheckpoint
227}
228
229// startSync will choose the best peer among the available candidate peers to
230// download/sync the blockchain from.  When syncing is already running, it
231// simply returns.  It also examines the candidates for any which are no longer
232// candidates and removes them as needed.
233func (sm *SyncManager) startSync() {
234	// Return now if we're already syncing.
235	if sm.syncPeer != nil {
236		return
237	}
238
239	// Once the segwit soft-fork package has activated, we only
240	// want to sync from peers which are witness enabled to ensure
241	// that we fully validate all blockchain data.
242	segwitActive, err := sm.chain.IsDeploymentActive(chaincfg.DeploymentSegwit)
243	if err != nil {
244		log.Errorf("Unable to query for segwit soft-fork state: %v", err)
245		return
246	}
247
248	best := sm.chain.BestSnapshot()
249	var higherPeers, equalPeers []*peerpkg.Peer
250	for peer, state := range sm.peerStates {
251		if !state.syncCandidate {
252			continue
253		}
254
255		if segwitActive && !peer.IsWitnessEnabled() {
256			log.Debugf("peer %v not witness enabled, skipping", peer)
257			continue
258		}
259
260		// Remove sync candidate peers that are no longer candidates due
261		// to passing their latest known block.  NOTE: The < is
262		// intentional as opposed to <=.  While technically the peer
263		// doesn't have a later block when it's equal, it will likely
264		// have one soon so it is a reasonable choice.  It also allows
265		// the case where both are at 0 such as during regression test.
266		if peer.LastBlock() < best.Height {
267			state.syncCandidate = false
268			continue
269		}
270
271		// If the peer is at the same height as us, we'll add it a set
272		// of backup peers in case we do not find one with a higher
273		// height. If we are synced up with all of our peers, all of
274		// them will be in this set.
275		if peer.LastBlock() == best.Height {
276			equalPeers = append(equalPeers, peer)
277			continue
278		}
279
280		// This peer has a height greater than our own, we'll consider
281		// it in the set of better peers from which we'll randomly
282		// select.
283		higherPeers = append(higherPeers, peer)
284	}
285
286	// Pick randomly from the set of peers greater than our block height,
287	// falling back to a random peer of the same height if none are greater.
288	//
289	// TODO(conner): Use a better algorithm to ranking peers based on
290	// observed metrics and/or sync in parallel.
291	var bestPeer *peerpkg.Peer
292	switch {
293	case len(higherPeers) > 0:
294		bestPeer = higherPeers[rand.Intn(len(higherPeers))]
295
296	case len(equalPeers) > 0:
297		bestPeer = equalPeers[rand.Intn(len(equalPeers))]
298	}
299
300	// Start syncing from the best peer if one was selected.
301	if bestPeer != nil {
302		// Clear the requestedBlocks if the sync peer changes, otherwise
303		// we may ignore blocks we need that the last sync peer failed
304		// to send.
305		sm.requestedBlocks = make(map[chainhash.Hash]struct{})
306
307		locator, err := sm.chain.LatestBlockLocator()
308		if err != nil {
309			log.Errorf("Failed to get block locator for the "+
310				"latest block: %v", err)
311			return
312		}
313
314		log.Infof("Syncing to block height %d from peer %v",
315			bestPeer.LastBlock(), bestPeer.Addr())
316
317		// When the current height is less than a known checkpoint we
318		// can use block headers to learn about which blocks comprise
319		// the chain up to the checkpoint and perform less validation
320		// for them.  This is possible since each header contains the
321		// hash of the previous header and a merkle root.  Therefore if
322		// we validate all of the received headers link together
323		// properly and the checkpoint hashes match, we can be sure the
324		// hashes for the blocks in between are accurate.  Further, once
325		// the full blocks are downloaded, the merkle root is computed
326		// and compared against the value in the header which proves the
327		// full block hasn't been tampered with.
328		//
329		// Once we have passed the final checkpoint, or checkpoints are
330		// disabled, use standard inv messages learn about the blocks
331		// and fully validate them.  Finally, regression test mode does
332		// not support the headers-first approach so do normal block
333		// downloads when in regression test mode.
334		if sm.nextCheckpoint != nil &&
335			best.Height < sm.nextCheckpoint.Height &&
336			sm.chainParams != &chaincfg.RegressionNetParams {
337
338			bestPeer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash)
339			sm.headersFirstMode = true
340			log.Infof("Downloading headers for blocks %d to "+
341				"%d from peer %s", best.Height+1,
342				sm.nextCheckpoint.Height, bestPeer.Addr())
343		} else {
344			bestPeer.PushGetBlocksMsg(locator, &zeroHash)
345		}
346		sm.syncPeer = bestPeer
347
348		// Reset the last progress time now that we have a non-nil
349		// syncPeer to avoid instantly detecting it as stalled in the
350		// event the progress time hasn't been updated recently.
351		sm.lastProgressTime = time.Now()
352	} else {
353		log.Warnf("No sync peer candidates available")
354	}
355}
356
357// isSyncCandidate returns whether or not the peer is a candidate to consider
358// syncing from.
359func (sm *SyncManager) isSyncCandidate(peer *peerpkg.Peer) bool {
360	// Typically a peer is not a candidate for sync if it's not a full node,
361	// however regression test is special in that the regression tool is
362	// not a full node and still needs to be considered a sync candidate.
363	if sm.chainParams == &chaincfg.RegressionNetParams {
364		// The peer is not a candidate if it's not coming from localhost
365		// or the hostname can't be determined for some reason.
366		host, _, err := net.SplitHostPort(peer.Addr())
367		if err != nil {
368			return false
369		}
370
371		if host != "127.0.0.1" && host != "localhost" {
372			return false
373		}
374	} else {
375		// The peer is not a candidate for sync if it's not a full
376		// node. Additionally, if the segwit soft-fork package has
377		// activated, then the peer must also be upgraded.
378		segwitActive, err := sm.chain.IsDeploymentActive(chaincfg.DeploymentSegwit)
379		if err != nil {
380			log.Errorf("Unable to query for segwit "+
381				"soft-fork state: %v", err)
382		}
383		nodeServices := peer.Services()
384		if nodeServices&wire.SFNodeNetwork != wire.SFNodeNetwork ||
385			(segwitActive && !peer.IsWitnessEnabled()) {
386			return false
387		}
388	}
389
390	// Candidate if all checks passed.
391	return true
392}
393
394// handleNewPeerMsg deals with new peers that have signalled they may
395// be considered as a sync peer (they have already successfully negotiated).  It
396// also starts syncing if needed.  It is invoked from the syncHandler goroutine.
397func (sm *SyncManager) handleNewPeerMsg(peer *peerpkg.Peer) {
398	// Ignore if in the process of shutting down.
399	if atomic.LoadInt32(&sm.shutdown) != 0 {
400		return
401	}
402
403	log.Infof("New valid peer %s (%s)", peer, peer.UserAgent())
404
405	// Initialize the peer state
406	isSyncCandidate := sm.isSyncCandidate(peer)
407	sm.peerStates[peer] = &peerSyncState{
408		syncCandidate:   isSyncCandidate,
409		requestedTxns:   make(map[chainhash.Hash]struct{}),
410		requestedBlocks: make(map[chainhash.Hash]struct{}),
411	}
412
413	// Start syncing by choosing the best candidate if needed.
414	if isSyncCandidate && sm.syncPeer == nil {
415		sm.startSync()
416	}
417}
418
419// handleStallSample will switch to a new sync peer if the current one has
420// stalled. This is detected when by comparing the last progress timestamp with
421// the current time, and disconnecting the peer if we stalled before reaching
422// their highest advertised block.
423func (sm *SyncManager) handleStallSample() {
424	if atomic.LoadInt32(&sm.shutdown) != 0 {
425		return
426	}
427
428	// If we don't have an active sync peer, exit early.
429	if sm.syncPeer == nil {
430		return
431	}
432
433	// If the stall timeout has not elapsed, exit early.
434	if time.Since(sm.lastProgressTime) <= maxStallDuration {
435		return
436	}
437
438	// Check to see that the peer's sync state exists.
439	state, exists := sm.peerStates[sm.syncPeer]
440	if !exists {
441		return
442	}
443
444	sm.clearRequestedState(state)
445
446	disconnectSyncPeer := sm.shouldDCStalledSyncPeer()
447	sm.updateSyncPeer(disconnectSyncPeer)
448}
449
450// shouldDCStalledSyncPeer determines whether or not we should disconnect a
451// stalled sync peer. If the peer has stalled and its reported height is greater
452// than our own best height, we will disconnect it. Otherwise, we will keep the
453// peer connected in case we are already at tip.
454func (sm *SyncManager) shouldDCStalledSyncPeer() bool {
455	lastBlock := sm.syncPeer.LastBlock()
456	startHeight := sm.syncPeer.StartingHeight()
457
458	var peerHeight int32
459	if lastBlock > startHeight {
460		peerHeight = lastBlock
461	} else {
462		peerHeight = startHeight
463	}
464
465	// If we've stalled out yet the sync peer reports having more blocks for
466	// us we will disconnect them. This allows us at tip to not disconnect
467	// peers when we are equal or they temporarily lag behind us.
468	best := sm.chain.BestSnapshot()
469	return peerHeight > best.Height
470}
471
472// handleDonePeerMsg deals with peers that have signalled they are done.  It
473// removes the peer as a candidate for syncing and in the case where it was
474// the current sync peer, attempts to select a new best peer to sync from.  It
475// is invoked from the syncHandler goroutine.
476func (sm *SyncManager) handleDonePeerMsg(peer *peerpkg.Peer) {
477	state, exists := sm.peerStates[peer]
478	if !exists {
479		log.Warnf("Received done peer message for unknown peer %s", peer)
480		return
481	}
482
483	// Remove the peer from the list of candidate peers.
484	delete(sm.peerStates, peer)
485
486	log.Infof("Lost peer %s", peer)
487
488	sm.clearRequestedState(state)
489
490	if peer == sm.syncPeer {
491		// Update the sync peer. The server has already disconnected the
492		// peer before signaling to the sync manager.
493		sm.updateSyncPeer(false)
494	}
495}
496
497// clearRequestedState wipes all expected transactions and blocks from the sync
498// manager's requested maps that were requested under a peer's sync state, This
499// allows them to be rerequested by a subsequent sync peer.
500func (sm *SyncManager) clearRequestedState(state *peerSyncState) {
501	// Remove requested transactions from the global map so that they will
502	// be fetched from elsewhere next time we get an inv.
503	for txHash := range state.requestedTxns {
504		delete(sm.requestedTxns, txHash)
505	}
506
507	// Remove requested blocks from the global map so that they will be
508	// fetched from elsewhere next time we get an inv.
509	// TODO: we could possibly here check which peers have these blocks
510	// and request them now to speed things up a little.
511	for blockHash := range state.requestedBlocks {
512		delete(sm.requestedBlocks, blockHash)
513	}
514}
515
516// updateSyncPeer choose a new sync peer to replace the current one. If
517// dcSyncPeer is true, this method will also disconnect the current sync peer.
518// If we are in header first mode, any header state related to prefetching is
519// also reset in preparation for the next sync peer.
520func (sm *SyncManager) updateSyncPeer(dcSyncPeer bool) {
521	log.Debugf("Updating sync peer, no progress for: %v",
522		time.Since(sm.lastProgressTime))
523
524	// First, disconnect the current sync peer if requested.
525	if dcSyncPeer {
526		sm.syncPeer.Disconnect()
527	}
528
529	// Reset any header state before we choose our next active sync peer.
530	if sm.headersFirstMode {
531		best := sm.chain.BestSnapshot()
532		sm.resetHeaderState(&best.Hash, best.Height)
533	}
534
535	sm.syncPeer = nil
536	sm.startSync()
537}
538
539// handleTxMsg handles transaction messages from all peers.
540func (sm *SyncManager) handleTxMsg(tmsg *txMsg) {
541	peer := tmsg.peer
542	state, exists := sm.peerStates[peer]
543	if !exists {
544		log.Warnf("Received tx message from unknown peer %s", peer)
545		return
546	}
547
548	// NOTE:  BitcoinJ, and possibly other wallets, don't follow the spec of
549	// sending an inventory message and allowing the remote peer to decide
550	// whether or not they want to request the transaction via a getdata
551	// message.  Unfortunately, the reference implementation permits
552	// unrequested data, so it has allowed wallets that don't follow the
553	// spec to proliferate.  While this is not ideal, there is no check here
554	// to disconnect peers for sending unsolicited transactions to provide
555	// interoperability.
556	txHash := tmsg.tx.Hash()
557
558	// Ignore transactions that we have already rejected.  Do not
559	// send a reject message here because if the transaction was already
560	// rejected, the transaction was unsolicited.
561	if _, exists = sm.rejectedTxns[*txHash]; exists {
562		log.Debugf("Ignoring unsolicited previously rejected "+
563			"transaction %v from %s", txHash, peer)
564		return
565	}
566
567	// Process the transaction to include validation, insertion in the
568	// memory pool, orphan handling, etc.
569	acceptedTxs, err := sm.txMemPool.ProcessTransaction(tmsg.tx,
570		true, true, mempool.Tag(peer.ID()))
571
572	// Remove transaction from request maps. Either the mempool/chain
573	// already knows about it and as such we shouldn't have any more
574	// instances of trying to fetch it, or we failed to insert and thus
575	// we'll retry next time we get an inv.
576	delete(state.requestedTxns, *txHash)
577	delete(sm.requestedTxns, *txHash)
578
579	if err != nil {
580		// Do not request this transaction again until a new block
581		// has been processed.
582		sm.rejectedTxns[*txHash] = struct{}{}
583		sm.limitMap(sm.rejectedTxns, maxRejectedTxns)
584
585		// When the error is a rule error, it means the transaction was
586		// simply rejected as opposed to something actually going wrong,
587		// so log it as such.  Otherwise, something really did go wrong,
588		// so log it as an actual error.
589		if _, ok := err.(mempool.RuleError); ok {
590			log.Debugf("Rejected transaction %v from %s: %v",
591				txHash, peer, err)
592		} else {
593			log.Errorf("Failed to process transaction %v: %v",
594				txHash, err)
595		}
596
597		// Convert the error into an appropriate reject message and
598		// send it.
599		code, reason := mempool.ErrToRejectErr(err)
600		peer.PushRejectMsg(wire.CmdTx, code, reason, txHash, false)
601		return
602	}
603
604	sm.peerNotifier.AnnounceNewTransactions(acceptedTxs)
605}
606
607// current returns true if we believe we are synced with our peers, false if we
608// still have blocks to check
609func (sm *SyncManager) current() bool {
610	if !sm.chain.IsCurrent() {
611		return false
612	}
613
614	// if blockChain thinks we are current and we have no syncPeer it
615	// is probably right.
616	if sm.syncPeer == nil {
617		return true
618	}
619
620	// No matter what chain thinks, if we are below the block we are syncing
621	// to we are not current.
622	if sm.chain.BestSnapshot().Height < sm.syncPeer.LastBlock() {
623		return false
624	}
625	return true
626}
627
628// handleBlockMsg handles block messages from all peers.
629func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
630	peer := bmsg.peer
631	state, exists := sm.peerStates[peer]
632	if !exists {
633		log.Warnf("Received block message from unknown peer %s", peer)
634		return
635	}
636
637	// If we didn't ask for this block then the peer is misbehaving.
638	blockHash := bmsg.block.Hash()
639	if _, exists = state.requestedBlocks[*blockHash]; !exists {
640		// The regression test intentionally sends some blocks twice
641		// to test duplicate block insertion fails.  Don't disconnect
642		// the peer or ignore the block when we're in regression test
643		// mode in this case so the chain code is actually fed the
644		// duplicate blocks.
645		if sm.chainParams != &chaincfg.RegressionNetParams {
646			log.Warnf("Got unrequested block %v from %s -- "+
647				"disconnecting", blockHash, peer.Addr())
648			peer.Disconnect()
649			return
650		}
651	}
652
653	// When in headers-first mode, if the block matches the hash of the
654	// first header in the list of headers that are being fetched, it's
655	// eligible for less validation since the headers have already been
656	// verified to link together and are valid up to the next checkpoint.
657	// Also, remove the list entry for all blocks except the checkpoint
658	// since it is needed to verify the next round of headers links
659	// properly.
660	isCheckpointBlock := false
661	behaviorFlags := blockchain.BFNone
662	if sm.headersFirstMode {
663		firstNodeEl := sm.headerList.Front()
664		if firstNodeEl != nil {
665			firstNode := firstNodeEl.Value.(*headerNode)
666			if blockHash.IsEqual(firstNode.hash) {
667				behaviorFlags |= blockchain.BFFastAdd
668				if firstNode.hash.IsEqual(sm.nextCheckpoint.Hash) {
669					isCheckpointBlock = true
670				} else {
671					sm.headerList.Remove(firstNodeEl)
672				}
673			}
674		}
675	}
676
677	// Remove block from request maps. Either chain will know about it and
678	// so we shouldn't have any more instances of trying to fetch it, or we
679	// will fail the insert and thus we'll retry next time we get an inv.
680	delete(state.requestedBlocks, *blockHash)
681	delete(sm.requestedBlocks, *blockHash)
682
683	// Process the block to include validation, best chain selection, orphan
684	// handling, etc.
685	_, isOrphan, err := sm.chain.ProcessBlock(bmsg.block, behaviorFlags)
686	if err != nil {
687		// When the error is a rule error, it means the block was simply
688		// rejected as opposed to something actually going wrong, so log
689		// it as such.  Otherwise, something really did go wrong, so log
690		// it as an actual error.
691		if _, ok := err.(blockchain.RuleError); ok {
692			log.Infof("Rejected block %v from %s: %v", blockHash,
693				peer, err)
694		} else {
695			log.Errorf("Failed to process block %v: %v",
696				blockHash, err)
697		}
698		if dbErr, ok := err.(database.Error); ok && dbErr.ErrorCode ==
699			database.ErrCorruption {
700			panic(dbErr)
701		}
702
703		// Convert the error into an appropriate reject message and
704		// send it.
705		code, reason := mempool.ErrToRejectErr(err)
706		peer.PushRejectMsg(wire.CmdBlock, code, reason, blockHash, false)
707		return
708	}
709
710	// Meta-data about the new block this peer is reporting. We use this
711	// below to update this peer's latest block height and the heights of
712	// other peers based on their last announced block hash. This allows us
713	// to dynamically update the block heights of peers, avoiding stale
714	// heights when looking for a new sync peer. Upon acceptance of a block
715	// or recognition of an orphan, we also use this information to update
716	// the block heights over other peers who's invs may have been ignored
717	// if we are actively syncing while the chain is not yet current or
718	// who may have lost the lock announcement race.
719	var heightUpdate int32
720	var blkHashUpdate *chainhash.Hash
721
722	// Request the parents for the orphan block from the peer that sent it.
723	if isOrphan {
724		// We've just received an orphan block from a peer. In order
725		// to update the height of the peer, we try to extract the
726		// block height from the scriptSig of the coinbase transaction.
727		// Extraction is only attempted if the block's version is
728		// high enough (ver 2+).
729		header := &bmsg.block.MsgBlock().Header
730		if blockchain.ShouldHaveSerializedBlockHeight(header) {
731			coinbaseTx := bmsg.block.Transactions()[0]
732			cbHeight, err := blockchain.ExtractCoinbaseHeight(coinbaseTx)
733			if err != nil {
734				log.Warnf("Unable to extract height from "+
735					"coinbase tx: %v", err)
736			} else {
737				log.Debugf("Extracted height of %v from "+
738					"orphan block", cbHeight)
739				heightUpdate = cbHeight
740				blkHashUpdate = blockHash
741			}
742		}
743
744		orphanRoot := sm.chain.GetOrphanRoot(blockHash)
745		locator, err := sm.chain.LatestBlockLocator()
746		if err != nil {
747			log.Warnf("Failed to get block locator for the "+
748				"latest block: %v", err)
749		} else {
750			peer.PushGetBlocksMsg(locator, orphanRoot)
751		}
752	} else {
753		if peer == sm.syncPeer {
754			sm.lastProgressTime = time.Now()
755		}
756
757		// When the block is not an orphan, log information about it and
758		// update the chain state.
759		sm.progressLogger.LogBlockHeight(bmsg.block)
760
761		// Update this peer's latest block height, for future
762		// potential sync node candidacy.
763		best := sm.chain.BestSnapshot()
764		heightUpdate = best.Height
765		blkHashUpdate = &best.Hash
766
767		// Clear the rejected transactions.
768		sm.rejectedTxns = make(map[chainhash.Hash]struct{})
769	}
770
771	// Update the block height for this peer. But only send a message to
772	// the server for updating peer heights if this is an orphan or our
773	// chain is "current". This avoids sending a spammy amount of messages
774	// if we're syncing the chain from scratch.
775	if blkHashUpdate != nil && heightUpdate != 0 {
776		peer.UpdateLastBlockHeight(heightUpdate)
777		if isOrphan || sm.current() {
778			go sm.peerNotifier.UpdatePeerHeights(blkHashUpdate, heightUpdate,
779				peer)
780		}
781	}
782
783	// Nothing more to do if we aren't in headers-first mode.
784	if !sm.headersFirstMode {
785		return
786	}
787
788	// This is headers-first mode, so if the block is not a checkpoint
789	// request more blocks using the header list when the request queue is
790	// getting short.
791	if !isCheckpointBlock {
792		if sm.startHeader != nil &&
793			len(state.requestedBlocks) < minInFlightBlocks {
794			sm.fetchHeaderBlocks()
795		}
796		return
797	}
798
799	// This is headers-first mode and the block is a checkpoint.  When
800	// there is a next checkpoint, get the next round of headers by asking
801	// for headers starting from the block after this one up to the next
802	// checkpoint.
803	prevHeight := sm.nextCheckpoint.Height
804	prevHash := sm.nextCheckpoint.Hash
805	sm.nextCheckpoint = sm.findNextHeaderCheckpoint(prevHeight)
806	if sm.nextCheckpoint != nil {
807		locator := blockchain.BlockLocator([]*chainhash.Hash{prevHash})
808		err := peer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash)
809		if err != nil {
810			log.Warnf("Failed to send getheaders message to "+
811				"peer %s: %v", peer.Addr(), err)
812			return
813		}
814		log.Infof("Downloading headers for blocks %d to %d from "+
815			"peer %s", prevHeight+1, sm.nextCheckpoint.Height,
816			sm.syncPeer.Addr())
817		return
818	}
819
820	// This is headers-first mode, the block is a checkpoint, and there are
821	// no more checkpoints, so switch to normal mode by requesting blocks
822	// from the block after this one up to the end of the chain (zero hash).
823	sm.headersFirstMode = false
824	sm.headerList.Init()
825	log.Infof("Reached the final checkpoint -- switching to normal mode")
826	locator := blockchain.BlockLocator([]*chainhash.Hash{blockHash})
827	err = peer.PushGetBlocksMsg(locator, &zeroHash)
828	if err != nil {
829		log.Warnf("Failed to send getblocks message to peer %s: %v",
830			peer.Addr(), err)
831		return
832	}
833}
834
835// fetchHeaderBlocks creates and sends a request to the syncPeer for the next
836// list of blocks to be downloaded based on the current list of headers.
837func (sm *SyncManager) fetchHeaderBlocks() {
838	// Nothing to do if there is no start header.
839	if sm.startHeader == nil {
840		log.Warnf("fetchHeaderBlocks called with no start header")
841		return
842	}
843
844	// Build up a getdata request for the list of blocks the headers
845	// describe.  The size hint will be limited to wire.MaxInvPerMsg by
846	// the function, so no need to double check it here.
847	gdmsg := wire.NewMsgGetDataSizeHint(uint(sm.headerList.Len()))
848	numRequested := 0
849	for e := sm.startHeader; e != nil; e = e.Next() {
850		node, ok := e.Value.(*headerNode)
851		if !ok {
852			log.Warn("Header list node type is not a headerNode")
853			continue
854		}
855
856		iv := wire.NewInvVect(wire.InvTypeBlock, node.hash)
857		haveInv, err := sm.haveInventory(iv)
858		if err != nil {
859			log.Warnf("Unexpected failure when checking for "+
860				"existing inventory during header block "+
861				"fetch: %v", err)
862		}
863		if !haveInv {
864			syncPeerState := sm.peerStates[sm.syncPeer]
865
866			sm.requestedBlocks[*node.hash] = struct{}{}
867			syncPeerState.requestedBlocks[*node.hash] = struct{}{}
868
869			// If we're fetching from a witness enabled peer
870			// post-fork, then ensure that we receive all the
871			// witness data in the blocks.
872			if sm.syncPeer.IsWitnessEnabled() {
873				iv.Type = wire.InvTypeWitnessBlock
874			}
875
876			gdmsg.AddInvVect(iv)
877			numRequested++
878		}
879		sm.startHeader = e.Next()
880		if numRequested >= wire.MaxInvPerMsg {
881			break
882		}
883	}
884	if len(gdmsg.InvList) > 0 {
885		sm.syncPeer.QueueMessage(gdmsg, nil)
886	}
887}
888
889// handleHeadersMsg handles block header messages from all peers.  Headers are
890// requested when performing a headers-first sync.
891func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) {
892	peer := hmsg.peer
893	_, exists := sm.peerStates[peer]
894	if !exists {
895		log.Warnf("Received headers message from unknown peer %s", peer)
896		return
897	}
898
899	// The remote peer is misbehaving if we didn't request headers.
900	msg := hmsg.headers
901	numHeaders := len(msg.Headers)
902	if !sm.headersFirstMode {
903		log.Warnf("Got %d unrequested headers from %s -- "+
904			"disconnecting", numHeaders, peer.Addr())
905		peer.Disconnect()
906		return
907	}
908
909	// Nothing to do for an empty headers message.
910	if numHeaders == 0 {
911		return
912	}
913
914	// Process all of the received headers ensuring each one connects to the
915	// previous and that checkpoints match.
916	receivedCheckpoint := false
917	var finalHash *chainhash.Hash
918	for _, blockHeader := range msg.Headers {
919		blockHash := blockHeader.BlockHash()
920		finalHash = &blockHash
921
922		// Ensure there is a previous header to compare against.
923		prevNodeEl := sm.headerList.Back()
924		if prevNodeEl == nil {
925			log.Warnf("Header list does not contain a previous" +
926				"element as expected -- disconnecting peer")
927			peer.Disconnect()
928			return
929		}
930
931		// Ensure the header properly connects to the previous one and
932		// add it to the list of headers.
933		node := headerNode{hash: &blockHash}
934		prevNode := prevNodeEl.Value.(*headerNode)
935		if prevNode.hash.IsEqual(&blockHeader.PrevBlock) {
936			node.height = prevNode.height + 1
937			e := sm.headerList.PushBack(&node)
938			if sm.startHeader == nil {
939				sm.startHeader = e
940			}
941		} else {
942			log.Warnf("Received block header that does not "+
943				"properly connect to the chain from peer %s "+
944				"-- disconnecting", peer.Addr())
945			peer.Disconnect()
946			return
947		}
948
949		// Verify the header at the next checkpoint height matches.
950		if node.height == sm.nextCheckpoint.Height {
951			if node.hash.IsEqual(sm.nextCheckpoint.Hash) {
952				receivedCheckpoint = true
953				log.Infof("Verified downloaded block "+
954					"header against checkpoint at height "+
955					"%d/hash %s", node.height, node.hash)
956			} else {
957				log.Warnf("Block header at height %d/hash "+
958					"%s from peer %s does NOT match "+
959					"expected checkpoint hash of %s -- "+
960					"disconnecting", node.height,
961					node.hash, peer.Addr(),
962					sm.nextCheckpoint.Hash)
963				peer.Disconnect()
964				return
965			}
966			break
967		}
968	}
969
970	// When this header is a checkpoint, switch to fetching the blocks for
971	// all of the headers since the last checkpoint.
972	if receivedCheckpoint {
973		// Since the first entry of the list is always the final block
974		// that is already in the database and is only used to ensure
975		// the next header links properly, it must be removed before
976		// fetching the blocks.
977		sm.headerList.Remove(sm.headerList.Front())
978		log.Infof("Received %v block headers: Fetching blocks",
979			sm.headerList.Len())
980		sm.progressLogger.SetLastLogTime(time.Now())
981		sm.fetchHeaderBlocks()
982		return
983	}
984
985	// This header is not a checkpoint, so request the next batch of
986	// headers starting from the latest known header and ending with the
987	// next checkpoint.
988	locator := blockchain.BlockLocator([]*chainhash.Hash{finalHash})
989	err := peer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash)
990	if err != nil {
991		log.Warnf("Failed to send getheaders message to "+
992			"peer %s: %v", peer.Addr(), err)
993		return
994	}
995}
996
997// haveInventory returns whether or not the inventory represented by the passed
998// inventory vector is known.  This includes checking all of the various places
999// inventory can be when it is in different states such as blocks that are part
1000// of the main chain, on a side chain, in the orphan pool, and transactions that
1001// are in the memory pool (either the main pool or orphan pool).
1002func (sm *SyncManager) haveInventory(invVect *wire.InvVect) (bool, error) {
1003	switch invVect.Type {
1004	case wire.InvTypeWitnessBlock:
1005		fallthrough
1006	case wire.InvTypeBlock:
1007		// Ask chain if the block is known to it in any form (main
1008		// chain, side chain, or orphan).
1009		return sm.chain.HaveBlock(&invVect.Hash)
1010
1011	case wire.InvTypeWitnessTx:
1012		fallthrough
1013	case wire.InvTypeTx:
1014		// Ask the transaction memory pool if the transaction is known
1015		// to it in any form (main pool or orphan).
1016		if sm.txMemPool.HaveTransaction(&invVect.Hash) {
1017			return true, nil
1018		}
1019
1020		// Check if the transaction exists from the point of view of the
1021		// end of the main chain.  Note that this is only a best effort
1022		// since it is expensive to check existence of every output and
1023		// the only purpose of this check is to avoid downloading
1024		// already known transactions.  Only the first two outputs are
1025		// checked because the vast majority of transactions consist of
1026		// two outputs where one is some form of "pay-to-somebody-else"
1027		// and the other is a change output.
1028		prevOut := wire.OutPoint{Hash: invVect.Hash}
1029		for i := uint32(0); i < 2; i++ {
1030			prevOut.Index = i
1031			entry, err := sm.chain.FetchUtxoEntry(prevOut)
1032			if err != nil {
1033				return false, err
1034			}
1035			if entry != nil && !entry.IsSpent() {
1036				return true, nil
1037			}
1038		}
1039
1040		return false, nil
1041	}
1042
1043	// The requested inventory is is an unsupported type, so just claim
1044	// it is known to avoid requesting it.
1045	return true, nil
1046}
1047
1048// handleInvMsg handles inv messages from all peers.
1049// We examine the inventory advertised by the remote peer and act accordingly.
1050func (sm *SyncManager) handleInvMsg(imsg *invMsg) {
1051	peer := imsg.peer
1052	state, exists := sm.peerStates[peer]
1053	if !exists {
1054		log.Warnf("Received inv message from unknown peer %s", peer)
1055		return
1056	}
1057
1058	// Attempt to find the final block in the inventory list.  There may
1059	// not be one.
1060	lastBlock := -1
1061	invVects := imsg.inv.InvList
1062	for i := len(invVects) - 1; i >= 0; i-- {
1063		if invVects[i].Type == wire.InvTypeBlock {
1064			lastBlock = i
1065			break
1066		}
1067	}
1068
1069	// If this inv contains a block announcement, and this isn't coming from
1070	// our current sync peer or we're current, then update the last
1071	// announced block for this peer. We'll use this information later to
1072	// update the heights of peers based on blocks we've accepted that they
1073	// previously announced.
1074	if lastBlock != -1 && (peer != sm.syncPeer || sm.current()) {
1075		peer.UpdateLastAnnouncedBlock(&invVects[lastBlock].Hash)
1076	}
1077
1078	// Ignore invs from peers that aren't the sync if we are not current.
1079	// Helps prevent fetching a mass of orphans.
1080	if peer != sm.syncPeer && !sm.current() {
1081		return
1082	}
1083
1084	// If our chain is current and a peer announces a block we already
1085	// know of, then update their current block height.
1086	if lastBlock != -1 && sm.current() {
1087		blkHeight, err := sm.chain.BlockHeightByHash(&invVects[lastBlock].Hash)
1088		if err == nil {
1089			peer.UpdateLastBlockHeight(blkHeight)
1090		}
1091	}
1092
1093	// Request the advertised inventory if we don't already have it.  Also,
1094	// request parent blocks of orphans if we receive one we already have.
1095	// Finally, attempt to detect potential stalls due to long side chains
1096	// we already have and request more blocks to prevent them.
1097	for i, iv := range invVects {
1098		// Ignore unsupported inventory types.
1099		switch iv.Type {
1100		case wire.InvTypeBlock:
1101		case wire.InvTypeTx:
1102		case wire.InvTypeWitnessBlock:
1103		case wire.InvTypeWitnessTx:
1104		default:
1105			continue
1106		}
1107
1108		// Add the inventory to the cache of known inventory
1109		// for the peer.
1110		peer.AddKnownInventory(iv)
1111
1112		// Ignore inventory when we're in headers-first mode.
1113		if sm.headersFirstMode {
1114			continue
1115		}
1116
1117		// Request the inventory if we don't already have it.
1118		haveInv, err := sm.haveInventory(iv)
1119		if err != nil {
1120			log.Warnf("Unexpected failure when checking for "+
1121				"existing inventory during inv message "+
1122				"processing: %v", err)
1123			continue
1124		}
1125		if !haveInv {
1126			if iv.Type == wire.InvTypeTx {
1127				// Skip the transaction if it has already been
1128				// rejected.
1129				if _, exists := sm.rejectedTxns[iv.Hash]; exists {
1130					continue
1131				}
1132			}
1133
1134			// Ignore invs block invs from non-witness enabled
1135			// peers, as after segwit activation we only want to
1136			// download from peers that can provide us full witness
1137			// data for blocks.
1138			if !peer.IsWitnessEnabled() && iv.Type == wire.InvTypeBlock {
1139				continue
1140			}
1141
1142			// Add it to the request queue.
1143			state.requestQueue = append(state.requestQueue, iv)
1144			continue
1145		}
1146
1147		if iv.Type == wire.InvTypeBlock {
1148			// The block is an orphan block that we already have.
1149			// When the existing orphan was processed, it requested
1150			// the missing parent blocks.  When this scenario
1151			// happens, it means there were more blocks missing
1152			// than are allowed into a single inventory message.  As
1153			// a result, once this peer requested the final
1154			// advertised block, the remote peer noticed and is now
1155			// resending the orphan block as an available block
1156			// to signal there are more missing blocks that need to
1157			// be requested.
1158			if sm.chain.IsKnownOrphan(&iv.Hash) {
1159				// Request blocks starting at the latest known
1160				// up to the root of the orphan that just came
1161				// in.
1162				orphanRoot := sm.chain.GetOrphanRoot(&iv.Hash)
1163				locator, err := sm.chain.LatestBlockLocator()
1164				if err != nil {
1165					log.Errorf("PEER: Failed to get block "+
1166						"locator for the latest block: "+
1167						"%v", err)
1168					continue
1169				}
1170				peer.PushGetBlocksMsg(locator, orphanRoot)
1171				continue
1172			}
1173
1174			// We already have the final block advertised by this
1175			// inventory message, so force a request for more.  This
1176			// should only happen if we're on a really long side
1177			// chain.
1178			if i == lastBlock {
1179				// Request blocks after this one up to the
1180				// final one the remote peer knows about (zero
1181				// stop hash).
1182				locator := sm.chain.BlockLocatorFromHash(&iv.Hash)
1183				peer.PushGetBlocksMsg(locator, &zeroHash)
1184			}
1185		}
1186	}
1187
1188	// Request as much as possible at once.  Anything that won't fit into
1189	// the request will be requested on the next inv message.
1190	numRequested := 0
1191	gdmsg := wire.NewMsgGetData()
1192	requestQueue := state.requestQueue
1193	for len(requestQueue) != 0 {
1194		iv := requestQueue[0]
1195		requestQueue[0] = nil
1196		requestQueue = requestQueue[1:]
1197
1198		switch iv.Type {
1199		case wire.InvTypeWitnessBlock:
1200			fallthrough
1201		case wire.InvTypeBlock:
1202			// Request the block if there is not already a pending
1203			// request.
1204			if _, exists := sm.requestedBlocks[iv.Hash]; !exists {
1205				sm.requestedBlocks[iv.Hash] = struct{}{}
1206				sm.limitMap(sm.requestedBlocks, maxRequestedBlocks)
1207				state.requestedBlocks[iv.Hash] = struct{}{}
1208
1209				if peer.IsWitnessEnabled() {
1210					iv.Type = wire.InvTypeWitnessBlock
1211				}
1212
1213				gdmsg.AddInvVect(iv)
1214				numRequested++
1215			}
1216
1217		case wire.InvTypeWitnessTx:
1218			fallthrough
1219		case wire.InvTypeTx:
1220			// Request the transaction if there is not already a
1221			// pending request.
1222			if _, exists := sm.requestedTxns[iv.Hash]; !exists {
1223				sm.requestedTxns[iv.Hash] = struct{}{}
1224				sm.limitMap(sm.requestedTxns, maxRequestedTxns)
1225				state.requestedTxns[iv.Hash] = struct{}{}
1226
1227				// If the peer is capable, request the txn
1228				// including all witness data.
1229				if peer.IsWitnessEnabled() {
1230					iv.Type = wire.InvTypeWitnessTx
1231				}
1232
1233				gdmsg.AddInvVect(iv)
1234				numRequested++
1235			}
1236		}
1237
1238		if numRequested >= wire.MaxInvPerMsg {
1239			break
1240		}
1241	}
1242	state.requestQueue = requestQueue
1243	if len(gdmsg.InvList) > 0 {
1244		peer.QueueMessage(gdmsg, nil)
1245	}
1246}
1247
1248// limitMap is a helper function for maps that require a maximum limit by
1249// evicting a random transaction if adding a new value would cause it to
1250// overflow the maximum allowed.
1251func (sm *SyncManager) limitMap(m map[chainhash.Hash]struct{}, limit int) {
1252	if len(m)+1 > limit {
1253		// Remove a random entry from the map.  For most compilers, Go's
1254		// range statement iterates starting at a random item although
1255		// that is not 100% guaranteed by the spec.  The iteration order
1256		// is not important here because an adversary would have to be
1257		// able to pull off preimage attacks on the hashing function in
1258		// order to target eviction of specific entries anyways.
1259		for txHash := range m {
1260			delete(m, txHash)
1261			return
1262		}
1263	}
1264}
1265
1266// blockHandler is the main handler for the sync manager.  It must be run as a
1267// goroutine.  It processes block and inv messages in a separate goroutine
1268// from the peer handlers so the block (MsgBlock) messages are handled by a
1269// single thread without needing to lock memory data structures.  This is
1270// important because the sync manager controls which blocks are needed and how
1271// the fetching should proceed.
1272func (sm *SyncManager) blockHandler() {
1273	stallTicker := time.NewTicker(stallSampleInterval)
1274	defer stallTicker.Stop()
1275
1276out:
1277	for {
1278		select {
1279		case m := <-sm.msgChan:
1280			switch msg := m.(type) {
1281			case *newPeerMsg:
1282				sm.handleNewPeerMsg(msg.peer)
1283
1284			case *txMsg:
1285				sm.handleTxMsg(msg)
1286				msg.reply <- struct{}{}
1287
1288			case *blockMsg:
1289				sm.handleBlockMsg(msg)
1290				msg.reply <- struct{}{}
1291
1292			case *invMsg:
1293				sm.handleInvMsg(msg)
1294
1295			case *headersMsg:
1296				sm.handleHeadersMsg(msg)
1297
1298			case *donePeerMsg:
1299				sm.handleDonePeerMsg(msg.peer)
1300
1301			case getSyncPeerMsg:
1302				var peerID int32
1303				if sm.syncPeer != nil {
1304					peerID = sm.syncPeer.ID()
1305				}
1306				msg.reply <- peerID
1307
1308			case processBlockMsg:
1309				_, isOrphan, err := sm.chain.ProcessBlock(
1310					msg.block, msg.flags)
1311				if err != nil {
1312					msg.reply <- processBlockResponse{
1313						isOrphan: false,
1314						err:      err,
1315					}
1316				}
1317
1318				msg.reply <- processBlockResponse{
1319					isOrphan: isOrphan,
1320					err:      nil,
1321				}
1322
1323			case isCurrentMsg:
1324				msg.reply <- sm.current()
1325
1326			case pauseMsg:
1327				// Wait until the sender unpauses the manager.
1328				<-msg.unpause
1329
1330			default:
1331				log.Warnf("Invalid message type in block "+
1332					"handler: %T", msg)
1333			}
1334
1335		case <-stallTicker.C:
1336			sm.handleStallSample()
1337
1338		case <-sm.quit:
1339			break out
1340		}
1341	}
1342
1343	sm.wg.Done()
1344	log.Trace("Block handler done")
1345}
1346
1347// handleBlockchainNotification handles notifications from blockchain.  It does
1348// things such as request orphan block parents and relay accepted blocks to
1349// connected peers.
1350func (sm *SyncManager) handleBlockchainNotification(notification *blockchain.Notification) {
1351	switch notification.Type {
1352	// A block has been accepted into the block chain.  Relay it to other
1353	// peers.
1354	case blockchain.NTBlockAccepted:
1355		// Don't relay if we are not current. Other peers that are
1356		// current should already know about it.
1357		if !sm.current() {
1358			return
1359		}
1360
1361		block, ok := notification.Data.(*btcutil.Block)
1362		if !ok {
1363			log.Warnf("Chain accepted notification is not a block.")
1364			break
1365		}
1366
1367		// Generate the inventory vector and relay it.
1368		iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash())
1369		sm.peerNotifier.RelayInventory(iv, block.MsgBlock().Header)
1370
1371	// A block has been connected to the main block chain.
1372	case blockchain.NTBlockConnected:
1373		block, ok := notification.Data.(*btcutil.Block)
1374		if !ok {
1375			log.Warnf("Chain connected notification is not a block.")
1376			break
1377		}
1378
1379		// Remove all of the transactions (except the coinbase) in the
1380		// connected block from the transaction pool.  Secondly, remove any
1381		// transactions which are now double spends as a result of these
1382		// new transactions.  Finally, remove any transaction that is
1383		// no longer an orphan. Transactions which depend on a confirmed
1384		// transaction are NOT removed recursively because they are still
1385		// valid.
1386		for _, tx := range block.Transactions()[1:] {
1387			sm.txMemPool.RemoveTransaction(tx, false)
1388			sm.txMemPool.RemoveDoubleSpends(tx)
1389			sm.txMemPool.RemoveOrphan(tx)
1390			sm.peerNotifier.TransactionConfirmed(tx)
1391			acceptedTxs := sm.txMemPool.ProcessOrphans(tx)
1392			sm.peerNotifier.AnnounceNewTransactions(acceptedTxs)
1393		}
1394
1395		// Register block with the fee estimator, if it exists.
1396		if sm.feeEstimator != nil {
1397			err := sm.feeEstimator.RegisterBlock(block)
1398
1399			// If an error is somehow generated then the fee estimator
1400			// has entered an invalid state. Since it doesn't know how
1401			// to recover, create a new one.
1402			if err != nil {
1403				sm.feeEstimator = mempool.NewFeeEstimator(
1404					mempool.DefaultEstimateFeeMaxRollback,
1405					mempool.DefaultEstimateFeeMinRegisteredBlocks)
1406			}
1407		}
1408
1409	// A block has been disconnected from the main block chain.
1410	case blockchain.NTBlockDisconnected:
1411		block, ok := notification.Data.(*btcutil.Block)
1412		if !ok {
1413			log.Warnf("Chain disconnected notification is not a block.")
1414			break
1415		}
1416
1417		// Reinsert all of the transactions (except the coinbase) into
1418		// the transaction pool.
1419		for _, tx := range block.Transactions()[1:] {
1420			_, _, err := sm.txMemPool.MaybeAcceptTransaction(tx,
1421				false, false)
1422			if err != nil {
1423				// Remove the transaction and all transactions
1424				// that depend on it if it wasn't accepted into
1425				// the transaction pool.
1426				sm.txMemPool.RemoveTransaction(tx, true)
1427			}
1428		}
1429
1430		// Rollback previous block recorded by the fee estimator.
1431		if sm.feeEstimator != nil {
1432			sm.feeEstimator.Rollback(block.Hash())
1433		}
1434	}
1435}
1436
1437// NewPeer informs the sync manager of a newly active peer.
1438func (sm *SyncManager) NewPeer(peer *peerpkg.Peer) {
1439	// Ignore if we are shutting down.
1440	if atomic.LoadInt32(&sm.shutdown) != 0 {
1441		return
1442	}
1443	sm.msgChan <- &newPeerMsg{peer: peer}
1444}
1445
1446// QueueTx adds the passed transaction message and peer to the block handling
1447// queue. Responds to the done channel argument after the tx message is
1448// processed.
1449func (sm *SyncManager) QueueTx(tx *btcutil.Tx, peer *peerpkg.Peer, done chan struct{}) {
1450	// Don't accept more transactions if we're shutting down.
1451	if atomic.LoadInt32(&sm.shutdown) != 0 {
1452		done <- struct{}{}
1453		return
1454	}
1455
1456	sm.msgChan <- &txMsg{tx: tx, peer: peer, reply: done}
1457}
1458
1459// QueueBlock adds the passed block message and peer to the block handling
1460// queue. Responds to the done channel argument after the block message is
1461// processed.
1462func (sm *SyncManager) QueueBlock(block *btcutil.Block, peer *peerpkg.Peer, done chan struct{}) {
1463	// Don't accept more blocks if we're shutting down.
1464	if atomic.LoadInt32(&sm.shutdown) != 0 {
1465		done <- struct{}{}
1466		return
1467	}
1468
1469	sm.msgChan <- &blockMsg{block: block, peer: peer, reply: done}
1470}
1471
1472// QueueInv adds the passed inv message and peer to the block handling queue.
1473func (sm *SyncManager) QueueInv(inv *wire.MsgInv, peer *peerpkg.Peer) {
1474	// No channel handling here because peers do not need to block on inv
1475	// messages.
1476	if atomic.LoadInt32(&sm.shutdown) != 0 {
1477		return
1478	}
1479
1480	sm.msgChan <- &invMsg{inv: inv, peer: peer}
1481}
1482
1483// QueueHeaders adds the passed headers message and peer to the block handling
1484// queue.
1485func (sm *SyncManager) QueueHeaders(headers *wire.MsgHeaders, peer *peerpkg.Peer) {
1486	// No channel handling here because peers do not need to block on
1487	// headers messages.
1488	if atomic.LoadInt32(&sm.shutdown) != 0 {
1489		return
1490	}
1491
1492	sm.msgChan <- &headersMsg{headers: headers, peer: peer}
1493}
1494
1495// DonePeer informs the blockmanager that a peer has disconnected.
1496func (sm *SyncManager) DonePeer(peer *peerpkg.Peer) {
1497	// Ignore if we are shutting down.
1498	if atomic.LoadInt32(&sm.shutdown) != 0 {
1499		return
1500	}
1501
1502	sm.msgChan <- &donePeerMsg{peer: peer}
1503}
1504
1505// Start begins the core block handler which processes block and inv messages.
1506func (sm *SyncManager) Start() {
1507	// Already started?
1508	if atomic.AddInt32(&sm.started, 1) != 1 {
1509		return
1510	}
1511
1512	log.Trace("Starting sync manager")
1513	sm.wg.Add(1)
1514	go sm.blockHandler()
1515}
1516
1517// Stop gracefully shuts down the sync manager by stopping all asynchronous
1518// handlers and waiting for them to finish.
1519func (sm *SyncManager) Stop() error {
1520	if atomic.AddInt32(&sm.shutdown, 1) != 1 {
1521		log.Warnf("Sync manager is already in the process of " +
1522			"shutting down")
1523		return nil
1524	}
1525
1526	log.Infof("Sync manager shutting down")
1527	close(sm.quit)
1528	sm.wg.Wait()
1529	return nil
1530}
1531
1532// SyncPeerID returns the ID of the current sync peer, or 0 if there is none.
1533func (sm *SyncManager) SyncPeerID() int32 {
1534	reply := make(chan int32)
1535	sm.msgChan <- getSyncPeerMsg{reply: reply}
1536	return <-reply
1537}
1538
1539// ProcessBlock makes use of ProcessBlock on an internal instance of a block
1540// chain.
1541func (sm *SyncManager) ProcessBlock(block *btcutil.Block, flags blockchain.BehaviorFlags) (bool, error) {
1542	reply := make(chan processBlockResponse, 1)
1543	sm.msgChan <- processBlockMsg{block: block, flags: flags, reply: reply}
1544	response := <-reply
1545	return response.isOrphan, response.err
1546}
1547
1548// IsCurrent returns whether or not the sync manager believes it is synced with
1549// the connected peers.
1550func (sm *SyncManager) IsCurrent() bool {
1551	reply := make(chan bool)
1552	sm.msgChan <- isCurrentMsg{reply: reply}
1553	return <-reply
1554}
1555
1556// Pause pauses the sync manager until the returned channel is closed.
1557//
1558// Note that while paused, all peer and block processing is halted.  The
1559// message sender should avoid pausing the sync manager for long durations.
1560func (sm *SyncManager) Pause() chan<- struct{} {
1561	c := make(chan struct{})
1562	sm.msgChan <- pauseMsg{c}
1563	return c
1564}
1565
1566// New constructs a new SyncManager. Use Start to begin processing asynchronous
1567// block, tx, and inv updates.
1568func New(config *Config) (*SyncManager, error) {
1569	sm := SyncManager{
1570		peerNotifier:    config.PeerNotifier,
1571		chain:           config.Chain,
1572		txMemPool:       config.TxMemPool,
1573		chainParams:     config.ChainParams,
1574		rejectedTxns:    make(map[chainhash.Hash]struct{}),
1575		requestedTxns:   make(map[chainhash.Hash]struct{}),
1576		requestedBlocks: make(map[chainhash.Hash]struct{}),
1577		peerStates:      make(map[*peerpkg.Peer]*peerSyncState),
1578		progressLogger:  newBlockProgressLogger("Processed", log),
1579		msgChan:         make(chan interface{}, config.MaxPeers*3),
1580		headerList:      list.New(),
1581		quit:            make(chan struct{}),
1582		feeEstimator:    config.FeeEstimator,
1583	}
1584
1585	best := sm.chain.BestSnapshot()
1586	if !config.DisableCheckpoints {
1587		// Initialize the next checkpoint based on the current height.
1588		sm.nextCheckpoint = sm.findNextHeaderCheckpoint(best.Height)
1589		if sm.nextCheckpoint != nil {
1590			sm.resetHeaderState(&best.Hash, best.Height)
1591		}
1592	} else {
1593		log.Info("Checkpoints are disabled")
1594	}
1595
1596	sm.chain.Subscribe(sm.handleBlockchainNotification)
1597
1598	return &sm, nil
1599}
1600