1// Copyright (c) 2013-2017 The btcsuite developers
2// Copyright (c) 2015-2018 The Decred developers
3// Use of this source code is governed by an ISC
4// license that can be found in the LICENSE file.
5
6package main
7
8import (
9	"bytes"
10	"crypto/rand"
11	"crypto/tls"
12	"encoding/binary"
13	"errors"
14	"fmt"
15	"math"
16	"net"
17	"runtime"
18	"sort"
19	"strconv"
20	"strings"
21	"sync"
22	"sync/atomic"
23	"time"
24
25	"github.com/btcsuite/btcd/addrmgr"
26	"github.com/btcsuite/btcd/blockchain"
27	"github.com/btcsuite/btcd/blockchain/indexers"
28	"github.com/btcsuite/btcd/chaincfg"
29	"github.com/btcsuite/btcd/chaincfg/chainhash"
30	"github.com/btcsuite/btcd/connmgr"
31	"github.com/btcsuite/btcd/database"
32	"github.com/btcsuite/btcd/mempool"
33	"github.com/btcsuite/btcd/mining"
34	"github.com/btcsuite/btcd/mining/cpuminer"
35	"github.com/btcsuite/btcd/netsync"
36	"github.com/btcsuite/btcd/peer"
37	"github.com/btcsuite/btcd/txscript"
38	"github.com/btcsuite/btcd/wire"
39	"github.com/btcsuite/btcutil"
40	"github.com/btcsuite/btcutil/bloom"
41)
42
43const (
44	// defaultServices describes the default services that are supported by
45	// the server.
46	defaultServices = wire.SFNodeNetwork | wire.SFNodeBloom |
47		wire.SFNodeWitness | wire.SFNodeCF
48
49	// defaultRequiredServices describes the default services that are
50	// required to be supported by outbound peers.
51	defaultRequiredServices = wire.SFNodeNetwork
52
53	// defaultTargetOutbound is the default number of outbound peers to target.
54	defaultTargetOutbound = 8
55
56	// connectionRetryInterval is the base amount of time to wait in between
57	// retries when connecting to persistent peers.  It is adjusted by the
58	// number of retries such that there is a retry backoff.
59	connectionRetryInterval = time.Second * 5
60)
61
62var (
63	// userAgentName is the user agent name and is used to help identify
64	// ourselves to other bitcoin peers.
65	userAgentName = "btcd"
66
67	// userAgentVersion is the user agent version and is used to help
68	// identify ourselves to other bitcoin peers.
69	userAgentVersion = fmt.Sprintf("%d.%d.%d", appMajor, appMinor, appPatch)
70)
71
72// zeroHash is the zero value hash (all zeros).  It is defined as a convenience.
73var zeroHash chainhash.Hash
74
75// onionAddr implements the net.Addr interface and represents a tor address.
76type onionAddr struct {
77	addr string
78}
79
80// String returns the onion address.
81//
82// This is part of the net.Addr interface.
83func (oa *onionAddr) String() string {
84	return oa.addr
85}
86
87// Network returns "onion".
88//
89// This is part of the net.Addr interface.
90func (oa *onionAddr) Network() string {
91	return "onion"
92}
93
94// Ensure onionAddr implements the net.Addr interface.
95var _ net.Addr = (*onionAddr)(nil)
96
97// simpleAddr implements the net.Addr interface with two struct fields
98type simpleAddr struct {
99	net, addr string
100}
101
102// String returns the address.
103//
104// This is part of the net.Addr interface.
105func (a simpleAddr) String() string {
106	return a.addr
107}
108
109// Network returns the network.
110//
111// This is part of the net.Addr interface.
112func (a simpleAddr) Network() string {
113	return a.net
114}
115
116// Ensure simpleAddr implements the net.Addr interface.
117var _ net.Addr = simpleAddr{}
118
119// broadcastMsg provides the ability to house a bitcoin message to be broadcast
120// to all connected peers except specified excluded peers.
121type broadcastMsg struct {
122	message      wire.Message
123	excludePeers []*serverPeer
124}
125
126// broadcastInventoryAdd is a type used to declare that the InvVect it contains
127// needs to be added to the rebroadcast map
128type broadcastInventoryAdd relayMsg
129
130// broadcastInventoryDel is a type used to declare that the InvVect it contains
131// needs to be removed from the rebroadcast map
132type broadcastInventoryDel *wire.InvVect
133
134// relayMsg packages an inventory vector along with the newly discovered
135// inventory so the relay has access to that information.
136type relayMsg struct {
137	invVect *wire.InvVect
138	data    interface{}
139}
140
141// updatePeerHeightsMsg is a message sent from the blockmanager to the server
142// after a new block has been accepted. The purpose of the message is to update
143// the heights of peers that were known to announce the block before we
144// connected it to the main chain or recognized it as an orphan. With these
145// updates, peer heights will be kept up to date, allowing for fresh data when
146// selecting sync peer candidacy.
147type updatePeerHeightsMsg struct {
148	newHash    *chainhash.Hash
149	newHeight  int32
150	originPeer *peer.Peer
151}
152
153// peerState maintains state of inbound, persistent, outbound peers as well
154// as banned peers and outbound groups.
155type peerState struct {
156	inboundPeers    map[int32]*serverPeer
157	outboundPeers   map[int32]*serverPeer
158	persistentPeers map[int32]*serverPeer
159	banned          map[string]time.Time
160	outboundGroups  map[string]int
161}
162
163// Count returns the count of all known peers.
164func (ps *peerState) Count() int {
165	return len(ps.inboundPeers) + len(ps.outboundPeers) +
166		len(ps.persistentPeers)
167}
168
169// forAllOutboundPeers is a helper function that runs closure on all outbound
170// peers known to peerState.
171func (ps *peerState) forAllOutboundPeers(closure func(sp *serverPeer)) {
172	for _, e := range ps.outboundPeers {
173		closure(e)
174	}
175	for _, e := range ps.persistentPeers {
176		closure(e)
177	}
178}
179
180// forAllPeers is a helper function that runs closure on all peers known to
181// peerState.
182func (ps *peerState) forAllPeers(closure func(sp *serverPeer)) {
183	for _, e := range ps.inboundPeers {
184		closure(e)
185	}
186	ps.forAllOutboundPeers(closure)
187}
188
189// cfHeaderKV is a tuple of a filter header and its associated block hash. The
190// struct is used to cache cfcheckpt responses.
191type cfHeaderKV struct {
192	blockHash    chainhash.Hash
193	filterHeader chainhash.Hash
194}
195
196// server provides a bitcoin server for handling communications to and from
197// bitcoin peers.
198type server struct {
199	// The following variables must only be used atomically.
200	// Putting the uint64s first makes them 64-bit aligned for 32-bit systems.
201	bytesReceived uint64 // Total bytes received from all peers since start.
202	bytesSent     uint64 // Total bytes sent by all peers since start.
203	started       int32
204	shutdown      int32
205	shutdownSched int32
206	startupTime   int64
207
208	chainParams          *chaincfg.Params
209	addrManager          *addrmgr.AddrManager
210	connManager          *connmgr.ConnManager
211	sigCache             *txscript.SigCache
212	hashCache            *txscript.HashCache
213	rpcServer            *rpcServer
214	syncManager          *netsync.SyncManager
215	chain                *blockchain.BlockChain
216	txMemPool            *mempool.TxPool
217	cpuMiner             *cpuminer.CPUMiner
218	modifyRebroadcastInv chan interface{}
219	newPeers             chan *serverPeer
220	donePeers            chan *serverPeer
221	banPeers             chan *serverPeer
222	query                chan interface{}
223	relayInv             chan relayMsg
224	broadcast            chan broadcastMsg
225	peerHeightsUpdate    chan updatePeerHeightsMsg
226	wg                   sync.WaitGroup
227	quit                 chan struct{}
228	nat                  NAT
229	db                   database.DB
230	timeSource           blockchain.MedianTimeSource
231	services             wire.ServiceFlag
232
233	// The following fields are used for optional indexes.  They will be nil
234	// if the associated index is not enabled.  These fields are set during
235	// initial creation of the server and never changed afterwards, so they
236	// do not need to be protected for concurrent access.
237	txIndex   *indexers.TxIndex
238	addrIndex *indexers.AddrIndex
239	cfIndex   *indexers.CfIndex
240
241	// The fee estimator keeps track of how long transactions are left in
242	// the mempool before they are mined into blocks.
243	feeEstimator *mempool.FeeEstimator
244
245	// cfCheckptCaches stores a cached slice of filter headers for cfcheckpt
246	// messages for each filter type.
247	cfCheckptCaches    map[wire.FilterType][]cfHeaderKV
248	cfCheckptCachesMtx sync.RWMutex
249
250	// agentBlacklist is a list of blacklisted substrings by which to filter
251	// user agents.
252	agentBlacklist []string
253
254	// agentWhitelist is a list of whitelisted user agent substrings, no
255	// whitelisting will be applied if the list is empty or nil.
256	agentWhitelist []string
257}
258
259// serverPeer extends the peer to maintain state shared by the server and
260// the blockmanager.
261type serverPeer struct {
262	// The following variables must only be used atomically
263	feeFilter int64
264
265	*peer.Peer
266
267	connReq        *connmgr.ConnReq
268	server         *server
269	persistent     bool
270	continueHash   *chainhash.Hash
271	relayMtx       sync.Mutex
272	disableRelayTx bool
273	sentAddrs      bool
274	isWhitelisted  bool
275	filter         *bloom.Filter
276	addressesMtx   sync.RWMutex
277	knownAddresses map[string]struct{}
278	banScore       connmgr.DynamicBanScore
279	quit           chan struct{}
280	// The following chans are used to sync blockmanager and server.
281	txProcessed    chan struct{}
282	blockProcessed chan struct{}
283}
284
285// newServerPeer returns a new serverPeer instance. The peer needs to be set by
286// the caller.
287func newServerPeer(s *server, isPersistent bool) *serverPeer {
288	return &serverPeer{
289		server:         s,
290		persistent:     isPersistent,
291		filter:         bloom.LoadFilter(nil),
292		knownAddresses: make(map[string]struct{}),
293		quit:           make(chan struct{}),
294		txProcessed:    make(chan struct{}, 1),
295		blockProcessed: make(chan struct{}, 1),
296	}
297}
298
299// newestBlock returns the current best block hash and height using the format
300// required by the configuration for the peer package.
301func (sp *serverPeer) newestBlock() (*chainhash.Hash, int32, error) {
302	best := sp.server.chain.BestSnapshot()
303	return &best.Hash, best.Height, nil
304}
305
306// addKnownAddresses adds the given addresses to the set of known addresses to
307// the peer to prevent sending duplicate addresses.
308func (sp *serverPeer) addKnownAddresses(addresses []*wire.NetAddress) {
309	sp.addressesMtx.Lock()
310	for _, na := range addresses {
311		sp.knownAddresses[addrmgr.NetAddressKey(na)] = struct{}{}
312	}
313	sp.addressesMtx.Unlock()
314}
315
316// addressKnown true if the given address is already known to the peer.
317func (sp *serverPeer) addressKnown(na *wire.NetAddress) bool {
318	sp.addressesMtx.RLock()
319	_, exists := sp.knownAddresses[addrmgr.NetAddressKey(na)]
320	sp.addressesMtx.RUnlock()
321	return exists
322}
323
324// setDisableRelayTx toggles relaying of transactions for the given peer.
325// It is safe for concurrent access.
326func (sp *serverPeer) setDisableRelayTx(disable bool) {
327	sp.relayMtx.Lock()
328	sp.disableRelayTx = disable
329	sp.relayMtx.Unlock()
330}
331
332// relayTxDisabled returns whether or not relaying of transactions for the given
333// peer is disabled.
334// It is safe for concurrent access.
335func (sp *serverPeer) relayTxDisabled() bool {
336	sp.relayMtx.Lock()
337	isDisabled := sp.disableRelayTx
338	sp.relayMtx.Unlock()
339
340	return isDisabled
341}
342
343// pushAddrMsg sends an addr message to the connected peer using the provided
344// addresses.
345func (sp *serverPeer) pushAddrMsg(addresses []*wire.NetAddress) {
346	// Filter addresses already known to the peer.
347	addrs := make([]*wire.NetAddress, 0, len(addresses))
348	for _, addr := range addresses {
349		if !sp.addressKnown(addr) {
350			addrs = append(addrs, addr)
351		}
352	}
353	known, err := sp.PushAddrMsg(addrs)
354	if err != nil {
355		peerLog.Errorf("Can't push address message to %s: %v", sp.Peer, err)
356		sp.Disconnect()
357		return
358	}
359	sp.addKnownAddresses(known)
360}
361
362// addBanScore increases the persistent and decaying ban score fields by the
363// values passed as parameters. If the resulting score exceeds half of the ban
364// threshold, a warning is logged including the reason provided. Further, if
365// the score is above the ban threshold, the peer will be banned and
366// disconnected.
367func (sp *serverPeer) addBanScore(persistent, transient uint32, reason string) {
368	// No warning is logged and no score is calculated if banning is disabled.
369	if cfg.DisableBanning {
370		return
371	}
372	if sp.isWhitelisted {
373		peerLog.Debugf("Misbehaving whitelisted peer %s: %s", sp, reason)
374		return
375	}
376
377	warnThreshold := cfg.BanThreshold >> 1
378	if transient == 0 && persistent == 0 {
379		// The score is not being increased, but a warning message is still
380		// logged if the score is above the warn threshold.
381		score := sp.banScore.Int()
382		if score > warnThreshold {
383			peerLog.Warnf("Misbehaving peer %s: %s -- ban score is %d, "+
384				"it was not increased this time", sp, reason, score)
385		}
386		return
387	}
388	score := sp.banScore.Increase(persistent, transient)
389	if score > warnThreshold {
390		peerLog.Warnf("Misbehaving peer %s: %s -- ban score increased to %d",
391			sp, reason, score)
392		if score > cfg.BanThreshold {
393			peerLog.Warnf("Misbehaving peer %s -- banning and disconnecting",
394				sp)
395			sp.server.BanPeer(sp)
396			sp.Disconnect()
397		}
398	}
399}
400
401// hasServices returns whether or not the provided advertised service flags have
402// all of the provided desired service flags set.
403func hasServices(advertised, desired wire.ServiceFlag) bool {
404	return advertised&desired == desired
405}
406
407// OnVersion is invoked when a peer receives a version bitcoin message
408// and is used to negotiate the protocol version details as well as kick start
409// the communications.
410func (sp *serverPeer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion) *wire.MsgReject {
411	// Update the address manager with the advertised services for outbound
412	// connections in case they have changed.  This is not done for inbound
413	// connections to help prevent malicious behavior and is skipped when
414	// running on the simulation test network since it is only intended to
415	// connect to specified peers and actively avoids advertising and
416	// connecting to discovered peers.
417	//
418	// NOTE: This is done before rejecting peers that are too old to ensure
419	// it is updated regardless in the case a new minimum protocol version is
420	// enforced and the remote node has not upgraded yet.
421	isInbound := sp.Inbound()
422	remoteAddr := sp.NA()
423	addrManager := sp.server.addrManager
424	if !cfg.SimNet && !isInbound {
425		addrManager.SetServices(remoteAddr, msg.Services)
426	}
427
428	// Ignore peers that have a protcol version that is too old.  The peer
429	// negotiation logic will disconnect it after this callback returns.
430	if msg.ProtocolVersion < int32(peer.MinAcceptableProtocolVersion) {
431		return nil
432	}
433
434	// Reject outbound peers that are not full nodes.
435	wantServices := wire.SFNodeNetwork
436	if !isInbound && !hasServices(msg.Services, wantServices) {
437		missingServices := wantServices & ^msg.Services
438		srvrLog.Debugf("Rejecting peer %s with services %v due to not "+
439			"providing desired services %v", sp.Peer, msg.Services,
440			missingServices)
441		reason := fmt.Sprintf("required services %#x not offered",
442			uint64(missingServices))
443		return wire.NewMsgReject(msg.Command(), wire.RejectNonstandard, reason)
444	}
445
446	if !cfg.SimNet && !isInbound {
447		// After soft-fork activation, only make outbound
448		// connection to peers if they flag that they're segwit
449		// enabled.
450		chain := sp.server.chain
451		segwitActive, err := chain.IsDeploymentActive(chaincfg.DeploymentSegwit)
452		if err != nil {
453			peerLog.Errorf("Unable to query for segwit soft-fork state: %v",
454				err)
455			return nil
456		}
457
458		if segwitActive && !sp.IsWitnessEnabled() {
459			peerLog.Infof("Disconnecting non-segwit peer %v, isn't segwit "+
460				"enabled and we need more segwit enabled peers", sp)
461			sp.Disconnect()
462			return nil
463		}
464	}
465
466	// Add the remote peer time as a sample for creating an offset against
467	// the local clock to keep the network time in sync.
468	sp.server.timeSource.AddTimeSample(sp.Addr(), msg.Timestamp)
469
470	// Choose whether or not to relay transactions before a filter command
471	// is received.
472	sp.setDisableRelayTx(msg.DisableRelayTx)
473
474	return nil
475}
476
477// OnVerAck is invoked when a peer receives a verack bitcoin message and is used
478// to kick start communication with them.
479func (sp *serverPeer) OnVerAck(_ *peer.Peer, _ *wire.MsgVerAck) {
480	sp.server.AddPeer(sp)
481}
482
483// OnMemPool is invoked when a peer receives a mempool bitcoin message.
484// It creates and sends an inventory message with the contents of the memory
485// pool up to the maximum inventory allowed per message.  When the peer has a
486// bloom filter loaded, the contents are filtered accordingly.
487func (sp *serverPeer) OnMemPool(_ *peer.Peer, msg *wire.MsgMemPool) {
488	// Only allow mempool requests if the server has bloom filtering
489	// enabled.
490	if sp.server.services&wire.SFNodeBloom != wire.SFNodeBloom {
491		peerLog.Debugf("peer %v sent mempool request with bloom "+
492			"filtering disabled -- disconnecting", sp)
493		sp.Disconnect()
494		return
495	}
496
497	// A decaying ban score increase is applied to prevent flooding.
498	// The ban score accumulates and passes the ban threshold if a burst of
499	// mempool messages comes from a peer. The score decays each minute to
500	// half of its value.
501	sp.addBanScore(0, 33, "mempool")
502
503	// Generate inventory message with the available transactions in the
504	// transaction memory pool.  Limit it to the max allowed inventory
505	// per message.  The NewMsgInvSizeHint function automatically limits
506	// the passed hint to the maximum allowed, so it's safe to pass it
507	// without double checking it here.
508	txMemPool := sp.server.txMemPool
509	txDescs := txMemPool.TxDescs()
510	invMsg := wire.NewMsgInvSizeHint(uint(len(txDescs)))
511
512	for _, txDesc := range txDescs {
513		// Either add all transactions when there is no bloom filter,
514		// or only the transactions that match the filter when there is
515		// one.
516		if !sp.filter.IsLoaded() || sp.filter.MatchTxAndUpdate(txDesc.Tx) {
517			iv := wire.NewInvVect(wire.InvTypeTx, txDesc.Tx.Hash())
518			invMsg.AddInvVect(iv)
519			if len(invMsg.InvList)+1 > wire.MaxInvPerMsg {
520				break
521			}
522		}
523	}
524
525	// Send the inventory message if there is anything to send.
526	if len(invMsg.InvList) > 0 {
527		sp.QueueMessage(invMsg, nil)
528	}
529}
530
531// OnTx is invoked when a peer receives a tx bitcoin message.  It blocks
532// until the bitcoin transaction has been fully processed.  Unlock the block
533// handler this does not serialize all transactions through a single thread
534// transactions don't rely on the previous one in a linear fashion like blocks.
535func (sp *serverPeer) OnTx(_ *peer.Peer, msg *wire.MsgTx) {
536	if cfg.BlocksOnly {
537		peerLog.Tracef("Ignoring tx %v from %v - blocksonly enabled",
538			msg.TxHash(), sp)
539		return
540	}
541
542	// Add the transaction to the known inventory for the peer.
543	// Convert the raw MsgTx to a btcutil.Tx which provides some convenience
544	// methods and things such as hash caching.
545	tx := btcutil.NewTx(msg)
546	iv := wire.NewInvVect(wire.InvTypeTx, tx.Hash())
547	sp.AddKnownInventory(iv)
548
549	// Queue the transaction up to be handled by the sync manager and
550	// intentionally block further receives until the transaction is fully
551	// processed and known good or bad.  This helps prevent a malicious peer
552	// from queuing up a bunch of bad transactions before disconnecting (or
553	// being disconnected) and wasting memory.
554	sp.server.syncManager.QueueTx(tx, sp.Peer, sp.txProcessed)
555	<-sp.txProcessed
556}
557
558// OnBlock is invoked when a peer receives a block bitcoin message.  It
559// blocks until the bitcoin block has been fully processed.
560func (sp *serverPeer) OnBlock(_ *peer.Peer, msg *wire.MsgBlock, buf []byte) {
561	// Convert the raw MsgBlock to a btcutil.Block which provides some
562	// convenience methods and things such as hash caching.
563	block := btcutil.NewBlockFromBlockAndBytes(msg, buf)
564
565	// Add the block to the known inventory for the peer.
566	iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash())
567	sp.AddKnownInventory(iv)
568
569	// Queue the block up to be handled by the block
570	// manager and intentionally block further receives
571	// until the bitcoin block is fully processed and known
572	// good or bad.  This helps prevent a malicious peer
573	// from queuing up a bunch of bad blocks before
574	// disconnecting (or being disconnected) and wasting
575	// memory.  Additionally, this behavior is depended on
576	// by at least the block acceptance test tool as the
577	// reference implementation processes blocks in the same
578	// thread and therefore blocks further messages until
579	// the bitcoin block has been fully processed.
580	sp.server.syncManager.QueueBlock(block, sp.Peer, sp.blockProcessed)
581	<-sp.blockProcessed
582}
583
584// OnInv is invoked when a peer receives an inv bitcoin message and is
585// used to examine the inventory being advertised by the remote peer and react
586// accordingly.  We pass the message down to blockmanager which will call
587// QueueMessage with any appropriate responses.
588func (sp *serverPeer) OnInv(_ *peer.Peer, msg *wire.MsgInv) {
589	if !cfg.BlocksOnly {
590		if len(msg.InvList) > 0 {
591			sp.server.syncManager.QueueInv(msg, sp.Peer)
592		}
593		return
594	}
595
596	newInv := wire.NewMsgInvSizeHint(uint(len(msg.InvList)))
597	for _, invVect := range msg.InvList {
598		if invVect.Type == wire.InvTypeTx {
599			peerLog.Tracef("Ignoring tx %v in inv from %v -- "+
600				"blocksonly enabled", invVect.Hash, sp)
601			if sp.ProtocolVersion() >= wire.BIP0037Version {
602				peerLog.Infof("Peer %v is announcing "+
603					"transactions -- disconnecting", sp)
604				sp.Disconnect()
605				return
606			}
607			continue
608		}
609		err := newInv.AddInvVect(invVect)
610		if err != nil {
611			peerLog.Errorf("Failed to add inventory vector: %v", err)
612			break
613		}
614	}
615
616	if len(newInv.InvList) > 0 {
617		sp.server.syncManager.QueueInv(newInv, sp.Peer)
618	}
619}
620
621// OnHeaders is invoked when a peer receives a headers bitcoin
622// message.  The message is passed down to the sync manager.
623func (sp *serverPeer) OnHeaders(_ *peer.Peer, msg *wire.MsgHeaders) {
624	sp.server.syncManager.QueueHeaders(msg, sp.Peer)
625}
626
627// handleGetData is invoked when a peer receives a getdata bitcoin message and
628// is used to deliver block and transaction information.
629func (sp *serverPeer) OnGetData(_ *peer.Peer, msg *wire.MsgGetData) {
630	numAdded := 0
631	notFound := wire.NewMsgNotFound()
632
633	length := len(msg.InvList)
634	// A decaying ban score increase is applied to prevent exhausting resources
635	// with unusually large inventory queries.
636	// Requesting more than the maximum inventory vector length within a short
637	// period of time yields a score above the default ban threshold. Sustained
638	// bursts of small requests are not penalized as that would potentially ban
639	// peers performing IBD.
640	// This incremental score decays each minute to half of its value.
641	sp.addBanScore(0, uint32(length)*99/wire.MaxInvPerMsg, "getdata")
642
643	// We wait on this wait channel periodically to prevent queuing
644	// far more data than we can send in a reasonable time, wasting memory.
645	// The waiting occurs after the database fetch for the next one to
646	// provide a little pipelining.
647	var waitChan chan struct{}
648	doneChan := make(chan struct{}, 1)
649
650	for i, iv := range msg.InvList {
651		var c chan struct{}
652		// If this will be the last message we send.
653		if i == length-1 && len(notFound.InvList) == 0 {
654			c = doneChan
655		} else if (i+1)%3 == 0 {
656			// Buffered so as to not make the send goroutine block.
657			c = make(chan struct{}, 1)
658		}
659		var err error
660		switch iv.Type {
661		case wire.InvTypeWitnessTx:
662			err = sp.server.pushTxMsg(sp, &iv.Hash, c, waitChan, wire.WitnessEncoding)
663		case wire.InvTypeTx:
664			err = sp.server.pushTxMsg(sp, &iv.Hash, c, waitChan, wire.BaseEncoding)
665		case wire.InvTypeWitnessBlock:
666			err = sp.server.pushBlockMsg(sp, &iv.Hash, c, waitChan, wire.WitnessEncoding)
667		case wire.InvTypeBlock:
668			err = sp.server.pushBlockMsg(sp, &iv.Hash, c, waitChan, wire.BaseEncoding)
669		case wire.InvTypeFilteredWitnessBlock:
670			err = sp.server.pushMerkleBlockMsg(sp, &iv.Hash, c, waitChan, wire.WitnessEncoding)
671		case wire.InvTypeFilteredBlock:
672			err = sp.server.pushMerkleBlockMsg(sp, &iv.Hash, c, waitChan, wire.BaseEncoding)
673		default:
674			peerLog.Warnf("Unknown type in inventory request %d",
675				iv.Type)
676			continue
677		}
678		if err != nil {
679			notFound.AddInvVect(iv)
680
681			// When there is a failure fetching the final entry
682			// and the done channel was sent in due to there
683			// being no outstanding not found inventory, consume
684			// it here because there is now not found inventory
685			// that will use the channel momentarily.
686			if i == len(msg.InvList)-1 && c != nil {
687				<-c
688			}
689		}
690		numAdded++
691		waitChan = c
692	}
693	if len(notFound.InvList) != 0 {
694		sp.QueueMessage(notFound, doneChan)
695	}
696
697	// Wait for messages to be sent. We can send quite a lot of data at this
698	// point and this will keep the peer busy for a decent amount of time.
699	// We don't process anything else by them in this time so that we
700	// have an idea of when we should hear back from them - else the idle
701	// timeout could fire when we were only half done sending the blocks.
702	if numAdded > 0 {
703		<-doneChan
704	}
705}
706
707// OnGetBlocks is invoked when a peer receives a getblocks bitcoin
708// message.
709func (sp *serverPeer) OnGetBlocks(_ *peer.Peer, msg *wire.MsgGetBlocks) {
710	// Find the most recent known block in the best chain based on the block
711	// locator and fetch all of the block hashes after it until either
712	// wire.MaxBlocksPerMsg have been fetched or the provided stop hash is
713	// encountered.
714	//
715	// Use the block after the genesis block if no other blocks in the
716	// provided locator are known.  This does mean the client will start
717	// over with the genesis block if unknown block locators are provided.
718	//
719	// This mirrors the behavior in the reference implementation.
720	chain := sp.server.chain
721	hashList := chain.LocateBlocks(msg.BlockLocatorHashes, &msg.HashStop,
722		wire.MaxBlocksPerMsg)
723
724	// Generate inventory message.
725	invMsg := wire.NewMsgInv()
726	for i := range hashList {
727		iv := wire.NewInvVect(wire.InvTypeBlock, &hashList[i])
728		invMsg.AddInvVect(iv)
729	}
730
731	// Send the inventory message if there is anything to send.
732	if len(invMsg.InvList) > 0 {
733		invListLen := len(invMsg.InvList)
734		if invListLen == wire.MaxBlocksPerMsg {
735			// Intentionally use a copy of the final hash so there
736			// is not a reference into the inventory slice which
737			// would prevent the entire slice from being eligible
738			// for GC as soon as it's sent.
739			continueHash := invMsg.InvList[invListLen-1].Hash
740			sp.continueHash = &continueHash
741		}
742		sp.QueueMessage(invMsg, nil)
743	}
744}
745
746// OnGetHeaders is invoked when a peer receives a getheaders bitcoin
747// message.
748func (sp *serverPeer) OnGetHeaders(_ *peer.Peer, msg *wire.MsgGetHeaders) {
749	// Ignore getheaders requests if not in sync.
750	if !sp.server.syncManager.IsCurrent() {
751		return
752	}
753
754	// Find the most recent known block in the best chain based on the block
755	// locator and fetch all of the headers after it until either
756	// wire.MaxBlockHeadersPerMsg have been fetched or the provided stop
757	// hash is encountered.
758	//
759	// Use the block after the genesis block if no other blocks in the
760	// provided locator are known.  This does mean the client will start
761	// over with the genesis block if unknown block locators are provided.
762	//
763	// This mirrors the behavior in the reference implementation.
764	chain := sp.server.chain
765	headers := chain.LocateHeaders(msg.BlockLocatorHashes, &msg.HashStop)
766
767	// Send found headers to the requesting peer.
768	blockHeaders := make([]*wire.BlockHeader, len(headers))
769	for i := range headers {
770		blockHeaders[i] = &headers[i]
771	}
772	sp.QueueMessage(&wire.MsgHeaders{Headers: blockHeaders}, nil)
773}
774
775// OnGetCFilters is invoked when a peer receives a getcfilters bitcoin message.
776func (sp *serverPeer) OnGetCFilters(_ *peer.Peer, msg *wire.MsgGetCFilters) {
777	// Ignore getcfilters requests if not in sync.
778	if !sp.server.syncManager.IsCurrent() {
779		return
780	}
781
782	// We'll also ensure that the remote party is requesting a set of
783	// filters that we actually currently maintain.
784	switch msg.FilterType {
785	case wire.GCSFilterRegular:
786		break
787
788	default:
789		peerLog.Debug("Filter request for unknown filter: %v",
790			msg.FilterType)
791		return
792	}
793
794	hashes, err := sp.server.chain.HeightToHashRange(
795		int32(msg.StartHeight), &msg.StopHash, wire.MaxGetCFiltersReqRange,
796	)
797	if err != nil {
798		peerLog.Debugf("Invalid getcfilters request: %v", err)
799		return
800	}
801
802	// Create []*chainhash.Hash from []chainhash.Hash to pass to
803	// FiltersByBlockHashes.
804	hashPtrs := make([]*chainhash.Hash, len(hashes))
805	for i := range hashes {
806		hashPtrs[i] = &hashes[i]
807	}
808
809	filters, err := sp.server.cfIndex.FiltersByBlockHashes(
810		hashPtrs, msg.FilterType,
811	)
812	if err != nil {
813		peerLog.Errorf("Error retrieving cfilters: %v", err)
814		return
815	}
816
817	for i, filterBytes := range filters {
818		if len(filterBytes) == 0 {
819			peerLog.Warnf("Could not obtain cfilter for %v",
820				hashes[i])
821			return
822		}
823
824		filterMsg := wire.NewMsgCFilter(
825			msg.FilterType, &hashes[i], filterBytes,
826		)
827		sp.QueueMessage(filterMsg, nil)
828	}
829}
830
831// OnGetCFHeaders is invoked when a peer receives a getcfheader bitcoin message.
832func (sp *serverPeer) OnGetCFHeaders(_ *peer.Peer, msg *wire.MsgGetCFHeaders) {
833	// Ignore getcfilterheader requests if not in sync.
834	if !sp.server.syncManager.IsCurrent() {
835		return
836	}
837
838	// We'll also ensure that the remote party is requesting a set of
839	// headers for filters that we actually currently maintain.
840	switch msg.FilterType {
841	case wire.GCSFilterRegular:
842		break
843
844	default:
845		peerLog.Debug("Filter request for unknown headers for "+
846			"filter: %v", msg.FilterType)
847		return
848	}
849
850	startHeight := int32(msg.StartHeight)
851	maxResults := wire.MaxCFHeadersPerMsg
852
853	// If StartHeight is positive, fetch the predecessor block hash so we
854	// can populate the PrevFilterHeader field.
855	if msg.StartHeight > 0 {
856		startHeight--
857		maxResults++
858	}
859
860	// Fetch the hashes from the block index.
861	hashList, err := sp.server.chain.HeightToHashRange(
862		startHeight, &msg.StopHash, maxResults,
863	)
864	if err != nil {
865		peerLog.Debugf("Invalid getcfheaders request: %v", err)
866	}
867
868	// This is possible if StartHeight is one greater that the height of
869	// StopHash, and we pull a valid range of hashes including the previous
870	// filter header.
871	if len(hashList) == 0 || (msg.StartHeight > 0 && len(hashList) == 1) {
872		peerLog.Debug("No results for getcfheaders request")
873		return
874	}
875
876	// Create []*chainhash.Hash from []chainhash.Hash to pass to
877	// FilterHeadersByBlockHashes.
878	hashPtrs := make([]*chainhash.Hash, len(hashList))
879	for i := range hashList {
880		hashPtrs[i] = &hashList[i]
881	}
882
883	// Fetch the raw filter hash bytes from the database for all blocks.
884	filterHashes, err := sp.server.cfIndex.FilterHashesByBlockHashes(
885		hashPtrs, msg.FilterType,
886	)
887	if err != nil {
888		peerLog.Errorf("Error retrieving cfilter hashes: %v", err)
889		return
890	}
891
892	// Generate cfheaders message and send it.
893	headersMsg := wire.NewMsgCFHeaders()
894
895	// Populate the PrevFilterHeader field.
896	if msg.StartHeight > 0 {
897		prevBlockHash := &hashList[0]
898
899		// Fetch the raw committed filter header bytes from the
900		// database.
901		headerBytes, err := sp.server.cfIndex.FilterHeaderByBlockHash(
902			prevBlockHash, msg.FilterType)
903		if err != nil {
904			peerLog.Errorf("Error retrieving CF header: %v", err)
905			return
906		}
907		if len(headerBytes) == 0 {
908			peerLog.Warnf("Could not obtain CF header for %v", prevBlockHash)
909			return
910		}
911
912		// Deserialize the hash into PrevFilterHeader.
913		err = headersMsg.PrevFilterHeader.SetBytes(headerBytes)
914		if err != nil {
915			peerLog.Warnf("Committed filter header deserialize "+
916				"failed: %v", err)
917			return
918		}
919
920		hashList = hashList[1:]
921		filterHashes = filterHashes[1:]
922	}
923
924	// Populate HeaderHashes.
925	for i, hashBytes := range filterHashes {
926		if len(hashBytes) == 0 {
927			peerLog.Warnf("Could not obtain CF hash for %v", hashList[i])
928			return
929		}
930
931		// Deserialize the hash.
932		filterHash, err := chainhash.NewHash(hashBytes)
933		if err != nil {
934			peerLog.Warnf("Committed filter hash deserialize "+
935				"failed: %v", err)
936			return
937		}
938
939		headersMsg.AddCFHash(filterHash)
940	}
941
942	headersMsg.FilterType = msg.FilterType
943	headersMsg.StopHash = msg.StopHash
944
945	sp.QueueMessage(headersMsg, nil)
946}
947
948// OnGetCFCheckpt is invoked when a peer receives a getcfcheckpt bitcoin message.
949func (sp *serverPeer) OnGetCFCheckpt(_ *peer.Peer, msg *wire.MsgGetCFCheckpt) {
950	// Ignore getcfcheckpt requests if not in sync.
951	if !sp.server.syncManager.IsCurrent() {
952		return
953	}
954
955	// We'll also ensure that the remote party is requesting a set of
956	// checkpoints for filters that we actually currently maintain.
957	switch msg.FilterType {
958	case wire.GCSFilterRegular:
959		break
960
961	default:
962		peerLog.Debug("Filter request for unknown checkpoints for "+
963			"filter: %v", msg.FilterType)
964		return
965	}
966
967	// Now that we know the client is fetching a filter that we know of,
968	// we'll fetch the block hashes et each check point interval so we can
969	// compare against our cache, and create new check points if necessary.
970	blockHashes, err := sp.server.chain.IntervalBlockHashes(
971		&msg.StopHash, wire.CFCheckptInterval,
972	)
973	if err != nil {
974		peerLog.Debugf("Invalid getcfilters request: %v", err)
975		return
976	}
977
978	checkptMsg := wire.NewMsgCFCheckpt(
979		msg.FilterType, &msg.StopHash, len(blockHashes),
980	)
981
982	// Fetch the current existing cache so we can decide if we need to
983	// extend it or if its adequate as is.
984	sp.server.cfCheckptCachesMtx.RLock()
985	checkptCache := sp.server.cfCheckptCaches[msg.FilterType]
986
987	// If the set of block hashes is beyond the current size of the cache,
988	// then we'll expand the size of the cache and also retain the write
989	// lock.
990	var updateCache bool
991	if len(blockHashes) > len(checkptCache) {
992		// Now that we know we'll need to modify the size of the cache,
993		// we'll release the read lock and grab the write lock to
994		// possibly expand the cache size.
995		sp.server.cfCheckptCachesMtx.RUnlock()
996
997		sp.server.cfCheckptCachesMtx.Lock()
998		defer sp.server.cfCheckptCachesMtx.Unlock()
999
1000		// Now that we have the write lock, we'll check again as it's
1001		// possible that the cache has already been expanded.
1002		checkptCache = sp.server.cfCheckptCaches[msg.FilterType]
1003
1004		// If we still need to expand the cache, then We'll mark that
1005		// we need to update the cache for below and also expand the
1006		// size of the cache in place.
1007		if len(blockHashes) > len(checkptCache) {
1008			updateCache = true
1009
1010			additionalLength := len(blockHashes) - len(checkptCache)
1011			newEntries := make([]cfHeaderKV, additionalLength)
1012
1013			peerLog.Infof("Growing size of checkpoint cache from %v to %v "+
1014				"block hashes", len(checkptCache), len(blockHashes))
1015
1016			checkptCache = append(
1017				sp.server.cfCheckptCaches[msg.FilterType],
1018				newEntries...,
1019			)
1020		}
1021	} else {
1022		// Otherwise, we'll hold onto the read lock for the remainder
1023		// of this method.
1024		defer sp.server.cfCheckptCachesMtx.RUnlock()
1025
1026		peerLog.Tracef("Serving stale cache of size %v",
1027			len(checkptCache))
1028	}
1029
1030	// Now that we know the cache is of an appropriate size, we'll iterate
1031	// backwards until the find the block hash. We do this as it's possible
1032	// a re-org has occurred so items in the db are now in the main china
1033	// while the cache has been partially invalidated.
1034	var forkIdx int
1035	for forkIdx = len(blockHashes); forkIdx > 0; forkIdx-- {
1036		if checkptCache[forkIdx-1].blockHash == blockHashes[forkIdx-1] {
1037			break
1038		}
1039	}
1040
1041	// Now that we know the how much of the cache is relevant for this
1042	// query, we'll populate our check point message with the cache as is.
1043	// Shortly below, we'll populate the new elements of the cache.
1044	for i := 0; i < forkIdx; i++ {
1045		checkptMsg.AddCFHeader(&checkptCache[i].filterHeader)
1046	}
1047
1048	// We'll now collect the set of hashes that are beyond our cache so we
1049	// can look up the filter headers to populate the final cache.
1050	blockHashPtrs := make([]*chainhash.Hash, 0, len(blockHashes)-forkIdx)
1051	for i := forkIdx; i < len(blockHashes); i++ {
1052		blockHashPtrs = append(blockHashPtrs, &blockHashes[i])
1053	}
1054	filterHeaders, err := sp.server.cfIndex.FilterHeadersByBlockHashes(
1055		blockHashPtrs, msg.FilterType,
1056	)
1057	if err != nil {
1058		peerLog.Errorf("Error retrieving cfilter headers: %v", err)
1059		return
1060	}
1061
1062	// Now that we have the full set of filter headers, we'll add them to
1063	// the checkpoint message, and also update our cache in line.
1064	for i, filterHeaderBytes := range filterHeaders {
1065		if len(filterHeaderBytes) == 0 {
1066			peerLog.Warnf("Could not obtain CF header for %v",
1067				blockHashPtrs[i])
1068			return
1069		}
1070
1071		filterHeader, err := chainhash.NewHash(filterHeaderBytes)
1072		if err != nil {
1073			peerLog.Warnf("Committed filter header deserialize "+
1074				"failed: %v", err)
1075			return
1076		}
1077
1078		checkptMsg.AddCFHeader(filterHeader)
1079
1080		// If the new main chain is longer than what's in the cache,
1081		// then we'll override it beyond the fork point.
1082		if updateCache {
1083			checkptCache[forkIdx+i] = cfHeaderKV{
1084				blockHash:    blockHashes[forkIdx+i],
1085				filterHeader: *filterHeader,
1086			}
1087		}
1088	}
1089
1090	// Finally, we'll update the cache if we need to, and send the final
1091	// message back to the requesting peer.
1092	if updateCache {
1093		sp.server.cfCheckptCaches[msg.FilterType] = checkptCache
1094	}
1095
1096	sp.QueueMessage(checkptMsg, nil)
1097}
1098
1099// enforceNodeBloomFlag disconnects the peer if the server is not configured to
1100// allow bloom filters.  Additionally, if the peer has negotiated to a protocol
1101// version  that is high enough to observe the bloom filter service support bit,
1102// it will be banned since it is intentionally violating the protocol.
1103func (sp *serverPeer) enforceNodeBloomFlag(cmd string) bool {
1104	if sp.server.services&wire.SFNodeBloom != wire.SFNodeBloom {
1105		// Ban the peer if the protocol version is high enough that the
1106		// peer is knowingly violating the protocol and banning is
1107		// enabled.
1108		//
1109		// NOTE: Even though the addBanScore function already examines
1110		// whether or not banning is enabled, it is checked here as well
1111		// to ensure the violation is logged and the peer is
1112		// disconnected regardless.
1113		if sp.ProtocolVersion() >= wire.BIP0111Version &&
1114			!cfg.DisableBanning {
1115
1116			// Disconnect the peer regardless of whether it was
1117			// banned.
1118			sp.addBanScore(100, 0, cmd)
1119			sp.Disconnect()
1120			return false
1121		}
1122
1123		// Disconnect the peer regardless of protocol version or banning
1124		// state.
1125		peerLog.Debugf("%s sent an unsupported %s request -- "+
1126			"disconnecting", sp, cmd)
1127		sp.Disconnect()
1128		return false
1129	}
1130
1131	return true
1132}
1133
1134// OnFeeFilter is invoked when a peer receives a feefilter bitcoin message and
1135// is used by remote peers to request that no transactions which have a fee rate
1136// lower than provided value are inventoried to them.  The peer will be
1137// disconnected if an invalid fee filter value is provided.
1138func (sp *serverPeer) OnFeeFilter(_ *peer.Peer, msg *wire.MsgFeeFilter) {
1139	// Check that the passed minimum fee is a valid amount.
1140	if msg.MinFee < 0 || msg.MinFee > btcutil.MaxSatoshi {
1141		peerLog.Debugf("Peer %v sent an invalid feefilter '%v' -- "+
1142			"disconnecting", sp, btcutil.Amount(msg.MinFee))
1143		sp.Disconnect()
1144		return
1145	}
1146
1147	atomic.StoreInt64(&sp.feeFilter, msg.MinFee)
1148}
1149
1150// OnFilterAdd is invoked when a peer receives a filteradd bitcoin
1151// message and is used by remote peers to add data to an already loaded bloom
1152// filter.  The peer will be disconnected if a filter is not loaded when this
1153// message is received or the server is not configured to allow bloom filters.
1154func (sp *serverPeer) OnFilterAdd(_ *peer.Peer, msg *wire.MsgFilterAdd) {
1155	// Disconnect and/or ban depending on the node bloom services flag and
1156	// negotiated protocol version.
1157	if !sp.enforceNodeBloomFlag(msg.Command()) {
1158		return
1159	}
1160
1161	if !sp.filter.IsLoaded() {
1162		peerLog.Debugf("%s sent a filteradd request with no filter "+
1163			"loaded -- disconnecting", sp)
1164		sp.Disconnect()
1165		return
1166	}
1167
1168	sp.filter.Add(msg.Data)
1169}
1170
1171// OnFilterClear is invoked when a peer receives a filterclear bitcoin
1172// message and is used by remote peers to clear an already loaded bloom filter.
1173// The peer will be disconnected if a filter is not loaded when this message is
1174// received  or the server is not configured to allow bloom filters.
1175func (sp *serverPeer) OnFilterClear(_ *peer.Peer, msg *wire.MsgFilterClear) {
1176	// Disconnect and/or ban depending on the node bloom services flag and
1177	// negotiated protocol version.
1178	if !sp.enforceNodeBloomFlag(msg.Command()) {
1179		return
1180	}
1181
1182	if !sp.filter.IsLoaded() {
1183		peerLog.Debugf("%s sent a filterclear request with no "+
1184			"filter loaded -- disconnecting", sp)
1185		sp.Disconnect()
1186		return
1187	}
1188
1189	sp.filter.Unload()
1190}
1191
1192// OnFilterLoad is invoked when a peer receives a filterload bitcoin
1193// message and it used to load a bloom filter that should be used for
1194// delivering merkle blocks and associated transactions that match the filter.
1195// The peer will be disconnected if the server is not configured to allow bloom
1196// filters.
1197func (sp *serverPeer) OnFilterLoad(_ *peer.Peer, msg *wire.MsgFilterLoad) {
1198	// Disconnect and/or ban depending on the node bloom services flag and
1199	// negotiated protocol version.
1200	if !sp.enforceNodeBloomFlag(msg.Command()) {
1201		return
1202	}
1203
1204	sp.setDisableRelayTx(false)
1205
1206	sp.filter.Reload(msg)
1207}
1208
1209// OnGetAddr is invoked when a peer receives a getaddr bitcoin message
1210// and is used to provide the peer with known addresses from the address
1211// manager.
1212func (sp *serverPeer) OnGetAddr(_ *peer.Peer, msg *wire.MsgGetAddr) {
1213	// Don't return any addresses when running on the simulation test
1214	// network.  This helps prevent the network from becoming another
1215	// public test network since it will not be able to learn about other
1216	// peers that have not specifically been provided.
1217	if cfg.SimNet {
1218		return
1219	}
1220
1221	// Do not accept getaddr requests from outbound peers.  This reduces
1222	// fingerprinting attacks.
1223	if !sp.Inbound() {
1224		peerLog.Debugf("Ignoring getaddr request from outbound peer ",
1225			"%v", sp)
1226		return
1227	}
1228
1229	// Only allow one getaddr request per connection to discourage
1230	// address stamping of inv announcements.
1231	if sp.sentAddrs {
1232		peerLog.Debugf("Ignoring repeated getaddr request from peer ",
1233			"%v", sp)
1234		return
1235	}
1236	sp.sentAddrs = true
1237
1238	// Get the current known addresses from the address manager.
1239	addrCache := sp.server.addrManager.AddressCache()
1240
1241	// Push the addresses.
1242	sp.pushAddrMsg(addrCache)
1243}
1244
1245// OnAddr is invoked when a peer receives an addr bitcoin message and is
1246// used to notify the server about advertised addresses.
1247func (sp *serverPeer) OnAddr(_ *peer.Peer, msg *wire.MsgAddr) {
1248	// Ignore addresses when running on the simulation test network.  This
1249	// helps prevent the network from becoming another public test network
1250	// since it will not be able to learn about other peers that have not
1251	// specifically been provided.
1252	if cfg.SimNet {
1253		return
1254	}
1255
1256	// Ignore old style addresses which don't include a timestamp.
1257	if sp.ProtocolVersion() < wire.NetAddressTimeVersion {
1258		return
1259	}
1260
1261	// A message that has no addresses is invalid.
1262	if len(msg.AddrList) == 0 {
1263		peerLog.Errorf("Command [%s] from %s does not contain any addresses",
1264			msg.Command(), sp.Peer)
1265		sp.Disconnect()
1266		return
1267	}
1268
1269	for _, na := range msg.AddrList {
1270		// Don't add more address if we're disconnecting.
1271		if !sp.Connected() {
1272			return
1273		}
1274
1275		// Set the timestamp to 5 days ago if it's more than 24 hours
1276		// in the future so this address is one of the first to be
1277		// removed when space is needed.
1278		now := time.Now()
1279		if na.Timestamp.After(now.Add(time.Minute * 10)) {
1280			na.Timestamp = now.Add(-1 * time.Hour * 24 * 5)
1281		}
1282
1283		// Add address to known addresses for this peer.
1284		sp.addKnownAddresses([]*wire.NetAddress{na})
1285	}
1286
1287	// Add addresses to server address manager.  The address manager handles
1288	// the details of things such as preventing duplicate addresses, max
1289	// addresses, and last seen updates.
1290	// XXX bitcoind gives a 2 hour time penalty here, do we want to do the
1291	// same?
1292	sp.server.addrManager.AddAddresses(msg.AddrList, sp.NA())
1293}
1294
1295// OnRead is invoked when a peer receives a message and it is used to update
1296// the bytes received by the server.
1297func (sp *serverPeer) OnRead(_ *peer.Peer, bytesRead int, msg wire.Message, err error) {
1298	sp.server.AddBytesReceived(uint64(bytesRead))
1299}
1300
1301// OnWrite is invoked when a peer sends a message and it is used to update
1302// the bytes sent by the server.
1303func (sp *serverPeer) OnWrite(_ *peer.Peer, bytesWritten int, msg wire.Message, err error) {
1304	sp.server.AddBytesSent(uint64(bytesWritten))
1305}
1306
1307// randomUint16Number returns a random uint16 in a specified input range.  Note
1308// that the range is in zeroth ordering; if you pass it 1800, you will get
1309// values from 0 to 1800.
1310func randomUint16Number(max uint16) uint16 {
1311	// In order to avoid modulo bias and ensure every possible outcome in
1312	// [0, max) has equal probability, the random number must be sampled
1313	// from a random source that has a range limited to a multiple of the
1314	// modulus.
1315	var randomNumber uint16
1316	var limitRange = (math.MaxUint16 / max) * max
1317	for {
1318		binary.Read(rand.Reader, binary.LittleEndian, &randomNumber)
1319		if randomNumber < limitRange {
1320			return (randomNumber % max)
1321		}
1322	}
1323}
1324
1325// AddRebroadcastInventory adds 'iv' to the list of inventories to be
1326// rebroadcasted at random intervals until they show up in a block.
1327func (s *server) AddRebroadcastInventory(iv *wire.InvVect, data interface{}) {
1328	// Ignore if shutting down.
1329	if atomic.LoadInt32(&s.shutdown) != 0 {
1330		return
1331	}
1332
1333	s.modifyRebroadcastInv <- broadcastInventoryAdd{invVect: iv, data: data}
1334}
1335
1336// RemoveRebroadcastInventory removes 'iv' from the list of items to be
1337// rebroadcasted if present.
1338func (s *server) RemoveRebroadcastInventory(iv *wire.InvVect) {
1339	// Ignore if shutting down.
1340	if atomic.LoadInt32(&s.shutdown) != 0 {
1341		return
1342	}
1343
1344	s.modifyRebroadcastInv <- broadcastInventoryDel(iv)
1345}
1346
1347// relayTransactions generates and relays inventory vectors for all of the
1348// passed transactions to all connected peers.
1349func (s *server) relayTransactions(txns []*mempool.TxDesc) {
1350	for _, txD := range txns {
1351		iv := wire.NewInvVect(wire.InvTypeTx, txD.Tx.Hash())
1352		s.RelayInventory(iv, txD)
1353	}
1354}
1355
1356// AnnounceNewTransactions generates and relays inventory vectors and notifies
1357// both websocket and getblocktemplate long poll clients of the passed
1358// transactions.  This function should be called whenever new transactions
1359// are added to the mempool.
1360func (s *server) AnnounceNewTransactions(txns []*mempool.TxDesc) {
1361	// Generate and relay inventory vectors for all newly accepted
1362	// transactions.
1363	s.relayTransactions(txns)
1364
1365	// Notify both websocket and getblocktemplate long poll clients of all
1366	// newly accepted transactions.
1367	if s.rpcServer != nil {
1368		s.rpcServer.NotifyNewTransactions(txns)
1369	}
1370}
1371
1372// Transaction has one confirmation on the main chain. Now we can mark it as no
1373// longer needing rebroadcasting.
1374func (s *server) TransactionConfirmed(tx *btcutil.Tx) {
1375	// Rebroadcasting is only necessary when the RPC server is active.
1376	if s.rpcServer == nil {
1377		return
1378	}
1379
1380	iv := wire.NewInvVect(wire.InvTypeTx, tx.Hash())
1381	s.RemoveRebroadcastInventory(iv)
1382}
1383
1384// pushTxMsg sends a tx message for the provided transaction hash to the
1385// connected peer.  An error is returned if the transaction hash is not known.
1386func (s *server) pushTxMsg(sp *serverPeer, hash *chainhash.Hash, doneChan chan<- struct{},
1387	waitChan <-chan struct{}, encoding wire.MessageEncoding) error {
1388
1389	// Attempt to fetch the requested transaction from the pool.  A
1390	// call could be made to check for existence first, but simply trying
1391	// to fetch a missing transaction results in the same behavior.
1392	tx, err := s.txMemPool.FetchTransaction(hash)
1393	if err != nil {
1394		peerLog.Tracef("Unable to fetch tx %v from transaction "+
1395			"pool: %v", hash, err)
1396
1397		if doneChan != nil {
1398			doneChan <- struct{}{}
1399		}
1400		return err
1401	}
1402
1403	// Once we have fetched data wait for any previous operation to finish.
1404	if waitChan != nil {
1405		<-waitChan
1406	}
1407
1408	sp.QueueMessageWithEncoding(tx.MsgTx(), doneChan, encoding)
1409
1410	return nil
1411}
1412
1413// pushBlockMsg sends a block message for the provided block hash to the
1414// connected peer.  An error is returned if the block hash is not known.
1415func (s *server) pushBlockMsg(sp *serverPeer, hash *chainhash.Hash, doneChan chan<- struct{},
1416	waitChan <-chan struct{}, encoding wire.MessageEncoding) error {
1417
1418	// Fetch the raw block bytes from the database.
1419	var blockBytes []byte
1420	err := sp.server.db.View(func(dbTx database.Tx) error {
1421		var err error
1422		blockBytes, err = dbTx.FetchBlock(hash)
1423		return err
1424	})
1425	if err != nil {
1426		peerLog.Tracef("Unable to fetch requested block hash %v: %v",
1427			hash, err)
1428
1429		if doneChan != nil {
1430			doneChan <- struct{}{}
1431		}
1432		return err
1433	}
1434
1435	// Deserialize the block.
1436	var msgBlock wire.MsgBlock
1437	err = msgBlock.Deserialize(bytes.NewReader(blockBytes))
1438	if err != nil {
1439		peerLog.Tracef("Unable to deserialize requested block hash "+
1440			"%v: %v", hash, err)
1441
1442		if doneChan != nil {
1443			doneChan <- struct{}{}
1444		}
1445		return err
1446	}
1447
1448	// Once we have fetched data wait for any previous operation to finish.
1449	if waitChan != nil {
1450		<-waitChan
1451	}
1452
1453	// We only send the channel for this message if we aren't sending
1454	// an inv straight after.
1455	var dc chan<- struct{}
1456	continueHash := sp.continueHash
1457	sendInv := continueHash != nil && continueHash.IsEqual(hash)
1458	if !sendInv {
1459		dc = doneChan
1460	}
1461	sp.QueueMessageWithEncoding(&msgBlock, dc, encoding)
1462
1463	// When the peer requests the final block that was advertised in
1464	// response to a getblocks message which requested more blocks than
1465	// would fit into a single message, send it a new inventory message
1466	// to trigger it to issue another getblocks message for the next
1467	// batch of inventory.
1468	if sendInv {
1469		best := sp.server.chain.BestSnapshot()
1470		invMsg := wire.NewMsgInvSizeHint(1)
1471		iv := wire.NewInvVect(wire.InvTypeBlock, &best.Hash)
1472		invMsg.AddInvVect(iv)
1473		sp.QueueMessage(invMsg, doneChan)
1474		sp.continueHash = nil
1475	}
1476	return nil
1477}
1478
1479// pushMerkleBlockMsg sends a merkleblock message for the provided block hash to
1480// the connected peer.  Since a merkle block requires the peer to have a filter
1481// loaded, this call will simply be ignored if there is no filter loaded.  An
1482// error is returned if the block hash is not known.
1483func (s *server) pushMerkleBlockMsg(sp *serverPeer, hash *chainhash.Hash,
1484	doneChan chan<- struct{}, waitChan <-chan struct{}, encoding wire.MessageEncoding) error {
1485
1486	// Do not send a response if the peer doesn't have a filter loaded.
1487	if !sp.filter.IsLoaded() {
1488		if doneChan != nil {
1489			doneChan <- struct{}{}
1490		}
1491		return nil
1492	}
1493
1494	// Fetch the raw block bytes from the database.
1495	blk, err := sp.server.chain.BlockByHash(hash)
1496	if err != nil {
1497		peerLog.Tracef("Unable to fetch requested block hash %v: %v",
1498			hash, err)
1499
1500		if doneChan != nil {
1501			doneChan <- struct{}{}
1502		}
1503		return err
1504	}
1505
1506	// Generate a merkle block by filtering the requested block according
1507	// to the filter for the peer.
1508	merkle, matchedTxIndices := bloom.NewMerkleBlock(blk, sp.filter)
1509
1510	// Once we have fetched data wait for any previous operation to finish.
1511	if waitChan != nil {
1512		<-waitChan
1513	}
1514
1515	// Send the merkleblock.  Only send the done channel with this message
1516	// if no transactions will be sent afterwards.
1517	var dc chan<- struct{}
1518	if len(matchedTxIndices) == 0 {
1519		dc = doneChan
1520	}
1521	sp.QueueMessage(merkle, dc)
1522
1523	// Finally, send any matched transactions.
1524	blkTransactions := blk.MsgBlock().Transactions
1525	for i, txIndex := range matchedTxIndices {
1526		// Only send the done channel on the final transaction.
1527		var dc chan<- struct{}
1528		if i == len(matchedTxIndices)-1 {
1529			dc = doneChan
1530		}
1531		if txIndex < uint32(len(blkTransactions)) {
1532			sp.QueueMessageWithEncoding(blkTransactions[txIndex], dc,
1533				encoding)
1534		}
1535	}
1536
1537	return nil
1538}
1539
1540// handleUpdatePeerHeight updates the heights of all peers who were known to
1541// announce a block we recently accepted.
1542func (s *server) handleUpdatePeerHeights(state *peerState, umsg updatePeerHeightsMsg) {
1543	state.forAllPeers(func(sp *serverPeer) {
1544		// The origin peer should already have the updated height.
1545		if sp.Peer == umsg.originPeer {
1546			return
1547		}
1548
1549		// This is a pointer to the underlying memory which doesn't
1550		// change.
1551		latestBlkHash := sp.LastAnnouncedBlock()
1552
1553		// Skip this peer if it hasn't recently announced any new blocks.
1554		if latestBlkHash == nil {
1555			return
1556		}
1557
1558		// If the peer has recently announced a block, and this block
1559		// matches our newly accepted block, then update their block
1560		// height.
1561		if *latestBlkHash == *umsg.newHash {
1562			sp.UpdateLastBlockHeight(umsg.newHeight)
1563			sp.UpdateLastAnnouncedBlock(nil)
1564		}
1565	})
1566}
1567
1568// handleAddPeerMsg deals with adding new peers.  It is invoked from the
1569// peerHandler goroutine.
1570func (s *server) handleAddPeerMsg(state *peerState, sp *serverPeer) bool {
1571	if sp == nil || !sp.Connected() {
1572		return false
1573	}
1574
1575	// Disconnect peers with unwanted user agents.
1576	if sp.HasUndesiredUserAgent(s.agentBlacklist, s.agentWhitelist) {
1577		sp.Disconnect()
1578		return false
1579	}
1580
1581	// Ignore new peers if we're shutting down.
1582	if atomic.LoadInt32(&s.shutdown) != 0 {
1583		srvrLog.Infof("New peer %s ignored - server is shutting down", sp)
1584		sp.Disconnect()
1585		return false
1586	}
1587
1588	// Disconnect banned peers.
1589	host, _, err := net.SplitHostPort(sp.Addr())
1590	if err != nil {
1591		srvrLog.Debugf("can't split hostport %v", err)
1592		sp.Disconnect()
1593		return false
1594	}
1595	if banEnd, ok := state.banned[host]; ok {
1596		if time.Now().Before(banEnd) {
1597			srvrLog.Debugf("Peer %s is banned for another %v - disconnecting",
1598				host, time.Until(banEnd))
1599			sp.Disconnect()
1600			return false
1601		}
1602
1603		srvrLog.Infof("Peer %s is no longer banned", host)
1604		delete(state.banned, host)
1605	}
1606
1607	// TODO: Check for max peers from a single IP.
1608
1609	// Limit max number of total peers.
1610	if state.Count() >= cfg.MaxPeers {
1611		srvrLog.Infof("Max peers reached [%d] - disconnecting peer %s",
1612			cfg.MaxPeers, sp)
1613		sp.Disconnect()
1614		// TODO: how to handle permanent peers here?
1615		// they should be rescheduled.
1616		return false
1617	}
1618
1619	// Add the new peer and start it.
1620	srvrLog.Debugf("New peer %s", sp)
1621	if sp.Inbound() {
1622		state.inboundPeers[sp.ID()] = sp
1623	} else {
1624		state.outboundGroups[addrmgr.GroupKey(sp.NA())]++
1625		if sp.persistent {
1626			state.persistentPeers[sp.ID()] = sp
1627		} else {
1628			state.outboundPeers[sp.ID()] = sp
1629		}
1630	}
1631
1632	// Update the address' last seen time if the peer has acknowledged
1633	// our version and has sent us its version as well.
1634	if sp.VerAckReceived() && sp.VersionKnown() && sp.NA() != nil {
1635		s.addrManager.Connected(sp.NA())
1636	}
1637
1638	// Signal the sync manager this peer is a new sync candidate.
1639	s.syncManager.NewPeer(sp.Peer)
1640
1641	// Update the address manager and request known addresses from the
1642	// remote peer for outbound connections. This is skipped when running on
1643	// the simulation test network since it is only intended to connect to
1644	// specified peers and actively avoids advertising and connecting to
1645	// discovered peers.
1646	if !cfg.SimNet && !sp.Inbound() {
1647		// Advertise the local address when the server accepts incoming
1648		// connections and it believes itself to be close to the best
1649		// known tip.
1650		if !cfg.DisableListen && s.syncManager.IsCurrent() {
1651			// Get address that best matches.
1652			lna := s.addrManager.GetBestLocalAddress(sp.NA())
1653			if addrmgr.IsRoutable(lna) {
1654				// Filter addresses the peer already knows about.
1655				addresses := []*wire.NetAddress{lna}
1656				sp.pushAddrMsg(addresses)
1657			}
1658		}
1659
1660		// Request known addresses if the server address manager needs
1661		// more and the peer has a protocol version new enough to
1662		// include a timestamp with addresses.
1663		hasTimestamp := sp.ProtocolVersion() >= wire.NetAddressTimeVersion
1664		if s.addrManager.NeedMoreAddresses() && hasTimestamp {
1665			sp.QueueMessage(wire.NewMsgGetAddr(), nil)
1666		}
1667
1668		// Mark the address as a known good address.
1669		s.addrManager.Good(sp.NA())
1670	}
1671
1672	return true
1673}
1674
1675// handleDonePeerMsg deals with peers that have signalled they are done.  It is
1676// invoked from the peerHandler goroutine.
1677func (s *server) handleDonePeerMsg(state *peerState, sp *serverPeer) {
1678	var list map[int32]*serverPeer
1679	if sp.persistent {
1680		list = state.persistentPeers
1681	} else if sp.Inbound() {
1682		list = state.inboundPeers
1683	} else {
1684		list = state.outboundPeers
1685	}
1686
1687	// Regardless of whether the peer was found in our list, we'll inform
1688	// our connection manager about the disconnection. This can happen if we
1689	// process a peer's `done` message before its `add`.
1690	if !sp.Inbound() {
1691		if sp.persistent {
1692			s.connManager.Disconnect(sp.connReq.ID())
1693		} else {
1694			s.connManager.Remove(sp.connReq.ID())
1695			go s.connManager.NewConnReq()
1696		}
1697	}
1698
1699	if _, ok := list[sp.ID()]; ok {
1700		if !sp.Inbound() && sp.VersionKnown() {
1701			state.outboundGroups[addrmgr.GroupKey(sp.NA())]--
1702		}
1703		delete(list, sp.ID())
1704		srvrLog.Debugf("Removed peer %s", sp)
1705		return
1706	}
1707}
1708
1709// handleBanPeerMsg deals with banning peers.  It is invoked from the
1710// peerHandler goroutine.
1711func (s *server) handleBanPeerMsg(state *peerState, sp *serverPeer) {
1712	host, _, err := net.SplitHostPort(sp.Addr())
1713	if err != nil {
1714		srvrLog.Debugf("can't split ban peer %s %v", sp.Addr(), err)
1715		return
1716	}
1717	direction := directionString(sp.Inbound())
1718	srvrLog.Infof("Banned peer %s (%s) for %v", host, direction,
1719		cfg.BanDuration)
1720	state.banned[host] = time.Now().Add(cfg.BanDuration)
1721}
1722
1723// handleRelayInvMsg deals with relaying inventory to peers that are not already
1724// known to have it.  It is invoked from the peerHandler goroutine.
1725func (s *server) handleRelayInvMsg(state *peerState, msg relayMsg) {
1726	state.forAllPeers(func(sp *serverPeer) {
1727		if !sp.Connected() {
1728			return
1729		}
1730
1731		// If the inventory is a block and the peer prefers headers,
1732		// generate and send a headers message instead of an inventory
1733		// message.
1734		if msg.invVect.Type == wire.InvTypeBlock && sp.WantsHeaders() {
1735			blockHeader, ok := msg.data.(wire.BlockHeader)
1736			if !ok {
1737				peerLog.Warnf("Underlying data for headers" +
1738					" is not a block header")
1739				return
1740			}
1741			msgHeaders := wire.NewMsgHeaders()
1742			if err := msgHeaders.AddBlockHeader(&blockHeader); err != nil {
1743				peerLog.Errorf("Failed to add block"+
1744					" header: %v", err)
1745				return
1746			}
1747			sp.QueueMessage(msgHeaders, nil)
1748			return
1749		}
1750
1751		if msg.invVect.Type == wire.InvTypeTx {
1752			// Don't relay the transaction to the peer when it has
1753			// transaction relaying disabled.
1754			if sp.relayTxDisabled() {
1755				return
1756			}
1757
1758			txD, ok := msg.data.(*mempool.TxDesc)
1759			if !ok {
1760				peerLog.Warnf("Underlying data for tx inv "+
1761					"relay is not a *mempool.TxDesc: %T",
1762					msg.data)
1763				return
1764			}
1765
1766			// Don't relay the transaction if the transaction fee-per-kb
1767			// is less than the peer's feefilter.
1768			feeFilter := atomic.LoadInt64(&sp.feeFilter)
1769			if feeFilter > 0 && txD.FeePerKB < feeFilter {
1770				return
1771			}
1772
1773			// Don't relay the transaction if there is a bloom
1774			// filter loaded and the transaction doesn't match it.
1775			if sp.filter.IsLoaded() {
1776				if !sp.filter.MatchTxAndUpdate(txD.Tx) {
1777					return
1778				}
1779			}
1780		}
1781
1782		// Queue the inventory to be relayed with the next batch.
1783		// It will be ignored if the peer is already known to
1784		// have the inventory.
1785		sp.QueueInventory(msg.invVect)
1786	})
1787}
1788
1789// handleBroadcastMsg deals with broadcasting messages to peers.  It is invoked
1790// from the peerHandler goroutine.
1791func (s *server) handleBroadcastMsg(state *peerState, bmsg *broadcastMsg) {
1792	state.forAllPeers(func(sp *serverPeer) {
1793		if !sp.Connected() {
1794			return
1795		}
1796
1797		for _, ep := range bmsg.excludePeers {
1798			if sp == ep {
1799				return
1800			}
1801		}
1802
1803		sp.QueueMessage(bmsg.message, nil)
1804	})
1805}
1806
1807type getConnCountMsg struct {
1808	reply chan int32
1809}
1810
1811type getPeersMsg struct {
1812	reply chan []*serverPeer
1813}
1814
1815type getOutboundGroup struct {
1816	key   string
1817	reply chan int
1818}
1819
1820type getAddedNodesMsg struct {
1821	reply chan []*serverPeer
1822}
1823
1824type disconnectNodeMsg struct {
1825	cmp   func(*serverPeer) bool
1826	reply chan error
1827}
1828
1829type connectNodeMsg struct {
1830	addr      string
1831	permanent bool
1832	reply     chan error
1833}
1834
1835type removeNodeMsg struct {
1836	cmp   func(*serverPeer) bool
1837	reply chan error
1838}
1839
1840// handleQuery is the central handler for all queries and commands from other
1841// goroutines related to peer state.
1842func (s *server) handleQuery(state *peerState, querymsg interface{}) {
1843	switch msg := querymsg.(type) {
1844	case getConnCountMsg:
1845		nconnected := int32(0)
1846		state.forAllPeers(func(sp *serverPeer) {
1847			if sp.Connected() {
1848				nconnected++
1849			}
1850		})
1851		msg.reply <- nconnected
1852
1853	case getPeersMsg:
1854		peers := make([]*serverPeer, 0, state.Count())
1855		state.forAllPeers(func(sp *serverPeer) {
1856			if !sp.Connected() {
1857				return
1858			}
1859			peers = append(peers, sp)
1860		})
1861		msg.reply <- peers
1862
1863	case connectNodeMsg:
1864		// TODO: duplicate oneshots?
1865		// Limit max number of total peers.
1866		if state.Count() >= cfg.MaxPeers {
1867			msg.reply <- errors.New("max peers reached")
1868			return
1869		}
1870		for _, peer := range state.persistentPeers {
1871			if peer.Addr() == msg.addr {
1872				if msg.permanent {
1873					msg.reply <- errors.New("peer already connected")
1874				} else {
1875					msg.reply <- errors.New("peer exists as a permanent peer")
1876				}
1877				return
1878			}
1879		}
1880
1881		netAddr, err := addrStringToNetAddr(msg.addr)
1882		if err != nil {
1883			msg.reply <- err
1884			return
1885		}
1886
1887		// TODO: if too many, nuke a non-perm peer.
1888		go s.connManager.Connect(&connmgr.ConnReq{
1889			Addr:      netAddr,
1890			Permanent: msg.permanent,
1891		})
1892		msg.reply <- nil
1893	case removeNodeMsg:
1894		found := disconnectPeer(state.persistentPeers, msg.cmp, func(sp *serverPeer) {
1895			// Keep group counts ok since we remove from
1896			// the list now.
1897			state.outboundGroups[addrmgr.GroupKey(sp.NA())]--
1898		})
1899
1900		if found {
1901			msg.reply <- nil
1902		} else {
1903			msg.reply <- errors.New("peer not found")
1904		}
1905	case getOutboundGroup:
1906		count, ok := state.outboundGroups[msg.key]
1907		if ok {
1908			msg.reply <- count
1909		} else {
1910			msg.reply <- 0
1911		}
1912	// Request a list of the persistent (added) peers.
1913	case getAddedNodesMsg:
1914		// Respond with a slice of the relevant peers.
1915		peers := make([]*serverPeer, 0, len(state.persistentPeers))
1916		for _, sp := range state.persistentPeers {
1917			peers = append(peers, sp)
1918		}
1919		msg.reply <- peers
1920	case disconnectNodeMsg:
1921		// Check inbound peers. We pass a nil callback since we don't
1922		// require any additional actions on disconnect for inbound peers.
1923		found := disconnectPeer(state.inboundPeers, msg.cmp, nil)
1924		if found {
1925			msg.reply <- nil
1926			return
1927		}
1928
1929		// Check outbound peers.
1930		found = disconnectPeer(state.outboundPeers, msg.cmp, func(sp *serverPeer) {
1931			// Keep group counts ok since we remove from
1932			// the list now.
1933			state.outboundGroups[addrmgr.GroupKey(sp.NA())]--
1934		})
1935		if found {
1936			// If there are multiple outbound connections to the same
1937			// ip:port, continue disconnecting them all until no such
1938			// peers are found.
1939			for found {
1940				found = disconnectPeer(state.outboundPeers, msg.cmp, func(sp *serverPeer) {
1941					state.outboundGroups[addrmgr.GroupKey(sp.NA())]--
1942				})
1943			}
1944			msg.reply <- nil
1945			return
1946		}
1947
1948		msg.reply <- errors.New("peer not found")
1949	}
1950}
1951
1952// disconnectPeer attempts to drop the connection of a targeted peer in the
1953// passed peer list. Targets are identified via usage of the passed
1954// `compareFunc`, which should return `true` if the passed peer is the target
1955// peer. This function returns true on success and false if the peer is unable
1956// to be located. If the peer is found, and the passed callback: `whenFound'
1957// isn't nil, we call it with the peer as the argument before it is removed
1958// from the peerList, and is disconnected from the server.
1959func disconnectPeer(peerList map[int32]*serverPeer, compareFunc func(*serverPeer) bool, whenFound func(*serverPeer)) bool {
1960	for addr, peer := range peerList {
1961		if compareFunc(peer) {
1962			if whenFound != nil {
1963				whenFound(peer)
1964			}
1965
1966			// This is ok because we are not continuing
1967			// to iterate so won't corrupt the loop.
1968			delete(peerList, addr)
1969			peer.Disconnect()
1970			return true
1971		}
1972	}
1973	return false
1974}
1975
1976// newPeerConfig returns the configuration for the given serverPeer.
1977func newPeerConfig(sp *serverPeer) *peer.Config {
1978	return &peer.Config{
1979		Listeners: peer.MessageListeners{
1980			OnVersion:      sp.OnVersion,
1981			OnVerAck:       sp.OnVerAck,
1982			OnMemPool:      sp.OnMemPool,
1983			OnTx:           sp.OnTx,
1984			OnBlock:        sp.OnBlock,
1985			OnInv:          sp.OnInv,
1986			OnHeaders:      sp.OnHeaders,
1987			OnGetData:      sp.OnGetData,
1988			OnGetBlocks:    sp.OnGetBlocks,
1989			OnGetHeaders:   sp.OnGetHeaders,
1990			OnGetCFilters:  sp.OnGetCFilters,
1991			OnGetCFHeaders: sp.OnGetCFHeaders,
1992			OnGetCFCheckpt: sp.OnGetCFCheckpt,
1993			OnFeeFilter:    sp.OnFeeFilter,
1994			OnFilterAdd:    sp.OnFilterAdd,
1995			OnFilterClear:  sp.OnFilterClear,
1996			OnFilterLoad:   sp.OnFilterLoad,
1997			OnGetAddr:      sp.OnGetAddr,
1998			OnAddr:         sp.OnAddr,
1999			OnRead:         sp.OnRead,
2000			OnWrite:        sp.OnWrite,
2001
2002			// Note: The reference client currently bans peers that send alerts
2003			// not signed with its key.  We could verify against their key, but
2004			// since the reference client is currently unwilling to support
2005			// other implementations' alert messages, we will not relay theirs.
2006			OnAlert: nil,
2007		},
2008		NewestBlock:       sp.newestBlock,
2009		HostToNetAddress:  sp.server.addrManager.HostToNetAddress,
2010		Proxy:             cfg.Proxy,
2011		UserAgentName:     userAgentName,
2012		UserAgentVersion:  userAgentVersion,
2013		UserAgentComments: cfg.UserAgentComments,
2014		ChainParams:       sp.server.chainParams,
2015		Services:          sp.server.services,
2016		DisableRelayTx:    cfg.BlocksOnly,
2017		ProtocolVersion:   peer.MaxProtocolVersion,
2018		TrickleInterval:   cfg.TrickleInterval,
2019	}
2020}
2021
2022// inboundPeerConnected is invoked by the connection manager when a new inbound
2023// connection is established.  It initializes a new inbound server peer
2024// instance, associates it with the connection, and starts a goroutine to wait
2025// for disconnection.
2026func (s *server) inboundPeerConnected(conn net.Conn) {
2027	sp := newServerPeer(s, false)
2028	sp.isWhitelisted = isWhitelisted(conn.RemoteAddr())
2029	sp.Peer = peer.NewInboundPeer(newPeerConfig(sp))
2030	sp.AssociateConnection(conn)
2031	go s.peerDoneHandler(sp)
2032}
2033
2034// outboundPeerConnected is invoked by the connection manager when a new
2035// outbound connection is established.  It initializes a new outbound server
2036// peer instance, associates it with the relevant state such as the connection
2037// request instance and the connection itself, and finally notifies the address
2038// manager of the attempt.
2039func (s *server) outboundPeerConnected(c *connmgr.ConnReq, conn net.Conn) {
2040	sp := newServerPeer(s, c.Permanent)
2041	p, err := peer.NewOutboundPeer(newPeerConfig(sp), c.Addr.String())
2042	if err != nil {
2043		srvrLog.Debugf("Cannot create outbound peer %s: %v", c.Addr, err)
2044		if c.Permanent {
2045			s.connManager.Disconnect(c.ID())
2046		} else {
2047			s.connManager.Remove(c.ID())
2048			go s.connManager.NewConnReq()
2049		}
2050		return
2051	}
2052	sp.Peer = p
2053	sp.connReq = c
2054	sp.isWhitelisted = isWhitelisted(conn.RemoteAddr())
2055	sp.AssociateConnection(conn)
2056	go s.peerDoneHandler(sp)
2057}
2058
2059// peerDoneHandler handles peer disconnects by notifiying the server that it's
2060// done along with other performing other desirable cleanup.
2061func (s *server) peerDoneHandler(sp *serverPeer) {
2062	sp.WaitForDisconnect()
2063	s.donePeers <- sp
2064
2065	// Only tell sync manager we are gone if we ever told it we existed.
2066	if sp.VerAckReceived() {
2067		s.syncManager.DonePeer(sp.Peer)
2068
2069		// Evict any remaining orphans that were sent by the peer.
2070		numEvicted := s.txMemPool.RemoveOrphansByTag(mempool.Tag(sp.ID()))
2071		if numEvicted > 0 {
2072			txmpLog.Debugf("Evicted %d %s from peer %v (id %d)",
2073				numEvicted, pickNoun(numEvicted, "orphan",
2074					"orphans"), sp, sp.ID())
2075		}
2076	}
2077	close(sp.quit)
2078}
2079
2080// peerHandler is used to handle peer operations such as adding and removing
2081// peers to and from the server, banning peers, and broadcasting messages to
2082// peers.  It must be run in a goroutine.
2083func (s *server) peerHandler() {
2084	// Start the address manager and sync manager, both of which are needed
2085	// by peers.  This is done here since their lifecycle is closely tied
2086	// to this handler and rather than adding more channels to sychronize
2087	// things, it's easier and slightly faster to simply start and stop them
2088	// in this handler.
2089	s.addrManager.Start()
2090	s.syncManager.Start()
2091
2092	srvrLog.Tracef("Starting peer handler")
2093
2094	state := &peerState{
2095		inboundPeers:    make(map[int32]*serverPeer),
2096		persistentPeers: make(map[int32]*serverPeer),
2097		outboundPeers:   make(map[int32]*serverPeer),
2098		banned:          make(map[string]time.Time),
2099		outboundGroups:  make(map[string]int),
2100	}
2101
2102	if !cfg.DisableDNSSeed {
2103		// Add peers discovered through DNS to the address manager.
2104		connmgr.SeedFromDNS(activeNetParams.Params, defaultRequiredServices,
2105			btcdLookup, func(addrs []*wire.NetAddress) {
2106				// Bitcoind uses a lookup of the dns seeder here. This
2107				// is rather strange since the values looked up by the
2108				// DNS seed lookups will vary quite a lot.
2109				// to replicate this behaviour we put all addresses as
2110				// having come from the first one.
2111				s.addrManager.AddAddresses(addrs, addrs[0])
2112			})
2113	}
2114	go s.connManager.Start()
2115
2116out:
2117	for {
2118		select {
2119		// New peers connected to the server.
2120		case p := <-s.newPeers:
2121			s.handleAddPeerMsg(state, p)
2122
2123		// Disconnected peers.
2124		case p := <-s.donePeers:
2125			s.handleDonePeerMsg(state, p)
2126
2127		// Block accepted in mainchain or orphan, update peer height.
2128		case umsg := <-s.peerHeightsUpdate:
2129			s.handleUpdatePeerHeights(state, umsg)
2130
2131		// Peer to ban.
2132		case p := <-s.banPeers:
2133			s.handleBanPeerMsg(state, p)
2134
2135		// New inventory to potentially be relayed to other peers.
2136		case invMsg := <-s.relayInv:
2137			s.handleRelayInvMsg(state, invMsg)
2138
2139		// Message to broadcast to all connected peers except those
2140		// which are excluded by the message.
2141		case bmsg := <-s.broadcast:
2142			s.handleBroadcastMsg(state, &bmsg)
2143
2144		case qmsg := <-s.query:
2145			s.handleQuery(state, qmsg)
2146
2147		case <-s.quit:
2148			// Disconnect all peers on server shutdown.
2149			state.forAllPeers(func(sp *serverPeer) {
2150				srvrLog.Tracef("Shutdown peer %s", sp)
2151				sp.Disconnect()
2152			})
2153			break out
2154		}
2155	}
2156
2157	s.connManager.Stop()
2158	s.syncManager.Stop()
2159	s.addrManager.Stop()
2160
2161	// Drain channels before exiting so nothing is left waiting around
2162	// to send.
2163cleanup:
2164	for {
2165		select {
2166		case <-s.newPeers:
2167		case <-s.donePeers:
2168		case <-s.peerHeightsUpdate:
2169		case <-s.relayInv:
2170		case <-s.broadcast:
2171		case <-s.query:
2172		default:
2173			break cleanup
2174		}
2175	}
2176	s.wg.Done()
2177	srvrLog.Tracef("Peer handler done")
2178}
2179
2180// AddPeer adds a new peer that has already been connected to the server.
2181func (s *server) AddPeer(sp *serverPeer) {
2182	s.newPeers <- sp
2183}
2184
2185// BanPeer bans a peer that has already been connected to the server by ip.
2186func (s *server) BanPeer(sp *serverPeer) {
2187	s.banPeers <- sp
2188}
2189
2190// RelayInventory relays the passed inventory vector to all connected peers
2191// that are not already known to have it.
2192func (s *server) RelayInventory(invVect *wire.InvVect, data interface{}) {
2193	s.relayInv <- relayMsg{invVect: invVect, data: data}
2194}
2195
2196// BroadcastMessage sends msg to all peers currently connected to the server
2197// except those in the passed peers to exclude.
2198func (s *server) BroadcastMessage(msg wire.Message, exclPeers ...*serverPeer) {
2199	// XXX: Need to determine if this is an alert that has already been
2200	// broadcast and refrain from broadcasting again.
2201	bmsg := broadcastMsg{message: msg, excludePeers: exclPeers}
2202	s.broadcast <- bmsg
2203}
2204
2205// ConnectedCount returns the number of currently connected peers.
2206func (s *server) ConnectedCount() int32 {
2207	replyChan := make(chan int32)
2208
2209	s.query <- getConnCountMsg{reply: replyChan}
2210
2211	return <-replyChan
2212}
2213
2214// OutboundGroupCount returns the number of peers connected to the given
2215// outbound group key.
2216func (s *server) OutboundGroupCount(key string) int {
2217	replyChan := make(chan int)
2218	s.query <- getOutboundGroup{key: key, reply: replyChan}
2219	return <-replyChan
2220}
2221
2222// AddBytesSent adds the passed number of bytes to the total bytes sent counter
2223// for the server.  It is safe for concurrent access.
2224func (s *server) AddBytesSent(bytesSent uint64) {
2225	atomic.AddUint64(&s.bytesSent, bytesSent)
2226}
2227
2228// AddBytesReceived adds the passed number of bytes to the total bytes received
2229// counter for the server.  It is safe for concurrent access.
2230func (s *server) AddBytesReceived(bytesReceived uint64) {
2231	atomic.AddUint64(&s.bytesReceived, bytesReceived)
2232}
2233
2234// NetTotals returns the sum of all bytes received and sent across the network
2235// for all peers.  It is safe for concurrent access.
2236func (s *server) NetTotals() (uint64, uint64) {
2237	return atomic.LoadUint64(&s.bytesReceived),
2238		atomic.LoadUint64(&s.bytesSent)
2239}
2240
2241// UpdatePeerHeights updates the heights of all peers who have have announced
2242// the latest connected main chain block, or a recognized orphan. These height
2243// updates allow us to dynamically refresh peer heights, ensuring sync peer
2244// selection has access to the latest block heights for each peer.
2245func (s *server) UpdatePeerHeights(latestBlkHash *chainhash.Hash, latestHeight int32, updateSource *peer.Peer) {
2246	s.peerHeightsUpdate <- updatePeerHeightsMsg{
2247		newHash:    latestBlkHash,
2248		newHeight:  latestHeight,
2249		originPeer: updateSource,
2250	}
2251}
2252
2253// rebroadcastHandler keeps track of user submitted inventories that we have
2254// sent out but have not yet made it into a block. We periodically rebroadcast
2255// them in case our peers restarted or otherwise lost track of them.
2256func (s *server) rebroadcastHandler() {
2257	// Wait 5 min before first tx rebroadcast.
2258	timer := time.NewTimer(5 * time.Minute)
2259	pendingInvs := make(map[wire.InvVect]interface{})
2260
2261out:
2262	for {
2263		select {
2264		case riv := <-s.modifyRebroadcastInv:
2265			switch msg := riv.(type) {
2266			// Incoming InvVects are added to our map of RPC txs.
2267			case broadcastInventoryAdd:
2268				pendingInvs[*msg.invVect] = msg.data
2269
2270			// When an InvVect has been added to a block, we can
2271			// now remove it, if it was present.
2272			case broadcastInventoryDel:
2273				if _, ok := pendingInvs[*msg]; ok {
2274					delete(pendingInvs, *msg)
2275				}
2276			}
2277
2278		case <-timer.C:
2279			// Any inventory we have has not made it into a block
2280			// yet. We periodically resubmit them until they have.
2281			for iv, data := range pendingInvs {
2282				ivCopy := iv
2283				s.RelayInventory(&ivCopy, data)
2284			}
2285
2286			// Process at a random time up to 30mins (in seconds)
2287			// in the future.
2288			timer.Reset(time.Second *
2289				time.Duration(randomUint16Number(1800)))
2290
2291		case <-s.quit:
2292			break out
2293		}
2294	}
2295
2296	timer.Stop()
2297
2298	// Drain channels before exiting so nothing is left waiting around
2299	// to send.
2300cleanup:
2301	for {
2302		select {
2303		case <-s.modifyRebroadcastInv:
2304		default:
2305			break cleanup
2306		}
2307	}
2308	s.wg.Done()
2309}
2310
2311// Start begins accepting connections from peers.
2312func (s *server) Start() {
2313	// Already started?
2314	if atomic.AddInt32(&s.started, 1) != 1 {
2315		return
2316	}
2317
2318	srvrLog.Trace("Starting server")
2319
2320	// Server startup time. Used for the uptime command for uptime calculation.
2321	s.startupTime = time.Now().Unix()
2322
2323	// Start the peer handler which in turn starts the address and block
2324	// managers.
2325	s.wg.Add(1)
2326	go s.peerHandler()
2327
2328	if s.nat != nil {
2329		s.wg.Add(1)
2330		go s.upnpUpdateThread()
2331	}
2332
2333	if !cfg.DisableRPC {
2334		s.wg.Add(1)
2335
2336		// Start the rebroadcastHandler, which ensures user tx received by
2337		// the RPC server are rebroadcast until being included in a block.
2338		go s.rebroadcastHandler()
2339
2340		s.rpcServer.Start()
2341	}
2342
2343	// Start the CPU miner if generation is enabled.
2344	if cfg.Generate {
2345		s.cpuMiner.Start()
2346	}
2347}
2348
2349// Stop gracefully shuts down the server by stopping and disconnecting all
2350// peers and the main listener.
2351func (s *server) Stop() error {
2352	// Make sure this only happens once.
2353	if atomic.AddInt32(&s.shutdown, 1) != 1 {
2354		srvrLog.Infof("Server is already in the process of shutting down")
2355		return nil
2356	}
2357
2358	srvrLog.Warnf("Server shutting down")
2359
2360	// Stop the CPU miner if needed
2361	s.cpuMiner.Stop()
2362
2363	// Shutdown the RPC server if it's not disabled.
2364	if !cfg.DisableRPC {
2365		s.rpcServer.Stop()
2366	}
2367
2368	// Save fee estimator state in the database.
2369	s.db.Update(func(tx database.Tx) error {
2370		metadata := tx.Metadata()
2371		metadata.Put(mempool.EstimateFeeDatabaseKey, s.feeEstimator.Save())
2372
2373		return nil
2374	})
2375
2376	// Signal the remaining goroutines to quit.
2377	close(s.quit)
2378	return nil
2379}
2380
2381// WaitForShutdown blocks until the main listener and peer handlers are stopped.
2382func (s *server) WaitForShutdown() {
2383	s.wg.Wait()
2384}
2385
2386// ScheduleShutdown schedules a server shutdown after the specified duration.
2387// It also dynamically adjusts how often to warn the server is going down based
2388// on remaining duration.
2389func (s *server) ScheduleShutdown(duration time.Duration) {
2390	// Don't schedule shutdown more than once.
2391	if atomic.AddInt32(&s.shutdownSched, 1) != 1 {
2392		return
2393	}
2394	srvrLog.Warnf("Server shutdown in %v", duration)
2395	go func() {
2396		remaining := duration
2397		tickDuration := dynamicTickDuration(remaining)
2398		done := time.After(remaining)
2399		ticker := time.NewTicker(tickDuration)
2400	out:
2401		for {
2402			select {
2403			case <-done:
2404				ticker.Stop()
2405				s.Stop()
2406				break out
2407			case <-ticker.C:
2408				remaining = remaining - tickDuration
2409				if remaining < time.Second {
2410					continue
2411				}
2412
2413				// Change tick duration dynamically based on remaining time.
2414				newDuration := dynamicTickDuration(remaining)
2415				if tickDuration != newDuration {
2416					tickDuration = newDuration
2417					ticker.Stop()
2418					ticker = time.NewTicker(tickDuration)
2419				}
2420				srvrLog.Warnf("Server shutdown in %v", remaining)
2421			}
2422		}
2423	}()
2424}
2425
2426// parseListeners determines whether each listen address is IPv4 and IPv6 and
2427// returns a slice of appropriate net.Addrs to listen on with TCP. It also
2428// properly detects addresses which apply to "all interfaces" and adds the
2429// address as both IPv4 and IPv6.
2430func parseListeners(addrs []string) ([]net.Addr, error) {
2431	netAddrs := make([]net.Addr, 0, len(addrs)*2)
2432	for _, addr := range addrs {
2433		host, _, err := net.SplitHostPort(addr)
2434		if err != nil {
2435			// Shouldn't happen due to already being normalized.
2436			return nil, err
2437		}
2438
2439		// Empty host or host of * on plan9 is both IPv4 and IPv6.
2440		if host == "" || (host == "*" && runtime.GOOS == "plan9") {
2441			netAddrs = append(netAddrs, simpleAddr{net: "tcp4", addr: addr})
2442			netAddrs = append(netAddrs, simpleAddr{net: "tcp6", addr: addr})
2443			continue
2444		}
2445
2446		// Strip IPv6 zone id if present since net.ParseIP does not
2447		// handle it.
2448		zoneIndex := strings.LastIndex(host, "%")
2449		if zoneIndex > 0 {
2450			host = host[:zoneIndex]
2451		}
2452
2453		// Parse the IP.
2454		ip := net.ParseIP(host)
2455		if ip == nil {
2456			return nil, fmt.Errorf("'%s' is not a valid IP address", host)
2457		}
2458
2459		// To4 returns nil when the IP is not an IPv4 address, so use
2460		// this determine the address type.
2461		if ip.To4() == nil {
2462			netAddrs = append(netAddrs, simpleAddr{net: "tcp6", addr: addr})
2463		} else {
2464			netAddrs = append(netAddrs, simpleAddr{net: "tcp4", addr: addr})
2465		}
2466	}
2467	return netAddrs, nil
2468}
2469
2470func (s *server) upnpUpdateThread() {
2471	// Go off immediately to prevent code duplication, thereafter we renew
2472	// lease every 15 minutes.
2473	timer := time.NewTimer(0 * time.Second)
2474	lport, _ := strconv.ParseInt(activeNetParams.DefaultPort, 10, 16)
2475	first := true
2476out:
2477	for {
2478		select {
2479		case <-timer.C:
2480			// TODO: pick external port  more cleverly
2481			// TODO: know which ports we are listening to on an external net.
2482			// TODO: if specific listen port doesn't work then ask for wildcard
2483			// listen port?
2484			// XXX this assumes timeout is in seconds.
2485			listenPort, err := s.nat.AddPortMapping("tcp", int(lport), int(lport),
2486				"btcd listen port", 20*60)
2487			if err != nil {
2488				srvrLog.Warnf("can't add UPnP port mapping: %v", err)
2489			}
2490			if first && err == nil {
2491				// TODO: look this up periodically to see if upnp domain changed
2492				// and so did ip.
2493				externalip, err := s.nat.GetExternalAddress()
2494				if err != nil {
2495					srvrLog.Warnf("UPnP can't get external address: %v", err)
2496					continue out
2497				}
2498				na := wire.NewNetAddressIPPort(externalip, uint16(listenPort),
2499					s.services)
2500				err = s.addrManager.AddLocalAddress(na, addrmgr.UpnpPrio)
2501				if err != nil {
2502					// XXX DeletePortMapping?
2503				}
2504				srvrLog.Warnf("Successfully bound via UPnP to %s", addrmgr.NetAddressKey(na))
2505				first = false
2506			}
2507			timer.Reset(time.Minute * 15)
2508		case <-s.quit:
2509			break out
2510		}
2511	}
2512
2513	timer.Stop()
2514
2515	if err := s.nat.DeletePortMapping("tcp", int(lport), int(lport)); err != nil {
2516		srvrLog.Warnf("unable to remove UPnP port mapping: %v", err)
2517	} else {
2518		srvrLog.Debugf("successfully disestablished UPnP port mapping")
2519	}
2520
2521	s.wg.Done()
2522}
2523
2524// setupRPCListeners returns a slice of listeners that are configured for use
2525// with the RPC server depending on the configuration settings for listen
2526// addresses and TLS.
2527func setupRPCListeners() ([]net.Listener, error) {
2528	// Setup TLS if not disabled.
2529	listenFunc := net.Listen
2530	if !cfg.DisableTLS {
2531		// Generate the TLS cert and key file if both don't already
2532		// exist.
2533		if !fileExists(cfg.RPCKey) && !fileExists(cfg.RPCCert) {
2534			err := genCertPair(cfg.RPCCert, cfg.RPCKey)
2535			if err != nil {
2536				return nil, err
2537			}
2538		}
2539		keypair, err := tls.LoadX509KeyPair(cfg.RPCCert, cfg.RPCKey)
2540		if err != nil {
2541			return nil, err
2542		}
2543
2544		tlsConfig := tls.Config{
2545			Certificates: []tls.Certificate{keypair},
2546			MinVersion:   tls.VersionTLS12,
2547		}
2548
2549		// Change the standard net.Listen function to the tls one.
2550		listenFunc = func(net string, laddr string) (net.Listener, error) {
2551			return tls.Listen(net, laddr, &tlsConfig)
2552		}
2553	}
2554
2555	netAddrs, err := parseListeners(cfg.RPCListeners)
2556	if err != nil {
2557		return nil, err
2558	}
2559
2560	listeners := make([]net.Listener, 0, len(netAddrs))
2561	for _, addr := range netAddrs {
2562		listener, err := listenFunc(addr.Network(), addr.String())
2563		if err != nil {
2564			rpcsLog.Warnf("Can't listen on %s: %v", addr, err)
2565			continue
2566		}
2567		listeners = append(listeners, listener)
2568	}
2569
2570	return listeners, nil
2571}
2572
2573// newServer returns a new btcd server configured to listen on addr for the
2574// bitcoin network type specified by chainParams.  Use start to begin accepting
2575// connections from peers.
2576func newServer(listenAddrs, agentBlacklist, agentWhitelist []string,
2577	db database.DB, chainParams *chaincfg.Params,
2578	interrupt <-chan struct{}) (*server, error) {
2579
2580	services := defaultServices
2581	if cfg.NoPeerBloomFilters {
2582		services &^= wire.SFNodeBloom
2583	}
2584	if cfg.NoCFilters {
2585		services &^= wire.SFNodeCF
2586	}
2587
2588	amgr := addrmgr.New(cfg.DataDir, btcdLookup)
2589
2590	var listeners []net.Listener
2591	var nat NAT
2592	if !cfg.DisableListen {
2593		var err error
2594		listeners, nat, err = initListeners(amgr, listenAddrs, services)
2595		if err != nil {
2596			return nil, err
2597		}
2598		if len(listeners) == 0 {
2599			return nil, errors.New("no valid listen address")
2600		}
2601	}
2602
2603	if len(agentBlacklist) > 0 {
2604		srvrLog.Infof("User-agent blacklist %s", agentBlacklist)
2605	}
2606	if len(agentWhitelist) > 0 {
2607		srvrLog.Infof("User-agent whitelist %s", agentWhitelist)
2608	}
2609
2610	s := server{
2611		chainParams:          chainParams,
2612		addrManager:          amgr,
2613		newPeers:             make(chan *serverPeer, cfg.MaxPeers),
2614		donePeers:            make(chan *serverPeer, cfg.MaxPeers),
2615		banPeers:             make(chan *serverPeer, cfg.MaxPeers),
2616		query:                make(chan interface{}),
2617		relayInv:             make(chan relayMsg, cfg.MaxPeers),
2618		broadcast:            make(chan broadcastMsg, cfg.MaxPeers),
2619		quit:                 make(chan struct{}),
2620		modifyRebroadcastInv: make(chan interface{}),
2621		peerHeightsUpdate:    make(chan updatePeerHeightsMsg),
2622		nat:                  nat,
2623		db:                   db,
2624		timeSource:           blockchain.NewMedianTime(),
2625		services:             services,
2626		sigCache:             txscript.NewSigCache(cfg.SigCacheMaxSize),
2627		hashCache:            txscript.NewHashCache(cfg.SigCacheMaxSize),
2628		cfCheckptCaches:      make(map[wire.FilterType][]cfHeaderKV),
2629		agentBlacklist:       agentBlacklist,
2630		agentWhitelist:       agentWhitelist,
2631	}
2632
2633	// Create the transaction and address indexes if needed.
2634	//
2635	// CAUTION: the txindex needs to be first in the indexes array because
2636	// the addrindex uses data from the txindex during catchup.  If the
2637	// addrindex is run first, it may not have the transactions from the
2638	// current block indexed.
2639	var indexes []indexers.Indexer
2640	if cfg.TxIndex || cfg.AddrIndex {
2641		// Enable transaction index if address index is enabled since it
2642		// requires it.
2643		if !cfg.TxIndex {
2644			indxLog.Infof("Transaction index enabled because it " +
2645				"is required by the address index")
2646			cfg.TxIndex = true
2647		} else {
2648			indxLog.Info("Transaction index is enabled")
2649		}
2650
2651		s.txIndex = indexers.NewTxIndex(db)
2652		indexes = append(indexes, s.txIndex)
2653	}
2654	if cfg.AddrIndex {
2655		indxLog.Info("Address index is enabled")
2656		s.addrIndex = indexers.NewAddrIndex(db, chainParams)
2657		indexes = append(indexes, s.addrIndex)
2658	}
2659	if !cfg.NoCFilters {
2660		indxLog.Info("Committed filter index is enabled")
2661		s.cfIndex = indexers.NewCfIndex(db, chainParams)
2662		indexes = append(indexes, s.cfIndex)
2663	}
2664
2665	// Create an index manager if any of the optional indexes are enabled.
2666	var indexManager blockchain.IndexManager
2667	if len(indexes) > 0 {
2668		indexManager = indexers.NewManager(db, indexes)
2669	}
2670
2671	// Merge given checkpoints with the default ones unless they are disabled.
2672	var checkpoints []chaincfg.Checkpoint
2673	if !cfg.DisableCheckpoints {
2674		checkpoints = mergeCheckpoints(s.chainParams.Checkpoints, cfg.addCheckpoints)
2675	}
2676
2677	// Create a new block chain instance with the appropriate configuration.
2678	var err error
2679	s.chain, err = blockchain.New(&blockchain.Config{
2680		DB:           s.db,
2681		Interrupt:    interrupt,
2682		ChainParams:  s.chainParams,
2683		Checkpoints:  checkpoints,
2684		TimeSource:   s.timeSource,
2685		SigCache:     s.sigCache,
2686		IndexManager: indexManager,
2687		HashCache:    s.hashCache,
2688	})
2689	if err != nil {
2690		return nil, err
2691	}
2692
2693	// Search for a FeeEstimator state in the database. If none can be found
2694	// or if it cannot be loaded, create a new one.
2695	db.Update(func(tx database.Tx) error {
2696		metadata := tx.Metadata()
2697		feeEstimationData := metadata.Get(mempool.EstimateFeeDatabaseKey)
2698		if feeEstimationData != nil {
2699			// delete it from the database so that we don't try to restore the
2700			// same thing again somehow.
2701			metadata.Delete(mempool.EstimateFeeDatabaseKey)
2702
2703			// If there is an error, log it and make a new fee estimator.
2704			var err error
2705			s.feeEstimator, err = mempool.RestoreFeeEstimator(feeEstimationData)
2706
2707			if err != nil {
2708				peerLog.Errorf("Failed to restore fee estimator %v", err)
2709			}
2710		}
2711
2712		return nil
2713	})
2714
2715	// If no feeEstimator has been found, or if the one that has been found
2716	// is behind somehow, create a new one and start over.
2717	if s.feeEstimator == nil || s.feeEstimator.LastKnownHeight() != s.chain.BestSnapshot().Height {
2718		s.feeEstimator = mempool.NewFeeEstimator(
2719			mempool.DefaultEstimateFeeMaxRollback,
2720			mempool.DefaultEstimateFeeMinRegisteredBlocks)
2721	}
2722
2723	txC := mempool.Config{
2724		Policy: mempool.Policy{
2725			DisableRelayPriority: cfg.NoRelayPriority,
2726			AcceptNonStd:         cfg.RelayNonStd,
2727			FreeTxRelayLimit:     cfg.FreeTxRelayLimit,
2728			MaxOrphanTxs:         cfg.MaxOrphanTxs,
2729			MaxOrphanTxSize:      defaultMaxOrphanTxSize,
2730			MaxSigOpCostPerTx:    blockchain.MaxBlockSigOpsCost / 4,
2731			MinRelayTxFee:        cfg.minRelayTxFee,
2732			MaxTxVersion:         2,
2733			RejectReplacement:    cfg.RejectReplacement,
2734		},
2735		ChainParams:    chainParams,
2736		FetchUtxoView:  s.chain.FetchUtxoView,
2737		BestHeight:     func() int32 { return s.chain.BestSnapshot().Height },
2738		MedianTimePast: func() time.Time { return s.chain.BestSnapshot().MedianTime },
2739		CalcSequenceLock: func(tx *btcutil.Tx, view *blockchain.UtxoViewpoint) (*blockchain.SequenceLock, error) {
2740			return s.chain.CalcSequenceLock(tx, view, true)
2741		},
2742		IsDeploymentActive: s.chain.IsDeploymentActive,
2743		SigCache:           s.sigCache,
2744		HashCache:          s.hashCache,
2745		AddrIndex:          s.addrIndex,
2746		FeeEstimator:       s.feeEstimator,
2747	}
2748	s.txMemPool = mempool.New(&txC)
2749
2750	s.syncManager, err = netsync.New(&netsync.Config{
2751		PeerNotifier:       &s,
2752		Chain:              s.chain,
2753		TxMemPool:          s.txMemPool,
2754		ChainParams:        s.chainParams,
2755		DisableCheckpoints: cfg.DisableCheckpoints,
2756		MaxPeers:           cfg.MaxPeers,
2757		FeeEstimator:       s.feeEstimator,
2758	})
2759	if err != nil {
2760		return nil, err
2761	}
2762
2763	// Create the mining policy and block template generator based on the
2764	// configuration options.
2765	//
2766	// NOTE: The CPU miner relies on the mempool, so the mempool has to be
2767	// created before calling the function to create the CPU miner.
2768	policy := mining.Policy{
2769		BlockMinWeight:    cfg.BlockMinWeight,
2770		BlockMaxWeight:    cfg.BlockMaxWeight,
2771		BlockMinSize:      cfg.BlockMinSize,
2772		BlockMaxSize:      cfg.BlockMaxSize,
2773		BlockPrioritySize: cfg.BlockPrioritySize,
2774		TxMinFreeFee:      cfg.minRelayTxFee,
2775	}
2776	blockTemplateGenerator := mining.NewBlkTmplGenerator(&policy,
2777		s.chainParams, s.txMemPool, s.chain, s.timeSource,
2778		s.sigCache, s.hashCache)
2779	s.cpuMiner = cpuminer.New(&cpuminer.Config{
2780		ChainParams:            chainParams,
2781		BlockTemplateGenerator: blockTemplateGenerator,
2782		MiningAddrs:            cfg.miningAddrs,
2783		ProcessBlock:           s.syncManager.ProcessBlock,
2784		ConnectedCount:         s.ConnectedCount,
2785		IsCurrent:              s.syncManager.IsCurrent,
2786	})
2787
2788	// Only setup a function to return new addresses to connect to when
2789	// not running in connect-only mode.  The simulation network is always
2790	// in connect-only mode since it is only intended to connect to
2791	// specified peers and actively avoid advertising and connecting to
2792	// discovered peers in order to prevent it from becoming a public test
2793	// network.
2794	var newAddressFunc func() (net.Addr, error)
2795	if !cfg.SimNet && len(cfg.ConnectPeers) == 0 {
2796		newAddressFunc = func() (net.Addr, error) {
2797			for tries := 0; tries < 100; tries++ {
2798				addr := s.addrManager.GetAddress()
2799				if addr == nil {
2800					break
2801				}
2802
2803				// Address will not be invalid, local or unroutable
2804				// because addrmanager rejects those on addition.
2805				// Just check that we don't already have an address
2806				// in the same group so that we are not connecting
2807				// to the same network segment at the expense of
2808				// others.
2809				key := addrmgr.GroupKey(addr.NetAddress())
2810				if s.OutboundGroupCount(key) != 0 {
2811					continue
2812				}
2813
2814				// only allow recent nodes (10mins) after we failed 30
2815				// times
2816				if tries < 30 && time.Since(addr.LastAttempt()) < 10*time.Minute {
2817					continue
2818				}
2819
2820				// allow nondefault ports after 50 failed tries.
2821				if tries < 50 && fmt.Sprintf("%d", addr.NetAddress().Port) !=
2822					activeNetParams.DefaultPort {
2823					continue
2824				}
2825
2826				// Mark an attempt for the valid address.
2827				s.addrManager.Attempt(addr.NetAddress())
2828
2829				addrString := addrmgr.NetAddressKey(addr.NetAddress())
2830				return addrStringToNetAddr(addrString)
2831			}
2832
2833			return nil, errors.New("no valid connect address")
2834		}
2835	}
2836
2837	// Create a connection manager.
2838	targetOutbound := defaultTargetOutbound
2839	if cfg.MaxPeers < targetOutbound {
2840		targetOutbound = cfg.MaxPeers
2841	}
2842	cmgr, err := connmgr.New(&connmgr.Config{
2843		Listeners:      listeners,
2844		OnAccept:       s.inboundPeerConnected,
2845		RetryDuration:  connectionRetryInterval,
2846		TargetOutbound: uint32(targetOutbound),
2847		Dial:           btcdDial,
2848		OnConnection:   s.outboundPeerConnected,
2849		GetNewAddress:  newAddressFunc,
2850	})
2851	if err != nil {
2852		return nil, err
2853	}
2854	s.connManager = cmgr
2855
2856	// Start up persistent peers.
2857	permanentPeers := cfg.ConnectPeers
2858	if len(permanentPeers) == 0 {
2859		permanentPeers = cfg.AddPeers
2860	}
2861	for _, addr := range permanentPeers {
2862		netAddr, err := addrStringToNetAddr(addr)
2863		if err != nil {
2864			return nil, err
2865		}
2866
2867		go s.connManager.Connect(&connmgr.ConnReq{
2868			Addr:      netAddr,
2869			Permanent: true,
2870		})
2871	}
2872
2873	if !cfg.DisableRPC {
2874		// Setup listeners for the configured RPC listen addresses and
2875		// TLS settings.
2876		rpcListeners, err := setupRPCListeners()
2877		if err != nil {
2878			return nil, err
2879		}
2880		if len(rpcListeners) == 0 {
2881			return nil, errors.New("RPCS: No valid listen address")
2882		}
2883
2884		s.rpcServer, err = newRPCServer(&rpcserverConfig{
2885			Listeners:    rpcListeners,
2886			StartupTime:  s.startupTime,
2887			ConnMgr:      &rpcConnManager{&s},
2888			SyncMgr:      &rpcSyncMgr{&s, s.syncManager},
2889			TimeSource:   s.timeSource,
2890			Chain:        s.chain,
2891			ChainParams:  chainParams,
2892			DB:           db,
2893			TxMemPool:    s.txMemPool,
2894			Generator:    blockTemplateGenerator,
2895			CPUMiner:     s.cpuMiner,
2896			TxIndex:      s.txIndex,
2897			AddrIndex:    s.addrIndex,
2898			CfIndex:      s.cfIndex,
2899			FeeEstimator: s.feeEstimator,
2900		})
2901		if err != nil {
2902			return nil, err
2903		}
2904
2905		// Signal process shutdown when the RPC server requests it.
2906		go func() {
2907			<-s.rpcServer.RequestedProcessShutdown()
2908			shutdownRequestChannel <- struct{}{}
2909		}()
2910	}
2911
2912	return &s, nil
2913}
2914
2915// initListeners initializes the configured net listeners and adds any bound
2916// addresses to the address manager. Returns the listeners and a NAT interface,
2917// which is non-nil if UPnP is in use.
2918func initListeners(amgr *addrmgr.AddrManager, listenAddrs []string, services wire.ServiceFlag) ([]net.Listener, NAT, error) {
2919	// Listen for TCP connections at the configured addresses
2920	netAddrs, err := parseListeners(listenAddrs)
2921	if err != nil {
2922		return nil, nil, err
2923	}
2924
2925	listeners := make([]net.Listener, 0, len(netAddrs))
2926	for _, addr := range netAddrs {
2927		listener, err := net.Listen(addr.Network(), addr.String())
2928		if err != nil {
2929			srvrLog.Warnf("Can't listen on %s: %v", addr, err)
2930			continue
2931		}
2932		listeners = append(listeners, listener)
2933	}
2934
2935	var nat NAT
2936	if len(cfg.ExternalIPs) != 0 {
2937		defaultPort, err := strconv.ParseUint(activeNetParams.DefaultPort, 10, 16)
2938		if err != nil {
2939			srvrLog.Errorf("Can not parse default port %s for active chain: %v",
2940				activeNetParams.DefaultPort, err)
2941			return nil, nil, err
2942		}
2943
2944		for _, sip := range cfg.ExternalIPs {
2945			eport := uint16(defaultPort)
2946			host, portstr, err := net.SplitHostPort(sip)
2947			if err != nil {
2948				// no port, use default.
2949				host = sip
2950			} else {
2951				port, err := strconv.ParseUint(portstr, 10, 16)
2952				if err != nil {
2953					srvrLog.Warnf("Can not parse port from %s for "+
2954						"externalip: %v", sip, err)
2955					continue
2956				}
2957				eport = uint16(port)
2958			}
2959			na, err := amgr.HostToNetAddress(host, eport, services)
2960			if err != nil {
2961				srvrLog.Warnf("Not adding %s as externalip: %v", sip, err)
2962				continue
2963			}
2964
2965			err = amgr.AddLocalAddress(na, addrmgr.ManualPrio)
2966			if err != nil {
2967				amgrLog.Warnf("Skipping specified external IP: %v", err)
2968			}
2969		}
2970	} else {
2971		if cfg.Upnp {
2972			var err error
2973			nat, err = Discover()
2974			if err != nil {
2975				srvrLog.Warnf("Can't discover upnp: %v", err)
2976			}
2977			// nil nat here is fine, just means no upnp on network.
2978		}
2979
2980		// Add bound addresses to address manager to be advertised to peers.
2981		for _, listener := range listeners {
2982			addr := listener.Addr().String()
2983			err := addLocalAddress(amgr, addr, services)
2984			if err != nil {
2985				amgrLog.Warnf("Skipping bound address %s: %v", addr, err)
2986			}
2987		}
2988	}
2989
2990	return listeners, nat, nil
2991}
2992
2993// addrStringToNetAddr takes an address in the form of 'host:port' and returns
2994// a net.Addr which maps to the original address with any host names resolved
2995// to IP addresses.  It also handles tor addresses properly by returning a
2996// net.Addr that encapsulates the address.
2997func addrStringToNetAddr(addr string) (net.Addr, error) {
2998	host, strPort, err := net.SplitHostPort(addr)
2999	if err != nil {
3000		return nil, err
3001	}
3002
3003	port, err := strconv.Atoi(strPort)
3004	if err != nil {
3005		return nil, err
3006	}
3007
3008	// Skip if host is already an IP address.
3009	if ip := net.ParseIP(host); ip != nil {
3010		return &net.TCPAddr{
3011			IP:   ip,
3012			Port: port,
3013		}, nil
3014	}
3015
3016	// Tor addresses cannot be resolved to an IP, so just return an onion
3017	// address instead.
3018	if strings.HasSuffix(host, ".onion") {
3019		if cfg.NoOnion {
3020			return nil, errors.New("tor has been disabled")
3021		}
3022
3023		return &onionAddr{addr: addr}, nil
3024	}
3025
3026	// Attempt to look up an IP address associated with the parsed host.
3027	ips, err := btcdLookup(host)
3028	if err != nil {
3029		return nil, err
3030	}
3031	if len(ips) == 0 {
3032		return nil, fmt.Errorf("no addresses found for %s", host)
3033	}
3034
3035	return &net.TCPAddr{
3036		IP:   ips[0],
3037		Port: port,
3038	}, nil
3039}
3040
3041// addLocalAddress adds an address that this node is listening on to the
3042// address manager so that it may be relayed to peers.
3043func addLocalAddress(addrMgr *addrmgr.AddrManager, addr string, services wire.ServiceFlag) error {
3044	host, portStr, err := net.SplitHostPort(addr)
3045	if err != nil {
3046		return err
3047	}
3048	port, err := strconv.ParseUint(portStr, 10, 16)
3049	if err != nil {
3050		return err
3051	}
3052
3053	if ip := net.ParseIP(host); ip != nil && ip.IsUnspecified() {
3054		// If bound to unspecified address, advertise all local interfaces
3055		addrs, err := net.InterfaceAddrs()
3056		if err != nil {
3057			return err
3058		}
3059
3060		for _, addr := range addrs {
3061			ifaceIP, _, err := net.ParseCIDR(addr.String())
3062			if err != nil {
3063				continue
3064			}
3065
3066			// If bound to 0.0.0.0, do not add IPv6 interfaces and if bound to
3067			// ::, do not add IPv4 interfaces.
3068			if (ip.To4() == nil) != (ifaceIP.To4() == nil) {
3069				continue
3070			}
3071
3072			netAddr := wire.NewNetAddressIPPort(ifaceIP, uint16(port), services)
3073			addrMgr.AddLocalAddress(netAddr, addrmgr.BoundPrio)
3074		}
3075	} else {
3076		netAddr, err := addrMgr.HostToNetAddress(host, uint16(port), services)
3077		if err != nil {
3078			return err
3079		}
3080
3081		addrMgr.AddLocalAddress(netAddr, addrmgr.BoundPrio)
3082	}
3083
3084	return nil
3085}
3086
3087// dynamicTickDuration is a convenience function used to dynamically choose a
3088// tick duration based on remaining time.  It is primarily used during
3089// server shutdown to make shutdown warnings more frequent as the shutdown time
3090// approaches.
3091func dynamicTickDuration(remaining time.Duration) time.Duration {
3092	switch {
3093	case remaining <= time.Second*5:
3094		return time.Second
3095	case remaining <= time.Second*15:
3096		return time.Second * 5
3097	case remaining <= time.Minute:
3098		return time.Second * 15
3099	case remaining <= time.Minute*5:
3100		return time.Minute
3101	case remaining <= time.Minute*15:
3102		return time.Minute * 5
3103	case remaining <= time.Hour:
3104		return time.Minute * 15
3105	}
3106	return time.Hour
3107}
3108
3109// isWhitelisted returns whether the IP address is included in the whitelisted
3110// networks and IPs.
3111func isWhitelisted(addr net.Addr) bool {
3112	if len(cfg.whitelists) == 0 {
3113		return false
3114	}
3115
3116	host, _, err := net.SplitHostPort(addr.String())
3117	if err != nil {
3118		srvrLog.Warnf("Unable to SplitHostPort on '%s': %v", addr, err)
3119		return false
3120	}
3121	ip := net.ParseIP(host)
3122	if ip == nil {
3123		srvrLog.Warnf("Unable to parse IP '%s'", addr)
3124		return false
3125	}
3126
3127	for _, ipnet := range cfg.whitelists {
3128		if ipnet.Contains(ip) {
3129			return true
3130		}
3131	}
3132	return false
3133}
3134
3135// checkpointSorter implements sort.Interface to allow a slice of checkpoints to
3136// be sorted.
3137type checkpointSorter []chaincfg.Checkpoint
3138
3139// Len returns the number of checkpoints in the slice.  It is part of the
3140// sort.Interface implementation.
3141func (s checkpointSorter) Len() int {
3142	return len(s)
3143}
3144
3145// Swap swaps the checkpoints at the passed indices.  It is part of the
3146// sort.Interface implementation.
3147func (s checkpointSorter) Swap(i, j int) {
3148	s[i], s[j] = s[j], s[i]
3149}
3150
3151// Less returns whether the checkpoint with index i should sort before the
3152// checkpoint with index j.  It is part of the sort.Interface implementation.
3153func (s checkpointSorter) Less(i, j int) bool {
3154	return s[i].Height < s[j].Height
3155}
3156
3157// mergeCheckpoints returns two slices of checkpoints merged into one slice
3158// such that the checkpoints are sorted by height.  In the case the additional
3159// checkpoints contain a checkpoint with the same height as a checkpoint in the
3160// default checkpoints, the additional checkpoint will take precedence and
3161// overwrite the default one.
3162func mergeCheckpoints(defaultCheckpoints, additional []chaincfg.Checkpoint) []chaincfg.Checkpoint {
3163	// Create a map of the additional checkpoints to remove duplicates while
3164	// leaving the most recently-specified checkpoint.
3165	extra := make(map[int32]chaincfg.Checkpoint)
3166	for _, checkpoint := range additional {
3167		extra[checkpoint.Height] = checkpoint
3168	}
3169
3170	// Add all default checkpoints that do not have an override in the
3171	// additional checkpoints.
3172	numDefault := len(defaultCheckpoints)
3173	checkpoints := make([]chaincfg.Checkpoint, 0, numDefault+len(extra))
3174	for _, checkpoint := range defaultCheckpoints {
3175		if _, exists := extra[checkpoint.Height]; !exists {
3176			checkpoints = append(checkpoints, checkpoint)
3177		}
3178	}
3179
3180	// Append the additional checkpoints and return the sorted results.
3181	for _, checkpoint := range extra {
3182		checkpoints = append(checkpoints, checkpoint)
3183	}
3184	sort.Sort(checkpointSorter(checkpoints))
3185	return checkpoints
3186}
3187
3188// HasUndesiredUserAgent determines whether the server should continue to pursue
3189// a connection with this peer based on its advertised user agent. It performs
3190// the following steps:
3191// 1) Reject the peer if it contains a blacklisted agent.
3192// 2) If no whitelist is provided, accept all user agents.
3193// 3) Accept the peer if it contains a whitelisted agent.
3194// 4) Reject all other peers.
3195func (sp *serverPeer) HasUndesiredUserAgent(blacklistedAgents,
3196	whitelistedAgents []string) bool {
3197
3198	agent := sp.UserAgent()
3199
3200	// First, if peer's user agent contains any blacklisted substring, we
3201	// will ignore the connection request.
3202	for _, blacklistedAgent := range blacklistedAgents {
3203		if strings.Contains(agent, blacklistedAgent) {
3204			srvrLog.Debugf("Ignoring peer %s, user agent "+
3205				"contains blacklisted user agent: %s", sp,
3206				agent)
3207			return true
3208		}
3209	}
3210
3211	// If no whitelist is provided, we will accept all user agents.
3212	if len(whitelistedAgents) == 0 {
3213		return false
3214	}
3215
3216	// Peer's user agent passed blacklist. Now check to see if it contains
3217	// one of our whitelisted user agents, if so accept.
3218	for _, whitelistedAgent := range whitelistedAgents {
3219		if strings.Contains(agent, whitelistedAgent) {
3220			return false
3221		}
3222	}
3223
3224	// Otherwise, the peer's user agent was not included in our whitelist.
3225	// Ignore just in case it could stall the initial block download.
3226	srvrLog.Debugf("Ignoring peer %s, user agent: %s not found in "+
3227		"whitelist", sp, agent)
3228
3229	return true
3230}
3231