1// Copyright (c) 2013-2018 The btcsuite developers
2// Copyright (c) 2016-2018 The Decred developers
3// Use of this source code is governed by an ISC
4// license that can be found in the LICENSE file.
5
6package peer
7
8import (
9	"bytes"
10	"container/list"
11	"errors"
12	"fmt"
13	"io"
14	"math/rand"
15	"net"
16	"strconv"
17	"sync"
18	"sync/atomic"
19	"time"
20
21	"github.com/btcsuite/btcd/blockchain"
22	"github.com/btcsuite/btcd/chaincfg"
23	"github.com/btcsuite/btcd/chaincfg/chainhash"
24	"github.com/btcsuite/btcd/wire"
25	"github.com/btcsuite/go-socks/socks"
26	"github.com/davecgh/go-spew/spew"
27)
28
29const (
30	// MaxProtocolVersion is the max protocol version the peer supports.
31	MaxProtocolVersion = wire.FeeFilterVersion
32
33	// DefaultTrickleInterval is the min time between attempts to send an
34	// inv message to a peer.
35	DefaultTrickleInterval = 10 * time.Second
36
37	// MinAcceptableProtocolVersion is the lowest protocol version that a
38	// connected peer may support.
39	MinAcceptableProtocolVersion = wire.MultipleAddressVersion
40
41	// outputBufferSize is the number of elements the output channels use.
42	outputBufferSize = 50
43
44	// invTrickleSize is the maximum amount of inventory to send in a single
45	// message when trickling inventory to remote peers.
46	maxInvTrickleSize = 1000
47
48	// maxKnownInventory is the maximum number of items to keep in the known
49	// inventory cache.
50	maxKnownInventory = 1000
51
52	// pingInterval is the interval of time to wait in between sending ping
53	// messages.
54	pingInterval = 2 * time.Minute
55
56	// negotiateTimeout is the duration of inactivity before we timeout a
57	// peer that hasn't completed the initial version negotiation.
58	negotiateTimeout = 30 * time.Second
59
60	// idleTimeout is the duration of inactivity before we time out a peer.
61	idleTimeout = 5 * time.Minute
62
63	// stallTickInterval is the interval of time between each check for
64	// stalled peers.
65	stallTickInterval = 15 * time.Second
66
67	// stallResponseTimeout is the base maximum amount of time messages that
68	// expect a response will wait before disconnecting the peer for
69	// stalling.  The deadlines are adjusted for callback running times and
70	// only checked on each stall tick interval.
71	stallResponseTimeout = 30 * time.Second
72)
73
74var (
75	// nodeCount is the total number of peer connections made since startup
76	// and is used to assign an id to a peer.
77	nodeCount int32
78
79	// zeroHash is the zero value hash (all zeros).  It is defined as a
80	// convenience.
81	zeroHash chainhash.Hash
82
83	// sentNonces houses the unique nonces that are generated when pushing
84	// version messages that are used to detect self connections.
85	sentNonces = newMruNonceMap(50)
86
87	// allowSelfConns is only used to allow the tests to bypass the self
88	// connection detecting and disconnect logic since they intentionally
89	// do so for testing purposes.
90	allowSelfConns bool
91)
92
93// MessageListeners defines callback function pointers to invoke with message
94// listeners for a peer. Any listener which is not set to a concrete callback
95// during peer initialization is ignored. Execution of multiple message
96// listeners occurs serially, so one callback blocks the execution of the next.
97//
98// NOTE: Unless otherwise documented, these listeners must NOT directly call any
99// blocking calls (such as WaitForShutdown) on the peer instance since the input
100// handler goroutine blocks until the callback has completed.  Doing so will
101// result in a deadlock.
102type MessageListeners struct {
103	// OnGetAddr is invoked when a peer receives a getaddr bitcoin message.
104	OnGetAddr func(p *Peer, msg *wire.MsgGetAddr)
105
106	// OnAddr is invoked when a peer receives an addr bitcoin message.
107	OnAddr func(p *Peer, msg *wire.MsgAddr)
108
109	// OnPing is invoked when a peer receives a ping bitcoin message.
110	OnPing func(p *Peer, msg *wire.MsgPing)
111
112	// OnPong is invoked when a peer receives a pong bitcoin message.
113	OnPong func(p *Peer, msg *wire.MsgPong)
114
115	// OnAlert is invoked when a peer receives an alert bitcoin message.
116	OnAlert func(p *Peer, msg *wire.MsgAlert)
117
118	// OnMemPool is invoked when a peer receives a mempool bitcoin message.
119	OnMemPool func(p *Peer, msg *wire.MsgMemPool)
120
121	// OnTx is invoked when a peer receives a tx bitcoin message.
122	OnTx func(p *Peer, msg *wire.MsgTx)
123
124	// OnBlock is invoked when a peer receives a block bitcoin message.
125	OnBlock func(p *Peer, msg *wire.MsgBlock, buf []byte)
126
127	// OnCFilter is invoked when a peer receives a cfilter bitcoin message.
128	OnCFilter func(p *Peer, msg *wire.MsgCFilter)
129
130	// OnCFHeaders is invoked when a peer receives a cfheaders bitcoin
131	// message.
132	OnCFHeaders func(p *Peer, msg *wire.MsgCFHeaders)
133
134	// OnCFCheckpt is invoked when a peer receives a cfcheckpt bitcoin
135	// message.
136	OnCFCheckpt func(p *Peer, msg *wire.MsgCFCheckpt)
137
138	// OnInv is invoked when a peer receives an inv bitcoin message.
139	OnInv func(p *Peer, msg *wire.MsgInv)
140
141	// OnHeaders is invoked when a peer receives a headers bitcoin message.
142	OnHeaders func(p *Peer, msg *wire.MsgHeaders)
143
144	// OnNotFound is invoked when a peer receives a notfound bitcoin
145	// message.
146	OnNotFound func(p *Peer, msg *wire.MsgNotFound)
147
148	// OnGetData is invoked when a peer receives a getdata bitcoin message.
149	OnGetData func(p *Peer, msg *wire.MsgGetData)
150
151	// OnGetBlocks is invoked when a peer receives a getblocks bitcoin
152	// message.
153	OnGetBlocks func(p *Peer, msg *wire.MsgGetBlocks)
154
155	// OnGetHeaders is invoked when a peer receives a getheaders bitcoin
156	// message.
157	OnGetHeaders func(p *Peer, msg *wire.MsgGetHeaders)
158
159	// OnGetCFilters is invoked when a peer receives a getcfilters bitcoin
160	// message.
161	OnGetCFilters func(p *Peer, msg *wire.MsgGetCFilters)
162
163	// OnGetCFHeaders is invoked when a peer receives a getcfheaders
164	// bitcoin message.
165	OnGetCFHeaders func(p *Peer, msg *wire.MsgGetCFHeaders)
166
167	// OnGetCFCheckpt is invoked when a peer receives a getcfcheckpt
168	// bitcoin message.
169	OnGetCFCheckpt func(p *Peer, msg *wire.MsgGetCFCheckpt)
170
171	// OnFeeFilter is invoked when a peer receives a feefilter bitcoin message.
172	OnFeeFilter func(p *Peer, msg *wire.MsgFeeFilter)
173
174	// OnFilterAdd is invoked when a peer receives a filteradd bitcoin message.
175	OnFilterAdd func(p *Peer, msg *wire.MsgFilterAdd)
176
177	// OnFilterClear is invoked when a peer receives a filterclear bitcoin
178	// message.
179	OnFilterClear func(p *Peer, msg *wire.MsgFilterClear)
180
181	// OnFilterLoad is invoked when a peer receives a filterload bitcoin
182	// message.
183	OnFilterLoad func(p *Peer, msg *wire.MsgFilterLoad)
184
185	// OnMerkleBlock  is invoked when a peer receives a merkleblock bitcoin
186	// message.
187	OnMerkleBlock func(p *Peer, msg *wire.MsgMerkleBlock)
188
189	// OnVersion is invoked when a peer receives a version bitcoin message.
190	// The caller may return a reject message in which case the message will
191	// be sent to the peer and the peer will be disconnected.
192	OnVersion func(p *Peer, msg *wire.MsgVersion) *wire.MsgReject
193
194	// OnVerAck is invoked when a peer receives a verack bitcoin message.
195	OnVerAck func(p *Peer, msg *wire.MsgVerAck)
196
197	// OnReject is invoked when a peer receives a reject bitcoin message.
198	OnReject func(p *Peer, msg *wire.MsgReject)
199
200	// OnSendHeaders is invoked when a peer receives a sendheaders bitcoin
201	// message.
202	OnSendHeaders func(p *Peer, msg *wire.MsgSendHeaders)
203
204	// OnRead is invoked when a peer receives a bitcoin message.  It
205	// consists of the number of bytes read, the message, and whether or not
206	// an error in the read occurred.  Typically, callers will opt to use
207	// the callbacks for the specific message types, however this can be
208	// useful for circumstances such as keeping track of server-wide byte
209	// counts or working with custom message types for which the peer does
210	// not directly provide a callback.
211	OnRead func(p *Peer, bytesRead int, msg wire.Message, err error)
212
213	// OnWrite is invoked when we write a bitcoin message to a peer.  It
214	// consists of the number of bytes written, the message, and whether or
215	// not an error in the write occurred.  This can be useful for
216	// circumstances such as keeping track of server-wide byte counts.
217	OnWrite func(p *Peer, bytesWritten int, msg wire.Message, err error)
218}
219
220// Config is the struct to hold configuration options useful to Peer.
221type Config struct {
222	// NewestBlock specifies a callback which provides the newest block
223	// details to the peer as needed.  This can be nil in which case the
224	// peer will report a block height of 0, however it is good practice for
225	// peers to specify this so their currently best known is accurately
226	// reported.
227	NewestBlock HashFunc
228
229	// HostToNetAddress returns the netaddress for the given host. This can be
230	// nil in  which case the host will be parsed as an IP address.
231	HostToNetAddress HostToNetAddrFunc
232
233	// Proxy indicates a proxy is being used for connections.  The only
234	// effect this has is to prevent leaking the tor proxy address, so it
235	// only needs to specified if using a tor proxy.
236	Proxy string
237
238	// UserAgentName specifies the user agent name to advertise.  It is
239	// highly recommended to specify this value.
240	UserAgentName string
241
242	// UserAgentVersion specifies the user agent version to advertise.  It
243	// is highly recommended to specify this value and that it follows the
244	// form "major.minor.revision" e.g. "2.6.41".
245	UserAgentVersion string
246
247	// UserAgentComments specify the user agent comments to advertise.  These
248	// values must not contain the illegal characters specified in BIP 14:
249	// '/', ':', '(', ')'.
250	UserAgentComments []string
251
252	// ChainParams identifies which chain parameters the peer is associated
253	// with.  It is highly recommended to specify this field, however it can
254	// be omitted in which case the test network will be used.
255	ChainParams *chaincfg.Params
256
257	// Services specifies which services to advertise as supported by the
258	// local peer.  This field can be omitted in which case it will be 0
259	// and therefore advertise no supported services.
260	Services wire.ServiceFlag
261
262	// ProtocolVersion specifies the maximum protocol version to use and
263	// advertise.  This field can be omitted in which case
264	// peer.MaxProtocolVersion will be used.
265	ProtocolVersion uint32
266
267	// DisableRelayTx specifies if the remote peer should be informed to
268	// not send inv messages for transactions.
269	DisableRelayTx bool
270
271	// Listeners houses callback functions to be invoked on receiving peer
272	// messages.
273	Listeners MessageListeners
274
275	// TrickleInterval is the duration of the ticker which trickles down the
276	// inventory to a peer.
277	TrickleInterval time.Duration
278}
279
280// minUint32 is a helper function to return the minimum of two uint32s.
281// This avoids a math import and the need to cast to floats.
282func minUint32(a, b uint32) uint32 {
283	if a < b {
284		return a
285	}
286	return b
287}
288
289// newNetAddress attempts to extract the IP address and port from the passed
290// net.Addr interface and create a bitcoin NetAddress structure using that
291// information.
292func newNetAddress(addr net.Addr, services wire.ServiceFlag) (*wire.NetAddress, error) {
293	// addr will be a net.TCPAddr when not using a proxy.
294	if tcpAddr, ok := addr.(*net.TCPAddr); ok {
295		ip := tcpAddr.IP
296		port := uint16(tcpAddr.Port)
297		na := wire.NewNetAddressIPPort(ip, port, services)
298		return na, nil
299	}
300
301	// addr will be a socks.ProxiedAddr when using a proxy.
302	if proxiedAddr, ok := addr.(*socks.ProxiedAddr); ok {
303		ip := net.ParseIP(proxiedAddr.Host)
304		if ip == nil {
305			ip = net.ParseIP("0.0.0.0")
306		}
307		port := uint16(proxiedAddr.Port)
308		na := wire.NewNetAddressIPPort(ip, port, services)
309		return na, nil
310	}
311
312	// For the most part, addr should be one of the two above cases, but
313	// to be safe, fall back to trying to parse the information from the
314	// address string as a last resort.
315	host, portStr, err := net.SplitHostPort(addr.String())
316	if err != nil {
317		return nil, err
318	}
319	ip := net.ParseIP(host)
320	port, err := strconv.ParseUint(portStr, 10, 16)
321	if err != nil {
322		return nil, err
323	}
324	na := wire.NewNetAddressIPPort(ip, uint16(port), services)
325	return na, nil
326}
327
328// outMsg is used to house a message to be sent along with a channel to signal
329// when the message has been sent (or won't be sent due to things such as
330// shutdown)
331type outMsg struct {
332	msg      wire.Message
333	doneChan chan<- struct{}
334	encoding wire.MessageEncoding
335}
336
337// stallControlCmd represents the command of a stall control message.
338type stallControlCmd uint8
339
340// Constants for the command of a stall control message.
341const (
342	// sccSendMessage indicates a message is being sent to the remote peer.
343	sccSendMessage stallControlCmd = iota
344
345	// sccReceiveMessage indicates a message has been received from the
346	// remote peer.
347	sccReceiveMessage
348
349	// sccHandlerStart indicates a callback handler is about to be invoked.
350	sccHandlerStart
351
352	// sccHandlerStart indicates a callback handler has completed.
353	sccHandlerDone
354)
355
356// stallControlMsg is used to signal the stall handler about specific events
357// so it can properly detect and handle stalled remote peers.
358type stallControlMsg struct {
359	command stallControlCmd
360	message wire.Message
361}
362
363// StatsSnap is a snapshot of peer stats at a point in time.
364type StatsSnap struct {
365	ID             int32
366	Addr           string
367	Services       wire.ServiceFlag
368	LastSend       time.Time
369	LastRecv       time.Time
370	BytesSent      uint64
371	BytesRecv      uint64
372	ConnTime       time.Time
373	TimeOffset     int64
374	Version        uint32
375	UserAgent      string
376	Inbound        bool
377	StartingHeight int32
378	LastBlock      int32
379	LastPingNonce  uint64
380	LastPingTime   time.Time
381	LastPingMicros int64
382}
383
384// HashFunc is a function which returns a block hash, height and error
385// It is used as a callback to get newest block details.
386type HashFunc func() (hash *chainhash.Hash, height int32, err error)
387
388// AddrFunc is a func which takes an address and returns a related address.
389type AddrFunc func(remoteAddr *wire.NetAddress) *wire.NetAddress
390
391// HostToNetAddrFunc is a func which takes a host, port, services and returns
392// the netaddress.
393type HostToNetAddrFunc func(host string, port uint16,
394	services wire.ServiceFlag) (*wire.NetAddress, error)
395
396// NOTE: The overall data flow of a peer is split into 3 goroutines.  Inbound
397// messages are read via the inHandler goroutine and generally dispatched to
398// their own handler.  For inbound data-related messages such as blocks,
399// transactions, and inventory, the data is handled by the corresponding
400// message handlers.  The data flow for outbound messages is split into 2
401// goroutines, queueHandler and outHandler.  The first, queueHandler, is used
402// as a way for external entities to queue messages, by way of the QueueMessage
403// function, quickly regardless of whether the peer is currently sending or not.
404// It acts as the traffic cop between the external world and the actual
405// goroutine which writes to the network socket.
406
407// Peer provides a basic concurrent safe bitcoin peer for handling bitcoin
408// communications via the peer-to-peer protocol.  It provides full duplex
409// reading and writing, automatic handling of the initial handshake process,
410// querying of usage statistics and other information about the remote peer such
411// as its address, user agent, and protocol version, output message queuing,
412// inventory trickling, and the ability to dynamically register and unregister
413// callbacks for handling bitcoin protocol messages.
414//
415// Outbound messages are typically queued via QueueMessage or QueueInventory.
416// QueueMessage is intended for all messages, including responses to data such
417// as blocks and transactions.  QueueInventory, on the other hand, is only
418// intended for relaying inventory as it employs a trickling mechanism to batch
419// the inventory together.  However, some helper functions for pushing messages
420// of specific types that typically require common special handling are
421// provided as a convenience.
422type Peer struct {
423	// The following variables must only be used atomically.
424	bytesReceived uint64
425	bytesSent     uint64
426	lastRecv      int64
427	lastSend      int64
428	connected     int32
429	disconnect    int32
430
431	conn net.Conn
432
433	// These fields are set at creation time and never modified, so they are
434	// safe to read from concurrently without a mutex.
435	addr    string
436	cfg     Config
437	inbound bool
438
439	flagsMtx             sync.Mutex // protects the peer flags below
440	na                   *wire.NetAddress
441	id                   int32
442	userAgent            string
443	services             wire.ServiceFlag
444	versionKnown         bool
445	advertisedProtoVer   uint32 // protocol version advertised by remote
446	protocolVersion      uint32 // negotiated protocol version
447	sendHeadersPreferred bool   // peer sent a sendheaders message
448	verAckReceived       bool
449	witnessEnabled       bool
450
451	wireEncoding wire.MessageEncoding
452
453	knownInventory     *mruInventoryMap
454	prevGetBlocksMtx   sync.Mutex
455	prevGetBlocksBegin *chainhash.Hash
456	prevGetBlocksStop  *chainhash.Hash
457	prevGetHdrsMtx     sync.Mutex
458	prevGetHdrsBegin   *chainhash.Hash
459	prevGetHdrsStop    *chainhash.Hash
460
461	// These fields keep track of statistics for the peer and are protected
462	// by the statsMtx mutex.
463	statsMtx           sync.RWMutex
464	timeOffset         int64
465	timeConnected      time.Time
466	startingHeight     int32
467	lastBlock          int32
468	lastAnnouncedBlock *chainhash.Hash
469	lastPingNonce      uint64    // Set to nonce if we have a pending ping.
470	lastPingTime       time.Time // Time we sent last ping.
471	lastPingMicros     int64     // Time for last ping to return.
472
473	stallControl  chan stallControlMsg
474	outputQueue   chan outMsg
475	sendQueue     chan outMsg
476	sendDoneQueue chan struct{}
477	outputInvChan chan *wire.InvVect
478	inQuit        chan struct{}
479	queueQuit     chan struct{}
480	outQuit       chan struct{}
481	quit          chan struct{}
482}
483
484// String returns the peer's address and directionality as a human-readable
485// string.
486//
487// This function is safe for concurrent access.
488func (p *Peer) String() string {
489	return fmt.Sprintf("%s (%s)", p.addr, directionString(p.inbound))
490}
491
492// UpdateLastBlockHeight updates the last known block for the peer.
493//
494// This function is safe for concurrent access.
495func (p *Peer) UpdateLastBlockHeight(newHeight int32) {
496	p.statsMtx.Lock()
497	log.Tracef("Updating last block height of peer %v from %v to %v",
498		p.addr, p.lastBlock, newHeight)
499	p.lastBlock = newHeight
500	p.statsMtx.Unlock()
501}
502
503// UpdateLastAnnouncedBlock updates meta-data about the last block hash this
504// peer is known to have announced.
505//
506// This function is safe for concurrent access.
507func (p *Peer) UpdateLastAnnouncedBlock(blkHash *chainhash.Hash) {
508	log.Tracef("Updating last blk for peer %v, %v", p.addr, blkHash)
509
510	p.statsMtx.Lock()
511	p.lastAnnouncedBlock = blkHash
512	p.statsMtx.Unlock()
513}
514
515// AddKnownInventory adds the passed inventory to the cache of known inventory
516// for the peer.
517//
518// This function is safe for concurrent access.
519func (p *Peer) AddKnownInventory(invVect *wire.InvVect) {
520	p.knownInventory.Add(invVect)
521}
522
523// StatsSnapshot returns a snapshot of the current peer flags and statistics.
524//
525// This function is safe for concurrent access.
526func (p *Peer) StatsSnapshot() *StatsSnap {
527	p.statsMtx.RLock()
528
529	p.flagsMtx.Lock()
530	id := p.id
531	addr := p.addr
532	userAgent := p.userAgent
533	services := p.services
534	protocolVersion := p.advertisedProtoVer
535	p.flagsMtx.Unlock()
536
537	// Get a copy of all relevant flags and stats.
538	statsSnap := &StatsSnap{
539		ID:             id,
540		Addr:           addr,
541		UserAgent:      userAgent,
542		Services:       services,
543		LastSend:       p.LastSend(),
544		LastRecv:       p.LastRecv(),
545		BytesSent:      p.BytesSent(),
546		BytesRecv:      p.BytesReceived(),
547		ConnTime:       p.timeConnected,
548		TimeOffset:     p.timeOffset,
549		Version:        protocolVersion,
550		Inbound:        p.inbound,
551		StartingHeight: p.startingHeight,
552		LastBlock:      p.lastBlock,
553		LastPingNonce:  p.lastPingNonce,
554		LastPingMicros: p.lastPingMicros,
555		LastPingTime:   p.lastPingTime,
556	}
557
558	p.statsMtx.RUnlock()
559	return statsSnap
560}
561
562// ID returns the peer id.
563//
564// This function is safe for concurrent access.
565func (p *Peer) ID() int32 {
566	p.flagsMtx.Lock()
567	id := p.id
568	p.flagsMtx.Unlock()
569
570	return id
571}
572
573// NA returns the peer network address.
574//
575// This function is safe for concurrent access.
576func (p *Peer) NA() *wire.NetAddress {
577	p.flagsMtx.Lock()
578	na := p.na
579	p.flagsMtx.Unlock()
580
581	return na
582}
583
584// Addr returns the peer address.
585//
586// This function is safe for concurrent access.
587func (p *Peer) Addr() string {
588	// The address doesn't change after initialization, therefore it is not
589	// protected by a mutex.
590	return p.addr
591}
592
593// Inbound returns whether the peer is inbound.
594//
595// This function is safe for concurrent access.
596func (p *Peer) Inbound() bool {
597	return p.inbound
598}
599
600// Services returns the services flag of the remote peer.
601//
602// This function is safe for concurrent access.
603func (p *Peer) Services() wire.ServiceFlag {
604	p.flagsMtx.Lock()
605	services := p.services
606	p.flagsMtx.Unlock()
607
608	return services
609}
610
611// UserAgent returns the user agent of the remote peer.
612//
613// This function is safe for concurrent access.
614func (p *Peer) UserAgent() string {
615	p.flagsMtx.Lock()
616	userAgent := p.userAgent
617	p.flagsMtx.Unlock()
618
619	return userAgent
620}
621
622// LastAnnouncedBlock returns the last announced block of the remote peer.
623//
624// This function is safe for concurrent access.
625func (p *Peer) LastAnnouncedBlock() *chainhash.Hash {
626	p.statsMtx.RLock()
627	lastAnnouncedBlock := p.lastAnnouncedBlock
628	p.statsMtx.RUnlock()
629
630	return lastAnnouncedBlock
631}
632
633// LastPingNonce returns the last ping nonce of the remote peer.
634//
635// This function is safe for concurrent access.
636func (p *Peer) LastPingNonce() uint64 {
637	p.statsMtx.RLock()
638	lastPingNonce := p.lastPingNonce
639	p.statsMtx.RUnlock()
640
641	return lastPingNonce
642}
643
644// LastPingTime returns the last ping time of the remote peer.
645//
646// This function is safe for concurrent access.
647func (p *Peer) LastPingTime() time.Time {
648	p.statsMtx.RLock()
649	lastPingTime := p.lastPingTime
650	p.statsMtx.RUnlock()
651
652	return lastPingTime
653}
654
655// LastPingMicros returns the last ping micros of the remote peer.
656//
657// This function is safe for concurrent access.
658func (p *Peer) LastPingMicros() int64 {
659	p.statsMtx.RLock()
660	lastPingMicros := p.lastPingMicros
661	p.statsMtx.RUnlock()
662
663	return lastPingMicros
664}
665
666// VersionKnown returns the whether or not the version of a peer is known
667// locally.
668//
669// This function is safe for concurrent access.
670func (p *Peer) VersionKnown() bool {
671	p.flagsMtx.Lock()
672	versionKnown := p.versionKnown
673	p.flagsMtx.Unlock()
674
675	return versionKnown
676}
677
678// VerAckReceived returns whether or not a verack message was received by the
679// peer.
680//
681// This function is safe for concurrent access.
682func (p *Peer) VerAckReceived() bool {
683	p.flagsMtx.Lock()
684	verAckReceived := p.verAckReceived
685	p.flagsMtx.Unlock()
686
687	return verAckReceived
688}
689
690// ProtocolVersion returns the negotiated peer protocol version.
691//
692// This function is safe for concurrent access.
693func (p *Peer) ProtocolVersion() uint32 {
694	p.flagsMtx.Lock()
695	protocolVersion := p.protocolVersion
696	p.flagsMtx.Unlock()
697
698	return protocolVersion
699}
700
701// LastBlock returns the last block of the peer.
702//
703// This function is safe for concurrent access.
704func (p *Peer) LastBlock() int32 {
705	p.statsMtx.RLock()
706	lastBlock := p.lastBlock
707	p.statsMtx.RUnlock()
708
709	return lastBlock
710}
711
712// LastSend returns the last send time of the peer.
713//
714// This function is safe for concurrent access.
715func (p *Peer) LastSend() time.Time {
716	return time.Unix(atomic.LoadInt64(&p.lastSend), 0)
717}
718
719// LastRecv returns the last recv time of the peer.
720//
721// This function is safe for concurrent access.
722func (p *Peer) LastRecv() time.Time {
723	return time.Unix(atomic.LoadInt64(&p.lastRecv), 0)
724}
725
726// LocalAddr returns the local address of the connection.
727//
728// This function is safe fo concurrent access.
729func (p *Peer) LocalAddr() net.Addr {
730	var localAddr net.Addr
731	if atomic.LoadInt32(&p.connected) != 0 {
732		localAddr = p.conn.LocalAddr()
733	}
734	return localAddr
735}
736
737// BytesSent returns the total number of bytes sent by the peer.
738//
739// This function is safe for concurrent access.
740func (p *Peer) BytesSent() uint64 {
741	return atomic.LoadUint64(&p.bytesSent)
742}
743
744// BytesReceived returns the total number of bytes received by the peer.
745//
746// This function is safe for concurrent access.
747func (p *Peer) BytesReceived() uint64 {
748	return atomic.LoadUint64(&p.bytesReceived)
749}
750
751// TimeConnected returns the time at which the peer connected.
752//
753// This function is safe for concurrent access.
754func (p *Peer) TimeConnected() time.Time {
755	p.statsMtx.RLock()
756	timeConnected := p.timeConnected
757	p.statsMtx.RUnlock()
758
759	return timeConnected
760}
761
762// TimeOffset returns the number of seconds the local time was offset from the
763// time the peer reported during the initial negotiation phase.  Negative values
764// indicate the remote peer's time is before the local time.
765//
766// This function is safe for concurrent access.
767func (p *Peer) TimeOffset() int64 {
768	p.statsMtx.RLock()
769	timeOffset := p.timeOffset
770	p.statsMtx.RUnlock()
771
772	return timeOffset
773}
774
775// StartingHeight returns the last known height the peer reported during the
776// initial negotiation phase.
777//
778// This function is safe for concurrent access.
779func (p *Peer) StartingHeight() int32 {
780	p.statsMtx.RLock()
781	startingHeight := p.startingHeight
782	p.statsMtx.RUnlock()
783
784	return startingHeight
785}
786
787// WantsHeaders returns if the peer wants header messages instead of
788// inventory vectors for blocks.
789//
790// This function is safe for concurrent access.
791func (p *Peer) WantsHeaders() bool {
792	p.flagsMtx.Lock()
793	sendHeadersPreferred := p.sendHeadersPreferred
794	p.flagsMtx.Unlock()
795
796	return sendHeadersPreferred
797}
798
799// IsWitnessEnabled returns true if the peer has signalled that it supports
800// segregated witness.
801//
802// This function is safe for concurrent access.
803func (p *Peer) IsWitnessEnabled() bool {
804	p.flagsMtx.Lock()
805	witnessEnabled := p.witnessEnabled
806	p.flagsMtx.Unlock()
807
808	return witnessEnabled
809}
810
811// PushAddrMsg sends an addr message to the connected peer using the provided
812// addresses.  This function is useful over manually sending the message via
813// QueueMessage since it automatically limits the addresses to the maximum
814// number allowed by the message and randomizes the chosen addresses when there
815// are too many.  It returns the addresses that were actually sent and no
816// message will be sent if there are no entries in the provided addresses slice.
817//
818// This function is safe for concurrent access.
819func (p *Peer) PushAddrMsg(addresses []*wire.NetAddress) ([]*wire.NetAddress, error) {
820	addressCount := len(addresses)
821
822	// Nothing to send.
823	if addressCount == 0 {
824		return nil, nil
825	}
826
827	msg := wire.NewMsgAddr()
828	msg.AddrList = make([]*wire.NetAddress, addressCount)
829	copy(msg.AddrList, addresses)
830
831	// Randomize the addresses sent if there are more than the maximum allowed.
832	if addressCount > wire.MaxAddrPerMsg {
833		// Shuffle the address list.
834		for i := 0; i < wire.MaxAddrPerMsg; i++ {
835			j := i + rand.Intn(addressCount-i)
836			msg.AddrList[i], msg.AddrList[j] = msg.AddrList[j], msg.AddrList[i]
837		}
838
839		// Truncate it to the maximum size.
840		msg.AddrList = msg.AddrList[:wire.MaxAddrPerMsg]
841	}
842
843	p.QueueMessage(msg, nil)
844	return msg.AddrList, nil
845}
846
847// PushGetBlocksMsg sends a getblocks message for the provided block locator
848// and stop hash.  It will ignore back-to-back duplicate requests.
849//
850// This function is safe for concurrent access.
851func (p *Peer) PushGetBlocksMsg(locator blockchain.BlockLocator, stopHash *chainhash.Hash) error {
852	// Extract the begin hash from the block locator, if one was specified,
853	// to use for filtering duplicate getblocks requests.
854	var beginHash *chainhash.Hash
855	if len(locator) > 0 {
856		beginHash = locator[0]
857	}
858
859	// Filter duplicate getblocks requests.
860	p.prevGetBlocksMtx.Lock()
861	isDuplicate := p.prevGetBlocksStop != nil && p.prevGetBlocksBegin != nil &&
862		beginHash != nil && stopHash.IsEqual(p.prevGetBlocksStop) &&
863		beginHash.IsEqual(p.prevGetBlocksBegin)
864	p.prevGetBlocksMtx.Unlock()
865
866	if isDuplicate {
867		log.Tracef("Filtering duplicate [getblocks] with begin "+
868			"hash %v, stop hash %v", beginHash, stopHash)
869		return nil
870	}
871
872	// Construct the getblocks request and queue it to be sent.
873	msg := wire.NewMsgGetBlocks(stopHash)
874	for _, hash := range locator {
875		err := msg.AddBlockLocatorHash(hash)
876		if err != nil {
877			return err
878		}
879	}
880	p.QueueMessage(msg, nil)
881
882	// Update the previous getblocks request information for filtering
883	// duplicates.
884	p.prevGetBlocksMtx.Lock()
885	p.prevGetBlocksBegin = beginHash
886	p.prevGetBlocksStop = stopHash
887	p.prevGetBlocksMtx.Unlock()
888	return nil
889}
890
891// PushGetHeadersMsg sends a getblocks message for the provided block locator
892// and stop hash.  It will ignore back-to-back duplicate requests.
893//
894// This function is safe for concurrent access.
895func (p *Peer) PushGetHeadersMsg(locator blockchain.BlockLocator, stopHash *chainhash.Hash) error {
896	// Extract the begin hash from the block locator, if one was specified,
897	// to use for filtering duplicate getheaders requests.
898	var beginHash *chainhash.Hash
899	if len(locator) > 0 {
900		beginHash = locator[0]
901	}
902
903	// Filter duplicate getheaders requests.
904	p.prevGetHdrsMtx.Lock()
905	isDuplicate := p.prevGetHdrsStop != nil && p.prevGetHdrsBegin != nil &&
906		beginHash != nil && stopHash.IsEqual(p.prevGetHdrsStop) &&
907		beginHash.IsEqual(p.prevGetHdrsBegin)
908	p.prevGetHdrsMtx.Unlock()
909
910	if isDuplicate {
911		log.Tracef("Filtering duplicate [getheaders] with begin hash %v",
912			beginHash)
913		return nil
914	}
915
916	// Construct the getheaders request and queue it to be sent.
917	msg := wire.NewMsgGetHeaders()
918	msg.HashStop = *stopHash
919	for _, hash := range locator {
920		err := msg.AddBlockLocatorHash(hash)
921		if err != nil {
922			return err
923		}
924	}
925	p.QueueMessage(msg, nil)
926
927	// Update the previous getheaders request information for filtering
928	// duplicates.
929	p.prevGetHdrsMtx.Lock()
930	p.prevGetHdrsBegin = beginHash
931	p.prevGetHdrsStop = stopHash
932	p.prevGetHdrsMtx.Unlock()
933	return nil
934}
935
936// PushRejectMsg sends a reject message for the provided command, reject code,
937// reject reason, and hash.  The hash will only be used when the command is a tx
938// or block and should be nil in other cases.  The wait parameter will cause the
939// function to block until the reject message has actually been sent.
940//
941// This function is safe for concurrent access.
942func (p *Peer) PushRejectMsg(command string, code wire.RejectCode, reason string, hash *chainhash.Hash, wait bool) {
943	// Don't bother sending the reject message if the protocol version
944	// is too low.
945	if p.VersionKnown() && p.ProtocolVersion() < wire.RejectVersion {
946		return
947	}
948
949	msg := wire.NewMsgReject(command, code, reason)
950	if command == wire.CmdTx || command == wire.CmdBlock {
951		if hash == nil {
952			log.Warnf("Sending a reject message for command "+
953				"type %v which should have specified a hash "+
954				"but does not", command)
955			hash = &zeroHash
956		}
957		msg.Hash = *hash
958	}
959
960	// Send the message without waiting if the caller has not requested it.
961	if !wait {
962		p.QueueMessage(msg, nil)
963		return
964	}
965
966	// Send the message and block until it has been sent before returning.
967	doneChan := make(chan struct{}, 1)
968	p.QueueMessage(msg, doneChan)
969	<-doneChan
970}
971
972// handlePingMsg is invoked when a peer receives a ping bitcoin message.  For
973// recent clients (protocol version > BIP0031Version), it replies with a pong
974// message.  For older clients, it does nothing and anything other than failure
975// is considered a successful ping.
976func (p *Peer) handlePingMsg(msg *wire.MsgPing) {
977	// Only reply with pong if the message is from a new enough client.
978	if p.ProtocolVersion() > wire.BIP0031Version {
979		// Include nonce from ping so pong can be identified.
980		p.QueueMessage(wire.NewMsgPong(msg.Nonce), nil)
981	}
982}
983
984// handlePongMsg is invoked when a peer receives a pong bitcoin message.  It
985// updates the ping statistics as required for recent clients (protocol
986// version > BIP0031Version).  There is no effect for older clients or when a
987// ping was not previously sent.
988func (p *Peer) handlePongMsg(msg *wire.MsgPong) {
989	// Arguably we could use a buffered channel here sending data
990	// in a fifo manner whenever we send a ping, or a list keeping track of
991	// the times of each ping. For now we just make a best effort and
992	// only record stats if it was for the last ping sent. Any preceding
993	// and overlapping pings will be ignored. It is unlikely to occur
994	// without large usage of the ping rpc call since we ping infrequently
995	// enough that if they overlap we would have timed out the peer.
996	if p.ProtocolVersion() > wire.BIP0031Version {
997		p.statsMtx.Lock()
998		if p.lastPingNonce != 0 && msg.Nonce == p.lastPingNonce {
999			p.lastPingMicros = time.Since(p.lastPingTime).Nanoseconds()
1000			p.lastPingMicros /= 1000 // convert to usec.
1001			p.lastPingNonce = 0
1002		}
1003		p.statsMtx.Unlock()
1004	}
1005}
1006
1007// readMessage reads the next bitcoin message from the peer with logging.
1008func (p *Peer) readMessage(encoding wire.MessageEncoding) (wire.Message, []byte, error) {
1009	n, msg, buf, err := wire.ReadMessageWithEncodingN(p.conn,
1010		p.ProtocolVersion(), p.cfg.ChainParams.Net, encoding)
1011	atomic.AddUint64(&p.bytesReceived, uint64(n))
1012	if p.cfg.Listeners.OnRead != nil {
1013		p.cfg.Listeners.OnRead(p, n, msg, err)
1014	}
1015	if err != nil {
1016		return nil, nil, err
1017	}
1018
1019	// Use closures to log expensive operations so they are only run when
1020	// the logging level requires it.
1021	log.Debugf("%v", newLogClosure(func() string {
1022		// Debug summary of message.
1023		summary := messageSummary(msg)
1024		if len(summary) > 0 {
1025			summary = " (" + summary + ")"
1026		}
1027		return fmt.Sprintf("Received %v%s from %s",
1028			msg.Command(), summary, p)
1029	}))
1030	log.Tracef("%v", newLogClosure(func() string {
1031		return spew.Sdump(msg)
1032	}))
1033	log.Tracef("%v", newLogClosure(func() string {
1034		return spew.Sdump(buf)
1035	}))
1036
1037	return msg, buf, nil
1038}
1039
1040// writeMessage sends a bitcoin message to the peer with logging.
1041func (p *Peer) writeMessage(msg wire.Message, enc wire.MessageEncoding) error {
1042	// Don't do anything if we're disconnecting.
1043	if atomic.LoadInt32(&p.disconnect) != 0 {
1044		return nil
1045	}
1046
1047	// Use closures to log expensive operations so they are only run when
1048	// the logging level requires it.
1049	log.Debugf("%v", newLogClosure(func() string {
1050		// Debug summary of message.
1051		summary := messageSummary(msg)
1052		if len(summary) > 0 {
1053			summary = " (" + summary + ")"
1054		}
1055		return fmt.Sprintf("Sending %v%s to %s", msg.Command(),
1056			summary, p)
1057	}))
1058	log.Tracef("%v", newLogClosure(func() string {
1059		return spew.Sdump(msg)
1060	}))
1061	log.Tracef("%v", newLogClosure(func() string {
1062		var buf bytes.Buffer
1063		_, err := wire.WriteMessageWithEncodingN(&buf, msg, p.ProtocolVersion(),
1064			p.cfg.ChainParams.Net, enc)
1065		if err != nil {
1066			return err.Error()
1067		}
1068		return spew.Sdump(buf.Bytes())
1069	}))
1070
1071	// Write the message to the peer.
1072	n, err := wire.WriteMessageWithEncodingN(p.conn, msg,
1073		p.ProtocolVersion(), p.cfg.ChainParams.Net, enc)
1074	atomic.AddUint64(&p.bytesSent, uint64(n))
1075	if p.cfg.Listeners.OnWrite != nil {
1076		p.cfg.Listeners.OnWrite(p, n, msg, err)
1077	}
1078	return err
1079}
1080
1081// isAllowedReadError returns whether or not the passed error is allowed without
1082// disconnecting the peer.  In particular, regression tests need to be allowed
1083// to send malformed messages without the peer being disconnected.
1084func (p *Peer) isAllowedReadError(err error) bool {
1085	// Only allow read errors in regression test mode.
1086	if p.cfg.ChainParams.Net != wire.TestNet {
1087		return false
1088	}
1089
1090	// Don't allow the error if it's not specifically a malformed message error.
1091	if _, ok := err.(*wire.MessageError); !ok {
1092		return false
1093	}
1094
1095	// Don't allow the error if it's not coming from localhost or the
1096	// hostname can't be determined for some reason.
1097	host, _, err := net.SplitHostPort(p.addr)
1098	if err != nil {
1099		return false
1100	}
1101
1102	if host != "127.0.0.1" && host != "localhost" {
1103		return false
1104	}
1105
1106	// Allowed if all checks passed.
1107	return true
1108}
1109
1110// shouldHandleReadError returns whether or not the passed error, which is
1111// expected to have come from reading from the remote peer in the inHandler,
1112// should be logged and responded to with a reject message.
1113func (p *Peer) shouldHandleReadError(err error) bool {
1114	// No logging or reject message when the peer is being forcibly
1115	// disconnected.
1116	if atomic.LoadInt32(&p.disconnect) != 0 {
1117		return false
1118	}
1119
1120	// No logging or reject message when the remote peer has been
1121	// disconnected.
1122	if err == io.EOF {
1123		return false
1124	}
1125	if opErr, ok := err.(*net.OpError); ok && !opErr.Temporary() {
1126		return false
1127	}
1128
1129	return true
1130}
1131
1132// maybeAddDeadline potentially adds a deadline for the appropriate expected
1133// response for the passed wire protocol command to the pending responses map.
1134func (p *Peer) maybeAddDeadline(pendingResponses map[string]time.Time, msgCmd string) {
1135	// Setup a deadline for each message being sent that expects a response.
1136	//
1137	// NOTE: Pings are intentionally ignored here since they are typically
1138	// sent asynchronously and as a result of a long backlock of messages,
1139	// such as is typical in the case of initial block download, the
1140	// response won't be received in time.
1141	deadline := time.Now().Add(stallResponseTimeout)
1142	switch msgCmd {
1143	case wire.CmdVersion:
1144		// Expects a verack message.
1145		pendingResponses[wire.CmdVerAck] = deadline
1146
1147	case wire.CmdMemPool:
1148		// Expects an inv message.
1149		pendingResponses[wire.CmdInv] = deadline
1150
1151	case wire.CmdGetBlocks:
1152		// Expects an inv message.
1153		pendingResponses[wire.CmdInv] = deadline
1154
1155	case wire.CmdGetData:
1156		// Expects a block, merkleblock, tx, or notfound message.
1157		pendingResponses[wire.CmdBlock] = deadline
1158		pendingResponses[wire.CmdMerkleBlock] = deadline
1159		pendingResponses[wire.CmdTx] = deadline
1160		pendingResponses[wire.CmdNotFound] = deadline
1161
1162	case wire.CmdGetHeaders:
1163		// Expects a headers message.  Use a longer deadline since it
1164		// can take a while for the remote peer to load all of the
1165		// headers.
1166		deadline = time.Now().Add(stallResponseTimeout * 3)
1167		pendingResponses[wire.CmdHeaders] = deadline
1168	}
1169}
1170
1171// stallHandler handles stall detection for the peer.  This entails keeping
1172// track of expected responses and assigning them deadlines while accounting for
1173// the time spent in callbacks.  It must be run as a goroutine.
1174func (p *Peer) stallHandler() {
1175	// These variables are used to adjust the deadline times forward by the
1176	// time it takes callbacks to execute.  This is done because new
1177	// messages aren't read until the previous one is finished processing
1178	// (which includes callbacks), so the deadline for receiving a response
1179	// for a given message must account for the processing time as well.
1180	var handlerActive bool
1181	var handlersStartTime time.Time
1182	var deadlineOffset time.Duration
1183
1184	// pendingResponses tracks the expected response deadline times.
1185	pendingResponses := make(map[string]time.Time)
1186
1187	// stallTicker is used to periodically check pending responses that have
1188	// exceeded the expected deadline and disconnect the peer due to
1189	// stalling.
1190	stallTicker := time.NewTicker(stallTickInterval)
1191	defer stallTicker.Stop()
1192
1193	// ioStopped is used to detect when both the input and output handler
1194	// goroutines are done.
1195	var ioStopped bool
1196out:
1197	for {
1198		select {
1199		case msg := <-p.stallControl:
1200			switch msg.command {
1201			case sccSendMessage:
1202				// Add a deadline for the expected response
1203				// message if needed.
1204				p.maybeAddDeadline(pendingResponses,
1205					msg.message.Command())
1206
1207			case sccReceiveMessage:
1208				// Remove received messages from the expected
1209				// response map.  Since certain commands expect
1210				// one of a group of responses, remove
1211				// everything in the expected group accordingly.
1212				switch msgCmd := msg.message.Command(); msgCmd {
1213				case wire.CmdBlock:
1214					fallthrough
1215				case wire.CmdMerkleBlock:
1216					fallthrough
1217				case wire.CmdTx:
1218					fallthrough
1219				case wire.CmdNotFound:
1220					delete(pendingResponses, wire.CmdBlock)
1221					delete(pendingResponses, wire.CmdMerkleBlock)
1222					delete(pendingResponses, wire.CmdTx)
1223					delete(pendingResponses, wire.CmdNotFound)
1224
1225				default:
1226					delete(pendingResponses, msgCmd)
1227				}
1228
1229			case sccHandlerStart:
1230				// Warn on unbalanced callback signalling.
1231				if handlerActive {
1232					log.Warn("Received handler start " +
1233						"control command while a " +
1234						"handler is already active")
1235					continue
1236				}
1237
1238				handlerActive = true
1239				handlersStartTime = time.Now()
1240
1241			case sccHandlerDone:
1242				// Warn on unbalanced callback signalling.
1243				if !handlerActive {
1244					log.Warn("Received handler done " +
1245						"control command when a " +
1246						"handler is not already active")
1247					continue
1248				}
1249
1250				// Extend active deadlines by the time it took
1251				// to execute the callback.
1252				duration := time.Since(handlersStartTime)
1253				deadlineOffset += duration
1254				handlerActive = false
1255
1256			default:
1257				log.Warnf("Unsupported message command %v",
1258					msg.command)
1259			}
1260
1261		case <-stallTicker.C:
1262			// Calculate the offset to apply to the deadline based
1263			// on how long the handlers have taken to execute since
1264			// the last tick.
1265			now := time.Now()
1266			offset := deadlineOffset
1267			if handlerActive {
1268				offset += now.Sub(handlersStartTime)
1269			}
1270
1271			// Disconnect the peer if any of the pending responses
1272			// don't arrive by their adjusted deadline.
1273			for command, deadline := range pendingResponses {
1274				if now.Before(deadline.Add(offset)) {
1275					continue
1276				}
1277
1278				log.Debugf("Peer %s appears to be stalled or "+
1279					"misbehaving, %s timeout -- "+
1280					"disconnecting", p, command)
1281				p.Disconnect()
1282				break
1283			}
1284
1285			// Reset the deadline offset for the next tick.
1286			deadlineOffset = 0
1287
1288		case <-p.inQuit:
1289			// The stall handler can exit once both the input and
1290			// output handler goroutines are done.
1291			if ioStopped {
1292				break out
1293			}
1294			ioStopped = true
1295
1296		case <-p.outQuit:
1297			// The stall handler can exit once both the input and
1298			// output handler goroutines are done.
1299			if ioStopped {
1300				break out
1301			}
1302			ioStopped = true
1303		}
1304	}
1305
1306	// Drain any wait channels before going away so there is nothing left
1307	// waiting on this goroutine.
1308cleanup:
1309	for {
1310		select {
1311		case <-p.stallControl:
1312		default:
1313			break cleanup
1314		}
1315	}
1316	log.Tracef("Peer stall handler done for %s", p)
1317}
1318
1319// inHandler handles all incoming messages for the peer.  It must be run as a
1320// goroutine.
1321func (p *Peer) inHandler() {
1322	// The timer is stopped when a new message is received and reset after it
1323	// is processed.
1324	idleTimer := time.AfterFunc(idleTimeout, func() {
1325		log.Warnf("Peer %s no answer for %s -- disconnecting", p, idleTimeout)
1326		p.Disconnect()
1327	})
1328
1329out:
1330	for atomic.LoadInt32(&p.disconnect) == 0 {
1331		// Read a message and stop the idle timer as soon as the read
1332		// is done.  The timer is reset below for the next iteration if
1333		// needed.
1334		rmsg, buf, err := p.readMessage(p.wireEncoding)
1335		idleTimer.Stop()
1336		if err != nil {
1337			// In order to allow regression tests with malformed messages, don't
1338			// disconnect the peer when we're in regression test mode and the
1339			// error is one of the allowed errors.
1340			if p.isAllowedReadError(err) {
1341				log.Errorf("Allowed test error from %s: %v", p, err)
1342				idleTimer.Reset(idleTimeout)
1343				continue
1344			}
1345
1346			// Only log the error and send reject message if the
1347			// local peer is not forcibly disconnecting and the
1348			// remote peer has not disconnected.
1349			if p.shouldHandleReadError(err) {
1350				errMsg := fmt.Sprintf("Can't read message from %s: %v", p, err)
1351				if err != io.ErrUnexpectedEOF {
1352					log.Errorf(errMsg)
1353				}
1354
1355				// Push a reject message for the malformed message and wait for
1356				// the message to be sent before disconnecting.
1357				//
1358				// NOTE: Ideally this would include the command in the header if
1359				// at least that much of the message was valid, but that is not
1360				// currently exposed by wire, so just used malformed for the
1361				// command.
1362				p.PushRejectMsg("malformed", wire.RejectMalformed, errMsg, nil,
1363					true)
1364			}
1365			break out
1366		}
1367		atomic.StoreInt64(&p.lastRecv, time.Now().Unix())
1368		p.stallControl <- stallControlMsg{sccReceiveMessage, rmsg}
1369
1370		// Handle each supported message type.
1371		p.stallControl <- stallControlMsg{sccHandlerStart, rmsg}
1372		switch msg := rmsg.(type) {
1373		case *wire.MsgVersion:
1374			// Limit to one version message per peer.
1375			p.PushRejectMsg(msg.Command(), wire.RejectDuplicate,
1376				"duplicate version message", nil, true)
1377			break out
1378
1379		case *wire.MsgVerAck:
1380			// Limit to one verack message per peer.
1381			p.PushRejectMsg(
1382				msg.Command(), wire.RejectDuplicate,
1383				"duplicate verack message", nil, true,
1384			)
1385			break out
1386
1387		case *wire.MsgGetAddr:
1388			if p.cfg.Listeners.OnGetAddr != nil {
1389				p.cfg.Listeners.OnGetAddr(p, msg)
1390			}
1391
1392		case *wire.MsgAddr:
1393			if p.cfg.Listeners.OnAddr != nil {
1394				p.cfg.Listeners.OnAddr(p, msg)
1395			}
1396
1397		case *wire.MsgPing:
1398			p.handlePingMsg(msg)
1399			if p.cfg.Listeners.OnPing != nil {
1400				p.cfg.Listeners.OnPing(p, msg)
1401			}
1402
1403		case *wire.MsgPong:
1404			p.handlePongMsg(msg)
1405			if p.cfg.Listeners.OnPong != nil {
1406				p.cfg.Listeners.OnPong(p, msg)
1407			}
1408
1409		case *wire.MsgAlert:
1410			if p.cfg.Listeners.OnAlert != nil {
1411				p.cfg.Listeners.OnAlert(p, msg)
1412			}
1413
1414		case *wire.MsgMemPool:
1415			if p.cfg.Listeners.OnMemPool != nil {
1416				p.cfg.Listeners.OnMemPool(p, msg)
1417			}
1418
1419		case *wire.MsgTx:
1420			if p.cfg.Listeners.OnTx != nil {
1421				p.cfg.Listeners.OnTx(p, msg)
1422			}
1423
1424		case *wire.MsgBlock:
1425			if p.cfg.Listeners.OnBlock != nil {
1426				p.cfg.Listeners.OnBlock(p, msg, buf)
1427			}
1428
1429		case *wire.MsgInv:
1430			if p.cfg.Listeners.OnInv != nil {
1431				p.cfg.Listeners.OnInv(p, msg)
1432			}
1433
1434		case *wire.MsgHeaders:
1435			if p.cfg.Listeners.OnHeaders != nil {
1436				p.cfg.Listeners.OnHeaders(p, msg)
1437			}
1438
1439		case *wire.MsgNotFound:
1440			if p.cfg.Listeners.OnNotFound != nil {
1441				p.cfg.Listeners.OnNotFound(p, msg)
1442			}
1443
1444		case *wire.MsgGetData:
1445			if p.cfg.Listeners.OnGetData != nil {
1446				p.cfg.Listeners.OnGetData(p, msg)
1447			}
1448
1449		case *wire.MsgGetBlocks:
1450			if p.cfg.Listeners.OnGetBlocks != nil {
1451				p.cfg.Listeners.OnGetBlocks(p, msg)
1452			}
1453
1454		case *wire.MsgGetHeaders:
1455			if p.cfg.Listeners.OnGetHeaders != nil {
1456				p.cfg.Listeners.OnGetHeaders(p, msg)
1457			}
1458
1459		case *wire.MsgGetCFilters:
1460			if p.cfg.Listeners.OnGetCFilters != nil {
1461				p.cfg.Listeners.OnGetCFilters(p, msg)
1462			}
1463
1464		case *wire.MsgGetCFHeaders:
1465			if p.cfg.Listeners.OnGetCFHeaders != nil {
1466				p.cfg.Listeners.OnGetCFHeaders(p, msg)
1467			}
1468
1469		case *wire.MsgGetCFCheckpt:
1470			if p.cfg.Listeners.OnGetCFCheckpt != nil {
1471				p.cfg.Listeners.OnGetCFCheckpt(p, msg)
1472			}
1473
1474		case *wire.MsgCFilter:
1475			if p.cfg.Listeners.OnCFilter != nil {
1476				p.cfg.Listeners.OnCFilter(p, msg)
1477			}
1478
1479		case *wire.MsgCFHeaders:
1480			if p.cfg.Listeners.OnCFHeaders != nil {
1481				p.cfg.Listeners.OnCFHeaders(p, msg)
1482			}
1483
1484		case *wire.MsgFeeFilter:
1485			if p.cfg.Listeners.OnFeeFilter != nil {
1486				p.cfg.Listeners.OnFeeFilter(p, msg)
1487			}
1488
1489		case *wire.MsgFilterAdd:
1490			if p.cfg.Listeners.OnFilterAdd != nil {
1491				p.cfg.Listeners.OnFilterAdd(p, msg)
1492			}
1493
1494		case *wire.MsgFilterClear:
1495			if p.cfg.Listeners.OnFilterClear != nil {
1496				p.cfg.Listeners.OnFilterClear(p, msg)
1497			}
1498
1499		case *wire.MsgFilterLoad:
1500			if p.cfg.Listeners.OnFilterLoad != nil {
1501				p.cfg.Listeners.OnFilterLoad(p, msg)
1502			}
1503
1504		case *wire.MsgMerkleBlock:
1505			if p.cfg.Listeners.OnMerkleBlock != nil {
1506				p.cfg.Listeners.OnMerkleBlock(p, msg)
1507			}
1508
1509		case *wire.MsgReject:
1510			if p.cfg.Listeners.OnReject != nil {
1511				p.cfg.Listeners.OnReject(p, msg)
1512			}
1513
1514		case *wire.MsgSendHeaders:
1515			p.flagsMtx.Lock()
1516			p.sendHeadersPreferred = true
1517			p.flagsMtx.Unlock()
1518
1519			if p.cfg.Listeners.OnSendHeaders != nil {
1520				p.cfg.Listeners.OnSendHeaders(p, msg)
1521			}
1522
1523		default:
1524			log.Debugf("Received unhandled message of type %v "+
1525				"from %v", rmsg.Command(), p)
1526		}
1527		p.stallControl <- stallControlMsg{sccHandlerDone, rmsg}
1528
1529		// A message was received so reset the idle timer.
1530		idleTimer.Reset(idleTimeout)
1531	}
1532
1533	// Ensure the idle timer is stopped to avoid leaking the resource.
1534	idleTimer.Stop()
1535
1536	// Ensure connection is closed.
1537	p.Disconnect()
1538
1539	close(p.inQuit)
1540	log.Tracef("Peer input handler done for %s", p)
1541}
1542
1543// queueHandler handles the queuing of outgoing data for the peer. This runs as
1544// a muxer for various sources of input so we can ensure that server and peer
1545// handlers will not block on us sending a message.  That data is then passed on
1546// to outHandler to be actually written.
1547func (p *Peer) queueHandler() {
1548	pendingMsgs := list.New()
1549	invSendQueue := list.New()
1550	trickleTicker := time.NewTicker(p.cfg.TrickleInterval)
1551	defer trickleTicker.Stop()
1552
1553	// We keep the waiting flag so that we know if we have a message queued
1554	// to the outHandler or not.  We could use the presence of a head of
1555	// the list for this but then we have rather racy concerns about whether
1556	// it has gotten it at cleanup time - and thus who sends on the
1557	// message's done channel.  To avoid such confusion we keep a different
1558	// flag and pendingMsgs only contains messages that we have not yet
1559	// passed to outHandler.
1560	waiting := false
1561
1562	// To avoid duplication below.
1563	queuePacket := func(msg outMsg, list *list.List, waiting bool) bool {
1564		if !waiting {
1565			p.sendQueue <- msg
1566		} else {
1567			list.PushBack(msg)
1568		}
1569		// we are always waiting now.
1570		return true
1571	}
1572out:
1573	for {
1574		select {
1575		case msg := <-p.outputQueue:
1576			waiting = queuePacket(msg, pendingMsgs, waiting)
1577
1578		// This channel is notified when a message has been sent across
1579		// the network socket.
1580		case <-p.sendDoneQueue:
1581			// No longer waiting if there are no more messages
1582			// in the pending messages queue.
1583			next := pendingMsgs.Front()
1584			if next == nil {
1585				waiting = false
1586				continue
1587			}
1588
1589			// Notify the outHandler about the next item to
1590			// asynchronously send.
1591			val := pendingMsgs.Remove(next)
1592			p.sendQueue <- val.(outMsg)
1593
1594		case iv := <-p.outputInvChan:
1595			// No handshake?  They'll find out soon enough.
1596			if p.VersionKnown() {
1597				// If this is a new block, then we'll blast it
1598				// out immediately, sipping the inv trickle
1599				// queue.
1600				if iv.Type == wire.InvTypeBlock ||
1601					iv.Type == wire.InvTypeWitnessBlock {
1602
1603					invMsg := wire.NewMsgInvSizeHint(1)
1604					invMsg.AddInvVect(iv)
1605					waiting = queuePacket(outMsg{msg: invMsg},
1606						pendingMsgs, waiting)
1607				} else {
1608					invSendQueue.PushBack(iv)
1609				}
1610			}
1611
1612		case <-trickleTicker.C:
1613			// Don't send anything if we're disconnecting or there
1614			// is no queued inventory.
1615			// version is known if send queue has any entries.
1616			if atomic.LoadInt32(&p.disconnect) != 0 ||
1617				invSendQueue.Len() == 0 {
1618				continue
1619			}
1620
1621			// Create and send as many inv messages as needed to
1622			// drain the inventory send queue.
1623			invMsg := wire.NewMsgInvSizeHint(uint(invSendQueue.Len()))
1624			for e := invSendQueue.Front(); e != nil; e = invSendQueue.Front() {
1625				iv := invSendQueue.Remove(e).(*wire.InvVect)
1626
1627				// Don't send inventory that became known after
1628				// the initial check.
1629				if p.knownInventory.Exists(iv) {
1630					continue
1631				}
1632
1633				invMsg.AddInvVect(iv)
1634				if len(invMsg.InvList) >= maxInvTrickleSize {
1635					waiting = queuePacket(
1636						outMsg{msg: invMsg},
1637						pendingMsgs, waiting)
1638					invMsg = wire.NewMsgInvSizeHint(uint(invSendQueue.Len()))
1639				}
1640
1641				// Add the inventory that is being relayed to
1642				// the known inventory for the peer.
1643				p.AddKnownInventory(iv)
1644			}
1645			if len(invMsg.InvList) > 0 {
1646				waiting = queuePacket(outMsg{msg: invMsg},
1647					pendingMsgs, waiting)
1648			}
1649
1650		case <-p.quit:
1651			break out
1652		}
1653	}
1654
1655	// Drain any wait channels before we go away so we don't leave something
1656	// waiting for us.
1657	for e := pendingMsgs.Front(); e != nil; e = pendingMsgs.Front() {
1658		val := pendingMsgs.Remove(e)
1659		msg := val.(outMsg)
1660		if msg.doneChan != nil {
1661			msg.doneChan <- struct{}{}
1662		}
1663	}
1664cleanup:
1665	for {
1666		select {
1667		case msg := <-p.outputQueue:
1668			if msg.doneChan != nil {
1669				msg.doneChan <- struct{}{}
1670			}
1671		case <-p.outputInvChan:
1672			// Just drain channel
1673		// sendDoneQueue is buffered so doesn't need draining.
1674		default:
1675			break cleanup
1676		}
1677	}
1678	close(p.queueQuit)
1679	log.Tracef("Peer queue handler done for %s", p)
1680}
1681
1682// shouldLogWriteError returns whether or not the passed error, which is
1683// expected to have come from writing to the remote peer in the outHandler,
1684// should be logged.
1685func (p *Peer) shouldLogWriteError(err error) bool {
1686	// No logging when the peer is being forcibly disconnected.
1687	if atomic.LoadInt32(&p.disconnect) != 0 {
1688		return false
1689	}
1690
1691	// No logging when the remote peer has been disconnected.
1692	if err == io.EOF {
1693		return false
1694	}
1695	if opErr, ok := err.(*net.OpError); ok && !opErr.Temporary() {
1696		return false
1697	}
1698
1699	return true
1700}
1701
1702// outHandler handles all outgoing messages for the peer.  It must be run as a
1703// goroutine.  It uses a buffered channel to serialize output messages while
1704// allowing the sender to continue running asynchronously.
1705func (p *Peer) outHandler() {
1706out:
1707	for {
1708		select {
1709		case msg := <-p.sendQueue:
1710			switch m := msg.msg.(type) {
1711			case *wire.MsgPing:
1712				// Only expects a pong message in later protocol
1713				// versions.  Also set up statistics.
1714				if p.ProtocolVersion() > wire.BIP0031Version {
1715					p.statsMtx.Lock()
1716					p.lastPingNonce = m.Nonce
1717					p.lastPingTime = time.Now()
1718					p.statsMtx.Unlock()
1719				}
1720			}
1721
1722			p.stallControl <- stallControlMsg{sccSendMessage, msg.msg}
1723
1724			err := p.writeMessage(msg.msg, msg.encoding)
1725			if err != nil {
1726				p.Disconnect()
1727				if p.shouldLogWriteError(err) {
1728					log.Errorf("Failed to send message to "+
1729						"%s: %v", p, err)
1730				}
1731				if msg.doneChan != nil {
1732					msg.doneChan <- struct{}{}
1733				}
1734				continue
1735			}
1736
1737			// At this point, the message was successfully sent, so
1738			// update the last send time, signal the sender of the
1739			// message that it has been sent (if requested), and
1740			// signal the send queue to the deliver the next queued
1741			// message.
1742			atomic.StoreInt64(&p.lastSend, time.Now().Unix())
1743			if msg.doneChan != nil {
1744				msg.doneChan <- struct{}{}
1745			}
1746			p.sendDoneQueue <- struct{}{}
1747
1748		case <-p.quit:
1749			break out
1750		}
1751	}
1752
1753	<-p.queueQuit
1754
1755	// Drain any wait channels before we go away so we don't leave something
1756	// waiting for us. We have waited on queueQuit and thus we can be sure
1757	// that we will not miss anything sent on sendQueue.
1758cleanup:
1759	for {
1760		select {
1761		case msg := <-p.sendQueue:
1762			if msg.doneChan != nil {
1763				msg.doneChan <- struct{}{}
1764			}
1765			// no need to send on sendDoneQueue since queueHandler
1766			// has been waited on and already exited.
1767		default:
1768			break cleanup
1769		}
1770	}
1771	close(p.outQuit)
1772	log.Tracef("Peer output handler done for %s", p)
1773}
1774
1775// pingHandler periodically pings the peer.  It must be run as a goroutine.
1776func (p *Peer) pingHandler() {
1777	pingTicker := time.NewTicker(pingInterval)
1778	defer pingTicker.Stop()
1779
1780out:
1781	for {
1782		select {
1783		case <-pingTicker.C:
1784			nonce, err := wire.RandomUint64()
1785			if err != nil {
1786				log.Errorf("Not sending ping to %s: %v", p, err)
1787				continue
1788			}
1789			p.QueueMessage(wire.NewMsgPing(nonce), nil)
1790
1791		case <-p.quit:
1792			break out
1793		}
1794	}
1795}
1796
1797// QueueMessage adds the passed bitcoin message to the peer send queue.
1798//
1799// This function is safe for concurrent access.
1800func (p *Peer) QueueMessage(msg wire.Message, doneChan chan<- struct{}) {
1801	p.QueueMessageWithEncoding(msg, doneChan, wire.BaseEncoding)
1802}
1803
1804// QueueMessageWithEncoding adds the passed bitcoin message to the peer send
1805// queue. This function is identical to QueueMessage, however it allows the
1806// caller to specify the wire encoding type that should be used when
1807// encoding/decoding blocks and transactions.
1808//
1809// This function is safe for concurrent access.
1810func (p *Peer) QueueMessageWithEncoding(msg wire.Message, doneChan chan<- struct{},
1811	encoding wire.MessageEncoding) {
1812
1813	// Avoid risk of deadlock if goroutine already exited.  The goroutine
1814	// we will be sending to hangs around until it knows for a fact that
1815	// it is marked as disconnected and *then* it drains the channels.
1816	if !p.Connected() {
1817		if doneChan != nil {
1818			go func() {
1819				doneChan <- struct{}{}
1820			}()
1821		}
1822		return
1823	}
1824	p.outputQueue <- outMsg{msg: msg, encoding: encoding, doneChan: doneChan}
1825}
1826
1827// QueueInventory adds the passed inventory to the inventory send queue which
1828// might not be sent right away, rather it is trickled to the peer in batches.
1829// Inventory that the peer is already known to have is ignored.
1830//
1831// This function is safe for concurrent access.
1832func (p *Peer) QueueInventory(invVect *wire.InvVect) {
1833	// Don't add the inventory to the send queue if the peer is already
1834	// known to have it.
1835	if p.knownInventory.Exists(invVect) {
1836		return
1837	}
1838
1839	// Avoid risk of deadlock if goroutine already exited.  The goroutine
1840	// we will be sending to hangs around until it knows for a fact that
1841	// it is marked as disconnected and *then* it drains the channels.
1842	if !p.Connected() {
1843		return
1844	}
1845
1846	p.outputInvChan <- invVect
1847}
1848
1849// Connected returns whether or not the peer is currently connected.
1850//
1851// This function is safe for concurrent access.
1852func (p *Peer) Connected() bool {
1853	return atomic.LoadInt32(&p.connected) != 0 &&
1854		atomic.LoadInt32(&p.disconnect) == 0
1855}
1856
1857// Disconnect disconnects the peer by closing the connection.  Calling this
1858// function when the peer is already disconnected or in the process of
1859// disconnecting will have no effect.
1860func (p *Peer) Disconnect() {
1861	if atomic.AddInt32(&p.disconnect, 1) != 1 {
1862		return
1863	}
1864
1865	log.Tracef("Disconnecting %s", p)
1866	if atomic.LoadInt32(&p.connected) != 0 {
1867		p.conn.Close()
1868	}
1869	close(p.quit)
1870}
1871
1872// readRemoteVersionMsg waits for the next message to arrive from the remote
1873// peer.  If the next message is not a version message or the version is not
1874// acceptable then return an error.
1875func (p *Peer) readRemoteVersionMsg() error {
1876	// Read their version message.
1877	remoteMsg, _, err := p.readMessage(wire.LatestEncoding)
1878	if err != nil {
1879		return err
1880	}
1881
1882	// Notify and disconnect clients if the first message is not a version
1883	// message.
1884	msg, ok := remoteMsg.(*wire.MsgVersion)
1885	if !ok {
1886		reason := "a version message must precede all others"
1887		rejectMsg := wire.NewMsgReject(msg.Command(), wire.RejectMalformed,
1888			reason)
1889		_ = p.writeMessage(rejectMsg, wire.LatestEncoding)
1890		return errors.New(reason)
1891	}
1892
1893	// Detect self connections.
1894	if !allowSelfConns && sentNonces.Exists(msg.Nonce) {
1895		return errors.New("disconnecting peer connected to self")
1896	}
1897
1898	// Negotiate the protocol version and set the services to what the remote
1899	// peer advertised.
1900	p.flagsMtx.Lock()
1901	p.advertisedProtoVer = uint32(msg.ProtocolVersion)
1902	p.protocolVersion = minUint32(p.protocolVersion, p.advertisedProtoVer)
1903	p.versionKnown = true
1904	p.services = msg.Services
1905	p.flagsMtx.Unlock()
1906	log.Debugf("Negotiated protocol version %d for peer %s",
1907		p.protocolVersion, p)
1908
1909	// Updating a bunch of stats including block based stats, and the
1910	// peer's time offset.
1911	p.statsMtx.Lock()
1912	p.lastBlock = msg.LastBlock
1913	p.startingHeight = msg.LastBlock
1914	p.timeOffset = msg.Timestamp.Unix() - time.Now().Unix()
1915	p.statsMtx.Unlock()
1916
1917	// Set the peer's ID, user agent, and potentially the flag which
1918	// specifies the witness support is enabled.
1919	p.flagsMtx.Lock()
1920	p.id = atomic.AddInt32(&nodeCount, 1)
1921	p.userAgent = msg.UserAgent
1922
1923	// Determine if the peer would like to receive witness data with
1924	// transactions, or not.
1925	if p.services&wire.SFNodeWitness == wire.SFNodeWitness {
1926		p.witnessEnabled = true
1927	}
1928	p.flagsMtx.Unlock()
1929
1930	// Once the version message has been exchanged, we're able to determine
1931	// if this peer knows how to encode witness data over the wire
1932	// protocol. If so, then we'll switch to a decoding mode which is
1933	// prepared for the new transaction format introduced as part of
1934	// BIP0144.
1935	if p.services&wire.SFNodeWitness == wire.SFNodeWitness {
1936		p.wireEncoding = wire.WitnessEncoding
1937	}
1938
1939	// Invoke the callback if specified.
1940	if p.cfg.Listeners.OnVersion != nil {
1941		rejectMsg := p.cfg.Listeners.OnVersion(p, msg)
1942		if rejectMsg != nil {
1943			_ = p.writeMessage(rejectMsg, wire.LatestEncoding)
1944			return errors.New(rejectMsg.Reason)
1945		}
1946	}
1947
1948	// Notify and disconnect clients that have a protocol version that is
1949	// too old.
1950	//
1951	// NOTE: If minAcceptableProtocolVersion is raised to be higher than
1952	// wire.RejectVersion, this should send a reject packet before
1953	// disconnecting.
1954	if uint32(msg.ProtocolVersion) < MinAcceptableProtocolVersion {
1955		// Send a reject message indicating the protocol version is
1956		// obsolete and wait for the message to be sent before
1957		// disconnecting.
1958		reason := fmt.Sprintf("protocol version must be %d or greater",
1959			MinAcceptableProtocolVersion)
1960		rejectMsg := wire.NewMsgReject(msg.Command(), wire.RejectObsolete,
1961			reason)
1962		_ = p.writeMessage(rejectMsg, wire.LatestEncoding)
1963		return errors.New(reason)
1964	}
1965
1966	return nil
1967}
1968
1969// readRemoteVerAckMsg waits for the next message to arrive from the remote
1970// peer. If this message is not a verack message, then an error is returned.
1971// This method is to be used as part of the version negotiation upon a new
1972// connection.
1973func (p *Peer) readRemoteVerAckMsg() error {
1974	// Read the next message from the wire.
1975	remoteMsg, _, err := p.readMessage(wire.LatestEncoding)
1976	if err != nil {
1977		return err
1978	}
1979
1980	// It should be a verack message, otherwise send a reject message to the
1981	// peer explaining why.
1982	msg, ok := remoteMsg.(*wire.MsgVerAck)
1983	if !ok {
1984		reason := "a verack message must follow version"
1985		rejectMsg := wire.NewMsgReject(
1986			msg.Command(), wire.RejectMalformed, reason,
1987		)
1988		_ = p.writeMessage(rejectMsg, wire.LatestEncoding)
1989		return errors.New(reason)
1990	}
1991
1992	p.flagsMtx.Lock()
1993	p.verAckReceived = true
1994	p.flagsMtx.Unlock()
1995
1996	if p.cfg.Listeners.OnVerAck != nil {
1997		p.cfg.Listeners.OnVerAck(p, msg)
1998	}
1999
2000	return nil
2001}
2002
2003// localVersionMsg creates a version message that can be used to send to the
2004// remote peer.
2005func (p *Peer) localVersionMsg() (*wire.MsgVersion, error) {
2006	var blockNum int32
2007	if p.cfg.NewestBlock != nil {
2008		var err error
2009		_, blockNum, err = p.cfg.NewestBlock()
2010		if err != nil {
2011			return nil, err
2012		}
2013	}
2014
2015	theirNA := p.na
2016
2017	// If we are behind a proxy and the connection comes from the proxy then
2018	// we return an unroutable address as their address. This is to prevent
2019	// leaking the tor proxy address.
2020	if p.cfg.Proxy != "" {
2021		proxyaddress, _, err := net.SplitHostPort(p.cfg.Proxy)
2022		// invalid proxy means poorly configured, be on the safe side.
2023		if err != nil || p.na.IP.String() == proxyaddress {
2024			theirNA = wire.NewNetAddressIPPort(net.IP([]byte{0, 0, 0, 0}), 0,
2025				theirNA.Services)
2026		}
2027	}
2028
2029	// Create a wire.NetAddress with only the services set to use as the
2030	// "addrme" in the version message.
2031	//
2032	// Older nodes previously added the IP and port information to the
2033	// address manager which proved to be unreliable as an inbound
2034	// connection from a peer didn't necessarily mean the peer itself
2035	// accepted inbound connections.
2036	//
2037	// Also, the timestamp is unused in the version message.
2038	ourNA := &wire.NetAddress{
2039		Services: p.cfg.Services,
2040	}
2041
2042	// Generate a unique nonce for this peer so self connections can be
2043	// detected.  This is accomplished by adding it to a size-limited map of
2044	// recently seen nonces.
2045	nonce := uint64(rand.Int63())
2046	sentNonces.Add(nonce)
2047
2048	// Version message.
2049	msg := wire.NewMsgVersion(ourNA, theirNA, nonce, blockNum)
2050	msg.AddUserAgent(p.cfg.UserAgentName, p.cfg.UserAgentVersion,
2051		p.cfg.UserAgentComments...)
2052
2053	// Advertise local services.
2054	msg.Services = p.cfg.Services
2055
2056	// Advertise our max supported protocol version.
2057	msg.ProtocolVersion = int32(p.cfg.ProtocolVersion)
2058
2059	// Advertise if inv messages for transactions are desired.
2060	msg.DisableRelayTx = p.cfg.DisableRelayTx
2061
2062	return msg, nil
2063}
2064
2065// writeLocalVersionMsg writes our version message to the remote peer.
2066func (p *Peer) writeLocalVersionMsg() error {
2067	localVerMsg, err := p.localVersionMsg()
2068	if err != nil {
2069		return err
2070	}
2071
2072	return p.writeMessage(localVerMsg, wire.LatestEncoding)
2073}
2074
2075// negotiateInboundProtocol performs the negotiation protocol for an inbound
2076// peer. The events should occur in the following order, otherwise an error is
2077// returned:
2078//
2079//   1. Remote peer sends their version.
2080//   2. We send our version.
2081//   3. We send our verack.
2082//   4. Remote peer sends their verack.
2083func (p *Peer) negotiateInboundProtocol() error {
2084	if err := p.readRemoteVersionMsg(); err != nil {
2085		return err
2086	}
2087
2088	if err := p.writeLocalVersionMsg(); err != nil {
2089		return err
2090	}
2091
2092	err := p.writeMessage(wire.NewMsgVerAck(), wire.LatestEncoding)
2093	if err != nil {
2094		return err
2095	}
2096
2097	return p.readRemoteVerAckMsg()
2098}
2099
2100// negotiateOutoundProtocol performs the negotiation protocol for an outbound
2101// peer. The events should occur in the following order, otherwise an error is
2102// returned:
2103//
2104//   1. We send our version.
2105//   2. Remote peer sends their version.
2106//   3. Remote peer sends their verack.
2107//   4. We send our verack.
2108func (p *Peer) negotiateOutboundProtocol() error {
2109	if err := p.writeLocalVersionMsg(); err != nil {
2110		return err
2111	}
2112
2113	if err := p.readRemoteVersionMsg(); err != nil {
2114		return err
2115	}
2116
2117	if err := p.readRemoteVerAckMsg(); err != nil {
2118		return err
2119	}
2120
2121	return p.writeMessage(wire.NewMsgVerAck(), wire.LatestEncoding)
2122}
2123
2124// start begins processing input and output messages.
2125func (p *Peer) start() error {
2126	log.Tracef("Starting peer %s", p)
2127
2128	negotiateErr := make(chan error, 1)
2129	go func() {
2130		if p.inbound {
2131			negotiateErr <- p.negotiateInboundProtocol()
2132		} else {
2133			negotiateErr <- p.negotiateOutboundProtocol()
2134		}
2135	}()
2136
2137	// Negotiate the protocol within the specified negotiateTimeout.
2138	select {
2139	case err := <-negotiateErr:
2140		if err != nil {
2141			p.Disconnect()
2142			return err
2143		}
2144	case <-time.After(negotiateTimeout):
2145		p.Disconnect()
2146		return errors.New("protocol negotiation timeout")
2147	}
2148	log.Debugf("Connected to %s", p.Addr())
2149
2150	// The protocol has been negotiated successfully so start processing input
2151	// and output messages.
2152	go p.stallHandler()
2153	go p.inHandler()
2154	go p.queueHandler()
2155	go p.outHandler()
2156	go p.pingHandler()
2157
2158	return nil
2159}
2160
2161// AssociateConnection associates the given conn to the peer.   Calling this
2162// function when the peer is already connected will have no effect.
2163func (p *Peer) AssociateConnection(conn net.Conn) {
2164	// Already connected?
2165	if !atomic.CompareAndSwapInt32(&p.connected, 0, 1) {
2166		return
2167	}
2168
2169	p.conn = conn
2170	p.timeConnected = time.Now()
2171
2172	if p.inbound {
2173		p.addr = p.conn.RemoteAddr().String()
2174
2175		// Set up a NetAddress for the peer to be used with AddrManager.  We
2176		// only do this inbound because outbound set this up at connection time
2177		// and no point recomputing.
2178		na, err := newNetAddress(p.conn.RemoteAddr(), p.services)
2179		if err != nil {
2180			log.Errorf("Cannot create remote net address: %v", err)
2181			p.Disconnect()
2182			return
2183		}
2184		p.na = na
2185	}
2186
2187	go func() {
2188		if err := p.start(); err != nil {
2189			log.Debugf("Cannot start peer %v: %v", p, err)
2190			p.Disconnect()
2191		}
2192	}()
2193}
2194
2195// WaitForDisconnect waits until the peer has completely disconnected and all
2196// resources are cleaned up.  This will happen if either the local or remote
2197// side has been disconnected or the peer is forcibly disconnected via
2198// Disconnect.
2199func (p *Peer) WaitForDisconnect() {
2200	<-p.quit
2201}
2202
2203// newPeerBase returns a new base bitcoin peer based on the inbound flag.  This
2204// is used by the NewInboundPeer and NewOutboundPeer functions to perform base
2205// setup needed by both types of peers.
2206func newPeerBase(origCfg *Config, inbound bool) *Peer {
2207	// Default to the max supported protocol version if not specified by the
2208	// caller.
2209	cfg := *origCfg // Copy to avoid mutating caller.
2210	if cfg.ProtocolVersion == 0 {
2211		cfg.ProtocolVersion = MaxProtocolVersion
2212	}
2213
2214	// Set the chain parameters to testnet if the caller did not specify any.
2215	if cfg.ChainParams == nil {
2216		cfg.ChainParams = &chaincfg.TestNet3Params
2217	}
2218
2219	// Set the trickle interval if a non-positive value is specified.
2220	if cfg.TrickleInterval <= 0 {
2221		cfg.TrickleInterval = DefaultTrickleInterval
2222	}
2223
2224	p := Peer{
2225		inbound:         inbound,
2226		wireEncoding:    wire.BaseEncoding,
2227		knownInventory:  newMruInventoryMap(maxKnownInventory),
2228		stallControl:    make(chan stallControlMsg, 1), // nonblocking sync
2229		outputQueue:     make(chan outMsg, outputBufferSize),
2230		sendQueue:       make(chan outMsg, 1),   // nonblocking sync
2231		sendDoneQueue:   make(chan struct{}, 1), // nonblocking sync
2232		outputInvChan:   make(chan *wire.InvVect, outputBufferSize),
2233		inQuit:          make(chan struct{}),
2234		queueQuit:       make(chan struct{}),
2235		outQuit:         make(chan struct{}),
2236		quit:            make(chan struct{}),
2237		cfg:             cfg, // Copy so caller can't mutate.
2238		services:        cfg.Services,
2239		protocolVersion: cfg.ProtocolVersion,
2240	}
2241	return &p
2242}
2243
2244// NewInboundPeer returns a new inbound bitcoin peer. Use Start to begin
2245// processing incoming and outgoing messages.
2246func NewInboundPeer(cfg *Config) *Peer {
2247	return newPeerBase(cfg, true)
2248}
2249
2250// NewOutboundPeer returns a new outbound bitcoin peer.
2251func NewOutboundPeer(cfg *Config, addr string) (*Peer, error) {
2252	p := newPeerBase(cfg, false)
2253	p.addr = addr
2254
2255	host, portStr, err := net.SplitHostPort(addr)
2256	if err != nil {
2257		return nil, err
2258	}
2259
2260	port, err := strconv.ParseUint(portStr, 10, 16)
2261	if err != nil {
2262		return nil, err
2263	}
2264
2265	if cfg.HostToNetAddress != nil {
2266		na, err := cfg.HostToNetAddress(host, uint16(port), 0)
2267		if err != nil {
2268			return nil, err
2269		}
2270		p.na = na
2271	} else {
2272		p.na = wire.NewNetAddressIPPort(net.ParseIP(host), uint16(port), 0)
2273	}
2274
2275	return p, nil
2276}
2277
2278func init() {
2279	rand.Seed(time.Now().UnixNano())
2280}
2281