1// Copyright (c) 2013-2018 The btcsuite developers 2// Copyright (c) 2016-2018 The Decred developers 3// Use of this source code is governed by an ISC 4// license that can be found in the LICENSE file. 5 6package peer 7 8import ( 9 "bytes" 10 "container/list" 11 "errors" 12 "fmt" 13 "io" 14 "math/rand" 15 "net" 16 "strconv" 17 "sync" 18 "sync/atomic" 19 "time" 20 21 "github.com/btcsuite/btcd/blockchain" 22 "github.com/btcsuite/btcd/chaincfg" 23 "github.com/btcsuite/btcd/chaincfg/chainhash" 24 "github.com/btcsuite/btcd/wire" 25 "github.com/btcsuite/go-socks/socks" 26 "github.com/davecgh/go-spew/spew" 27) 28 29const ( 30 // MaxProtocolVersion is the max protocol version the peer supports. 31 MaxProtocolVersion = wire.FeeFilterVersion 32 33 // DefaultTrickleInterval is the min time between attempts to send an 34 // inv message to a peer. 35 DefaultTrickleInterval = 10 * time.Second 36 37 // MinAcceptableProtocolVersion is the lowest protocol version that a 38 // connected peer may support. 39 MinAcceptableProtocolVersion = wire.MultipleAddressVersion 40 41 // outputBufferSize is the number of elements the output channels use. 42 outputBufferSize = 50 43 44 // invTrickleSize is the maximum amount of inventory to send in a single 45 // message when trickling inventory to remote peers. 46 maxInvTrickleSize = 1000 47 48 // maxKnownInventory is the maximum number of items to keep in the known 49 // inventory cache. 50 maxKnownInventory = 1000 51 52 // pingInterval is the interval of time to wait in between sending ping 53 // messages. 54 pingInterval = 2 * time.Minute 55 56 // negotiateTimeout is the duration of inactivity before we timeout a 57 // peer that hasn't completed the initial version negotiation. 58 negotiateTimeout = 30 * time.Second 59 60 // idleTimeout is the duration of inactivity before we time out a peer. 61 idleTimeout = 5 * time.Minute 62 63 // stallTickInterval is the interval of time between each check for 64 // stalled peers. 65 stallTickInterval = 15 * time.Second 66 67 // stallResponseTimeout is the base maximum amount of time messages that 68 // expect a response will wait before disconnecting the peer for 69 // stalling. The deadlines are adjusted for callback running times and 70 // only checked on each stall tick interval. 71 stallResponseTimeout = 30 * time.Second 72) 73 74var ( 75 // nodeCount is the total number of peer connections made since startup 76 // and is used to assign an id to a peer. 77 nodeCount int32 78 79 // zeroHash is the zero value hash (all zeros). It is defined as a 80 // convenience. 81 zeroHash chainhash.Hash 82 83 // sentNonces houses the unique nonces that are generated when pushing 84 // version messages that are used to detect self connections. 85 sentNonces = newMruNonceMap(50) 86 87 // allowSelfConns is only used to allow the tests to bypass the self 88 // connection detecting and disconnect logic since they intentionally 89 // do so for testing purposes. 90 allowSelfConns bool 91) 92 93// MessageListeners defines callback function pointers to invoke with message 94// listeners for a peer. Any listener which is not set to a concrete callback 95// during peer initialization is ignored. Execution of multiple message 96// listeners occurs serially, so one callback blocks the execution of the next. 97// 98// NOTE: Unless otherwise documented, these listeners must NOT directly call any 99// blocking calls (such as WaitForShutdown) on the peer instance since the input 100// handler goroutine blocks until the callback has completed. Doing so will 101// result in a deadlock. 102type MessageListeners struct { 103 // OnGetAddr is invoked when a peer receives a getaddr bitcoin message. 104 OnGetAddr func(p *Peer, msg *wire.MsgGetAddr) 105 106 // OnAddr is invoked when a peer receives an addr bitcoin message. 107 OnAddr func(p *Peer, msg *wire.MsgAddr) 108 109 // OnPing is invoked when a peer receives a ping bitcoin message. 110 OnPing func(p *Peer, msg *wire.MsgPing) 111 112 // OnPong is invoked when a peer receives a pong bitcoin message. 113 OnPong func(p *Peer, msg *wire.MsgPong) 114 115 // OnAlert is invoked when a peer receives an alert bitcoin message. 116 OnAlert func(p *Peer, msg *wire.MsgAlert) 117 118 // OnMemPool is invoked when a peer receives a mempool bitcoin message. 119 OnMemPool func(p *Peer, msg *wire.MsgMemPool) 120 121 // OnTx is invoked when a peer receives a tx bitcoin message. 122 OnTx func(p *Peer, msg *wire.MsgTx) 123 124 // OnBlock is invoked when a peer receives a block bitcoin message. 125 OnBlock func(p *Peer, msg *wire.MsgBlock, buf []byte) 126 127 // OnCFilter is invoked when a peer receives a cfilter bitcoin message. 128 OnCFilter func(p *Peer, msg *wire.MsgCFilter) 129 130 // OnCFHeaders is invoked when a peer receives a cfheaders bitcoin 131 // message. 132 OnCFHeaders func(p *Peer, msg *wire.MsgCFHeaders) 133 134 // OnCFCheckpt is invoked when a peer receives a cfcheckpt bitcoin 135 // message. 136 OnCFCheckpt func(p *Peer, msg *wire.MsgCFCheckpt) 137 138 // OnInv is invoked when a peer receives an inv bitcoin message. 139 OnInv func(p *Peer, msg *wire.MsgInv) 140 141 // OnHeaders is invoked when a peer receives a headers bitcoin message. 142 OnHeaders func(p *Peer, msg *wire.MsgHeaders) 143 144 // OnNotFound is invoked when a peer receives a notfound bitcoin 145 // message. 146 OnNotFound func(p *Peer, msg *wire.MsgNotFound) 147 148 // OnGetData is invoked when a peer receives a getdata bitcoin message. 149 OnGetData func(p *Peer, msg *wire.MsgGetData) 150 151 // OnGetBlocks is invoked when a peer receives a getblocks bitcoin 152 // message. 153 OnGetBlocks func(p *Peer, msg *wire.MsgGetBlocks) 154 155 // OnGetHeaders is invoked when a peer receives a getheaders bitcoin 156 // message. 157 OnGetHeaders func(p *Peer, msg *wire.MsgGetHeaders) 158 159 // OnGetCFilters is invoked when a peer receives a getcfilters bitcoin 160 // message. 161 OnGetCFilters func(p *Peer, msg *wire.MsgGetCFilters) 162 163 // OnGetCFHeaders is invoked when a peer receives a getcfheaders 164 // bitcoin message. 165 OnGetCFHeaders func(p *Peer, msg *wire.MsgGetCFHeaders) 166 167 // OnGetCFCheckpt is invoked when a peer receives a getcfcheckpt 168 // bitcoin message. 169 OnGetCFCheckpt func(p *Peer, msg *wire.MsgGetCFCheckpt) 170 171 // OnFeeFilter is invoked when a peer receives a feefilter bitcoin message. 172 OnFeeFilter func(p *Peer, msg *wire.MsgFeeFilter) 173 174 // OnFilterAdd is invoked when a peer receives a filteradd bitcoin message. 175 OnFilterAdd func(p *Peer, msg *wire.MsgFilterAdd) 176 177 // OnFilterClear is invoked when a peer receives a filterclear bitcoin 178 // message. 179 OnFilterClear func(p *Peer, msg *wire.MsgFilterClear) 180 181 // OnFilterLoad is invoked when a peer receives a filterload bitcoin 182 // message. 183 OnFilterLoad func(p *Peer, msg *wire.MsgFilterLoad) 184 185 // OnMerkleBlock is invoked when a peer receives a merkleblock bitcoin 186 // message. 187 OnMerkleBlock func(p *Peer, msg *wire.MsgMerkleBlock) 188 189 // OnVersion is invoked when a peer receives a version bitcoin message. 190 // The caller may return a reject message in which case the message will 191 // be sent to the peer and the peer will be disconnected. 192 OnVersion func(p *Peer, msg *wire.MsgVersion) *wire.MsgReject 193 194 // OnVerAck is invoked when a peer receives a verack bitcoin message. 195 OnVerAck func(p *Peer, msg *wire.MsgVerAck) 196 197 // OnReject is invoked when a peer receives a reject bitcoin message. 198 OnReject func(p *Peer, msg *wire.MsgReject) 199 200 // OnSendHeaders is invoked when a peer receives a sendheaders bitcoin 201 // message. 202 OnSendHeaders func(p *Peer, msg *wire.MsgSendHeaders) 203 204 // OnRead is invoked when a peer receives a bitcoin message. It 205 // consists of the number of bytes read, the message, and whether or not 206 // an error in the read occurred. Typically, callers will opt to use 207 // the callbacks for the specific message types, however this can be 208 // useful for circumstances such as keeping track of server-wide byte 209 // counts or working with custom message types for which the peer does 210 // not directly provide a callback. 211 OnRead func(p *Peer, bytesRead int, msg wire.Message, err error) 212 213 // OnWrite is invoked when we write a bitcoin message to a peer. It 214 // consists of the number of bytes written, the message, and whether or 215 // not an error in the write occurred. This can be useful for 216 // circumstances such as keeping track of server-wide byte counts. 217 OnWrite func(p *Peer, bytesWritten int, msg wire.Message, err error) 218} 219 220// Config is the struct to hold configuration options useful to Peer. 221type Config struct { 222 // NewestBlock specifies a callback which provides the newest block 223 // details to the peer as needed. This can be nil in which case the 224 // peer will report a block height of 0, however it is good practice for 225 // peers to specify this so their currently best known is accurately 226 // reported. 227 NewestBlock HashFunc 228 229 // HostToNetAddress returns the netaddress for the given host. This can be 230 // nil in which case the host will be parsed as an IP address. 231 HostToNetAddress HostToNetAddrFunc 232 233 // Proxy indicates a proxy is being used for connections. The only 234 // effect this has is to prevent leaking the tor proxy address, so it 235 // only needs to specified if using a tor proxy. 236 Proxy string 237 238 // UserAgentName specifies the user agent name to advertise. It is 239 // highly recommended to specify this value. 240 UserAgentName string 241 242 // UserAgentVersion specifies the user agent version to advertise. It 243 // is highly recommended to specify this value and that it follows the 244 // form "major.minor.revision" e.g. "2.6.41". 245 UserAgentVersion string 246 247 // UserAgentComments specify the user agent comments to advertise. These 248 // values must not contain the illegal characters specified in BIP 14: 249 // '/', ':', '(', ')'. 250 UserAgentComments []string 251 252 // ChainParams identifies which chain parameters the peer is associated 253 // with. It is highly recommended to specify this field, however it can 254 // be omitted in which case the test network will be used. 255 ChainParams *chaincfg.Params 256 257 // Services specifies which services to advertise as supported by the 258 // local peer. This field can be omitted in which case it will be 0 259 // and therefore advertise no supported services. 260 Services wire.ServiceFlag 261 262 // ProtocolVersion specifies the maximum protocol version to use and 263 // advertise. This field can be omitted in which case 264 // peer.MaxProtocolVersion will be used. 265 ProtocolVersion uint32 266 267 // DisableRelayTx specifies if the remote peer should be informed to 268 // not send inv messages for transactions. 269 DisableRelayTx bool 270 271 // Listeners houses callback functions to be invoked on receiving peer 272 // messages. 273 Listeners MessageListeners 274 275 // TrickleInterval is the duration of the ticker which trickles down the 276 // inventory to a peer. 277 TrickleInterval time.Duration 278} 279 280// minUint32 is a helper function to return the minimum of two uint32s. 281// This avoids a math import and the need to cast to floats. 282func minUint32(a, b uint32) uint32 { 283 if a < b { 284 return a 285 } 286 return b 287} 288 289// newNetAddress attempts to extract the IP address and port from the passed 290// net.Addr interface and create a bitcoin NetAddress structure using that 291// information. 292func newNetAddress(addr net.Addr, services wire.ServiceFlag) (*wire.NetAddress, error) { 293 // addr will be a net.TCPAddr when not using a proxy. 294 if tcpAddr, ok := addr.(*net.TCPAddr); ok { 295 ip := tcpAddr.IP 296 port := uint16(tcpAddr.Port) 297 na := wire.NewNetAddressIPPort(ip, port, services) 298 return na, nil 299 } 300 301 // addr will be a socks.ProxiedAddr when using a proxy. 302 if proxiedAddr, ok := addr.(*socks.ProxiedAddr); ok { 303 ip := net.ParseIP(proxiedAddr.Host) 304 if ip == nil { 305 ip = net.ParseIP("0.0.0.0") 306 } 307 port := uint16(proxiedAddr.Port) 308 na := wire.NewNetAddressIPPort(ip, port, services) 309 return na, nil 310 } 311 312 // For the most part, addr should be one of the two above cases, but 313 // to be safe, fall back to trying to parse the information from the 314 // address string as a last resort. 315 host, portStr, err := net.SplitHostPort(addr.String()) 316 if err != nil { 317 return nil, err 318 } 319 ip := net.ParseIP(host) 320 port, err := strconv.ParseUint(portStr, 10, 16) 321 if err != nil { 322 return nil, err 323 } 324 na := wire.NewNetAddressIPPort(ip, uint16(port), services) 325 return na, nil 326} 327 328// outMsg is used to house a message to be sent along with a channel to signal 329// when the message has been sent (or won't be sent due to things such as 330// shutdown) 331type outMsg struct { 332 msg wire.Message 333 doneChan chan<- struct{} 334 encoding wire.MessageEncoding 335} 336 337// stallControlCmd represents the command of a stall control message. 338type stallControlCmd uint8 339 340// Constants for the command of a stall control message. 341const ( 342 // sccSendMessage indicates a message is being sent to the remote peer. 343 sccSendMessage stallControlCmd = iota 344 345 // sccReceiveMessage indicates a message has been received from the 346 // remote peer. 347 sccReceiveMessage 348 349 // sccHandlerStart indicates a callback handler is about to be invoked. 350 sccHandlerStart 351 352 // sccHandlerStart indicates a callback handler has completed. 353 sccHandlerDone 354) 355 356// stallControlMsg is used to signal the stall handler about specific events 357// so it can properly detect and handle stalled remote peers. 358type stallControlMsg struct { 359 command stallControlCmd 360 message wire.Message 361} 362 363// StatsSnap is a snapshot of peer stats at a point in time. 364type StatsSnap struct { 365 ID int32 366 Addr string 367 Services wire.ServiceFlag 368 LastSend time.Time 369 LastRecv time.Time 370 BytesSent uint64 371 BytesRecv uint64 372 ConnTime time.Time 373 TimeOffset int64 374 Version uint32 375 UserAgent string 376 Inbound bool 377 StartingHeight int32 378 LastBlock int32 379 LastPingNonce uint64 380 LastPingTime time.Time 381 LastPingMicros int64 382} 383 384// HashFunc is a function which returns a block hash, height and error 385// It is used as a callback to get newest block details. 386type HashFunc func() (hash *chainhash.Hash, height int32, err error) 387 388// AddrFunc is a func which takes an address and returns a related address. 389type AddrFunc func(remoteAddr *wire.NetAddress) *wire.NetAddress 390 391// HostToNetAddrFunc is a func which takes a host, port, services and returns 392// the netaddress. 393type HostToNetAddrFunc func(host string, port uint16, 394 services wire.ServiceFlag) (*wire.NetAddress, error) 395 396// NOTE: The overall data flow of a peer is split into 3 goroutines. Inbound 397// messages are read via the inHandler goroutine and generally dispatched to 398// their own handler. For inbound data-related messages such as blocks, 399// transactions, and inventory, the data is handled by the corresponding 400// message handlers. The data flow for outbound messages is split into 2 401// goroutines, queueHandler and outHandler. The first, queueHandler, is used 402// as a way for external entities to queue messages, by way of the QueueMessage 403// function, quickly regardless of whether the peer is currently sending or not. 404// It acts as the traffic cop between the external world and the actual 405// goroutine which writes to the network socket. 406 407// Peer provides a basic concurrent safe bitcoin peer for handling bitcoin 408// communications via the peer-to-peer protocol. It provides full duplex 409// reading and writing, automatic handling of the initial handshake process, 410// querying of usage statistics and other information about the remote peer such 411// as its address, user agent, and protocol version, output message queuing, 412// inventory trickling, and the ability to dynamically register and unregister 413// callbacks for handling bitcoin protocol messages. 414// 415// Outbound messages are typically queued via QueueMessage or QueueInventory. 416// QueueMessage is intended for all messages, including responses to data such 417// as blocks and transactions. QueueInventory, on the other hand, is only 418// intended for relaying inventory as it employs a trickling mechanism to batch 419// the inventory together. However, some helper functions for pushing messages 420// of specific types that typically require common special handling are 421// provided as a convenience. 422type Peer struct { 423 // The following variables must only be used atomically. 424 bytesReceived uint64 425 bytesSent uint64 426 lastRecv int64 427 lastSend int64 428 connected int32 429 disconnect int32 430 431 conn net.Conn 432 433 // These fields are set at creation time and never modified, so they are 434 // safe to read from concurrently without a mutex. 435 addr string 436 cfg Config 437 inbound bool 438 439 flagsMtx sync.Mutex // protects the peer flags below 440 na *wire.NetAddress 441 id int32 442 userAgent string 443 services wire.ServiceFlag 444 versionKnown bool 445 advertisedProtoVer uint32 // protocol version advertised by remote 446 protocolVersion uint32 // negotiated protocol version 447 sendHeadersPreferred bool // peer sent a sendheaders message 448 verAckReceived bool 449 witnessEnabled bool 450 451 wireEncoding wire.MessageEncoding 452 453 knownInventory *mruInventoryMap 454 prevGetBlocksMtx sync.Mutex 455 prevGetBlocksBegin *chainhash.Hash 456 prevGetBlocksStop *chainhash.Hash 457 prevGetHdrsMtx sync.Mutex 458 prevGetHdrsBegin *chainhash.Hash 459 prevGetHdrsStop *chainhash.Hash 460 461 // These fields keep track of statistics for the peer and are protected 462 // by the statsMtx mutex. 463 statsMtx sync.RWMutex 464 timeOffset int64 465 timeConnected time.Time 466 startingHeight int32 467 lastBlock int32 468 lastAnnouncedBlock *chainhash.Hash 469 lastPingNonce uint64 // Set to nonce if we have a pending ping. 470 lastPingTime time.Time // Time we sent last ping. 471 lastPingMicros int64 // Time for last ping to return. 472 473 stallControl chan stallControlMsg 474 outputQueue chan outMsg 475 sendQueue chan outMsg 476 sendDoneQueue chan struct{} 477 outputInvChan chan *wire.InvVect 478 inQuit chan struct{} 479 queueQuit chan struct{} 480 outQuit chan struct{} 481 quit chan struct{} 482} 483 484// String returns the peer's address and directionality as a human-readable 485// string. 486// 487// This function is safe for concurrent access. 488func (p *Peer) String() string { 489 return fmt.Sprintf("%s (%s)", p.addr, directionString(p.inbound)) 490} 491 492// UpdateLastBlockHeight updates the last known block for the peer. 493// 494// This function is safe for concurrent access. 495func (p *Peer) UpdateLastBlockHeight(newHeight int32) { 496 p.statsMtx.Lock() 497 log.Tracef("Updating last block height of peer %v from %v to %v", 498 p.addr, p.lastBlock, newHeight) 499 p.lastBlock = newHeight 500 p.statsMtx.Unlock() 501} 502 503// UpdateLastAnnouncedBlock updates meta-data about the last block hash this 504// peer is known to have announced. 505// 506// This function is safe for concurrent access. 507func (p *Peer) UpdateLastAnnouncedBlock(blkHash *chainhash.Hash) { 508 log.Tracef("Updating last blk for peer %v, %v", p.addr, blkHash) 509 510 p.statsMtx.Lock() 511 p.lastAnnouncedBlock = blkHash 512 p.statsMtx.Unlock() 513} 514 515// AddKnownInventory adds the passed inventory to the cache of known inventory 516// for the peer. 517// 518// This function is safe for concurrent access. 519func (p *Peer) AddKnownInventory(invVect *wire.InvVect) { 520 p.knownInventory.Add(invVect) 521} 522 523// StatsSnapshot returns a snapshot of the current peer flags and statistics. 524// 525// This function is safe for concurrent access. 526func (p *Peer) StatsSnapshot() *StatsSnap { 527 p.statsMtx.RLock() 528 529 p.flagsMtx.Lock() 530 id := p.id 531 addr := p.addr 532 userAgent := p.userAgent 533 services := p.services 534 protocolVersion := p.advertisedProtoVer 535 p.flagsMtx.Unlock() 536 537 // Get a copy of all relevant flags and stats. 538 statsSnap := &StatsSnap{ 539 ID: id, 540 Addr: addr, 541 UserAgent: userAgent, 542 Services: services, 543 LastSend: p.LastSend(), 544 LastRecv: p.LastRecv(), 545 BytesSent: p.BytesSent(), 546 BytesRecv: p.BytesReceived(), 547 ConnTime: p.timeConnected, 548 TimeOffset: p.timeOffset, 549 Version: protocolVersion, 550 Inbound: p.inbound, 551 StartingHeight: p.startingHeight, 552 LastBlock: p.lastBlock, 553 LastPingNonce: p.lastPingNonce, 554 LastPingMicros: p.lastPingMicros, 555 LastPingTime: p.lastPingTime, 556 } 557 558 p.statsMtx.RUnlock() 559 return statsSnap 560} 561 562// ID returns the peer id. 563// 564// This function is safe for concurrent access. 565func (p *Peer) ID() int32 { 566 p.flagsMtx.Lock() 567 id := p.id 568 p.flagsMtx.Unlock() 569 570 return id 571} 572 573// NA returns the peer network address. 574// 575// This function is safe for concurrent access. 576func (p *Peer) NA() *wire.NetAddress { 577 p.flagsMtx.Lock() 578 na := p.na 579 p.flagsMtx.Unlock() 580 581 return na 582} 583 584// Addr returns the peer address. 585// 586// This function is safe for concurrent access. 587func (p *Peer) Addr() string { 588 // The address doesn't change after initialization, therefore it is not 589 // protected by a mutex. 590 return p.addr 591} 592 593// Inbound returns whether the peer is inbound. 594// 595// This function is safe for concurrent access. 596func (p *Peer) Inbound() bool { 597 return p.inbound 598} 599 600// Services returns the services flag of the remote peer. 601// 602// This function is safe for concurrent access. 603func (p *Peer) Services() wire.ServiceFlag { 604 p.flagsMtx.Lock() 605 services := p.services 606 p.flagsMtx.Unlock() 607 608 return services 609} 610 611// UserAgent returns the user agent of the remote peer. 612// 613// This function is safe for concurrent access. 614func (p *Peer) UserAgent() string { 615 p.flagsMtx.Lock() 616 userAgent := p.userAgent 617 p.flagsMtx.Unlock() 618 619 return userAgent 620} 621 622// LastAnnouncedBlock returns the last announced block of the remote peer. 623// 624// This function is safe for concurrent access. 625func (p *Peer) LastAnnouncedBlock() *chainhash.Hash { 626 p.statsMtx.RLock() 627 lastAnnouncedBlock := p.lastAnnouncedBlock 628 p.statsMtx.RUnlock() 629 630 return lastAnnouncedBlock 631} 632 633// LastPingNonce returns the last ping nonce of the remote peer. 634// 635// This function is safe for concurrent access. 636func (p *Peer) LastPingNonce() uint64 { 637 p.statsMtx.RLock() 638 lastPingNonce := p.lastPingNonce 639 p.statsMtx.RUnlock() 640 641 return lastPingNonce 642} 643 644// LastPingTime returns the last ping time of the remote peer. 645// 646// This function is safe for concurrent access. 647func (p *Peer) LastPingTime() time.Time { 648 p.statsMtx.RLock() 649 lastPingTime := p.lastPingTime 650 p.statsMtx.RUnlock() 651 652 return lastPingTime 653} 654 655// LastPingMicros returns the last ping micros of the remote peer. 656// 657// This function is safe for concurrent access. 658func (p *Peer) LastPingMicros() int64 { 659 p.statsMtx.RLock() 660 lastPingMicros := p.lastPingMicros 661 p.statsMtx.RUnlock() 662 663 return lastPingMicros 664} 665 666// VersionKnown returns the whether or not the version of a peer is known 667// locally. 668// 669// This function is safe for concurrent access. 670func (p *Peer) VersionKnown() bool { 671 p.flagsMtx.Lock() 672 versionKnown := p.versionKnown 673 p.flagsMtx.Unlock() 674 675 return versionKnown 676} 677 678// VerAckReceived returns whether or not a verack message was received by the 679// peer. 680// 681// This function is safe for concurrent access. 682func (p *Peer) VerAckReceived() bool { 683 p.flagsMtx.Lock() 684 verAckReceived := p.verAckReceived 685 p.flagsMtx.Unlock() 686 687 return verAckReceived 688} 689 690// ProtocolVersion returns the negotiated peer protocol version. 691// 692// This function is safe for concurrent access. 693func (p *Peer) ProtocolVersion() uint32 { 694 p.flagsMtx.Lock() 695 protocolVersion := p.protocolVersion 696 p.flagsMtx.Unlock() 697 698 return protocolVersion 699} 700 701// LastBlock returns the last block of the peer. 702// 703// This function is safe for concurrent access. 704func (p *Peer) LastBlock() int32 { 705 p.statsMtx.RLock() 706 lastBlock := p.lastBlock 707 p.statsMtx.RUnlock() 708 709 return lastBlock 710} 711 712// LastSend returns the last send time of the peer. 713// 714// This function is safe for concurrent access. 715func (p *Peer) LastSend() time.Time { 716 return time.Unix(atomic.LoadInt64(&p.lastSend), 0) 717} 718 719// LastRecv returns the last recv time of the peer. 720// 721// This function is safe for concurrent access. 722func (p *Peer) LastRecv() time.Time { 723 return time.Unix(atomic.LoadInt64(&p.lastRecv), 0) 724} 725 726// LocalAddr returns the local address of the connection. 727// 728// This function is safe fo concurrent access. 729func (p *Peer) LocalAddr() net.Addr { 730 var localAddr net.Addr 731 if atomic.LoadInt32(&p.connected) != 0 { 732 localAddr = p.conn.LocalAddr() 733 } 734 return localAddr 735} 736 737// BytesSent returns the total number of bytes sent by the peer. 738// 739// This function is safe for concurrent access. 740func (p *Peer) BytesSent() uint64 { 741 return atomic.LoadUint64(&p.bytesSent) 742} 743 744// BytesReceived returns the total number of bytes received by the peer. 745// 746// This function is safe for concurrent access. 747func (p *Peer) BytesReceived() uint64 { 748 return atomic.LoadUint64(&p.bytesReceived) 749} 750 751// TimeConnected returns the time at which the peer connected. 752// 753// This function is safe for concurrent access. 754func (p *Peer) TimeConnected() time.Time { 755 p.statsMtx.RLock() 756 timeConnected := p.timeConnected 757 p.statsMtx.RUnlock() 758 759 return timeConnected 760} 761 762// TimeOffset returns the number of seconds the local time was offset from the 763// time the peer reported during the initial negotiation phase. Negative values 764// indicate the remote peer's time is before the local time. 765// 766// This function is safe for concurrent access. 767func (p *Peer) TimeOffset() int64 { 768 p.statsMtx.RLock() 769 timeOffset := p.timeOffset 770 p.statsMtx.RUnlock() 771 772 return timeOffset 773} 774 775// StartingHeight returns the last known height the peer reported during the 776// initial negotiation phase. 777// 778// This function is safe for concurrent access. 779func (p *Peer) StartingHeight() int32 { 780 p.statsMtx.RLock() 781 startingHeight := p.startingHeight 782 p.statsMtx.RUnlock() 783 784 return startingHeight 785} 786 787// WantsHeaders returns if the peer wants header messages instead of 788// inventory vectors for blocks. 789// 790// This function is safe for concurrent access. 791func (p *Peer) WantsHeaders() bool { 792 p.flagsMtx.Lock() 793 sendHeadersPreferred := p.sendHeadersPreferred 794 p.flagsMtx.Unlock() 795 796 return sendHeadersPreferred 797} 798 799// IsWitnessEnabled returns true if the peer has signalled that it supports 800// segregated witness. 801// 802// This function is safe for concurrent access. 803func (p *Peer) IsWitnessEnabled() bool { 804 p.flagsMtx.Lock() 805 witnessEnabled := p.witnessEnabled 806 p.flagsMtx.Unlock() 807 808 return witnessEnabled 809} 810 811// PushAddrMsg sends an addr message to the connected peer using the provided 812// addresses. This function is useful over manually sending the message via 813// QueueMessage since it automatically limits the addresses to the maximum 814// number allowed by the message and randomizes the chosen addresses when there 815// are too many. It returns the addresses that were actually sent and no 816// message will be sent if there are no entries in the provided addresses slice. 817// 818// This function is safe for concurrent access. 819func (p *Peer) PushAddrMsg(addresses []*wire.NetAddress) ([]*wire.NetAddress, error) { 820 addressCount := len(addresses) 821 822 // Nothing to send. 823 if addressCount == 0 { 824 return nil, nil 825 } 826 827 msg := wire.NewMsgAddr() 828 msg.AddrList = make([]*wire.NetAddress, addressCount) 829 copy(msg.AddrList, addresses) 830 831 // Randomize the addresses sent if there are more than the maximum allowed. 832 if addressCount > wire.MaxAddrPerMsg { 833 // Shuffle the address list. 834 for i := 0; i < wire.MaxAddrPerMsg; i++ { 835 j := i + rand.Intn(addressCount-i) 836 msg.AddrList[i], msg.AddrList[j] = msg.AddrList[j], msg.AddrList[i] 837 } 838 839 // Truncate it to the maximum size. 840 msg.AddrList = msg.AddrList[:wire.MaxAddrPerMsg] 841 } 842 843 p.QueueMessage(msg, nil) 844 return msg.AddrList, nil 845} 846 847// PushGetBlocksMsg sends a getblocks message for the provided block locator 848// and stop hash. It will ignore back-to-back duplicate requests. 849// 850// This function is safe for concurrent access. 851func (p *Peer) PushGetBlocksMsg(locator blockchain.BlockLocator, stopHash *chainhash.Hash) error { 852 // Extract the begin hash from the block locator, if one was specified, 853 // to use for filtering duplicate getblocks requests. 854 var beginHash *chainhash.Hash 855 if len(locator) > 0 { 856 beginHash = locator[0] 857 } 858 859 // Filter duplicate getblocks requests. 860 p.prevGetBlocksMtx.Lock() 861 isDuplicate := p.prevGetBlocksStop != nil && p.prevGetBlocksBegin != nil && 862 beginHash != nil && stopHash.IsEqual(p.prevGetBlocksStop) && 863 beginHash.IsEqual(p.prevGetBlocksBegin) 864 p.prevGetBlocksMtx.Unlock() 865 866 if isDuplicate { 867 log.Tracef("Filtering duplicate [getblocks] with begin "+ 868 "hash %v, stop hash %v", beginHash, stopHash) 869 return nil 870 } 871 872 // Construct the getblocks request and queue it to be sent. 873 msg := wire.NewMsgGetBlocks(stopHash) 874 for _, hash := range locator { 875 err := msg.AddBlockLocatorHash(hash) 876 if err != nil { 877 return err 878 } 879 } 880 p.QueueMessage(msg, nil) 881 882 // Update the previous getblocks request information for filtering 883 // duplicates. 884 p.prevGetBlocksMtx.Lock() 885 p.prevGetBlocksBegin = beginHash 886 p.prevGetBlocksStop = stopHash 887 p.prevGetBlocksMtx.Unlock() 888 return nil 889} 890 891// PushGetHeadersMsg sends a getblocks message for the provided block locator 892// and stop hash. It will ignore back-to-back duplicate requests. 893// 894// This function is safe for concurrent access. 895func (p *Peer) PushGetHeadersMsg(locator blockchain.BlockLocator, stopHash *chainhash.Hash) error { 896 // Extract the begin hash from the block locator, if one was specified, 897 // to use for filtering duplicate getheaders requests. 898 var beginHash *chainhash.Hash 899 if len(locator) > 0 { 900 beginHash = locator[0] 901 } 902 903 // Filter duplicate getheaders requests. 904 p.prevGetHdrsMtx.Lock() 905 isDuplicate := p.prevGetHdrsStop != nil && p.prevGetHdrsBegin != nil && 906 beginHash != nil && stopHash.IsEqual(p.prevGetHdrsStop) && 907 beginHash.IsEqual(p.prevGetHdrsBegin) 908 p.prevGetHdrsMtx.Unlock() 909 910 if isDuplicate { 911 log.Tracef("Filtering duplicate [getheaders] with begin hash %v", 912 beginHash) 913 return nil 914 } 915 916 // Construct the getheaders request and queue it to be sent. 917 msg := wire.NewMsgGetHeaders() 918 msg.HashStop = *stopHash 919 for _, hash := range locator { 920 err := msg.AddBlockLocatorHash(hash) 921 if err != nil { 922 return err 923 } 924 } 925 p.QueueMessage(msg, nil) 926 927 // Update the previous getheaders request information for filtering 928 // duplicates. 929 p.prevGetHdrsMtx.Lock() 930 p.prevGetHdrsBegin = beginHash 931 p.prevGetHdrsStop = stopHash 932 p.prevGetHdrsMtx.Unlock() 933 return nil 934} 935 936// PushRejectMsg sends a reject message for the provided command, reject code, 937// reject reason, and hash. The hash will only be used when the command is a tx 938// or block and should be nil in other cases. The wait parameter will cause the 939// function to block until the reject message has actually been sent. 940// 941// This function is safe for concurrent access. 942func (p *Peer) PushRejectMsg(command string, code wire.RejectCode, reason string, hash *chainhash.Hash, wait bool) { 943 // Don't bother sending the reject message if the protocol version 944 // is too low. 945 if p.VersionKnown() && p.ProtocolVersion() < wire.RejectVersion { 946 return 947 } 948 949 msg := wire.NewMsgReject(command, code, reason) 950 if command == wire.CmdTx || command == wire.CmdBlock { 951 if hash == nil { 952 log.Warnf("Sending a reject message for command "+ 953 "type %v which should have specified a hash "+ 954 "but does not", command) 955 hash = &zeroHash 956 } 957 msg.Hash = *hash 958 } 959 960 // Send the message without waiting if the caller has not requested it. 961 if !wait { 962 p.QueueMessage(msg, nil) 963 return 964 } 965 966 // Send the message and block until it has been sent before returning. 967 doneChan := make(chan struct{}, 1) 968 p.QueueMessage(msg, doneChan) 969 <-doneChan 970} 971 972// handlePingMsg is invoked when a peer receives a ping bitcoin message. For 973// recent clients (protocol version > BIP0031Version), it replies with a pong 974// message. For older clients, it does nothing and anything other than failure 975// is considered a successful ping. 976func (p *Peer) handlePingMsg(msg *wire.MsgPing) { 977 // Only reply with pong if the message is from a new enough client. 978 if p.ProtocolVersion() > wire.BIP0031Version { 979 // Include nonce from ping so pong can be identified. 980 p.QueueMessage(wire.NewMsgPong(msg.Nonce), nil) 981 } 982} 983 984// handlePongMsg is invoked when a peer receives a pong bitcoin message. It 985// updates the ping statistics as required for recent clients (protocol 986// version > BIP0031Version). There is no effect for older clients or when a 987// ping was not previously sent. 988func (p *Peer) handlePongMsg(msg *wire.MsgPong) { 989 // Arguably we could use a buffered channel here sending data 990 // in a fifo manner whenever we send a ping, or a list keeping track of 991 // the times of each ping. For now we just make a best effort and 992 // only record stats if it was for the last ping sent. Any preceding 993 // and overlapping pings will be ignored. It is unlikely to occur 994 // without large usage of the ping rpc call since we ping infrequently 995 // enough that if they overlap we would have timed out the peer. 996 if p.ProtocolVersion() > wire.BIP0031Version { 997 p.statsMtx.Lock() 998 if p.lastPingNonce != 0 && msg.Nonce == p.lastPingNonce { 999 p.lastPingMicros = time.Since(p.lastPingTime).Nanoseconds() 1000 p.lastPingMicros /= 1000 // convert to usec. 1001 p.lastPingNonce = 0 1002 } 1003 p.statsMtx.Unlock() 1004 } 1005} 1006 1007// readMessage reads the next bitcoin message from the peer with logging. 1008func (p *Peer) readMessage(encoding wire.MessageEncoding) (wire.Message, []byte, error) { 1009 n, msg, buf, err := wire.ReadMessageWithEncodingN(p.conn, 1010 p.ProtocolVersion(), p.cfg.ChainParams.Net, encoding) 1011 atomic.AddUint64(&p.bytesReceived, uint64(n)) 1012 if p.cfg.Listeners.OnRead != nil { 1013 p.cfg.Listeners.OnRead(p, n, msg, err) 1014 } 1015 if err != nil { 1016 return nil, nil, err 1017 } 1018 1019 // Use closures to log expensive operations so they are only run when 1020 // the logging level requires it. 1021 log.Debugf("%v", newLogClosure(func() string { 1022 // Debug summary of message. 1023 summary := messageSummary(msg) 1024 if len(summary) > 0 { 1025 summary = " (" + summary + ")" 1026 } 1027 return fmt.Sprintf("Received %v%s from %s", 1028 msg.Command(), summary, p) 1029 })) 1030 log.Tracef("%v", newLogClosure(func() string { 1031 return spew.Sdump(msg) 1032 })) 1033 log.Tracef("%v", newLogClosure(func() string { 1034 return spew.Sdump(buf) 1035 })) 1036 1037 return msg, buf, nil 1038} 1039 1040// writeMessage sends a bitcoin message to the peer with logging. 1041func (p *Peer) writeMessage(msg wire.Message, enc wire.MessageEncoding) error { 1042 // Don't do anything if we're disconnecting. 1043 if atomic.LoadInt32(&p.disconnect) != 0 { 1044 return nil 1045 } 1046 1047 // Use closures to log expensive operations so they are only run when 1048 // the logging level requires it. 1049 log.Debugf("%v", newLogClosure(func() string { 1050 // Debug summary of message. 1051 summary := messageSummary(msg) 1052 if len(summary) > 0 { 1053 summary = " (" + summary + ")" 1054 } 1055 return fmt.Sprintf("Sending %v%s to %s", msg.Command(), 1056 summary, p) 1057 })) 1058 log.Tracef("%v", newLogClosure(func() string { 1059 return spew.Sdump(msg) 1060 })) 1061 log.Tracef("%v", newLogClosure(func() string { 1062 var buf bytes.Buffer 1063 _, err := wire.WriteMessageWithEncodingN(&buf, msg, p.ProtocolVersion(), 1064 p.cfg.ChainParams.Net, enc) 1065 if err != nil { 1066 return err.Error() 1067 } 1068 return spew.Sdump(buf.Bytes()) 1069 })) 1070 1071 // Write the message to the peer. 1072 n, err := wire.WriteMessageWithEncodingN(p.conn, msg, 1073 p.ProtocolVersion(), p.cfg.ChainParams.Net, enc) 1074 atomic.AddUint64(&p.bytesSent, uint64(n)) 1075 if p.cfg.Listeners.OnWrite != nil { 1076 p.cfg.Listeners.OnWrite(p, n, msg, err) 1077 } 1078 return err 1079} 1080 1081// isAllowedReadError returns whether or not the passed error is allowed without 1082// disconnecting the peer. In particular, regression tests need to be allowed 1083// to send malformed messages without the peer being disconnected. 1084func (p *Peer) isAllowedReadError(err error) bool { 1085 // Only allow read errors in regression test mode. 1086 if p.cfg.ChainParams.Net != wire.TestNet { 1087 return false 1088 } 1089 1090 // Don't allow the error if it's not specifically a malformed message error. 1091 if _, ok := err.(*wire.MessageError); !ok { 1092 return false 1093 } 1094 1095 // Don't allow the error if it's not coming from localhost or the 1096 // hostname can't be determined for some reason. 1097 host, _, err := net.SplitHostPort(p.addr) 1098 if err != nil { 1099 return false 1100 } 1101 1102 if host != "127.0.0.1" && host != "localhost" { 1103 return false 1104 } 1105 1106 // Allowed if all checks passed. 1107 return true 1108} 1109 1110// shouldHandleReadError returns whether or not the passed error, which is 1111// expected to have come from reading from the remote peer in the inHandler, 1112// should be logged and responded to with a reject message. 1113func (p *Peer) shouldHandleReadError(err error) bool { 1114 // No logging or reject message when the peer is being forcibly 1115 // disconnected. 1116 if atomic.LoadInt32(&p.disconnect) != 0 { 1117 return false 1118 } 1119 1120 // No logging or reject message when the remote peer has been 1121 // disconnected. 1122 if err == io.EOF { 1123 return false 1124 } 1125 if opErr, ok := err.(*net.OpError); ok && !opErr.Temporary() { 1126 return false 1127 } 1128 1129 return true 1130} 1131 1132// maybeAddDeadline potentially adds a deadline for the appropriate expected 1133// response for the passed wire protocol command to the pending responses map. 1134func (p *Peer) maybeAddDeadline(pendingResponses map[string]time.Time, msgCmd string) { 1135 // Setup a deadline for each message being sent that expects a response. 1136 // 1137 // NOTE: Pings are intentionally ignored here since they are typically 1138 // sent asynchronously and as a result of a long backlock of messages, 1139 // such as is typical in the case of initial block download, the 1140 // response won't be received in time. 1141 deadline := time.Now().Add(stallResponseTimeout) 1142 switch msgCmd { 1143 case wire.CmdVersion: 1144 // Expects a verack message. 1145 pendingResponses[wire.CmdVerAck] = deadline 1146 1147 case wire.CmdMemPool: 1148 // Expects an inv message. 1149 pendingResponses[wire.CmdInv] = deadline 1150 1151 case wire.CmdGetBlocks: 1152 // Expects an inv message. 1153 pendingResponses[wire.CmdInv] = deadline 1154 1155 case wire.CmdGetData: 1156 // Expects a block, merkleblock, tx, or notfound message. 1157 pendingResponses[wire.CmdBlock] = deadline 1158 pendingResponses[wire.CmdMerkleBlock] = deadline 1159 pendingResponses[wire.CmdTx] = deadline 1160 pendingResponses[wire.CmdNotFound] = deadline 1161 1162 case wire.CmdGetHeaders: 1163 // Expects a headers message. Use a longer deadline since it 1164 // can take a while for the remote peer to load all of the 1165 // headers. 1166 deadline = time.Now().Add(stallResponseTimeout * 3) 1167 pendingResponses[wire.CmdHeaders] = deadline 1168 } 1169} 1170 1171// stallHandler handles stall detection for the peer. This entails keeping 1172// track of expected responses and assigning them deadlines while accounting for 1173// the time spent in callbacks. It must be run as a goroutine. 1174func (p *Peer) stallHandler() { 1175 // These variables are used to adjust the deadline times forward by the 1176 // time it takes callbacks to execute. This is done because new 1177 // messages aren't read until the previous one is finished processing 1178 // (which includes callbacks), so the deadline for receiving a response 1179 // for a given message must account for the processing time as well. 1180 var handlerActive bool 1181 var handlersStartTime time.Time 1182 var deadlineOffset time.Duration 1183 1184 // pendingResponses tracks the expected response deadline times. 1185 pendingResponses := make(map[string]time.Time) 1186 1187 // stallTicker is used to periodically check pending responses that have 1188 // exceeded the expected deadline and disconnect the peer due to 1189 // stalling. 1190 stallTicker := time.NewTicker(stallTickInterval) 1191 defer stallTicker.Stop() 1192 1193 // ioStopped is used to detect when both the input and output handler 1194 // goroutines are done. 1195 var ioStopped bool 1196out: 1197 for { 1198 select { 1199 case msg := <-p.stallControl: 1200 switch msg.command { 1201 case sccSendMessage: 1202 // Add a deadline for the expected response 1203 // message if needed. 1204 p.maybeAddDeadline(pendingResponses, 1205 msg.message.Command()) 1206 1207 case sccReceiveMessage: 1208 // Remove received messages from the expected 1209 // response map. Since certain commands expect 1210 // one of a group of responses, remove 1211 // everything in the expected group accordingly. 1212 switch msgCmd := msg.message.Command(); msgCmd { 1213 case wire.CmdBlock: 1214 fallthrough 1215 case wire.CmdMerkleBlock: 1216 fallthrough 1217 case wire.CmdTx: 1218 fallthrough 1219 case wire.CmdNotFound: 1220 delete(pendingResponses, wire.CmdBlock) 1221 delete(pendingResponses, wire.CmdMerkleBlock) 1222 delete(pendingResponses, wire.CmdTx) 1223 delete(pendingResponses, wire.CmdNotFound) 1224 1225 default: 1226 delete(pendingResponses, msgCmd) 1227 } 1228 1229 case sccHandlerStart: 1230 // Warn on unbalanced callback signalling. 1231 if handlerActive { 1232 log.Warn("Received handler start " + 1233 "control command while a " + 1234 "handler is already active") 1235 continue 1236 } 1237 1238 handlerActive = true 1239 handlersStartTime = time.Now() 1240 1241 case sccHandlerDone: 1242 // Warn on unbalanced callback signalling. 1243 if !handlerActive { 1244 log.Warn("Received handler done " + 1245 "control command when a " + 1246 "handler is not already active") 1247 continue 1248 } 1249 1250 // Extend active deadlines by the time it took 1251 // to execute the callback. 1252 duration := time.Since(handlersStartTime) 1253 deadlineOffset += duration 1254 handlerActive = false 1255 1256 default: 1257 log.Warnf("Unsupported message command %v", 1258 msg.command) 1259 } 1260 1261 case <-stallTicker.C: 1262 // Calculate the offset to apply to the deadline based 1263 // on how long the handlers have taken to execute since 1264 // the last tick. 1265 now := time.Now() 1266 offset := deadlineOffset 1267 if handlerActive { 1268 offset += now.Sub(handlersStartTime) 1269 } 1270 1271 // Disconnect the peer if any of the pending responses 1272 // don't arrive by their adjusted deadline. 1273 for command, deadline := range pendingResponses { 1274 if now.Before(deadline.Add(offset)) { 1275 continue 1276 } 1277 1278 log.Debugf("Peer %s appears to be stalled or "+ 1279 "misbehaving, %s timeout -- "+ 1280 "disconnecting", p, command) 1281 p.Disconnect() 1282 break 1283 } 1284 1285 // Reset the deadline offset for the next tick. 1286 deadlineOffset = 0 1287 1288 case <-p.inQuit: 1289 // The stall handler can exit once both the input and 1290 // output handler goroutines are done. 1291 if ioStopped { 1292 break out 1293 } 1294 ioStopped = true 1295 1296 case <-p.outQuit: 1297 // The stall handler can exit once both the input and 1298 // output handler goroutines are done. 1299 if ioStopped { 1300 break out 1301 } 1302 ioStopped = true 1303 } 1304 } 1305 1306 // Drain any wait channels before going away so there is nothing left 1307 // waiting on this goroutine. 1308cleanup: 1309 for { 1310 select { 1311 case <-p.stallControl: 1312 default: 1313 break cleanup 1314 } 1315 } 1316 log.Tracef("Peer stall handler done for %s", p) 1317} 1318 1319// inHandler handles all incoming messages for the peer. It must be run as a 1320// goroutine. 1321func (p *Peer) inHandler() { 1322 // The timer is stopped when a new message is received and reset after it 1323 // is processed. 1324 idleTimer := time.AfterFunc(idleTimeout, func() { 1325 log.Warnf("Peer %s no answer for %s -- disconnecting", p, idleTimeout) 1326 p.Disconnect() 1327 }) 1328 1329out: 1330 for atomic.LoadInt32(&p.disconnect) == 0 { 1331 // Read a message and stop the idle timer as soon as the read 1332 // is done. The timer is reset below for the next iteration if 1333 // needed. 1334 rmsg, buf, err := p.readMessage(p.wireEncoding) 1335 idleTimer.Stop() 1336 if err != nil { 1337 // In order to allow regression tests with malformed messages, don't 1338 // disconnect the peer when we're in regression test mode and the 1339 // error is one of the allowed errors. 1340 if p.isAllowedReadError(err) { 1341 log.Errorf("Allowed test error from %s: %v", p, err) 1342 idleTimer.Reset(idleTimeout) 1343 continue 1344 } 1345 1346 // Only log the error and send reject message if the 1347 // local peer is not forcibly disconnecting and the 1348 // remote peer has not disconnected. 1349 if p.shouldHandleReadError(err) { 1350 errMsg := fmt.Sprintf("Can't read message from %s: %v", p, err) 1351 if err != io.ErrUnexpectedEOF { 1352 log.Errorf(errMsg) 1353 } 1354 1355 // Push a reject message for the malformed message and wait for 1356 // the message to be sent before disconnecting. 1357 // 1358 // NOTE: Ideally this would include the command in the header if 1359 // at least that much of the message was valid, but that is not 1360 // currently exposed by wire, so just used malformed for the 1361 // command. 1362 p.PushRejectMsg("malformed", wire.RejectMalformed, errMsg, nil, 1363 true) 1364 } 1365 break out 1366 } 1367 atomic.StoreInt64(&p.lastRecv, time.Now().Unix()) 1368 p.stallControl <- stallControlMsg{sccReceiveMessage, rmsg} 1369 1370 // Handle each supported message type. 1371 p.stallControl <- stallControlMsg{sccHandlerStart, rmsg} 1372 switch msg := rmsg.(type) { 1373 case *wire.MsgVersion: 1374 // Limit to one version message per peer. 1375 p.PushRejectMsg(msg.Command(), wire.RejectDuplicate, 1376 "duplicate version message", nil, true) 1377 break out 1378 1379 case *wire.MsgVerAck: 1380 // Limit to one verack message per peer. 1381 p.PushRejectMsg( 1382 msg.Command(), wire.RejectDuplicate, 1383 "duplicate verack message", nil, true, 1384 ) 1385 break out 1386 1387 case *wire.MsgGetAddr: 1388 if p.cfg.Listeners.OnGetAddr != nil { 1389 p.cfg.Listeners.OnGetAddr(p, msg) 1390 } 1391 1392 case *wire.MsgAddr: 1393 if p.cfg.Listeners.OnAddr != nil { 1394 p.cfg.Listeners.OnAddr(p, msg) 1395 } 1396 1397 case *wire.MsgPing: 1398 p.handlePingMsg(msg) 1399 if p.cfg.Listeners.OnPing != nil { 1400 p.cfg.Listeners.OnPing(p, msg) 1401 } 1402 1403 case *wire.MsgPong: 1404 p.handlePongMsg(msg) 1405 if p.cfg.Listeners.OnPong != nil { 1406 p.cfg.Listeners.OnPong(p, msg) 1407 } 1408 1409 case *wire.MsgAlert: 1410 if p.cfg.Listeners.OnAlert != nil { 1411 p.cfg.Listeners.OnAlert(p, msg) 1412 } 1413 1414 case *wire.MsgMemPool: 1415 if p.cfg.Listeners.OnMemPool != nil { 1416 p.cfg.Listeners.OnMemPool(p, msg) 1417 } 1418 1419 case *wire.MsgTx: 1420 if p.cfg.Listeners.OnTx != nil { 1421 p.cfg.Listeners.OnTx(p, msg) 1422 } 1423 1424 case *wire.MsgBlock: 1425 if p.cfg.Listeners.OnBlock != nil { 1426 p.cfg.Listeners.OnBlock(p, msg, buf) 1427 } 1428 1429 case *wire.MsgInv: 1430 if p.cfg.Listeners.OnInv != nil { 1431 p.cfg.Listeners.OnInv(p, msg) 1432 } 1433 1434 case *wire.MsgHeaders: 1435 if p.cfg.Listeners.OnHeaders != nil { 1436 p.cfg.Listeners.OnHeaders(p, msg) 1437 } 1438 1439 case *wire.MsgNotFound: 1440 if p.cfg.Listeners.OnNotFound != nil { 1441 p.cfg.Listeners.OnNotFound(p, msg) 1442 } 1443 1444 case *wire.MsgGetData: 1445 if p.cfg.Listeners.OnGetData != nil { 1446 p.cfg.Listeners.OnGetData(p, msg) 1447 } 1448 1449 case *wire.MsgGetBlocks: 1450 if p.cfg.Listeners.OnGetBlocks != nil { 1451 p.cfg.Listeners.OnGetBlocks(p, msg) 1452 } 1453 1454 case *wire.MsgGetHeaders: 1455 if p.cfg.Listeners.OnGetHeaders != nil { 1456 p.cfg.Listeners.OnGetHeaders(p, msg) 1457 } 1458 1459 case *wire.MsgGetCFilters: 1460 if p.cfg.Listeners.OnGetCFilters != nil { 1461 p.cfg.Listeners.OnGetCFilters(p, msg) 1462 } 1463 1464 case *wire.MsgGetCFHeaders: 1465 if p.cfg.Listeners.OnGetCFHeaders != nil { 1466 p.cfg.Listeners.OnGetCFHeaders(p, msg) 1467 } 1468 1469 case *wire.MsgGetCFCheckpt: 1470 if p.cfg.Listeners.OnGetCFCheckpt != nil { 1471 p.cfg.Listeners.OnGetCFCheckpt(p, msg) 1472 } 1473 1474 case *wire.MsgCFilter: 1475 if p.cfg.Listeners.OnCFilter != nil { 1476 p.cfg.Listeners.OnCFilter(p, msg) 1477 } 1478 1479 case *wire.MsgCFHeaders: 1480 if p.cfg.Listeners.OnCFHeaders != nil { 1481 p.cfg.Listeners.OnCFHeaders(p, msg) 1482 } 1483 1484 case *wire.MsgFeeFilter: 1485 if p.cfg.Listeners.OnFeeFilter != nil { 1486 p.cfg.Listeners.OnFeeFilter(p, msg) 1487 } 1488 1489 case *wire.MsgFilterAdd: 1490 if p.cfg.Listeners.OnFilterAdd != nil { 1491 p.cfg.Listeners.OnFilterAdd(p, msg) 1492 } 1493 1494 case *wire.MsgFilterClear: 1495 if p.cfg.Listeners.OnFilterClear != nil { 1496 p.cfg.Listeners.OnFilterClear(p, msg) 1497 } 1498 1499 case *wire.MsgFilterLoad: 1500 if p.cfg.Listeners.OnFilterLoad != nil { 1501 p.cfg.Listeners.OnFilterLoad(p, msg) 1502 } 1503 1504 case *wire.MsgMerkleBlock: 1505 if p.cfg.Listeners.OnMerkleBlock != nil { 1506 p.cfg.Listeners.OnMerkleBlock(p, msg) 1507 } 1508 1509 case *wire.MsgReject: 1510 if p.cfg.Listeners.OnReject != nil { 1511 p.cfg.Listeners.OnReject(p, msg) 1512 } 1513 1514 case *wire.MsgSendHeaders: 1515 p.flagsMtx.Lock() 1516 p.sendHeadersPreferred = true 1517 p.flagsMtx.Unlock() 1518 1519 if p.cfg.Listeners.OnSendHeaders != nil { 1520 p.cfg.Listeners.OnSendHeaders(p, msg) 1521 } 1522 1523 default: 1524 log.Debugf("Received unhandled message of type %v "+ 1525 "from %v", rmsg.Command(), p) 1526 } 1527 p.stallControl <- stallControlMsg{sccHandlerDone, rmsg} 1528 1529 // A message was received so reset the idle timer. 1530 idleTimer.Reset(idleTimeout) 1531 } 1532 1533 // Ensure the idle timer is stopped to avoid leaking the resource. 1534 idleTimer.Stop() 1535 1536 // Ensure connection is closed. 1537 p.Disconnect() 1538 1539 close(p.inQuit) 1540 log.Tracef("Peer input handler done for %s", p) 1541} 1542 1543// queueHandler handles the queuing of outgoing data for the peer. This runs as 1544// a muxer for various sources of input so we can ensure that server and peer 1545// handlers will not block on us sending a message. That data is then passed on 1546// to outHandler to be actually written. 1547func (p *Peer) queueHandler() { 1548 pendingMsgs := list.New() 1549 invSendQueue := list.New() 1550 trickleTicker := time.NewTicker(p.cfg.TrickleInterval) 1551 defer trickleTicker.Stop() 1552 1553 // We keep the waiting flag so that we know if we have a message queued 1554 // to the outHandler or not. We could use the presence of a head of 1555 // the list for this but then we have rather racy concerns about whether 1556 // it has gotten it at cleanup time - and thus who sends on the 1557 // message's done channel. To avoid such confusion we keep a different 1558 // flag and pendingMsgs only contains messages that we have not yet 1559 // passed to outHandler. 1560 waiting := false 1561 1562 // To avoid duplication below. 1563 queuePacket := func(msg outMsg, list *list.List, waiting bool) bool { 1564 if !waiting { 1565 p.sendQueue <- msg 1566 } else { 1567 list.PushBack(msg) 1568 } 1569 // we are always waiting now. 1570 return true 1571 } 1572out: 1573 for { 1574 select { 1575 case msg := <-p.outputQueue: 1576 waiting = queuePacket(msg, pendingMsgs, waiting) 1577 1578 // This channel is notified when a message has been sent across 1579 // the network socket. 1580 case <-p.sendDoneQueue: 1581 // No longer waiting if there are no more messages 1582 // in the pending messages queue. 1583 next := pendingMsgs.Front() 1584 if next == nil { 1585 waiting = false 1586 continue 1587 } 1588 1589 // Notify the outHandler about the next item to 1590 // asynchronously send. 1591 val := pendingMsgs.Remove(next) 1592 p.sendQueue <- val.(outMsg) 1593 1594 case iv := <-p.outputInvChan: 1595 // No handshake? They'll find out soon enough. 1596 if p.VersionKnown() { 1597 // If this is a new block, then we'll blast it 1598 // out immediately, sipping the inv trickle 1599 // queue. 1600 if iv.Type == wire.InvTypeBlock || 1601 iv.Type == wire.InvTypeWitnessBlock { 1602 1603 invMsg := wire.NewMsgInvSizeHint(1) 1604 invMsg.AddInvVect(iv) 1605 waiting = queuePacket(outMsg{msg: invMsg}, 1606 pendingMsgs, waiting) 1607 } else { 1608 invSendQueue.PushBack(iv) 1609 } 1610 } 1611 1612 case <-trickleTicker.C: 1613 // Don't send anything if we're disconnecting or there 1614 // is no queued inventory. 1615 // version is known if send queue has any entries. 1616 if atomic.LoadInt32(&p.disconnect) != 0 || 1617 invSendQueue.Len() == 0 { 1618 continue 1619 } 1620 1621 // Create and send as many inv messages as needed to 1622 // drain the inventory send queue. 1623 invMsg := wire.NewMsgInvSizeHint(uint(invSendQueue.Len())) 1624 for e := invSendQueue.Front(); e != nil; e = invSendQueue.Front() { 1625 iv := invSendQueue.Remove(e).(*wire.InvVect) 1626 1627 // Don't send inventory that became known after 1628 // the initial check. 1629 if p.knownInventory.Exists(iv) { 1630 continue 1631 } 1632 1633 invMsg.AddInvVect(iv) 1634 if len(invMsg.InvList) >= maxInvTrickleSize { 1635 waiting = queuePacket( 1636 outMsg{msg: invMsg}, 1637 pendingMsgs, waiting) 1638 invMsg = wire.NewMsgInvSizeHint(uint(invSendQueue.Len())) 1639 } 1640 1641 // Add the inventory that is being relayed to 1642 // the known inventory for the peer. 1643 p.AddKnownInventory(iv) 1644 } 1645 if len(invMsg.InvList) > 0 { 1646 waiting = queuePacket(outMsg{msg: invMsg}, 1647 pendingMsgs, waiting) 1648 } 1649 1650 case <-p.quit: 1651 break out 1652 } 1653 } 1654 1655 // Drain any wait channels before we go away so we don't leave something 1656 // waiting for us. 1657 for e := pendingMsgs.Front(); e != nil; e = pendingMsgs.Front() { 1658 val := pendingMsgs.Remove(e) 1659 msg := val.(outMsg) 1660 if msg.doneChan != nil { 1661 msg.doneChan <- struct{}{} 1662 } 1663 } 1664cleanup: 1665 for { 1666 select { 1667 case msg := <-p.outputQueue: 1668 if msg.doneChan != nil { 1669 msg.doneChan <- struct{}{} 1670 } 1671 case <-p.outputInvChan: 1672 // Just drain channel 1673 // sendDoneQueue is buffered so doesn't need draining. 1674 default: 1675 break cleanup 1676 } 1677 } 1678 close(p.queueQuit) 1679 log.Tracef("Peer queue handler done for %s", p) 1680} 1681 1682// shouldLogWriteError returns whether or not the passed error, which is 1683// expected to have come from writing to the remote peer in the outHandler, 1684// should be logged. 1685func (p *Peer) shouldLogWriteError(err error) bool { 1686 // No logging when the peer is being forcibly disconnected. 1687 if atomic.LoadInt32(&p.disconnect) != 0 { 1688 return false 1689 } 1690 1691 // No logging when the remote peer has been disconnected. 1692 if err == io.EOF { 1693 return false 1694 } 1695 if opErr, ok := err.(*net.OpError); ok && !opErr.Temporary() { 1696 return false 1697 } 1698 1699 return true 1700} 1701 1702// outHandler handles all outgoing messages for the peer. It must be run as a 1703// goroutine. It uses a buffered channel to serialize output messages while 1704// allowing the sender to continue running asynchronously. 1705func (p *Peer) outHandler() { 1706out: 1707 for { 1708 select { 1709 case msg := <-p.sendQueue: 1710 switch m := msg.msg.(type) { 1711 case *wire.MsgPing: 1712 // Only expects a pong message in later protocol 1713 // versions. Also set up statistics. 1714 if p.ProtocolVersion() > wire.BIP0031Version { 1715 p.statsMtx.Lock() 1716 p.lastPingNonce = m.Nonce 1717 p.lastPingTime = time.Now() 1718 p.statsMtx.Unlock() 1719 } 1720 } 1721 1722 p.stallControl <- stallControlMsg{sccSendMessage, msg.msg} 1723 1724 err := p.writeMessage(msg.msg, msg.encoding) 1725 if err != nil { 1726 p.Disconnect() 1727 if p.shouldLogWriteError(err) { 1728 log.Errorf("Failed to send message to "+ 1729 "%s: %v", p, err) 1730 } 1731 if msg.doneChan != nil { 1732 msg.doneChan <- struct{}{} 1733 } 1734 continue 1735 } 1736 1737 // At this point, the message was successfully sent, so 1738 // update the last send time, signal the sender of the 1739 // message that it has been sent (if requested), and 1740 // signal the send queue to the deliver the next queued 1741 // message. 1742 atomic.StoreInt64(&p.lastSend, time.Now().Unix()) 1743 if msg.doneChan != nil { 1744 msg.doneChan <- struct{}{} 1745 } 1746 p.sendDoneQueue <- struct{}{} 1747 1748 case <-p.quit: 1749 break out 1750 } 1751 } 1752 1753 <-p.queueQuit 1754 1755 // Drain any wait channels before we go away so we don't leave something 1756 // waiting for us. We have waited on queueQuit and thus we can be sure 1757 // that we will not miss anything sent on sendQueue. 1758cleanup: 1759 for { 1760 select { 1761 case msg := <-p.sendQueue: 1762 if msg.doneChan != nil { 1763 msg.doneChan <- struct{}{} 1764 } 1765 // no need to send on sendDoneQueue since queueHandler 1766 // has been waited on and already exited. 1767 default: 1768 break cleanup 1769 } 1770 } 1771 close(p.outQuit) 1772 log.Tracef("Peer output handler done for %s", p) 1773} 1774 1775// pingHandler periodically pings the peer. It must be run as a goroutine. 1776func (p *Peer) pingHandler() { 1777 pingTicker := time.NewTicker(pingInterval) 1778 defer pingTicker.Stop() 1779 1780out: 1781 for { 1782 select { 1783 case <-pingTicker.C: 1784 nonce, err := wire.RandomUint64() 1785 if err != nil { 1786 log.Errorf("Not sending ping to %s: %v", p, err) 1787 continue 1788 } 1789 p.QueueMessage(wire.NewMsgPing(nonce), nil) 1790 1791 case <-p.quit: 1792 break out 1793 } 1794 } 1795} 1796 1797// QueueMessage adds the passed bitcoin message to the peer send queue. 1798// 1799// This function is safe for concurrent access. 1800func (p *Peer) QueueMessage(msg wire.Message, doneChan chan<- struct{}) { 1801 p.QueueMessageWithEncoding(msg, doneChan, wire.BaseEncoding) 1802} 1803 1804// QueueMessageWithEncoding adds the passed bitcoin message to the peer send 1805// queue. This function is identical to QueueMessage, however it allows the 1806// caller to specify the wire encoding type that should be used when 1807// encoding/decoding blocks and transactions. 1808// 1809// This function is safe for concurrent access. 1810func (p *Peer) QueueMessageWithEncoding(msg wire.Message, doneChan chan<- struct{}, 1811 encoding wire.MessageEncoding) { 1812 1813 // Avoid risk of deadlock if goroutine already exited. The goroutine 1814 // we will be sending to hangs around until it knows for a fact that 1815 // it is marked as disconnected and *then* it drains the channels. 1816 if !p.Connected() { 1817 if doneChan != nil { 1818 go func() { 1819 doneChan <- struct{}{} 1820 }() 1821 } 1822 return 1823 } 1824 p.outputQueue <- outMsg{msg: msg, encoding: encoding, doneChan: doneChan} 1825} 1826 1827// QueueInventory adds the passed inventory to the inventory send queue which 1828// might not be sent right away, rather it is trickled to the peer in batches. 1829// Inventory that the peer is already known to have is ignored. 1830// 1831// This function is safe for concurrent access. 1832func (p *Peer) QueueInventory(invVect *wire.InvVect) { 1833 // Don't add the inventory to the send queue if the peer is already 1834 // known to have it. 1835 if p.knownInventory.Exists(invVect) { 1836 return 1837 } 1838 1839 // Avoid risk of deadlock if goroutine already exited. The goroutine 1840 // we will be sending to hangs around until it knows for a fact that 1841 // it is marked as disconnected and *then* it drains the channels. 1842 if !p.Connected() { 1843 return 1844 } 1845 1846 p.outputInvChan <- invVect 1847} 1848 1849// Connected returns whether or not the peer is currently connected. 1850// 1851// This function is safe for concurrent access. 1852func (p *Peer) Connected() bool { 1853 return atomic.LoadInt32(&p.connected) != 0 && 1854 atomic.LoadInt32(&p.disconnect) == 0 1855} 1856 1857// Disconnect disconnects the peer by closing the connection. Calling this 1858// function when the peer is already disconnected or in the process of 1859// disconnecting will have no effect. 1860func (p *Peer) Disconnect() { 1861 if atomic.AddInt32(&p.disconnect, 1) != 1 { 1862 return 1863 } 1864 1865 log.Tracef("Disconnecting %s", p) 1866 if atomic.LoadInt32(&p.connected) != 0 { 1867 p.conn.Close() 1868 } 1869 close(p.quit) 1870} 1871 1872// readRemoteVersionMsg waits for the next message to arrive from the remote 1873// peer. If the next message is not a version message or the version is not 1874// acceptable then return an error. 1875func (p *Peer) readRemoteVersionMsg() error { 1876 // Read their version message. 1877 remoteMsg, _, err := p.readMessage(wire.LatestEncoding) 1878 if err != nil { 1879 return err 1880 } 1881 1882 // Notify and disconnect clients if the first message is not a version 1883 // message. 1884 msg, ok := remoteMsg.(*wire.MsgVersion) 1885 if !ok { 1886 reason := "a version message must precede all others" 1887 rejectMsg := wire.NewMsgReject(msg.Command(), wire.RejectMalformed, 1888 reason) 1889 _ = p.writeMessage(rejectMsg, wire.LatestEncoding) 1890 return errors.New(reason) 1891 } 1892 1893 // Detect self connections. 1894 if !allowSelfConns && sentNonces.Exists(msg.Nonce) { 1895 return errors.New("disconnecting peer connected to self") 1896 } 1897 1898 // Negotiate the protocol version and set the services to what the remote 1899 // peer advertised. 1900 p.flagsMtx.Lock() 1901 p.advertisedProtoVer = uint32(msg.ProtocolVersion) 1902 p.protocolVersion = minUint32(p.protocolVersion, p.advertisedProtoVer) 1903 p.versionKnown = true 1904 p.services = msg.Services 1905 p.flagsMtx.Unlock() 1906 log.Debugf("Negotiated protocol version %d for peer %s", 1907 p.protocolVersion, p) 1908 1909 // Updating a bunch of stats including block based stats, and the 1910 // peer's time offset. 1911 p.statsMtx.Lock() 1912 p.lastBlock = msg.LastBlock 1913 p.startingHeight = msg.LastBlock 1914 p.timeOffset = msg.Timestamp.Unix() - time.Now().Unix() 1915 p.statsMtx.Unlock() 1916 1917 // Set the peer's ID, user agent, and potentially the flag which 1918 // specifies the witness support is enabled. 1919 p.flagsMtx.Lock() 1920 p.id = atomic.AddInt32(&nodeCount, 1) 1921 p.userAgent = msg.UserAgent 1922 1923 // Determine if the peer would like to receive witness data with 1924 // transactions, or not. 1925 if p.services&wire.SFNodeWitness == wire.SFNodeWitness { 1926 p.witnessEnabled = true 1927 } 1928 p.flagsMtx.Unlock() 1929 1930 // Once the version message has been exchanged, we're able to determine 1931 // if this peer knows how to encode witness data over the wire 1932 // protocol. If so, then we'll switch to a decoding mode which is 1933 // prepared for the new transaction format introduced as part of 1934 // BIP0144. 1935 if p.services&wire.SFNodeWitness == wire.SFNodeWitness { 1936 p.wireEncoding = wire.WitnessEncoding 1937 } 1938 1939 // Invoke the callback if specified. 1940 if p.cfg.Listeners.OnVersion != nil { 1941 rejectMsg := p.cfg.Listeners.OnVersion(p, msg) 1942 if rejectMsg != nil { 1943 _ = p.writeMessage(rejectMsg, wire.LatestEncoding) 1944 return errors.New(rejectMsg.Reason) 1945 } 1946 } 1947 1948 // Notify and disconnect clients that have a protocol version that is 1949 // too old. 1950 // 1951 // NOTE: If minAcceptableProtocolVersion is raised to be higher than 1952 // wire.RejectVersion, this should send a reject packet before 1953 // disconnecting. 1954 if uint32(msg.ProtocolVersion) < MinAcceptableProtocolVersion { 1955 // Send a reject message indicating the protocol version is 1956 // obsolete and wait for the message to be sent before 1957 // disconnecting. 1958 reason := fmt.Sprintf("protocol version must be %d or greater", 1959 MinAcceptableProtocolVersion) 1960 rejectMsg := wire.NewMsgReject(msg.Command(), wire.RejectObsolete, 1961 reason) 1962 _ = p.writeMessage(rejectMsg, wire.LatestEncoding) 1963 return errors.New(reason) 1964 } 1965 1966 return nil 1967} 1968 1969// readRemoteVerAckMsg waits for the next message to arrive from the remote 1970// peer. If this message is not a verack message, then an error is returned. 1971// This method is to be used as part of the version negotiation upon a new 1972// connection. 1973func (p *Peer) readRemoteVerAckMsg() error { 1974 // Read the next message from the wire. 1975 remoteMsg, _, err := p.readMessage(wire.LatestEncoding) 1976 if err != nil { 1977 return err 1978 } 1979 1980 // It should be a verack message, otherwise send a reject message to the 1981 // peer explaining why. 1982 msg, ok := remoteMsg.(*wire.MsgVerAck) 1983 if !ok { 1984 reason := "a verack message must follow version" 1985 rejectMsg := wire.NewMsgReject( 1986 msg.Command(), wire.RejectMalformed, reason, 1987 ) 1988 _ = p.writeMessage(rejectMsg, wire.LatestEncoding) 1989 return errors.New(reason) 1990 } 1991 1992 p.flagsMtx.Lock() 1993 p.verAckReceived = true 1994 p.flagsMtx.Unlock() 1995 1996 if p.cfg.Listeners.OnVerAck != nil { 1997 p.cfg.Listeners.OnVerAck(p, msg) 1998 } 1999 2000 return nil 2001} 2002 2003// localVersionMsg creates a version message that can be used to send to the 2004// remote peer. 2005func (p *Peer) localVersionMsg() (*wire.MsgVersion, error) { 2006 var blockNum int32 2007 if p.cfg.NewestBlock != nil { 2008 var err error 2009 _, blockNum, err = p.cfg.NewestBlock() 2010 if err != nil { 2011 return nil, err 2012 } 2013 } 2014 2015 theirNA := p.na 2016 2017 // If we are behind a proxy and the connection comes from the proxy then 2018 // we return an unroutable address as their address. This is to prevent 2019 // leaking the tor proxy address. 2020 if p.cfg.Proxy != "" { 2021 proxyaddress, _, err := net.SplitHostPort(p.cfg.Proxy) 2022 // invalid proxy means poorly configured, be on the safe side. 2023 if err != nil || p.na.IP.String() == proxyaddress { 2024 theirNA = wire.NewNetAddressIPPort(net.IP([]byte{0, 0, 0, 0}), 0, 2025 theirNA.Services) 2026 } 2027 } 2028 2029 // Create a wire.NetAddress with only the services set to use as the 2030 // "addrme" in the version message. 2031 // 2032 // Older nodes previously added the IP and port information to the 2033 // address manager which proved to be unreliable as an inbound 2034 // connection from a peer didn't necessarily mean the peer itself 2035 // accepted inbound connections. 2036 // 2037 // Also, the timestamp is unused in the version message. 2038 ourNA := &wire.NetAddress{ 2039 Services: p.cfg.Services, 2040 } 2041 2042 // Generate a unique nonce for this peer so self connections can be 2043 // detected. This is accomplished by adding it to a size-limited map of 2044 // recently seen nonces. 2045 nonce := uint64(rand.Int63()) 2046 sentNonces.Add(nonce) 2047 2048 // Version message. 2049 msg := wire.NewMsgVersion(ourNA, theirNA, nonce, blockNum) 2050 msg.AddUserAgent(p.cfg.UserAgentName, p.cfg.UserAgentVersion, 2051 p.cfg.UserAgentComments...) 2052 2053 // Advertise local services. 2054 msg.Services = p.cfg.Services 2055 2056 // Advertise our max supported protocol version. 2057 msg.ProtocolVersion = int32(p.cfg.ProtocolVersion) 2058 2059 // Advertise if inv messages for transactions are desired. 2060 msg.DisableRelayTx = p.cfg.DisableRelayTx 2061 2062 return msg, nil 2063} 2064 2065// writeLocalVersionMsg writes our version message to the remote peer. 2066func (p *Peer) writeLocalVersionMsg() error { 2067 localVerMsg, err := p.localVersionMsg() 2068 if err != nil { 2069 return err 2070 } 2071 2072 return p.writeMessage(localVerMsg, wire.LatestEncoding) 2073} 2074 2075// negotiateInboundProtocol performs the negotiation protocol for an inbound 2076// peer. The events should occur in the following order, otherwise an error is 2077// returned: 2078// 2079// 1. Remote peer sends their version. 2080// 2. We send our version. 2081// 3. We send our verack. 2082// 4. Remote peer sends their verack. 2083func (p *Peer) negotiateInboundProtocol() error { 2084 if err := p.readRemoteVersionMsg(); err != nil { 2085 return err 2086 } 2087 2088 if err := p.writeLocalVersionMsg(); err != nil { 2089 return err 2090 } 2091 2092 err := p.writeMessage(wire.NewMsgVerAck(), wire.LatestEncoding) 2093 if err != nil { 2094 return err 2095 } 2096 2097 return p.readRemoteVerAckMsg() 2098} 2099 2100// negotiateOutoundProtocol performs the negotiation protocol for an outbound 2101// peer. The events should occur in the following order, otherwise an error is 2102// returned: 2103// 2104// 1. We send our version. 2105// 2. Remote peer sends their version. 2106// 3. Remote peer sends their verack. 2107// 4. We send our verack. 2108func (p *Peer) negotiateOutboundProtocol() error { 2109 if err := p.writeLocalVersionMsg(); err != nil { 2110 return err 2111 } 2112 2113 if err := p.readRemoteVersionMsg(); err != nil { 2114 return err 2115 } 2116 2117 if err := p.readRemoteVerAckMsg(); err != nil { 2118 return err 2119 } 2120 2121 return p.writeMessage(wire.NewMsgVerAck(), wire.LatestEncoding) 2122} 2123 2124// start begins processing input and output messages. 2125func (p *Peer) start() error { 2126 log.Tracef("Starting peer %s", p) 2127 2128 negotiateErr := make(chan error, 1) 2129 go func() { 2130 if p.inbound { 2131 negotiateErr <- p.negotiateInboundProtocol() 2132 } else { 2133 negotiateErr <- p.negotiateOutboundProtocol() 2134 } 2135 }() 2136 2137 // Negotiate the protocol within the specified negotiateTimeout. 2138 select { 2139 case err := <-negotiateErr: 2140 if err != nil { 2141 p.Disconnect() 2142 return err 2143 } 2144 case <-time.After(negotiateTimeout): 2145 p.Disconnect() 2146 return errors.New("protocol negotiation timeout") 2147 } 2148 log.Debugf("Connected to %s", p.Addr()) 2149 2150 // The protocol has been negotiated successfully so start processing input 2151 // and output messages. 2152 go p.stallHandler() 2153 go p.inHandler() 2154 go p.queueHandler() 2155 go p.outHandler() 2156 go p.pingHandler() 2157 2158 return nil 2159} 2160 2161// AssociateConnection associates the given conn to the peer. Calling this 2162// function when the peer is already connected will have no effect. 2163func (p *Peer) AssociateConnection(conn net.Conn) { 2164 // Already connected? 2165 if !atomic.CompareAndSwapInt32(&p.connected, 0, 1) { 2166 return 2167 } 2168 2169 p.conn = conn 2170 p.timeConnected = time.Now() 2171 2172 if p.inbound { 2173 p.addr = p.conn.RemoteAddr().String() 2174 2175 // Set up a NetAddress for the peer to be used with AddrManager. We 2176 // only do this inbound because outbound set this up at connection time 2177 // and no point recomputing. 2178 na, err := newNetAddress(p.conn.RemoteAddr(), p.services) 2179 if err != nil { 2180 log.Errorf("Cannot create remote net address: %v", err) 2181 p.Disconnect() 2182 return 2183 } 2184 p.na = na 2185 } 2186 2187 go func() { 2188 if err := p.start(); err != nil { 2189 log.Debugf("Cannot start peer %v: %v", p, err) 2190 p.Disconnect() 2191 } 2192 }() 2193} 2194 2195// WaitForDisconnect waits until the peer has completely disconnected and all 2196// resources are cleaned up. This will happen if either the local or remote 2197// side has been disconnected or the peer is forcibly disconnected via 2198// Disconnect. 2199func (p *Peer) WaitForDisconnect() { 2200 <-p.quit 2201} 2202 2203// newPeerBase returns a new base bitcoin peer based on the inbound flag. This 2204// is used by the NewInboundPeer and NewOutboundPeer functions to perform base 2205// setup needed by both types of peers. 2206func newPeerBase(origCfg *Config, inbound bool) *Peer { 2207 // Default to the max supported protocol version if not specified by the 2208 // caller. 2209 cfg := *origCfg // Copy to avoid mutating caller. 2210 if cfg.ProtocolVersion == 0 { 2211 cfg.ProtocolVersion = MaxProtocolVersion 2212 } 2213 2214 // Set the chain parameters to testnet if the caller did not specify any. 2215 if cfg.ChainParams == nil { 2216 cfg.ChainParams = &chaincfg.TestNet3Params 2217 } 2218 2219 // Set the trickle interval if a non-positive value is specified. 2220 if cfg.TrickleInterval <= 0 { 2221 cfg.TrickleInterval = DefaultTrickleInterval 2222 } 2223 2224 p := Peer{ 2225 inbound: inbound, 2226 wireEncoding: wire.BaseEncoding, 2227 knownInventory: newMruInventoryMap(maxKnownInventory), 2228 stallControl: make(chan stallControlMsg, 1), // nonblocking sync 2229 outputQueue: make(chan outMsg, outputBufferSize), 2230 sendQueue: make(chan outMsg, 1), // nonblocking sync 2231 sendDoneQueue: make(chan struct{}, 1), // nonblocking sync 2232 outputInvChan: make(chan *wire.InvVect, outputBufferSize), 2233 inQuit: make(chan struct{}), 2234 queueQuit: make(chan struct{}), 2235 outQuit: make(chan struct{}), 2236 quit: make(chan struct{}), 2237 cfg: cfg, // Copy so caller can't mutate. 2238 services: cfg.Services, 2239 protocolVersion: cfg.ProtocolVersion, 2240 } 2241 return &p 2242} 2243 2244// NewInboundPeer returns a new inbound bitcoin peer. Use Start to begin 2245// processing incoming and outgoing messages. 2246func NewInboundPeer(cfg *Config) *Peer { 2247 return newPeerBase(cfg, true) 2248} 2249 2250// NewOutboundPeer returns a new outbound bitcoin peer. 2251func NewOutboundPeer(cfg *Config, addr string) (*Peer, error) { 2252 p := newPeerBase(cfg, false) 2253 p.addr = addr 2254 2255 host, portStr, err := net.SplitHostPort(addr) 2256 if err != nil { 2257 return nil, err 2258 } 2259 2260 port, err := strconv.ParseUint(portStr, 10, 16) 2261 if err != nil { 2262 return nil, err 2263 } 2264 2265 if cfg.HostToNetAddress != nil { 2266 na, err := cfg.HostToNetAddress(host, uint16(port), 0) 2267 if err != nil { 2268 return nil, err 2269 } 2270 p.na = na 2271 } else { 2272 p.na = wire.NewNetAddressIPPort(net.ParseIP(host), uint16(port), 0) 2273 } 2274 2275 return p, nil 2276} 2277 2278func init() { 2279 rand.Seed(time.Now().UnixNano()) 2280} 2281