1// Copyright (c) 2013-2017 The btcsuite developers 2// Copyright (c) 2015-2018 The Decred developers 3// Use of this source code is governed by an ISC 4// license that can be found in the LICENSE file. 5 6package main 7 8import ( 9 "bytes" 10 "crypto/rand" 11 "crypto/tls" 12 "encoding/binary" 13 "errors" 14 "fmt" 15 "math" 16 "net" 17 "runtime" 18 "sort" 19 "strconv" 20 "strings" 21 "sync" 22 "sync/atomic" 23 "time" 24 25 "github.com/btcsuite/btcd/addrmgr" 26 "github.com/btcsuite/btcd/blockchain" 27 "github.com/btcsuite/btcd/blockchain/indexers" 28 "github.com/btcsuite/btcd/chaincfg" 29 "github.com/btcsuite/btcd/chaincfg/chainhash" 30 "github.com/btcsuite/btcd/connmgr" 31 "github.com/btcsuite/btcd/database" 32 "github.com/btcsuite/btcd/mempool" 33 "github.com/btcsuite/btcd/mining" 34 "github.com/btcsuite/btcd/mining/cpuminer" 35 "github.com/btcsuite/btcd/netsync" 36 "github.com/btcsuite/btcd/peer" 37 "github.com/btcsuite/btcd/txscript" 38 "github.com/btcsuite/btcd/wire" 39 "github.com/btcsuite/btcutil" 40 "github.com/btcsuite/btcutil/bloom" 41) 42 43const ( 44 // defaultServices describes the default services that are supported by 45 // the server. 46 defaultServices = wire.SFNodeNetwork | wire.SFNodeBloom | 47 wire.SFNodeWitness | wire.SFNodeCF 48 49 // defaultRequiredServices describes the default services that are 50 // required to be supported by outbound peers. 51 defaultRequiredServices = wire.SFNodeNetwork 52 53 // defaultTargetOutbound is the default number of outbound peers to target. 54 defaultTargetOutbound = 8 55 56 // connectionRetryInterval is the base amount of time to wait in between 57 // retries when connecting to persistent peers. It is adjusted by the 58 // number of retries such that there is a retry backoff. 59 connectionRetryInterval = time.Second * 5 60) 61 62var ( 63 // userAgentName is the user agent name and is used to help identify 64 // ourselves to other bitcoin peers. 65 userAgentName = "btcd" 66 67 // userAgentVersion is the user agent version and is used to help 68 // identify ourselves to other bitcoin peers. 69 userAgentVersion = fmt.Sprintf("%d.%d.%d", appMajor, appMinor, appPatch) 70) 71 72// zeroHash is the zero value hash (all zeros). It is defined as a convenience. 73var zeroHash chainhash.Hash 74 75// onionAddr implements the net.Addr interface and represents a tor address. 76type onionAddr struct { 77 addr string 78} 79 80// String returns the onion address. 81// 82// This is part of the net.Addr interface. 83func (oa *onionAddr) String() string { 84 return oa.addr 85} 86 87// Network returns "onion". 88// 89// This is part of the net.Addr interface. 90func (oa *onionAddr) Network() string { 91 return "onion" 92} 93 94// Ensure onionAddr implements the net.Addr interface. 95var _ net.Addr = (*onionAddr)(nil) 96 97// simpleAddr implements the net.Addr interface with two struct fields 98type simpleAddr struct { 99 net, addr string 100} 101 102// String returns the address. 103// 104// This is part of the net.Addr interface. 105func (a simpleAddr) String() string { 106 return a.addr 107} 108 109// Network returns the network. 110// 111// This is part of the net.Addr interface. 112func (a simpleAddr) Network() string { 113 return a.net 114} 115 116// Ensure simpleAddr implements the net.Addr interface. 117var _ net.Addr = simpleAddr{} 118 119// broadcastMsg provides the ability to house a bitcoin message to be broadcast 120// to all connected peers except specified excluded peers. 121type broadcastMsg struct { 122 message wire.Message 123 excludePeers []*serverPeer 124} 125 126// broadcastInventoryAdd is a type used to declare that the InvVect it contains 127// needs to be added to the rebroadcast map 128type broadcastInventoryAdd relayMsg 129 130// broadcastInventoryDel is a type used to declare that the InvVect it contains 131// needs to be removed from the rebroadcast map 132type broadcastInventoryDel *wire.InvVect 133 134// relayMsg packages an inventory vector along with the newly discovered 135// inventory so the relay has access to that information. 136type relayMsg struct { 137 invVect *wire.InvVect 138 data interface{} 139} 140 141// updatePeerHeightsMsg is a message sent from the blockmanager to the server 142// after a new block has been accepted. The purpose of the message is to update 143// the heights of peers that were known to announce the block before we 144// connected it to the main chain or recognized it as an orphan. With these 145// updates, peer heights will be kept up to date, allowing for fresh data when 146// selecting sync peer candidacy. 147type updatePeerHeightsMsg struct { 148 newHash *chainhash.Hash 149 newHeight int32 150 originPeer *peer.Peer 151} 152 153// peerState maintains state of inbound, persistent, outbound peers as well 154// as banned peers and outbound groups. 155type peerState struct { 156 inboundPeers map[int32]*serverPeer 157 outboundPeers map[int32]*serverPeer 158 persistentPeers map[int32]*serverPeer 159 banned map[string]time.Time 160 outboundGroups map[string]int 161} 162 163// Count returns the count of all known peers. 164func (ps *peerState) Count() int { 165 return len(ps.inboundPeers) + len(ps.outboundPeers) + 166 len(ps.persistentPeers) 167} 168 169// forAllOutboundPeers is a helper function that runs closure on all outbound 170// peers known to peerState. 171func (ps *peerState) forAllOutboundPeers(closure func(sp *serverPeer)) { 172 for _, e := range ps.outboundPeers { 173 closure(e) 174 } 175 for _, e := range ps.persistentPeers { 176 closure(e) 177 } 178} 179 180// forAllPeers is a helper function that runs closure on all peers known to 181// peerState. 182func (ps *peerState) forAllPeers(closure func(sp *serverPeer)) { 183 for _, e := range ps.inboundPeers { 184 closure(e) 185 } 186 ps.forAllOutboundPeers(closure) 187} 188 189// cfHeaderKV is a tuple of a filter header and its associated block hash. The 190// struct is used to cache cfcheckpt responses. 191type cfHeaderKV struct { 192 blockHash chainhash.Hash 193 filterHeader chainhash.Hash 194} 195 196// server provides a bitcoin server for handling communications to and from 197// bitcoin peers. 198type server struct { 199 // The following variables must only be used atomically. 200 // Putting the uint64s first makes them 64-bit aligned for 32-bit systems. 201 bytesReceived uint64 // Total bytes received from all peers since start. 202 bytesSent uint64 // Total bytes sent by all peers since start. 203 started int32 204 shutdown int32 205 shutdownSched int32 206 startupTime int64 207 208 chainParams *chaincfg.Params 209 addrManager *addrmgr.AddrManager 210 connManager *connmgr.ConnManager 211 sigCache *txscript.SigCache 212 hashCache *txscript.HashCache 213 rpcServer *rpcServer 214 syncManager *netsync.SyncManager 215 chain *blockchain.BlockChain 216 txMemPool *mempool.TxPool 217 cpuMiner *cpuminer.CPUMiner 218 modifyRebroadcastInv chan interface{} 219 newPeers chan *serverPeer 220 donePeers chan *serverPeer 221 banPeers chan *serverPeer 222 query chan interface{} 223 relayInv chan relayMsg 224 broadcast chan broadcastMsg 225 peerHeightsUpdate chan updatePeerHeightsMsg 226 wg sync.WaitGroup 227 quit chan struct{} 228 nat NAT 229 db database.DB 230 timeSource blockchain.MedianTimeSource 231 services wire.ServiceFlag 232 233 // The following fields are used for optional indexes. They will be nil 234 // if the associated index is not enabled. These fields are set during 235 // initial creation of the server and never changed afterwards, so they 236 // do not need to be protected for concurrent access. 237 txIndex *indexers.TxIndex 238 addrIndex *indexers.AddrIndex 239 cfIndex *indexers.CfIndex 240 241 // The fee estimator keeps track of how long transactions are left in 242 // the mempool before they are mined into blocks. 243 feeEstimator *mempool.FeeEstimator 244 245 // cfCheckptCaches stores a cached slice of filter headers for cfcheckpt 246 // messages for each filter type. 247 cfCheckptCaches map[wire.FilterType][]cfHeaderKV 248 cfCheckptCachesMtx sync.RWMutex 249 250 // agentBlacklist is a list of blacklisted substrings by which to filter 251 // user agents. 252 agentBlacklist []string 253 254 // agentWhitelist is a list of whitelisted user agent substrings, no 255 // whitelisting will be applied if the list is empty or nil. 256 agentWhitelist []string 257} 258 259// serverPeer extends the peer to maintain state shared by the server and 260// the blockmanager. 261type serverPeer struct { 262 // The following variables must only be used atomically 263 feeFilter int64 264 265 *peer.Peer 266 267 connReq *connmgr.ConnReq 268 server *server 269 persistent bool 270 continueHash *chainhash.Hash 271 relayMtx sync.Mutex 272 disableRelayTx bool 273 sentAddrs bool 274 isWhitelisted bool 275 filter *bloom.Filter 276 addressesMtx sync.RWMutex 277 knownAddresses map[string]struct{} 278 banScore connmgr.DynamicBanScore 279 quit chan struct{} 280 // The following chans are used to sync blockmanager and server. 281 txProcessed chan struct{} 282 blockProcessed chan struct{} 283} 284 285// newServerPeer returns a new serverPeer instance. The peer needs to be set by 286// the caller. 287func newServerPeer(s *server, isPersistent bool) *serverPeer { 288 return &serverPeer{ 289 server: s, 290 persistent: isPersistent, 291 filter: bloom.LoadFilter(nil), 292 knownAddresses: make(map[string]struct{}), 293 quit: make(chan struct{}), 294 txProcessed: make(chan struct{}, 1), 295 blockProcessed: make(chan struct{}, 1), 296 } 297} 298 299// newestBlock returns the current best block hash and height using the format 300// required by the configuration for the peer package. 301func (sp *serverPeer) newestBlock() (*chainhash.Hash, int32, error) { 302 best := sp.server.chain.BestSnapshot() 303 return &best.Hash, best.Height, nil 304} 305 306// addKnownAddresses adds the given addresses to the set of known addresses to 307// the peer to prevent sending duplicate addresses. 308func (sp *serverPeer) addKnownAddresses(addresses []*wire.NetAddress) { 309 sp.addressesMtx.Lock() 310 for _, na := range addresses { 311 sp.knownAddresses[addrmgr.NetAddressKey(na)] = struct{}{} 312 } 313 sp.addressesMtx.Unlock() 314} 315 316// addressKnown true if the given address is already known to the peer. 317func (sp *serverPeer) addressKnown(na *wire.NetAddress) bool { 318 sp.addressesMtx.RLock() 319 _, exists := sp.knownAddresses[addrmgr.NetAddressKey(na)] 320 sp.addressesMtx.RUnlock() 321 return exists 322} 323 324// setDisableRelayTx toggles relaying of transactions for the given peer. 325// It is safe for concurrent access. 326func (sp *serverPeer) setDisableRelayTx(disable bool) { 327 sp.relayMtx.Lock() 328 sp.disableRelayTx = disable 329 sp.relayMtx.Unlock() 330} 331 332// relayTxDisabled returns whether or not relaying of transactions for the given 333// peer is disabled. 334// It is safe for concurrent access. 335func (sp *serverPeer) relayTxDisabled() bool { 336 sp.relayMtx.Lock() 337 isDisabled := sp.disableRelayTx 338 sp.relayMtx.Unlock() 339 340 return isDisabled 341} 342 343// pushAddrMsg sends an addr message to the connected peer using the provided 344// addresses. 345func (sp *serverPeer) pushAddrMsg(addresses []*wire.NetAddress) { 346 // Filter addresses already known to the peer. 347 addrs := make([]*wire.NetAddress, 0, len(addresses)) 348 for _, addr := range addresses { 349 if !sp.addressKnown(addr) { 350 addrs = append(addrs, addr) 351 } 352 } 353 known, err := sp.PushAddrMsg(addrs) 354 if err != nil { 355 peerLog.Errorf("Can't push address message to %s: %v", sp.Peer, err) 356 sp.Disconnect() 357 return 358 } 359 sp.addKnownAddresses(known) 360} 361 362// addBanScore increases the persistent and decaying ban score fields by the 363// values passed as parameters. If the resulting score exceeds half of the ban 364// threshold, a warning is logged including the reason provided. Further, if 365// the score is above the ban threshold, the peer will be banned and 366// disconnected. 367func (sp *serverPeer) addBanScore(persistent, transient uint32, reason string) { 368 // No warning is logged and no score is calculated if banning is disabled. 369 if cfg.DisableBanning { 370 return 371 } 372 if sp.isWhitelisted { 373 peerLog.Debugf("Misbehaving whitelisted peer %s: %s", sp, reason) 374 return 375 } 376 377 warnThreshold := cfg.BanThreshold >> 1 378 if transient == 0 && persistent == 0 { 379 // The score is not being increased, but a warning message is still 380 // logged if the score is above the warn threshold. 381 score := sp.banScore.Int() 382 if score > warnThreshold { 383 peerLog.Warnf("Misbehaving peer %s: %s -- ban score is %d, "+ 384 "it was not increased this time", sp, reason, score) 385 } 386 return 387 } 388 score := sp.banScore.Increase(persistent, transient) 389 if score > warnThreshold { 390 peerLog.Warnf("Misbehaving peer %s: %s -- ban score increased to %d", 391 sp, reason, score) 392 if score > cfg.BanThreshold { 393 peerLog.Warnf("Misbehaving peer %s -- banning and disconnecting", 394 sp) 395 sp.server.BanPeer(sp) 396 sp.Disconnect() 397 } 398 } 399} 400 401// hasServices returns whether or not the provided advertised service flags have 402// all of the provided desired service flags set. 403func hasServices(advertised, desired wire.ServiceFlag) bool { 404 return advertised&desired == desired 405} 406 407// OnVersion is invoked when a peer receives a version bitcoin message 408// and is used to negotiate the protocol version details as well as kick start 409// the communications. 410func (sp *serverPeer) OnVersion(_ *peer.Peer, msg *wire.MsgVersion) *wire.MsgReject { 411 // Update the address manager with the advertised services for outbound 412 // connections in case they have changed. This is not done for inbound 413 // connections to help prevent malicious behavior and is skipped when 414 // running on the simulation test network since it is only intended to 415 // connect to specified peers and actively avoids advertising and 416 // connecting to discovered peers. 417 // 418 // NOTE: This is done before rejecting peers that are too old to ensure 419 // it is updated regardless in the case a new minimum protocol version is 420 // enforced and the remote node has not upgraded yet. 421 isInbound := sp.Inbound() 422 remoteAddr := sp.NA() 423 addrManager := sp.server.addrManager 424 if !cfg.SimNet && !isInbound { 425 addrManager.SetServices(remoteAddr, msg.Services) 426 } 427 428 // Ignore peers that have a protcol version that is too old. The peer 429 // negotiation logic will disconnect it after this callback returns. 430 if msg.ProtocolVersion < int32(peer.MinAcceptableProtocolVersion) { 431 return nil 432 } 433 434 // Reject outbound peers that are not full nodes. 435 wantServices := wire.SFNodeNetwork 436 if !isInbound && !hasServices(msg.Services, wantServices) { 437 missingServices := wantServices & ^msg.Services 438 srvrLog.Debugf("Rejecting peer %s with services %v due to not "+ 439 "providing desired services %v", sp.Peer, msg.Services, 440 missingServices) 441 reason := fmt.Sprintf("required services %#x not offered", 442 uint64(missingServices)) 443 return wire.NewMsgReject(msg.Command(), wire.RejectNonstandard, reason) 444 } 445 446 if !cfg.SimNet && !isInbound { 447 // After soft-fork activation, only make outbound 448 // connection to peers if they flag that they're segwit 449 // enabled. 450 chain := sp.server.chain 451 segwitActive, err := chain.IsDeploymentActive(chaincfg.DeploymentSegwit) 452 if err != nil { 453 peerLog.Errorf("Unable to query for segwit soft-fork state: %v", 454 err) 455 return nil 456 } 457 458 if segwitActive && !sp.IsWitnessEnabled() { 459 peerLog.Infof("Disconnecting non-segwit peer %v, isn't segwit "+ 460 "enabled and we need more segwit enabled peers", sp) 461 sp.Disconnect() 462 return nil 463 } 464 } 465 466 // Add the remote peer time as a sample for creating an offset against 467 // the local clock to keep the network time in sync. 468 sp.server.timeSource.AddTimeSample(sp.Addr(), msg.Timestamp) 469 470 // Choose whether or not to relay transactions before a filter command 471 // is received. 472 sp.setDisableRelayTx(msg.DisableRelayTx) 473 474 return nil 475} 476 477// OnVerAck is invoked when a peer receives a verack bitcoin message and is used 478// to kick start communication with them. 479func (sp *serverPeer) OnVerAck(_ *peer.Peer, _ *wire.MsgVerAck) { 480 sp.server.AddPeer(sp) 481} 482 483// OnMemPool is invoked when a peer receives a mempool bitcoin message. 484// It creates and sends an inventory message with the contents of the memory 485// pool up to the maximum inventory allowed per message. When the peer has a 486// bloom filter loaded, the contents are filtered accordingly. 487func (sp *serverPeer) OnMemPool(_ *peer.Peer, msg *wire.MsgMemPool) { 488 // Only allow mempool requests if the server has bloom filtering 489 // enabled. 490 if sp.server.services&wire.SFNodeBloom != wire.SFNodeBloom { 491 peerLog.Debugf("peer %v sent mempool request with bloom "+ 492 "filtering disabled -- disconnecting", sp) 493 sp.Disconnect() 494 return 495 } 496 497 // A decaying ban score increase is applied to prevent flooding. 498 // The ban score accumulates and passes the ban threshold if a burst of 499 // mempool messages comes from a peer. The score decays each minute to 500 // half of its value. 501 sp.addBanScore(0, 33, "mempool") 502 503 // Generate inventory message with the available transactions in the 504 // transaction memory pool. Limit it to the max allowed inventory 505 // per message. The NewMsgInvSizeHint function automatically limits 506 // the passed hint to the maximum allowed, so it's safe to pass it 507 // without double checking it here. 508 txMemPool := sp.server.txMemPool 509 txDescs := txMemPool.TxDescs() 510 invMsg := wire.NewMsgInvSizeHint(uint(len(txDescs))) 511 512 for _, txDesc := range txDescs { 513 // Either add all transactions when there is no bloom filter, 514 // or only the transactions that match the filter when there is 515 // one. 516 if !sp.filter.IsLoaded() || sp.filter.MatchTxAndUpdate(txDesc.Tx) { 517 iv := wire.NewInvVect(wire.InvTypeTx, txDesc.Tx.Hash()) 518 invMsg.AddInvVect(iv) 519 if len(invMsg.InvList)+1 > wire.MaxInvPerMsg { 520 break 521 } 522 } 523 } 524 525 // Send the inventory message if there is anything to send. 526 if len(invMsg.InvList) > 0 { 527 sp.QueueMessage(invMsg, nil) 528 } 529} 530 531// OnTx is invoked when a peer receives a tx bitcoin message. It blocks 532// until the bitcoin transaction has been fully processed. Unlock the block 533// handler this does not serialize all transactions through a single thread 534// transactions don't rely on the previous one in a linear fashion like blocks. 535func (sp *serverPeer) OnTx(_ *peer.Peer, msg *wire.MsgTx) { 536 if cfg.BlocksOnly { 537 peerLog.Tracef("Ignoring tx %v from %v - blocksonly enabled", 538 msg.TxHash(), sp) 539 return 540 } 541 542 // Add the transaction to the known inventory for the peer. 543 // Convert the raw MsgTx to a btcutil.Tx which provides some convenience 544 // methods and things such as hash caching. 545 tx := btcutil.NewTx(msg) 546 iv := wire.NewInvVect(wire.InvTypeTx, tx.Hash()) 547 sp.AddKnownInventory(iv) 548 549 // Queue the transaction up to be handled by the sync manager and 550 // intentionally block further receives until the transaction is fully 551 // processed and known good or bad. This helps prevent a malicious peer 552 // from queuing up a bunch of bad transactions before disconnecting (or 553 // being disconnected) and wasting memory. 554 sp.server.syncManager.QueueTx(tx, sp.Peer, sp.txProcessed) 555 <-sp.txProcessed 556} 557 558// OnBlock is invoked when a peer receives a block bitcoin message. It 559// blocks until the bitcoin block has been fully processed. 560func (sp *serverPeer) OnBlock(_ *peer.Peer, msg *wire.MsgBlock, buf []byte) { 561 // Convert the raw MsgBlock to a btcutil.Block which provides some 562 // convenience methods and things such as hash caching. 563 block := btcutil.NewBlockFromBlockAndBytes(msg, buf) 564 565 // Add the block to the known inventory for the peer. 566 iv := wire.NewInvVect(wire.InvTypeBlock, block.Hash()) 567 sp.AddKnownInventory(iv) 568 569 // Queue the block up to be handled by the block 570 // manager and intentionally block further receives 571 // until the bitcoin block is fully processed and known 572 // good or bad. This helps prevent a malicious peer 573 // from queuing up a bunch of bad blocks before 574 // disconnecting (or being disconnected) and wasting 575 // memory. Additionally, this behavior is depended on 576 // by at least the block acceptance test tool as the 577 // reference implementation processes blocks in the same 578 // thread and therefore blocks further messages until 579 // the bitcoin block has been fully processed. 580 sp.server.syncManager.QueueBlock(block, sp.Peer, sp.blockProcessed) 581 <-sp.blockProcessed 582} 583 584// OnInv is invoked when a peer receives an inv bitcoin message and is 585// used to examine the inventory being advertised by the remote peer and react 586// accordingly. We pass the message down to blockmanager which will call 587// QueueMessage with any appropriate responses. 588func (sp *serverPeer) OnInv(_ *peer.Peer, msg *wire.MsgInv) { 589 if !cfg.BlocksOnly { 590 if len(msg.InvList) > 0 { 591 sp.server.syncManager.QueueInv(msg, sp.Peer) 592 } 593 return 594 } 595 596 newInv := wire.NewMsgInvSizeHint(uint(len(msg.InvList))) 597 for _, invVect := range msg.InvList { 598 if invVect.Type == wire.InvTypeTx { 599 peerLog.Tracef("Ignoring tx %v in inv from %v -- "+ 600 "blocksonly enabled", invVect.Hash, sp) 601 if sp.ProtocolVersion() >= wire.BIP0037Version { 602 peerLog.Infof("Peer %v is announcing "+ 603 "transactions -- disconnecting", sp) 604 sp.Disconnect() 605 return 606 } 607 continue 608 } 609 err := newInv.AddInvVect(invVect) 610 if err != nil { 611 peerLog.Errorf("Failed to add inventory vector: %v", err) 612 break 613 } 614 } 615 616 if len(newInv.InvList) > 0 { 617 sp.server.syncManager.QueueInv(newInv, sp.Peer) 618 } 619} 620 621// OnHeaders is invoked when a peer receives a headers bitcoin 622// message. The message is passed down to the sync manager. 623func (sp *serverPeer) OnHeaders(_ *peer.Peer, msg *wire.MsgHeaders) { 624 sp.server.syncManager.QueueHeaders(msg, sp.Peer) 625} 626 627// handleGetData is invoked when a peer receives a getdata bitcoin message and 628// is used to deliver block and transaction information. 629func (sp *serverPeer) OnGetData(_ *peer.Peer, msg *wire.MsgGetData) { 630 numAdded := 0 631 notFound := wire.NewMsgNotFound() 632 633 length := len(msg.InvList) 634 // A decaying ban score increase is applied to prevent exhausting resources 635 // with unusually large inventory queries. 636 // Requesting more than the maximum inventory vector length within a short 637 // period of time yields a score above the default ban threshold. Sustained 638 // bursts of small requests are not penalized as that would potentially ban 639 // peers performing IBD. 640 // This incremental score decays each minute to half of its value. 641 sp.addBanScore(0, uint32(length)*99/wire.MaxInvPerMsg, "getdata") 642 643 // We wait on this wait channel periodically to prevent queuing 644 // far more data than we can send in a reasonable time, wasting memory. 645 // The waiting occurs after the database fetch for the next one to 646 // provide a little pipelining. 647 var waitChan chan struct{} 648 doneChan := make(chan struct{}, 1) 649 650 for i, iv := range msg.InvList { 651 var c chan struct{} 652 // If this will be the last message we send. 653 if i == length-1 && len(notFound.InvList) == 0 { 654 c = doneChan 655 } else if (i+1)%3 == 0 { 656 // Buffered so as to not make the send goroutine block. 657 c = make(chan struct{}, 1) 658 } 659 var err error 660 switch iv.Type { 661 case wire.InvTypeWitnessTx: 662 err = sp.server.pushTxMsg(sp, &iv.Hash, c, waitChan, wire.WitnessEncoding) 663 case wire.InvTypeTx: 664 err = sp.server.pushTxMsg(sp, &iv.Hash, c, waitChan, wire.BaseEncoding) 665 case wire.InvTypeWitnessBlock: 666 err = sp.server.pushBlockMsg(sp, &iv.Hash, c, waitChan, wire.WitnessEncoding) 667 case wire.InvTypeBlock: 668 err = sp.server.pushBlockMsg(sp, &iv.Hash, c, waitChan, wire.BaseEncoding) 669 case wire.InvTypeFilteredWitnessBlock: 670 err = sp.server.pushMerkleBlockMsg(sp, &iv.Hash, c, waitChan, wire.WitnessEncoding) 671 case wire.InvTypeFilteredBlock: 672 err = sp.server.pushMerkleBlockMsg(sp, &iv.Hash, c, waitChan, wire.BaseEncoding) 673 default: 674 peerLog.Warnf("Unknown type in inventory request %d", 675 iv.Type) 676 continue 677 } 678 if err != nil { 679 notFound.AddInvVect(iv) 680 681 // When there is a failure fetching the final entry 682 // and the done channel was sent in due to there 683 // being no outstanding not found inventory, consume 684 // it here because there is now not found inventory 685 // that will use the channel momentarily. 686 if i == len(msg.InvList)-1 && c != nil { 687 <-c 688 } 689 } 690 numAdded++ 691 waitChan = c 692 } 693 if len(notFound.InvList) != 0 { 694 sp.QueueMessage(notFound, doneChan) 695 } 696 697 // Wait for messages to be sent. We can send quite a lot of data at this 698 // point and this will keep the peer busy for a decent amount of time. 699 // We don't process anything else by them in this time so that we 700 // have an idea of when we should hear back from them - else the idle 701 // timeout could fire when we were only half done sending the blocks. 702 if numAdded > 0 { 703 <-doneChan 704 } 705} 706 707// OnGetBlocks is invoked when a peer receives a getblocks bitcoin 708// message. 709func (sp *serverPeer) OnGetBlocks(_ *peer.Peer, msg *wire.MsgGetBlocks) { 710 // Find the most recent known block in the best chain based on the block 711 // locator and fetch all of the block hashes after it until either 712 // wire.MaxBlocksPerMsg have been fetched or the provided stop hash is 713 // encountered. 714 // 715 // Use the block after the genesis block if no other blocks in the 716 // provided locator are known. This does mean the client will start 717 // over with the genesis block if unknown block locators are provided. 718 // 719 // This mirrors the behavior in the reference implementation. 720 chain := sp.server.chain 721 hashList := chain.LocateBlocks(msg.BlockLocatorHashes, &msg.HashStop, 722 wire.MaxBlocksPerMsg) 723 724 // Generate inventory message. 725 invMsg := wire.NewMsgInv() 726 for i := range hashList { 727 iv := wire.NewInvVect(wire.InvTypeBlock, &hashList[i]) 728 invMsg.AddInvVect(iv) 729 } 730 731 // Send the inventory message if there is anything to send. 732 if len(invMsg.InvList) > 0 { 733 invListLen := len(invMsg.InvList) 734 if invListLen == wire.MaxBlocksPerMsg { 735 // Intentionally use a copy of the final hash so there 736 // is not a reference into the inventory slice which 737 // would prevent the entire slice from being eligible 738 // for GC as soon as it's sent. 739 continueHash := invMsg.InvList[invListLen-1].Hash 740 sp.continueHash = &continueHash 741 } 742 sp.QueueMessage(invMsg, nil) 743 } 744} 745 746// OnGetHeaders is invoked when a peer receives a getheaders bitcoin 747// message. 748func (sp *serverPeer) OnGetHeaders(_ *peer.Peer, msg *wire.MsgGetHeaders) { 749 // Ignore getheaders requests if not in sync. 750 if !sp.server.syncManager.IsCurrent() { 751 return 752 } 753 754 // Find the most recent known block in the best chain based on the block 755 // locator and fetch all of the headers after it until either 756 // wire.MaxBlockHeadersPerMsg have been fetched or the provided stop 757 // hash is encountered. 758 // 759 // Use the block after the genesis block if no other blocks in the 760 // provided locator are known. This does mean the client will start 761 // over with the genesis block if unknown block locators are provided. 762 // 763 // This mirrors the behavior in the reference implementation. 764 chain := sp.server.chain 765 headers := chain.LocateHeaders(msg.BlockLocatorHashes, &msg.HashStop) 766 767 // Send found headers to the requesting peer. 768 blockHeaders := make([]*wire.BlockHeader, len(headers)) 769 for i := range headers { 770 blockHeaders[i] = &headers[i] 771 } 772 sp.QueueMessage(&wire.MsgHeaders{Headers: blockHeaders}, nil) 773} 774 775// OnGetCFilters is invoked when a peer receives a getcfilters bitcoin message. 776func (sp *serverPeer) OnGetCFilters(_ *peer.Peer, msg *wire.MsgGetCFilters) { 777 // Ignore getcfilters requests if not in sync. 778 if !sp.server.syncManager.IsCurrent() { 779 return 780 } 781 782 // We'll also ensure that the remote party is requesting a set of 783 // filters that we actually currently maintain. 784 switch msg.FilterType { 785 case wire.GCSFilterRegular: 786 break 787 788 default: 789 peerLog.Debug("Filter request for unknown filter: %v", 790 msg.FilterType) 791 return 792 } 793 794 hashes, err := sp.server.chain.HeightToHashRange( 795 int32(msg.StartHeight), &msg.StopHash, wire.MaxGetCFiltersReqRange, 796 ) 797 if err != nil { 798 peerLog.Debugf("Invalid getcfilters request: %v", err) 799 return 800 } 801 802 // Create []*chainhash.Hash from []chainhash.Hash to pass to 803 // FiltersByBlockHashes. 804 hashPtrs := make([]*chainhash.Hash, len(hashes)) 805 for i := range hashes { 806 hashPtrs[i] = &hashes[i] 807 } 808 809 filters, err := sp.server.cfIndex.FiltersByBlockHashes( 810 hashPtrs, msg.FilterType, 811 ) 812 if err != nil { 813 peerLog.Errorf("Error retrieving cfilters: %v", err) 814 return 815 } 816 817 for i, filterBytes := range filters { 818 if len(filterBytes) == 0 { 819 peerLog.Warnf("Could not obtain cfilter for %v", 820 hashes[i]) 821 return 822 } 823 824 filterMsg := wire.NewMsgCFilter( 825 msg.FilterType, &hashes[i], filterBytes, 826 ) 827 sp.QueueMessage(filterMsg, nil) 828 } 829} 830 831// OnGetCFHeaders is invoked when a peer receives a getcfheader bitcoin message. 832func (sp *serverPeer) OnGetCFHeaders(_ *peer.Peer, msg *wire.MsgGetCFHeaders) { 833 // Ignore getcfilterheader requests if not in sync. 834 if !sp.server.syncManager.IsCurrent() { 835 return 836 } 837 838 // We'll also ensure that the remote party is requesting a set of 839 // headers for filters that we actually currently maintain. 840 switch msg.FilterType { 841 case wire.GCSFilterRegular: 842 break 843 844 default: 845 peerLog.Debug("Filter request for unknown headers for "+ 846 "filter: %v", msg.FilterType) 847 return 848 } 849 850 startHeight := int32(msg.StartHeight) 851 maxResults := wire.MaxCFHeadersPerMsg 852 853 // If StartHeight is positive, fetch the predecessor block hash so we 854 // can populate the PrevFilterHeader field. 855 if msg.StartHeight > 0 { 856 startHeight-- 857 maxResults++ 858 } 859 860 // Fetch the hashes from the block index. 861 hashList, err := sp.server.chain.HeightToHashRange( 862 startHeight, &msg.StopHash, maxResults, 863 ) 864 if err != nil { 865 peerLog.Debugf("Invalid getcfheaders request: %v", err) 866 } 867 868 // This is possible if StartHeight is one greater that the height of 869 // StopHash, and we pull a valid range of hashes including the previous 870 // filter header. 871 if len(hashList) == 0 || (msg.StartHeight > 0 && len(hashList) == 1) { 872 peerLog.Debug("No results for getcfheaders request") 873 return 874 } 875 876 // Create []*chainhash.Hash from []chainhash.Hash to pass to 877 // FilterHeadersByBlockHashes. 878 hashPtrs := make([]*chainhash.Hash, len(hashList)) 879 for i := range hashList { 880 hashPtrs[i] = &hashList[i] 881 } 882 883 // Fetch the raw filter hash bytes from the database for all blocks. 884 filterHashes, err := sp.server.cfIndex.FilterHashesByBlockHashes( 885 hashPtrs, msg.FilterType, 886 ) 887 if err != nil { 888 peerLog.Errorf("Error retrieving cfilter hashes: %v", err) 889 return 890 } 891 892 // Generate cfheaders message and send it. 893 headersMsg := wire.NewMsgCFHeaders() 894 895 // Populate the PrevFilterHeader field. 896 if msg.StartHeight > 0 { 897 prevBlockHash := &hashList[0] 898 899 // Fetch the raw committed filter header bytes from the 900 // database. 901 headerBytes, err := sp.server.cfIndex.FilterHeaderByBlockHash( 902 prevBlockHash, msg.FilterType) 903 if err != nil { 904 peerLog.Errorf("Error retrieving CF header: %v", err) 905 return 906 } 907 if len(headerBytes) == 0 { 908 peerLog.Warnf("Could not obtain CF header for %v", prevBlockHash) 909 return 910 } 911 912 // Deserialize the hash into PrevFilterHeader. 913 err = headersMsg.PrevFilterHeader.SetBytes(headerBytes) 914 if err != nil { 915 peerLog.Warnf("Committed filter header deserialize "+ 916 "failed: %v", err) 917 return 918 } 919 920 hashList = hashList[1:] 921 filterHashes = filterHashes[1:] 922 } 923 924 // Populate HeaderHashes. 925 for i, hashBytes := range filterHashes { 926 if len(hashBytes) == 0 { 927 peerLog.Warnf("Could not obtain CF hash for %v", hashList[i]) 928 return 929 } 930 931 // Deserialize the hash. 932 filterHash, err := chainhash.NewHash(hashBytes) 933 if err != nil { 934 peerLog.Warnf("Committed filter hash deserialize "+ 935 "failed: %v", err) 936 return 937 } 938 939 headersMsg.AddCFHash(filterHash) 940 } 941 942 headersMsg.FilterType = msg.FilterType 943 headersMsg.StopHash = msg.StopHash 944 945 sp.QueueMessage(headersMsg, nil) 946} 947 948// OnGetCFCheckpt is invoked when a peer receives a getcfcheckpt bitcoin message. 949func (sp *serverPeer) OnGetCFCheckpt(_ *peer.Peer, msg *wire.MsgGetCFCheckpt) { 950 // Ignore getcfcheckpt requests if not in sync. 951 if !sp.server.syncManager.IsCurrent() { 952 return 953 } 954 955 // We'll also ensure that the remote party is requesting a set of 956 // checkpoints for filters that we actually currently maintain. 957 switch msg.FilterType { 958 case wire.GCSFilterRegular: 959 break 960 961 default: 962 peerLog.Debug("Filter request for unknown checkpoints for "+ 963 "filter: %v", msg.FilterType) 964 return 965 } 966 967 // Now that we know the client is fetching a filter that we know of, 968 // we'll fetch the block hashes et each check point interval so we can 969 // compare against our cache, and create new check points if necessary. 970 blockHashes, err := sp.server.chain.IntervalBlockHashes( 971 &msg.StopHash, wire.CFCheckptInterval, 972 ) 973 if err != nil { 974 peerLog.Debugf("Invalid getcfilters request: %v", err) 975 return 976 } 977 978 checkptMsg := wire.NewMsgCFCheckpt( 979 msg.FilterType, &msg.StopHash, len(blockHashes), 980 ) 981 982 // Fetch the current existing cache so we can decide if we need to 983 // extend it or if its adequate as is. 984 sp.server.cfCheckptCachesMtx.RLock() 985 checkptCache := sp.server.cfCheckptCaches[msg.FilterType] 986 987 // If the set of block hashes is beyond the current size of the cache, 988 // then we'll expand the size of the cache and also retain the write 989 // lock. 990 var updateCache bool 991 if len(blockHashes) > len(checkptCache) { 992 // Now that we know we'll need to modify the size of the cache, 993 // we'll release the read lock and grab the write lock to 994 // possibly expand the cache size. 995 sp.server.cfCheckptCachesMtx.RUnlock() 996 997 sp.server.cfCheckptCachesMtx.Lock() 998 defer sp.server.cfCheckptCachesMtx.Unlock() 999 1000 // Now that we have the write lock, we'll check again as it's 1001 // possible that the cache has already been expanded. 1002 checkptCache = sp.server.cfCheckptCaches[msg.FilterType] 1003 1004 // If we still need to expand the cache, then We'll mark that 1005 // we need to update the cache for below and also expand the 1006 // size of the cache in place. 1007 if len(blockHashes) > len(checkptCache) { 1008 updateCache = true 1009 1010 additionalLength := len(blockHashes) - len(checkptCache) 1011 newEntries := make([]cfHeaderKV, additionalLength) 1012 1013 peerLog.Infof("Growing size of checkpoint cache from %v to %v "+ 1014 "block hashes", len(checkptCache), len(blockHashes)) 1015 1016 checkptCache = append( 1017 sp.server.cfCheckptCaches[msg.FilterType], 1018 newEntries..., 1019 ) 1020 } 1021 } else { 1022 // Otherwise, we'll hold onto the read lock for the remainder 1023 // of this method. 1024 defer sp.server.cfCheckptCachesMtx.RUnlock() 1025 1026 peerLog.Tracef("Serving stale cache of size %v", 1027 len(checkptCache)) 1028 } 1029 1030 // Now that we know the cache is of an appropriate size, we'll iterate 1031 // backwards until the find the block hash. We do this as it's possible 1032 // a re-org has occurred so items in the db are now in the main china 1033 // while the cache has been partially invalidated. 1034 var forkIdx int 1035 for forkIdx = len(blockHashes); forkIdx > 0; forkIdx-- { 1036 if checkptCache[forkIdx-1].blockHash == blockHashes[forkIdx-1] { 1037 break 1038 } 1039 } 1040 1041 // Now that we know the how much of the cache is relevant for this 1042 // query, we'll populate our check point message with the cache as is. 1043 // Shortly below, we'll populate the new elements of the cache. 1044 for i := 0; i < forkIdx; i++ { 1045 checkptMsg.AddCFHeader(&checkptCache[i].filterHeader) 1046 } 1047 1048 // We'll now collect the set of hashes that are beyond our cache so we 1049 // can look up the filter headers to populate the final cache. 1050 blockHashPtrs := make([]*chainhash.Hash, 0, len(blockHashes)-forkIdx) 1051 for i := forkIdx; i < len(blockHashes); i++ { 1052 blockHashPtrs = append(blockHashPtrs, &blockHashes[i]) 1053 } 1054 filterHeaders, err := sp.server.cfIndex.FilterHeadersByBlockHashes( 1055 blockHashPtrs, msg.FilterType, 1056 ) 1057 if err != nil { 1058 peerLog.Errorf("Error retrieving cfilter headers: %v", err) 1059 return 1060 } 1061 1062 // Now that we have the full set of filter headers, we'll add them to 1063 // the checkpoint message, and also update our cache in line. 1064 for i, filterHeaderBytes := range filterHeaders { 1065 if len(filterHeaderBytes) == 0 { 1066 peerLog.Warnf("Could not obtain CF header for %v", 1067 blockHashPtrs[i]) 1068 return 1069 } 1070 1071 filterHeader, err := chainhash.NewHash(filterHeaderBytes) 1072 if err != nil { 1073 peerLog.Warnf("Committed filter header deserialize "+ 1074 "failed: %v", err) 1075 return 1076 } 1077 1078 checkptMsg.AddCFHeader(filterHeader) 1079 1080 // If the new main chain is longer than what's in the cache, 1081 // then we'll override it beyond the fork point. 1082 if updateCache { 1083 checkptCache[forkIdx+i] = cfHeaderKV{ 1084 blockHash: blockHashes[forkIdx+i], 1085 filterHeader: *filterHeader, 1086 } 1087 } 1088 } 1089 1090 // Finally, we'll update the cache if we need to, and send the final 1091 // message back to the requesting peer. 1092 if updateCache { 1093 sp.server.cfCheckptCaches[msg.FilterType] = checkptCache 1094 } 1095 1096 sp.QueueMessage(checkptMsg, nil) 1097} 1098 1099// enforceNodeBloomFlag disconnects the peer if the server is not configured to 1100// allow bloom filters. Additionally, if the peer has negotiated to a protocol 1101// version that is high enough to observe the bloom filter service support bit, 1102// it will be banned since it is intentionally violating the protocol. 1103func (sp *serverPeer) enforceNodeBloomFlag(cmd string) bool { 1104 if sp.server.services&wire.SFNodeBloom != wire.SFNodeBloom { 1105 // Ban the peer if the protocol version is high enough that the 1106 // peer is knowingly violating the protocol and banning is 1107 // enabled. 1108 // 1109 // NOTE: Even though the addBanScore function already examines 1110 // whether or not banning is enabled, it is checked here as well 1111 // to ensure the violation is logged and the peer is 1112 // disconnected regardless. 1113 if sp.ProtocolVersion() >= wire.BIP0111Version && 1114 !cfg.DisableBanning { 1115 1116 // Disconnect the peer regardless of whether it was 1117 // banned. 1118 sp.addBanScore(100, 0, cmd) 1119 sp.Disconnect() 1120 return false 1121 } 1122 1123 // Disconnect the peer regardless of protocol version or banning 1124 // state. 1125 peerLog.Debugf("%s sent an unsupported %s request -- "+ 1126 "disconnecting", sp, cmd) 1127 sp.Disconnect() 1128 return false 1129 } 1130 1131 return true 1132} 1133 1134// OnFeeFilter is invoked when a peer receives a feefilter bitcoin message and 1135// is used by remote peers to request that no transactions which have a fee rate 1136// lower than provided value are inventoried to them. The peer will be 1137// disconnected if an invalid fee filter value is provided. 1138func (sp *serverPeer) OnFeeFilter(_ *peer.Peer, msg *wire.MsgFeeFilter) { 1139 // Check that the passed minimum fee is a valid amount. 1140 if msg.MinFee < 0 || msg.MinFee > btcutil.MaxSatoshi { 1141 peerLog.Debugf("Peer %v sent an invalid feefilter '%v' -- "+ 1142 "disconnecting", sp, btcutil.Amount(msg.MinFee)) 1143 sp.Disconnect() 1144 return 1145 } 1146 1147 atomic.StoreInt64(&sp.feeFilter, msg.MinFee) 1148} 1149 1150// OnFilterAdd is invoked when a peer receives a filteradd bitcoin 1151// message and is used by remote peers to add data to an already loaded bloom 1152// filter. The peer will be disconnected if a filter is not loaded when this 1153// message is received or the server is not configured to allow bloom filters. 1154func (sp *serverPeer) OnFilterAdd(_ *peer.Peer, msg *wire.MsgFilterAdd) { 1155 // Disconnect and/or ban depending on the node bloom services flag and 1156 // negotiated protocol version. 1157 if !sp.enforceNodeBloomFlag(msg.Command()) { 1158 return 1159 } 1160 1161 if !sp.filter.IsLoaded() { 1162 peerLog.Debugf("%s sent a filteradd request with no filter "+ 1163 "loaded -- disconnecting", sp) 1164 sp.Disconnect() 1165 return 1166 } 1167 1168 sp.filter.Add(msg.Data) 1169} 1170 1171// OnFilterClear is invoked when a peer receives a filterclear bitcoin 1172// message and is used by remote peers to clear an already loaded bloom filter. 1173// The peer will be disconnected if a filter is not loaded when this message is 1174// received or the server is not configured to allow bloom filters. 1175func (sp *serverPeer) OnFilterClear(_ *peer.Peer, msg *wire.MsgFilterClear) { 1176 // Disconnect and/or ban depending on the node bloom services flag and 1177 // negotiated protocol version. 1178 if !sp.enforceNodeBloomFlag(msg.Command()) { 1179 return 1180 } 1181 1182 if !sp.filter.IsLoaded() { 1183 peerLog.Debugf("%s sent a filterclear request with no "+ 1184 "filter loaded -- disconnecting", sp) 1185 sp.Disconnect() 1186 return 1187 } 1188 1189 sp.filter.Unload() 1190} 1191 1192// OnFilterLoad is invoked when a peer receives a filterload bitcoin 1193// message and it used to load a bloom filter that should be used for 1194// delivering merkle blocks and associated transactions that match the filter. 1195// The peer will be disconnected if the server is not configured to allow bloom 1196// filters. 1197func (sp *serverPeer) OnFilterLoad(_ *peer.Peer, msg *wire.MsgFilterLoad) { 1198 // Disconnect and/or ban depending on the node bloom services flag and 1199 // negotiated protocol version. 1200 if !sp.enforceNodeBloomFlag(msg.Command()) { 1201 return 1202 } 1203 1204 sp.setDisableRelayTx(false) 1205 1206 sp.filter.Reload(msg) 1207} 1208 1209// OnGetAddr is invoked when a peer receives a getaddr bitcoin message 1210// and is used to provide the peer with known addresses from the address 1211// manager. 1212func (sp *serverPeer) OnGetAddr(_ *peer.Peer, msg *wire.MsgGetAddr) { 1213 // Don't return any addresses when running on the simulation test 1214 // network. This helps prevent the network from becoming another 1215 // public test network since it will not be able to learn about other 1216 // peers that have not specifically been provided. 1217 if cfg.SimNet { 1218 return 1219 } 1220 1221 // Do not accept getaddr requests from outbound peers. This reduces 1222 // fingerprinting attacks. 1223 if !sp.Inbound() { 1224 peerLog.Debugf("Ignoring getaddr request from outbound peer "+ 1225 "%v", sp) 1226 return 1227 } 1228 1229 // Only allow one getaddr request per connection to discourage 1230 // address stamping of inv announcements. 1231 if sp.sentAddrs { 1232 peerLog.Debugf("Ignoring repeated getaddr request from peer "+ 1233 "%v", sp) 1234 return 1235 } 1236 sp.sentAddrs = true 1237 1238 // Get the current known addresses from the address manager. 1239 addrCache := sp.server.addrManager.AddressCache() 1240 1241 // Push the addresses. 1242 sp.pushAddrMsg(addrCache) 1243} 1244 1245// OnAddr is invoked when a peer receives an addr bitcoin message and is 1246// used to notify the server about advertised addresses. 1247func (sp *serverPeer) OnAddr(_ *peer.Peer, msg *wire.MsgAddr) { 1248 // Ignore addresses when running on the simulation test network. This 1249 // helps prevent the network from becoming another public test network 1250 // since it will not be able to learn about other peers that have not 1251 // specifically been provided. 1252 if cfg.SimNet { 1253 return 1254 } 1255 1256 // Ignore old style addresses which don't include a timestamp. 1257 if sp.ProtocolVersion() < wire.NetAddressTimeVersion { 1258 return 1259 } 1260 1261 // A message that has no addresses is invalid. 1262 if len(msg.AddrList) == 0 { 1263 peerLog.Errorf("Command [%s] from %s does not contain any addresses", 1264 msg.Command(), sp.Peer) 1265 sp.Disconnect() 1266 return 1267 } 1268 1269 for _, na := range msg.AddrList { 1270 // Don't add more address if we're disconnecting. 1271 if !sp.Connected() { 1272 return 1273 } 1274 1275 // Set the timestamp to 5 days ago if it's more than 24 hours 1276 // in the future so this address is one of the first to be 1277 // removed when space is needed. 1278 now := time.Now() 1279 if na.Timestamp.After(now.Add(time.Minute * 10)) { 1280 na.Timestamp = now.Add(-1 * time.Hour * 24 * 5) 1281 } 1282 1283 // Add address to known addresses for this peer. 1284 sp.addKnownAddresses([]*wire.NetAddress{na}) 1285 } 1286 1287 // Add addresses to server address manager. The address manager handles 1288 // the details of things such as preventing duplicate addresses, max 1289 // addresses, and last seen updates. 1290 // XXX bitcoind gives a 2 hour time penalty here, do we want to do the 1291 // same? 1292 sp.server.addrManager.AddAddresses(msg.AddrList, sp.NA()) 1293} 1294 1295// OnRead is invoked when a peer receives a message and it is used to update 1296// the bytes received by the server. 1297func (sp *serverPeer) OnRead(_ *peer.Peer, bytesRead int, msg wire.Message, err error) { 1298 sp.server.AddBytesReceived(uint64(bytesRead)) 1299} 1300 1301// OnWrite is invoked when a peer sends a message and it is used to update 1302// the bytes sent by the server. 1303func (sp *serverPeer) OnWrite(_ *peer.Peer, bytesWritten int, msg wire.Message, err error) { 1304 sp.server.AddBytesSent(uint64(bytesWritten)) 1305} 1306 1307// randomUint16Number returns a random uint16 in a specified input range. Note 1308// that the range is in zeroth ordering; if you pass it 1800, you will get 1309// values from 0 to 1800. 1310func randomUint16Number(max uint16) uint16 { 1311 // In order to avoid modulo bias and ensure every possible outcome in 1312 // [0, max) has equal probability, the random number must be sampled 1313 // from a random source that has a range limited to a multiple of the 1314 // modulus. 1315 var randomNumber uint16 1316 var limitRange = (math.MaxUint16 / max) * max 1317 for { 1318 binary.Read(rand.Reader, binary.LittleEndian, &randomNumber) 1319 if randomNumber < limitRange { 1320 return (randomNumber % max) 1321 } 1322 } 1323} 1324 1325// AddRebroadcastInventory adds 'iv' to the list of inventories to be 1326// rebroadcasted at random intervals until they show up in a block. 1327func (s *server) AddRebroadcastInventory(iv *wire.InvVect, data interface{}) { 1328 // Ignore if shutting down. 1329 if atomic.LoadInt32(&s.shutdown) != 0 { 1330 return 1331 } 1332 1333 s.modifyRebroadcastInv <- broadcastInventoryAdd{invVect: iv, data: data} 1334} 1335 1336// RemoveRebroadcastInventory removes 'iv' from the list of items to be 1337// rebroadcasted if present. 1338func (s *server) RemoveRebroadcastInventory(iv *wire.InvVect) { 1339 // Ignore if shutting down. 1340 if atomic.LoadInt32(&s.shutdown) != 0 { 1341 return 1342 } 1343 1344 s.modifyRebroadcastInv <- broadcastInventoryDel(iv) 1345} 1346 1347// relayTransactions generates and relays inventory vectors for all of the 1348// passed transactions to all connected peers. 1349func (s *server) relayTransactions(txns []*mempool.TxDesc) { 1350 for _, txD := range txns { 1351 iv := wire.NewInvVect(wire.InvTypeTx, txD.Tx.Hash()) 1352 s.RelayInventory(iv, txD) 1353 } 1354} 1355 1356// AnnounceNewTransactions generates and relays inventory vectors and notifies 1357// both websocket and getblocktemplate long poll clients of the passed 1358// transactions. This function should be called whenever new transactions 1359// are added to the mempool. 1360func (s *server) AnnounceNewTransactions(txns []*mempool.TxDesc) { 1361 // Generate and relay inventory vectors for all newly accepted 1362 // transactions. 1363 s.relayTransactions(txns) 1364 1365 // Notify both websocket and getblocktemplate long poll clients of all 1366 // newly accepted transactions. 1367 if s.rpcServer != nil { 1368 s.rpcServer.NotifyNewTransactions(txns) 1369 } 1370} 1371 1372// Transaction has one confirmation on the main chain. Now we can mark it as no 1373// longer needing rebroadcasting. 1374func (s *server) TransactionConfirmed(tx *btcutil.Tx) { 1375 // Rebroadcasting is only necessary when the RPC server is active. 1376 if s.rpcServer == nil { 1377 return 1378 } 1379 1380 iv := wire.NewInvVect(wire.InvTypeTx, tx.Hash()) 1381 s.RemoveRebroadcastInventory(iv) 1382} 1383 1384// pushTxMsg sends a tx message for the provided transaction hash to the 1385// connected peer. An error is returned if the transaction hash is not known. 1386func (s *server) pushTxMsg(sp *serverPeer, hash *chainhash.Hash, doneChan chan<- struct{}, 1387 waitChan <-chan struct{}, encoding wire.MessageEncoding) error { 1388 1389 // Attempt to fetch the requested transaction from the pool. A 1390 // call could be made to check for existence first, but simply trying 1391 // to fetch a missing transaction results in the same behavior. 1392 tx, err := s.txMemPool.FetchTransaction(hash) 1393 if err != nil { 1394 peerLog.Tracef("Unable to fetch tx %v from transaction "+ 1395 "pool: %v", hash, err) 1396 1397 if doneChan != nil { 1398 doneChan <- struct{}{} 1399 } 1400 return err 1401 } 1402 1403 // Once we have fetched data wait for any previous operation to finish. 1404 if waitChan != nil { 1405 <-waitChan 1406 } 1407 1408 sp.QueueMessageWithEncoding(tx.MsgTx(), doneChan, encoding) 1409 1410 return nil 1411} 1412 1413// pushBlockMsg sends a block message for the provided block hash to the 1414// connected peer. An error is returned if the block hash is not known. 1415func (s *server) pushBlockMsg(sp *serverPeer, hash *chainhash.Hash, doneChan chan<- struct{}, 1416 waitChan <-chan struct{}, encoding wire.MessageEncoding) error { 1417 1418 // Fetch the raw block bytes from the database. 1419 var blockBytes []byte 1420 err := sp.server.db.View(func(dbTx database.Tx) error { 1421 var err error 1422 blockBytes, err = dbTx.FetchBlock(hash) 1423 return err 1424 }) 1425 if err != nil { 1426 peerLog.Tracef("Unable to fetch requested block hash %v: %v", 1427 hash, err) 1428 1429 if doneChan != nil { 1430 doneChan <- struct{}{} 1431 } 1432 return err 1433 } 1434 1435 // Deserialize the block. 1436 var msgBlock wire.MsgBlock 1437 err = msgBlock.Deserialize(bytes.NewReader(blockBytes)) 1438 if err != nil { 1439 peerLog.Tracef("Unable to deserialize requested block hash "+ 1440 "%v: %v", hash, err) 1441 1442 if doneChan != nil { 1443 doneChan <- struct{}{} 1444 } 1445 return err 1446 } 1447 1448 // Once we have fetched data wait for any previous operation to finish. 1449 if waitChan != nil { 1450 <-waitChan 1451 } 1452 1453 // We only send the channel for this message if we aren't sending 1454 // an inv straight after. 1455 var dc chan<- struct{} 1456 continueHash := sp.continueHash 1457 sendInv := continueHash != nil && continueHash.IsEqual(hash) 1458 if !sendInv { 1459 dc = doneChan 1460 } 1461 sp.QueueMessageWithEncoding(&msgBlock, dc, encoding) 1462 1463 // When the peer requests the final block that was advertised in 1464 // response to a getblocks message which requested more blocks than 1465 // would fit into a single message, send it a new inventory message 1466 // to trigger it to issue another getblocks message for the next 1467 // batch of inventory. 1468 if sendInv { 1469 best := sp.server.chain.BestSnapshot() 1470 invMsg := wire.NewMsgInvSizeHint(1) 1471 iv := wire.NewInvVect(wire.InvTypeBlock, &best.Hash) 1472 invMsg.AddInvVect(iv) 1473 sp.QueueMessage(invMsg, doneChan) 1474 sp.continueHash = nil 1475 } 1476 return nil 1477} 1478 1479// pushMerkleBlockMsg sends a merkleblock message for the provided block hash to 1480// the connected peer. Since a merkle block requires the peer to have a filter 1481// loaded, this call will simply be ignored if there is no filter loaded. An 1482// error is returned if the block hash is not known. 1483func (s *server) pushMerkleBlockMsg(sp *serverPeer, hash *chainhash.Hash, 1484 doneChan chan<- struct{}, waitChan <-chan struct{}, encoding wire.MessageEncoding) error { 1485 1486 // Do not send a response if the peer doesn't have a filter loaded. 1487 if !sp.filter.IsLoaded() { 1488 if doneChan != nil { 1489 doneChan <- struct{}{} 1490 } 1491 return nil 1492 } 1493 1494 // Fetch the raw block bytes from the database. 1495 blk, err := sp.server.chain.BlockByHash(hash) 1496 if err != nil { 1497 peerLog.Tracef("Unable to fetch requested block hash %v: %v", 1498 hash, err) 1499 1500 if doneChan != nil { 1501 doneChan <- struct{}{} 1502 } 1503 return err 1504 } 1505 1506 // Generate a merkle block by filtering the requested block according 1507 // to the filter for the peer. 1508 merkle, matchedTxIndices := bloom.NewMerkleBlock(blk, sp.filter) 1509 1510 // Once we have fetched data wait for any previous operation to finish. 1511 if waitChan != nil { 1512 <-waitChan 1513 } 1514 1515 // Send the merkleblock. Only send the done channel with this message 1516 // if no transactions will be sent afterwards. 1517 var dc chan<- struct{} 1518 if len(matchedTxIndices) == 0 { 1519 dc = doneChan 1520 } 1521 sp.QueueMessage(merkle, dc) 1522 1523 // Finally, send any matched transactions. 1524 blkTransactions := blk.MsgBlock().Transactions 1525 for i, txIndex := range matchedTxIndices { 1526 // Only send the done channel on the final transaction. 1527 var dc chan<- struct{} 1528 if i == len(matchedTxIndices)-1 { 1529 dc = doneChan 1530 } 1531 if txIndex < uint32(len(blkTransactions)) { 1532 sp.QueueMessageWithEncoding(blkTransactions[txIndex], dc, 1533 encoding) 1534 } 1535 } 1536 1537 return nil 1538} 1539 1540// handleUpdatePeerHeight updates the heights of all peers who were known to 1541// announce a block we recently accepted. 1542func (s *server) handleUpdatePeerHeights(state *peerState, umsg updatePeerHeightsMsg) { 1543 state.forAllPeers(func(sp *serverPeer) { 1544 // The origin peer should already have the updated height. 1545 if sp.Peer == umsg.originPeer { 1546 return 1547 } 1548 1549 // This is a pointer to the underlying memory which doesn't 1550 // change. 1551 latestBlkHash := sp.LastAnnouncedBlock() 1552 1553 // Skip this peer if it hasn't recently announced any new blocks. 1554 if latestBlkHash == nil { 1555 return 1556 } 1557 1558 // If the peer has recently announced a block, and this block 1559 // matches our newly accepted block, then update their block 1560 // height. 1561 if *latestBlkHash == *umsg.newHash { 1562 sp.UpdateLastBlockHeight(umsg.newHeight) 1563 sp.UpdateLastAnnouncedBlock(nil) 1564 } 1565 }) 1566} 1567 1568// handleAddPeerMsg deals with adding new peers. It is invoked from the 1569// peerHandler goroutine. 1570func (s *server) handleAddPeerMsg(state *peerState, sp *serverPeer) bool { 1571 if sp == nil || !sp.Connected() { 1572 return false 1573 } 1574 1575 // Disconnect peers with unwanted user agents. 1576 if sp.HasUndesiredUserAgent(s.agentBlacklist, s.agentWhitelist) { 1577 sp.Disconnect() 1578 return false 1579 } 1580 1581 // Ignore new peers if we're shutting down. 1582 if atomic.LoadInt32(&s.shutdown) != 0 { 1583 srvrLog.Infof("New peer %s ignored - server is shutting down", sp) 1584 sp.Disconnect() 1585 return false 1586 } 1587 1588 // Disconnect banned peers. 1589 host, _, err := net.SplitHostPort(sp.Addr()) 1590 if err != nil { 1591 srvrLog.Debugf("can't split hostport %v", err) 1592 sp.Disconnect() 1593 return false 1594 } 1595 if banEnd, ok := state.banned[host]; ok { 1596 if time.Now().Before(banEnd) { 1597 srvrLog.Debugf("Peer %s is banned for another %v - disconnecting", 1598 host, time.Until(banEnd)) 1599 sp.Disconnect() 1600 return false 1601 } 1602 1603 srvrLog.Infof("Peer %s is no longer banned", host) 1604 delete(state.banned, host) 1605 } 1606 1607 // TODO: Check for max peers from a single IP. 1608 1609 // Limit max number of total peers. 1610 if state.Count() >= cfg.MaxPeers { 1611 srvrLog.Infof("Max peers reached [%d] - disconnecting peer %s", 1612 cfg.MaxPeers, sp) 1613 sp.Disconnect() 1614 // TODO: how to handle permanent peers here? 1615 // they should be rescheduled. 1616 return false 1617 } 1618 1619 // Add the new peer and start it. 1620 srvrLog.Debugf("New peer %s", sp) 1621 if sp.Inbound() { 1622 state.inboundPeers[sp.ID()] = sp 1623 } else { 1624 state.outboundGroups[addrmgr.GroupKey(sp.NA())]++ 1625 if sp.persistent { 1626 state.persistentPeers[sp.ID()] = sp 1627 } else { 1628 state.outboundPeers[sp.ID()] = sp 1629 } 1630 } 1631 1632 // Update the address' last seen time if the peer has acknowledged 1633 // our version and has sent us its version as well. 1634 if sp.VerAckReceived() && sp.VersionKnown() && sp.NA() != nil { 1635 s.addrManager.Connected(sp.NA()) 1636 } 1637 1638 // Signal the sync manager this peer is a new sync candidate. 1639 s.syncManager.NewPeer(sp.Peer) 1640 1641 // Update the address manager and request known addresses from the 1642 // remote peer for outbound connections. This is skipped when running on 1643 // the simulation test network since it is only intended to connect to 1644 // specified peers and actively avoids advertising and connecting to 1645 // discovered peers. 1646 if !cfg.SimNet && !sp.Inbound() { 1647 // Advertise the local address when the server accepts incoming 1648 // connections and it believes itself to be close to the best 1649 // known tip. 1650 if !cfg.DisableListen && s.syncManager.IsCurrent() { 1651 // Get address that best matches. 1652 lna := s.addrManager.GetBestLocalAddress(sp.NA()) 1653 if addrmgr.IsRoutable(lna) { 1654 // Filter addresses the peer already knows about. 1655 addresses := []*wire.NetAddress{lna} 1656 sp.pushAddrMsg(addresses) 1657 } 1658 } 1659 1660 // Request known addresses if the server address manager needs 1661 // more and the peer has a protocol version new enough to 1662 // include a timestamp with addresses. 1663 hasTimestamp := sp.ProtocolVersion() >= wire.NetAddressTimeVersion 1664 if s.addrManager.NeedMoreAddresses() && hasTimestamp { 1665 sp.QueueMessage(wire.NewMsgGetAddr(), nil) 1666 } 1667 1668 // Mark the address as a known good address. 1669 s.addrManager.Good(sp.NA()) 1670 } 1671 1672 return true 1673} 1674 1675// handleDonePeerMsg deals with peers that have signalled they are done. It is 1676// invoked from the peerHandler goroutine. 1677func (s *server) handleDonePeerMsg(state *peerState, sp *serverPeer) { 1678 var list map[int32]*serverPeer 1679 if sp.persistent { 1680 list = state.persistentPeers 1681 } else if sp.Inbound() { 1682 list = state.inboundPeers 1683 } else { 1684 list = state.outboundPeers 1685 } 1686 1687 // Regardless of whether the peer was found in our list, we'll inform 1688 // our connection manager about the disconnection. This can happen if we 1689 // process a peer's `done` message before its `add`. 1690 if !sp.Inbound() { 1691 if sp.persistent { 1692 s.connManager.Disconnect(sp.connReq.ID()) 1693 } else { 1694 s.connManager.Remove(sp.connReq.ID()) 1695 go s.connManager.NewConnReq() 1696 } 1697 } 1698 1699 if _, ok := list[sp.ID()]; ok { 1700 if !sp.Inbound() && sp.VersionKnown() { 1701 state.outboundGroups[addrmgr.GroupKey(sp.NA())]-- 1702 } 1703 delete(list, sp.ID()) 1704 srvrLog.Debugf("Removed peer %s", sp) 1705 return 1706 } 1707} 1708 1709// handleBanPeerMsg deals with banning peers. It is invoked from the 1710// peerHandler goroutine. 1711func (s *server) handleBanPeerMsg(state *peerState, sp *serverPeer) { 1712 host, _, err := net.SplitHostPort(sp.Addr()) 1713 if err != nil { 1714 srvrLog.Debugf("can't split ban peer %s %v", sp.Addr(), err) 1715 return 1716 } 1717 direction := directionString(sp.Inbound()) 1718 srvrLog.Infof("Banned peer %s (%s) for %v", host, direction, 1719 cfg.BanDuration) 1720 state.banned[host] = time.Now().Add(cfg.BanDuration) 1721} 1722 1723// handleRelayInvMsg deals with relaying inventory to peers that are not already 1724// known to have it. It is invoked from the peerHandler goroutine. 1725func (s *server) handleRelayInvMsg(state *peerState, msg relayMsg) { 1726 state.forAllPeers(func(sp *serverPeer) { 1727 if !sp.Connected() { 1728 return 1729 } 1730 1731 // If the inventory is a block and the peer prefers headers, 1732 // generate and send a headers message instead of an inventory 1733 // message. 1734 if msg.invVect.Type == wire.InvTypeBlock && sp.WantsHeaders() { 1735 blockHeader, ok := msg.data.(wire.BlockHeader) 1736 if !ok { 1737 peerLog.Warnf("Underlying data for headers" + 1738 " is not a block header") 1739 return 1740 } 1741 msgHeaders := wire.NewMsgHeaders() 1742 if err := msgHeaders.AddBlockHeader(&blockHeader); err != nil { 1743 peerLog.Errorf("Failed to add block"+ 1744 " header: %v", err) 1745 return 1746 } 1747 sp.QueueMessage(msgHeaders, nil) 1748 return 1749 } 1750 1751 if msg.invVect.Type == wire.InvTypeTx { 1752 // Don't relay the transaction to the peer when it has 1753 // transaction relaying disabled. 1754 if sp.relayTxDisabled() { 1755 return 1756 } 1757 1758 txD, ok := msg.data.(*mempool.TxDesc) 1759 if !ok { 1760 peerLog.Warnf("Underlying data for tx inv "+ 1761 "relay is not a *mempool.TxDesc: %T", 1762 msg.data) 1763 return 1764 } 1765 1766 // Don't relay the transaction if the transaction fee-per-kb 1767 // is less than the peer's feefilter. 1768 feeFilter := atomic.LoadInt64(&sp.feeFilter) 1769 if feeFilter > 0 && txD.FeePerKB < feeFilter { 1770 return 1771 } 1772 1773 // Don't relay the transaction if there is a bloom 1774 // filter loaded and the transaction doesn't match it. 1775 if sp.filter.IsLoaded() { 1776 if !sp.filter.MatchTxAndUpdate(txD.Tx) { 1777 return 1778 } 1779 } 1780 } 1781 1782 // Queue the inventory to be relayed with the next batch. 1783 // It will be ignored if the peer is already known to 1784 // have the inventory. 1785 sp.QueueInventory(msg.invVect) 1786 }) 1787} 1788 1789// handleBroadcastMsg deals with broadcasting messages to peers. It is invoked 1790// from the peerHandler goroutine. 1791func (s *server) handleBroadcastMsg(state *peerState, bmsg *broadcastMsg) { 1792 state.forAllPeers(func(sp *serverPeer) { 1793 if !sp.Connected() { 1794 return 1795 } 1796 1797 for _, ep := range bmsg.excludePeers { 1798 if sp == ep { 1799 return 1800 } 1801 } 1802 1803 sp.QueueMessage(bmsg.message, nil) 1804 }) 1805} 1806 1807type getConnCountMsg struct { 1808 reply chan int32 1809} 1810 1811type getPeersMsg struct { 1812 reply chan []*serverPeer 1813} 1814 1815type getOutboundGroup struct { 1816 key string 1817 reply chan int 1818} 1819 1820type getAddedNodesMsg struct { 1821 reply chan []*serverPeer 1822} 1823 1824type disconnectNodeMsg struct { 1825 cmp func(*serverPeer) bool 1826 reply chan error 1827} 1828 1829type connectNodeMsg struct { 1830 addr string 1831 permanent bool 1832 reply chan error 1833} 1834 1835type removeNodeMsg struct { 1836 cmp func(*serverPeer) bool 1837 reply chan error 1838} 1839 1840// handleQuery is the central handler for all queries and commands from other 1841// goroutines related to peer state. 1842func (s *server) handleQuery(state *peerState, querymsg interface{}) { 1843 switch msg := querymsg.(type) { 1844 case getConnCountMsg: 1845 nconnected := int32(0) 1846 state.forAllPeers(func(sp *serverPeer) { 1847 if sp.Connected() { 1848 nconnected++ 1849 } 1850 }) 1851 msg.reply <- nconnected 1852 1853 case getPeersMsg: 1854 peers := make([]*serverPeer, 0, state.Count()) 1855 state.forAllPeers(func(sp *serverPeer) { 1856 if !sp.Connected() { 1857 return 1858 } 1859 peers = append(peers, sp) 1860 }) 1861 msg.reply <- peers 1862 1863 case connectNodeMsg: 1864 // TODO: duplicate oneshots? 1865 // Limit max number of total peers. 1866 if state.Count() >= cfg.MaxPeers { 1867 msg.reply <- errors.New("max peers reached") 1868 return 1869 } 1870 for _, peer := range state.persistentPeers { 1871 if peer.Addr() == msg.addr { 1872 if msg.permanent { 1873 msg.reply <- errors.New("peer already connected") 1874 } else { 1875 msg.reply <- errors.New("peer exists as a permanent peer") 1876 } 1877 return 1878 } 1879 } 1880 1881 netAddr, err := addrStringToNetAddr(msg.addr) 1882 if err != nil { 1883 msg.reply <- err 1884 return 1885 } 1886 1887 // TODO: if too many, nuke a non-perm peer. 1888 go s.connManager.Connect(&connmgr.ConnReq{ 1889 Addr: netAddr, 1890 Permanent: msg.permanent, 1891 }) 1892 msg.reply <- nil 1893 case removeNodeMsg: 1894 found := disconnectPeer(state.persistentPeers, msg.cmp, func(sp *serverPeer) { 1895 // Keep group counts ok since we remove from 1896 // the list now. 1897 state.outboundGroups[addrmgr.GroupKey(sp.NA())]-- 1898 }) 1899 1900 if found { 1901 msg.reply <- nil 1902 } else { 1903 msg.reply <- errors.New("peer not found") 1904 } 1905 case getOutboundGroup: 1906 count, ok := state.outboundGroups[msg.key] 1907 if ok { 1908 msg.reply <- count 1909 } else { 1910 msg.reply <- 0 1911 } 1912 // Request a list of the persistent (added) peers. 1913 case getAddedNodesMsg: 1914 // Respond with a slice of the relevant peers. 1915 peers := make([]*serverPeer, 0, len(state.persistentPeers)) 1916 for _, sp := range state.persistentPeers { 1917 peers = append(peers, sp) 1918 } 1919 msg.reply <- peers 1920 case disconnectNodeMsg: 1921 // Check inbound peers. We pass a nil callback since we don't 1922 // require any additional actions on disconnect for inbound peers. 1923 found := disconnectPeer(state.inboundPeers, msg.cmp, nil) 1924 if found { 1925 msg.reply <- nil 1926 return 1927 } 1928 1929 // Check outbound peers. 1930 found = disconnectPeer(state.outboundPeers, msg.cmp, func(sp *serverPeer) { 1931 // Keep group counts ok since we remove from 1932 // the list now. 1933 state.outboundGroups[addrmgr.GroupKey(sp.NA())]-- 1934 }) 1935 if found { 1936 // If there are multiple outbound connections to the same 1937 // ip:port, continue disconnecting them all until no such 1938 // peers are found. 1939 for found { 1940 found = disconnectPeer(state.outboundPeers, msg.cmp, func(sp *serverPeer) { 1941 state.outboundGroups[addrmgr.GroupKey(sp.NA())]-- 1942 }) 1943 } 1944 msg.reply <- nil 1945 return 1946 } 1947 1948 msg.reply <- errors.New("peer not found") 1949 } 1950} 1951 1952// disconnectPeer attempts to drop the connection of a targeted peer in the 1953// passed peer list. Targets are identified via usage of the passed 1954// `compareFunc`, which should return `true` if the passed peer is the target 1955// peer. This function returns true on success and false if the peer is unable 1956// to be located. If the peer is found, and the passed callback: `whenFound' 1957// isn't nil, we call it with the peer as the argument before it is removed 1958// from the peerList, and is disconnected from the server. 1959func disconnectPeer(peerList map[int32]*serverPeer, compareFunc func(*serverPeer) bool, whenFound func(*serverPeer)) bool { 1960 for addr, peer := range peerList { 1961 if compareFunc(peer) { 1962 if whenFound != nil { 1963 whenFound(peer) 1964 } 1965 1966 // This is ok because we are not continuing 1967 // to iterate so won't corrupt the loop. 1968 delete(peerList, addr) 1969 peer.Disconnect() 1970 return true 1971 } 1972 } 1973 return false 1974} 1975 1976// newPeerConfig returns the configuration for the given serverPeer. 1977func newPeerConfig(sp *serverPeer) *peer.Config { 1978 return &peer.Config{ 1979 Listeners: peer.MessageListeners{ 1980 OnVersion: sp.OnVersion, 1981 OnVerAck: sp.OnVerAck, 1982 OnMemPool: sp.OnMemPool, 1983 OnTx: sp.OnTx, 1984 OnBlock: sp.OnBlock, 1985 OnInv: sp.OnInv, 1986 OnHeaders: sp.OnHeaders, 1987 OnGetData: sp.OnGetData, 1988 OnGetBlocks: sp.OnGetBlocks, 1989 OnGetHeaders: sp.OnGetHeaders, 1990 OnGetCFilters: sp.OnGetCFilters, 1991 OnGetCFHeaders: sp.OnGetCFHeaders, 1992 OnGetCFCheckpt: sp.OnGetCFCheckpt, 1993 OnFeeFilter: sp.OnFeeFilter, 1994 OnFilterAdd: sp.OnFilterAdd, 1995 OnFilterClear: sp.OnFilterClear, 1996 OnFilterLoad: sp.OnFilterLoad, 1997 OnGetAddr: sp.OnGetAddr, 1998 OnAddr: sp.OnAddr, 1999 OnRead: sp.OnRead, 2000 OnWrite: sp.OnWrite, 2001 2002 // Note: The reference client currently bans peers that send alerts 2003 // not signed with its key. We could verify against their key, but 2004 // since the reference client is currently unwilling to support 2005 // other implementations' alert messages, we will not relay theirs. 2006 OnAlert: nil, 2007 }, 2008 NewestBlock: sp.newestBlock, 2009 HostToNetAddress: sp.server.addrManager.HostToNetAddress, 2010 Proxy: cfg.Proxy, 2011 UserAgentName: userAgentName, 2012 UserAgentVersion: userAgentVersion, 2013 UserAgentComments: cfg.UserAgentComments, 2014 ChainParams: sp.server.chainParams, 2015 Services: sp.server.services, 2016 DisableRelayTx: cfg.BlocksOnly, 2017 ProtocolVersion: peer.MaxProtocolVersion, 2018 TrickleInterval: cfg.TrickleInterval, 2019 } 2020} 2021 2022// inboundPeerConnected is invoked by the connection manager when a new inbound 2023// connection is established. It initializes a new inbound server peer 2024// instance, associates it with the connection, and starts a goroutine to wait 2025// for disconnection. 2026func (s *server) inboundPeerConnected(conn net.Conn) { 2027 sp := newServerPeer(s, false) 2028 sp.isWhitelisted = isWhitelisted(conn.RemoteAddr()) 2029 sp.Peer = peer.NewInboundPeer(newPeerConfig(sp)) 2030 sp.AssociateConnection(conn) 2031 go s.peerDoneHandler(sp) 2032} 2033 2034// outboundPeerConnected is invoked by the connection manager when a new 2035// outbound connection is established. It initializes a new outbound server 2036// peer instance, associates it with the relevant state such as the connection 2037// request instance and the connection itself, and finally notifies the address 2038// manager of the attempt. 2039func (s *server) outboundPeerConnected(c *connmgr.ConnReq, conn net.Conn) { 2040 sp := newServerPeer(s, c.Permanent) 2041 p, err := peer.NewOutboundPeer(newPeerConfig(sp), c.Addr.String()) 2042 if err != nil { 2043 srvrLog.Debugf("Cannot create outbound peer %s: %v", c.Addr, err) 2044 if c.Permanent { 2045 s.connManager.Disconnect(c.ID()) 2046 } else { 2047 s.connManager.Remove(c.ID()) 2048 go s.connManager.NewConnReq() 2049 } 2050 return 2051 } 2052 sp.Peer = p 2053 sp.connReq = c 2054 sp.isWhitelisted = isWhitelisted(conn.RemoteAddr()) 2055 sp.AssociateConnection(conn) 2056 go s.peerDoneHandler(sp) 2057} 2058 2059// peerDoneHandler handles peer disconnects by notifiying the server that it's 2060// done along with other performing other desirable cleanup. 2061func (s *server) peerDoneHandler(sp *serverPeer) { 2062 sp.WaitForDisconnect() 2063 s.donePeers <- sp 2064 2065 // Only tell sync manager we are gone if we ever told it we existed. 2066 if sp.VerAckReceived() { 2067 s.syncManager.DonePeer(sp.Peer) 2068 2069 // Evict any remaining orphans that were sent by the peer. 2070 numEvicted := s.txMemPool.RemoveOrphansByTag(mempool.Tag(sp.ID())) 2071 if numEvicted > 0 { 2072 txmpLog.Debugf("Evicted %d %s from peer %v (id %d)", 2073 numEvicted, pickNoun(numEvicted, "orphan", 2074 "orphans"), sp, sp.ID()) 2075 } 2076 } 2077 close(sp.quit) 2078} 2079 2080// peerHandler is used to handle peer operations such as adding and removing 2081// peers to and from the server, banning peers, and broadcasting messages to 2082// peers. It must be run in a goroutine. 2083func (s *server) peerHandler() { 2084 // Start the address manager and sync manager, both of which are needed 2085 // by peers. This is done here since their lifecycle is closely tied 2086 // to this handler and rather than adding more channels to sychronize 2087 // things, it's easier and slightly faster to simply start and stop them 2088 // in this handler. 2089 s.addrManager.Start() 2090 s.syncManager.Start() 2091 2092 srvrLog.Tracef("Starting peer handler") 2093 2094 state := &peerState{ 2095 inboundPeers: make(map[int32]*serverPeer), 2096 persistentPeers: make(map[int32]*serverPeer), 2097 outboundPeers: make(map[int32]*serverPeer), 2098 banned: make(map[string]time.Time), 2099 outboundGroups: make(map[string]int), 2100 } 2101 2102 if !cfg.DisableDNSSeed { 2103 // Add peers discovered through DNS to the address manager. 2104 connmgr.SeedFromDNS(activeNetParams.Params, defaultRequiredServices, 2105 btcdLookup, func(addrs []*wire.NetAddress) { 2106 // Bitcoind uses a lookup of the dns seeder here. This 2107 // is rather strange since the values looked up by the 2108 // DNS seed lookups will vary quite a lot. 2109 // to replicate this behaviour we put all addresses as 2110 // having come from the first one. 2111 s.addrManager.AddAddresses(addrs, addrs[0]) 2112 }) 2113 } 2114 go s.connManager.Start() 2115 2116out: 2117 for { 2118 select { 2119 // New peers connected to the server. 2120 case p := <-s.newPeers: 2121 s.handleAddPeerMsg(state, p) 2122 2123 // Disconnected peers. 2124 case p := <-s.donePeers: 2125 s.handleDonePeerMsg(state, p) 2126 2127 // Block accepted in mainchain or orphan, update peer height. 2128 case umsg := <-s.peerHeightsUpdate: 2129 s.handleUpdatePeerHeights(state, umsg) 2130 2131 // Peer to ban. 2132 case p := <-s.banPeers: 2133 s.handleBanPeerMsg(state, p) 2134 2135 // New inventory to potentially be relayed to other peers. 2136 case invMsg := <-s.relayInv: 2137 s.handleRelayInvMsg(state, invMsg) 2138 2139 // Message to broadcast to all connected peers except those 2140 // which are excluded by the message. 2141 case bmsg := <-s.broadcast: 2142 s.handleBroadcastMsg(state, &bmsg) 2143 2144 case qmsg := <-s.query: 2145 s.handleQuery(state, qmsg) 2146 2147 case <-s.quit: 2148 // Disconnect all peers on server shutdown. 2149 state.forAllPeers(func(sp *serverPeer) { 2150 srvrLog.Tracef("Shutdown peer %s", sp) 2151 sp.Disconnect() 2152 }) 2153 break out 2154 } 2155 } 2156 2157 s.connManager.Stop() 2158 s.syncManager.Stop() 2159 s.addrManager.Stop() 2160 2161 // Drain channels before exiting so nothing is left waiting around 2162 // to send. 2163cleanup: 2164 for { 2165 select { 2166 case <-s.newPeers: 2167 case <-s.donePeers: 2168 case <-s.peerHeightsUpdate: 2169 case <-s.relayInv: 2170 case <-s.broadcast: 2171 case <-s.query: 2172 default: 2173 break cleanup 2174 } 2175 } 2176 s.wg.Done() 2177 srvrLog.Tracef("Peer handler done") 2178} 2179 2180// AddPeer adds a new peer that has already been connected to the server. 2181func (s *server) AddPeer(sp *serverPeer) { 2182 s.newPeers <- sp 2183} 2184 2185// BanPeer bans a peer that has already been connected to the server by ip. 2186func (s *server) BanPeer(sp *serverPeer) { 2187 s.banPeers <- sp 2188} 2189 2190// RelayInventory relays the passed inventory vector to all connected peers 2191// that are not already known to have it. 2192func (s *server) RelayInventory(invVect *wire.InvVect, data interface{}) { 2193 s.relayInv <- relayMsg{invVect: invVect, data: data} 2194} 2195 2196// BroadcastMessage sends msg to all peers currently connected to the server 2197// except those in the passed peers to exclude. 2198func (s *server) BroadcastMessage(msg wire.Message, exclPeers ...*serverPeer) { 2199 // XXX: Need to determine if this is an alert that has already been 2200 // broadcast and refrain from broadcasting again. 2201 bmsg := broadcastMsg{message: msg, excludePeers: exclPeers} 2202 s.broadcast <- bmsg 2203} 2204 2205// ConnectedCount returns the number of currently connected peers. 2206func (s *server) ConnectedCount() int32 { 2207 replyChan := make(chan int32) 2208 2209 s.query <- getConnCountMsg{reply: replyChan} 2210 2211 return <-replyChan 2212} 2213 2214// OutboundGroupCount returns the number of peers connected to the given 2215// outbound group key. 2216func (s *server) OutboundGroupCount(key string) int { 2217 replyChan := make(chan int) 2218 s.query <- getOutboundGroup{key: key, reply: replyChan} 2219 return <-replyChan 2220} 2221 2222// AddBytesSent adds the passed number of bytes to the total bytes sent counter 2223// for the server. It is safe for concurrent access. 2224func (s *server) AddBytesSent(bytesSent uint64) { 2225 atomic.AddUint64(&s.bytesSent, bytesSent) 2226} 2227 2228// AddBytesReceived adds the passed number of bytes to the total bytes received 2229// counter for the server. It is safe for concurrent access. 2230func (s *server) AddBytesReceived(bytesReceived uint64) { 2231 atomic.AddUint64(&s.bytesReceived, bytesReceived) 2232} 2233 2234// NetTotals returns the sum of all bytes received and sent across the network 2235// for all peers. It is safe for concurrent access. 2236func (s *server) NetTotals() (uint64, uint64) { 2237 return atomic.LoadUint64(&s.bytesReceived), 2238 atomic.LoadUint64(&s.bytesSent) 2239} 2240 2241// UpdatePeerHeights updates the heights of all peers who have have announced 2242// the latest connected main chain block, or a recognized orphan. These height 2243// updates allow us to dynamically refresh peer heights, ensuring sync peer 2244// selection has access to the latest block heights for each peer. 2245func (s *server) UpdatePeerHeights(latestBlkHash *chainhash.Hash, latestHeight int32, updateSource *peer.Peer) { 2246 s.peerHeightsUpdate <- updatePeerHeightsMsg{ 2247 newHash: latestBlkHash, 2248 newHeight: latestHeight, 2249 originPeer: updateSource, 2250 } 2251} 2252 2253// rebroadcastHandler keeps track of user submitted inventories that we have 2254// sent out but have not yet made it into a block. We periodically rebroadcast 2255// them in case our peers restarted or otherwise lost track of them. 2256func (s *server) rebroadcastHandler() { 2257 // Wait 5 min before first tx rebroadcast. 2258 timer := time.NewTimer(5 * time.Minute) 2259 pendingInvs := make(map[wire.InvVect]interface{}) 2260 2261out: 2262 for { 2263 select { 2264 case riv := <-s.modifyRebroadcastInv: 2265 switch msg := riv.(type) { 2266 // Incoming InvVects are added to our map of RPC txs. 2267 case broadcastInventoryAdd: 2268 pendingInvs[*msg.invVect] = msg.data 2269 2270 // When an InvVect has been added to a block, we can 2271 // now remove it, if it was present. 2272 case broadcastInventoryDel: 2273 if _, ok := pendingInvs[*msg]; ok { 2274 delete(pendingInvs, *msg) 2275 } 2276 } 2277 2278 case <-timer.C: 2279 // Any inventory we have has not made it into a block 2280 // yet. We periodically resubmit them until they have. 2281 for iv, data := range pendingInvs { 2282 ivCopy := iv 2283 s.RelayInventory(&ivCopy, data) 2284 } 2285 2286 // Process at a random time up to 30mins (in seconds) 2287 // in the future. 2288 timer.Reset(time.Second * 2289 time.Duration(randomUint16Number(1800))) 2290 2291 case <-s.quit: 2292 break out 2293 } 2294 } 2295 2296 timer.Stop() 2297 2298 // Drain channels before exiting so nothing is left waiting around 2299 // to send. 2300cleanup: 2301 for { 2302 select { 2303 case <-s.modifyRebroadcastInv: 2304 default: 2305 break cleanup 2306 } 2307 } 2308 s.wg.Done() 2309} 2310 2311// Start begins accepting connections from peers. 2312func (s *server) Start() { 2313 // Already started? 2314 if atomic.AddInt32(&s.started, 1) != 1 { 2315 return 2316 } 2317 2318 srvrLog.Trace("Starting server") 2319 2320 // Server startup time. Used for the uptime command for uptime calculation. 2321 s.startupTime = time.Now().Unix() 2322 2323 // Start the peer handler which in turn starts the address and block 2324 // managers. 2325 s.wg.Add(1) 2326 go s.peerHandler() 2327 2328 if s.nat != nil { 2329 s.wg.Add(1) 2330 go s.upnpUpdateThread() 2331 } 2332 2333 if !cfg.DisableRPC { 2334 s.wg.Add(1) 2335 2336 // Start the rebroadcastHandler, which ensures user tx received by 2337 // the RPC server are rebroadcast until being included in a block. 2338 go s.rebroadcastHandler() 2339 2340 s.rpcServer.Start() 2341 } 2342 2343 // Start the CPU miner if generation is enabled. 2344 if cfg.Generate { 2345 s.cpuMiner.Start() 2346 } 2347} 2348 2349// Stop gracefully shuts down the server by stopping and disconnecting all 2350// peers and the main listener. 2351func (s *server) Stop() error { 2352 // Make sure this only happens once. 2353 if atomic.AddInt32(&s.shutdown, 1) != 1 { 2354 srvrLog.Infof("Server is already in the process of shutting down") 2355 return nil 2356 } 2357 2358 srvrLog.Warnf("Server shutting down") 2359 2360 // Stop the CPU miner if needed 2361 s.cpuMiner.Stop() 2362 2363 // Shutdown the RPC server if it's not disabled. 2364 if !cfg.DisableRPC { 2365 s.rpcServer.Stop() 2366 } 2367 2368 // Save fee estimator state in the database. 2369 s.db.Update(func(tx database.Tx) error { 2370 metadata := tx.Metadata() 2371 metadata.Put(mempool.EstimateFeeDatabaseKey, s.feeEstimator.Save()) 2372 2373 return nil 2374 }) 2375 2376 // Signal the remaining goroutines to quit. 2377 close(s.quit) 2378 return nil 2379} 2380 2381// WaitForShutdown blocks until the main listener and peer handlers are stopped. 2382func (s *server) WaitForShutdown() { 2383 s.wg.Wait() 2384} 2385 2386// ScheduleShutdown schedules a server shutdown after the specified duration. 2387// It also dynamically adjusts how often to warn the server is going down based 2388// on remaining duration. 2389func (s *server) ScheduleShutdown(duration time.Duration) { 2390 // Don't schedule shutdown more than once. 2391 if atomic.AddInt32(&s.shutdownSched, 1) != 1 { 2392 return 2393 } 2394 srvrLog.Warnf("Server shutdown in %v", duration) 2395 go func() { 2396 remaining := duration 2397 tickDuration := dynamicTickDuration(remaining) 2398 done := time.After(remaining) 2399 ticker := time.NewTicker(tickDuration) 2400 out: 2401 for { 2402 select { 2403 case <-done: 2404 ticker.Stop() 2405 s.Stop() 2406 break out 2407 case <-ticker.C: 2408 remaining = remaining - tickDuration 2409 if remaining < time.Second { 2410 continue 2411 } 2412 2413 // Change tick duration dynamically based on remaining time. 2414 newDuration := dynamicTickDuration(remaining) 2415 if tickDuration != newDuration { 2416 tickDuration = newDuration 2417 ticker.Stop() 2418 ticker = time.NewTicker(tickDuration) 2419 } 2420 srvrLog.Warnf("Server shutdown in %v", remaining) 2421 } 2422 } 2423 }() 2424} 2425 2426// parseListeners determines whether each listen address is IPv4 and IPv6 and 2427// returns a slice of appropriate net.Addrs to listen on with TCP. It also 2428// properly detects addresses which apply to "all interfaces" and adds the 2429// address as both IPv4 and IPv6. 2430func parseListeners(addrs []string) ([]net.Addr, error) { 2431 netAddrs := make([]net.Addr, 0, len(addrs)*2) 2432 for _, addr := range addrs { 2433 host, _, err := net.SplitHostPort(addr) 2434 if err != nil { 2435 // Shouldn't happen due to already being normalized. 2436 return nil, err 2437 } 2438 2439 // Empty host or host of * on plan9 is both IPv4 and IPv6. 2440 if host == "" || (host == "*" && runtime.GOOS == "plan9") { 2441 netAddrs = append(netAddrs, simpleAddr{net: "tcp4", addr: addr}) 2442 netAddrs = append(netAddrs, simpleAddr{net: "tcp6", addr: addr}) 2443 continue 2444 } 2445 2446 // Strip IPv6 zone id if present since net.ParseIP does not 2447 // handle it. 2448 zoneIndex := strings.LastIndex(host, "%") 2449 if zoneIndex > 0 { 2450 host = host[:zoneIndex] 2451 } 2452 2453 // Parse the IP. 2454 ip := net.ParseIP(host) 2455 if ip == nil { 2456 return nil, fmt.Errorf("'%s' is not a valid IP address", host) 2457 } 2458 2459 // To4 returns nil when the IP is not an IPv4 address, so use 2460 // this determine the address type. 2461 if ip.To4() == nil { 2462 netAddrs = append(netAddrs, simpleAddr{net: "tcp6", addr: addr}) 2463 } else { 2464 netAddrs = append(netAddrs, simpleAddr{net: "tcp4", addr: addr}) 2465 } 2466 } 2467 return netAddrs, nil 2468} 2469 2470func (s *server) upnpUpdateThread() { 2471 // Go off immediately to prevent code duplication, thereafter we renew 2472 // lease every 15 minutes. 2473 timer := time.NewTimer(0 * time.Second) 2474 lport, _ := strconv.ParseInt(activeNetParams.DefaultPort, 10, 16) 2475 first := true 2476out: 2477 for { 2478 select { 2479 case <-timer.C: 2480 // TODO: pick external port more cleverly 2481 // TODO: know which ports we are listening to on an external net. 2482 // TODO: if specific listen port doesn't work then ask for wildcard 2483 // listen port? 2484 // XXX this assumes timeout is in seconds. 2485 listenPort, err := s.nat.AddPortMapping("tcp", int(lport), int(lport), 2486 "btcd listen port", 20*60) 2487 if err != nil { 2488 srvrLog.Warnf("can't add UPnP port mapping: %v", err) 2489 } 2490 if first && err == nil { 2491 // TODO: look this up periodically to see if upnp domain changed 2492 // and so did ip. 2493 externalip, err := s.nat.GetExternalAddress() 2494 if err != nil { 2495 srvrLog.Warnf("UPnP can't get external address: %v", err) 2496 continue out 2497 } 2498 na := wire.NewNetAddressIPPort(externalip, uint16(listenPort), 2499 s.services) 2500 err = s.addrManager.AddLocalAddress(na, addrmgr.UpnpPrio) 2501 if err != nil { 2502 // XXX DeletePortMapping? 2503 } 2504 srvrLog.Warnf("Successfully bound via UPnP to %s", addrmgr.NetAddressKey(na)) 2505 first = false 2506 } 2507 timer.Reset(time.Minute * 15) 2508 case <-s.quit: 2509 break out 2510 } 2511 } 2512 2513 timer.Stop() 2514 2515 if err := s.nat.DeletePortMapping("tcp", int(lport), int(lport)); err != nil { 2516 srvrLog.Warnf("unable to remove UPnP port mapping: %v", err) 2517 } else { 2518 srvrLog.Debugf("successfully disestablished UPnP port mapping") 2519 } 2520 2521 s.wg.Done() 2522} 2523 2524// setupRPCListeners returns a slice of listeners that are configured for use 2525// with the RPC server depending on the configuration settings for listen 2526// addresses and TLS. 2527func setupRPCListeners() ([]net.Listener, error) { 2528 // Setup TLS if not disabled. 2529 listenFunc := net.Listen 2530 if !cfg.DisableTLS { 2531 // Generate the TLS cert and key file if both don't already 2532 // exist. 2533 if !fileExists(cfg.RPCKey) && !fileExists(cfg.RPCCert) { 2534 err := genCertPair(cfg.RPCCert, cfg.RPCKey) 2535 if err != nil { 2536 return nil, err 2537 } 2538 } 2539 keypair, err := tls.LoadX509KeyPair(cfg.RPCCert, cfg.RPCKey) 2540 if err != nil { 2541 return nil, err 2542 } 2543 2544 tlsConfig := tls.Config{ 2545 Certificates: []tls.Certificate{keypair}, 2546 MinVersion: tls.VersionTLS12, 2547 } 2548 2549 // Change the standard net.Listen function to the tls one. 2550 listenFunc = func(net string, laddr string) (net.Listener, error) { 2551 return tls.Listen(net, laddr, &tlsConfig) 2552 } 2553 } 2554 2555 netAddrs, err := parseListeners(cfg.RPCListeners) 2556 if err != nil { 2557 return nil, err 2558 } 2559 2560 listeners := make([]net.Listener, 0, len(netAddrs)) 2561 for _, addr := range netAddrs { 2562 listener, err := listenFunc(addr.Network(), addr.String()) 2563 if err != nil { 2564 rpcsLog.Warnf("Can't listen on %s: %v", addr, err) 2565 continue 2566 } 2567 listeners = append(listeners, listener) 2568 } 2569 2570 return listeners, nil 2571} 2572 2573// newServer returns a new btcd server configured to listen on addr for the 2574// bitcoin network type specified by chainParams. Use start to begin accepting 2575// connections from peers. 2576func newServer(listenAddrs, agentBlacklist, agentWhitelist []string, 2577 db database.DB, chainParams *chaincfg.Params, 2578 interrupt <-chan struct{}) (*server, error) { 2579 2580 services := defaultServices 2581 if cfg.NoPeerBloomFilters { 2582 services &^= wire.SFNodeBloom 2583 } 2584 if cfg.NoCFilters { 2585 services &^= wire.SFNodeCF 2586 } 2587 2588 amgr := addrmgr.New(cfg.DataDir, btcdLookup) 2589 2590 var listeners []net.Listener 2591 var nat NAT 2592 if !cfg.DisableListen { 2593 var err error 2594 listeners, nat, err = initListeners(amgr, listenAddrs, services) 2595 if err != nil { 2596 return nil, err 2597 } 2598 if len(listeners) == 0 { 2599 return nil, errors.New("no valid listen address") 2600 } 2601 } 2602 2603 if len(agentBlacklist) > 0 { 2604 srvrLog.Infof("User-agent blacklist %s", agentBlacklist) 2605 } 2606 if len(agentWhitelist) > 0 { 2607 srvrLog.Infof("User-agent whitelist %s", agentWhitelist) 2608 } 2609 2610 s := server{ 2611 chainParams: chainParams, 2612 addrManager: amgr, 2613 newPeers: make(chan *serverPeer, cfg.MaxPeers), 2614 donePeers: make(chan *serverPeer, cfg.MaxPeers), 2615 banPeers: make(chan *serverPeer, cfg.MaxPeers), 2616 query: make(chan interface{}), 2617 relayInv: make(chan relayMsg, cfg.MaxPeers), 2618 broadcast: make(chan broadcastMsg, cfg.MaxPeers), 2619 quit: make(chan struct{}), 2620 modifyRebroadcastInv: make(chan interface{}), 2621 peerHeightsUpdate: make(chan updatePeerHeightsMsg), 2622 nat: nat, 2623 db: db, 2624 timeSource: blockchain.NewMedianTime(), 2625 services: services, 2626 sigCache: txscript.NewSigCache(cfg.SigCacheMaxSize), 2627 hashCache: txscript.NewHashCache(cfg.SigCacheMaxSize), 2628 cfCheckptCaches: make(map[wire.FilterType][]cfHeaderKV), 2629 agentBlacklist: agentBlacklist, 2630 agentWhitelist: agentWhitelist, 2631 } 2632 2633 // Create the transaction and address indexes if needed. 2634 // 2635 // CAUTION: the txindex needs to be first in the indexes array because 2636 // the addrindex uses data from the txindex during catchup. If the 2637 // addrindex is run first, it may not have the transactions from the 2638 // current block indexed. 2639 var indexes []indexers.Indexer 2640 if cfg.TxIndex || cfg.AddrIndex { 2641 // Enable transaction index if address index is enabled since it 2642 // requires it. 2643 if !cfg.TxIndex { 2644 indxLog.Infof("Transaction index enabled because it " + 2645 "is required by the address index") 2646 cfg.TxIndex = true 2647 } else { 2648 indxLog.Info("Transaction index is enabled") 2649 } 2650 2651 s.txIndex = indexers.NewTxIndex(db) 2652 indexes = append(indexes, s.txIndex) 2653 } 2654 if cfg.AddrIndex { 2655 indxLog.Info("Address index is enabled") 2656 s.addrIndex = indexers.NewAddrIndex(db, chainParams) 2657 indexes = append(indexes, s.addrIndex) 2658 } 2659 if !cfg.NoCFilters { 2660 indxLog.Info("Committed filter index is enabled") 2661 s.cfIndex = indexers.NewCfIndex(db, chainParams) 2662 indexes = append(indexes, s.cfIndex) 2663 } 2664 2665 // Create an index manager if any of the optional indexes are enabled. 2666 var indexManager blockchain.IndexManager 2667 if len(indexes) > 0 { 2668 indexManager = indexers.NewManager(db, indexes) 2669 } 2670 2671 // Merge given checkpoints with the default ones unless they are disabled. 2672 var checkpoints []chaincfg.Checkpoint 2673 if !cfg.DisableCheckpoints { 2674 checkpoints = mergeCheckpoints(s.chainParams.Checkpoints, cfg.addCheckpoints) 2675 } 2676 2677 // Create a new block chain instance with the appropriate configuration. 2678 var err error 2679 s.chain, err = blockchain.New(&blockchain.Config{ 2680 DB: s.db, 2681 Interrupt: interrupt, 2682 ChainParams: s.chainParams, 2683 Checkpoints: checkpoints, 2684 TimeSource: s.timeSource, 2685 SigCache: s.sigCache, 2686 IndexManager: indexManager, 2687 HashCache: s.hashCache, 2688 }) 2689 if err != nil { 2690 return nil, err 2691 } 2692 2693 // Search for a FeeEstimator state in the database. If none can be found 2694 // or if it cannot be loaded, create a new one. 2695 db.Update(func(tx database.Tx) error { 2696 metadata := tx.Metadata() 2697 feeEstimationData := metadata.Get(mempool.EstimateFeeDatabaseKey) 2698 if feeEstimationData != nil { 2699 // delete it from the database so that we don't try to restore the 2700 // same thing again somehow. 2701 metadata.Delete(mempool.EstimateFeeDatabaseKey) 2702 2703 // If there is an error, log it and make a new fee estimator. 2704 var err error 2705 s.feeEstimator, err = mempool.RestoreFeeEstimator(feeEstimationData) 2706 2707 if err != nil { 2708 peerLog.Errorf("Failed to restore fee estimator %v", err) 2709 } 2710 } 2711 2712 return nil 2713 }) 2714 2715 // If no feeEstimator has been found, or if the one that has been found 2716 // is behind somehow, create a new one and start over. 2717 if s.feeEstimator == nil || s.feeEstimator.LastKnownHeight() != s.chain.BestSnapshot().Height { 2718 s.feeEstimator = mempool.NewFeeEstimator( 2719 mempool.DefaultEstimateFeeMaxRollback, 2720 mempool.DefaultEstimateFeeMinRegisteredBlocks) 2721 } 2722 2723 txC := mempool.Config{ 2724 Policy: mempool.Policy{ 2725 DisableRelayPriority: cfg.NoRelayPriority, 2726 AcceptNonStd: cfg.RelayNonStd, 2727 FreeTxRelayLimit: cfg.FreeTxRelayLimit, 2728 MaxOrphanTxs: cfg.MaxOrphanTxs, 2729 MaxOrphanTxSize: defaultMaxOrphanTxSize, 2730 MaxSigOpCostPerTx: blockchain.MaxBlockSigOpsCost / 4, 2731 MinRelayTxFee: cfg.minRelayTxFee, 2732 MaxTxVersion: 2, 2733 RejectReplacement: cfg.RejectReplacement, 2734 }, 2735 ChainParams: chainParams, 2736 FetchUtxoView: s.chain.FetchUtxoView, 2737 BestHeight: func() int32 { return s.chain.BestSnapshot().Height }, 2738 MedianTimePast: func() time.Time { return s.chain.BestSnapshot().MedianTime }, 2739 CalcSequenceLock: func(tx *btcutil.Tx, view *blockchain.UtxoViewpoint) (*blockchain.SequenceLock, error) { 2740 return s.chain.CalcSequenceLock(tx, view, true) 2741 }, 2742 IsDeploymentActive: s.chain.IsDeploymentActive, 2743 SigCache: s.sigCache, 2744 HashCache: s.hashCache, 2745 AddrIndex: s.addrIndex, 2746 FeeEstimator: s.feeEstimator, 2747 } 2748 s.txMemPool = mempool.New(&txC) 2749 2750 s.syncManager, err = netsync.New(&netsync.Config{ 2751 PeerNotifier: &s, 2752 Chain: s.chain, 2753 TxMemPool: s.txMemPool, 2754 ChainParams: s.chainParams, 2755 DisableCheckpoints: cfg.DisableCheckpoints, 2756 MaxPeers: cfg.MaxPeers, 2757 FeeEstimator: s.feeEstimator, 2758 }) 2759 if err != nil { 2760 return nil, err 2761 } 2762 2763 // Create the mining policy and block template generator based on the 2764 // configuration options. 2765 // 2766 // NOTE: The CPU miner relies on the mempool, so the mempool has to be 2767 // created before calling the function to create the CPU miner. 2768 policy := mining.Policy{ 2769 BlockMinWeight: cfg.BlockMinWeight, 2770 BlockMaxWeight: cfg.BlockMaxWeight, 2771 BlockMinSize: cfg.BlockMinSize, 2772 BlockMaxSize: cfg.BlockMaxSize, 2773 BlockPrioritySize: cfg.BlockPrioritySize, 2774 TxMinFreeFee: cfg.minRelayTxFee, 2775 } 2776 blockTemplateGenerator := mining.NewBlkTmplGenerator(&policy, 2777 s.chainParams, s.txMemPool, s.chain, s.timeSource, 2778 s.sigCache, s.hashCache) 2779 s.cpuMiner = cpuminer.New(&cpuminer.Config{ 2780 ChainParams: chainParams, 2781 BlockTemplateGenerator: blockTemplateGenerator, 2782 MiningAddrs: cfg.miningAddrs, 2783 ProcessBlock: s.syncManager.ProcessBlock, 2784 ConnectedCount: s.ConnectedCount, 2785 IsCurrent: s.syncManager.IsCurrent, 2786 }) 2787 2788 // Only setup a function to return new addresses to connect to when 2789 // not running in connect-only mode. The simulation network is always 2790 // in connect-only mode since it is only intended to connect to 2791 // specified peers and actively avoid advertising and connecting to 2792 // discovered peers in order to prevent it from becoming a public test 2793 // network. 2794 var newAddressFunc func() (net.Addr, error) 2795 if !cfg.SimNet && len(cfg.ConnectPeers) == 0 { 2796 newAddressFunc = func() (net.Addr, error) { 2797 for tries := 0; tries < 100; tries++ { 2798 addr := s.addrManager.GetAddress() 2799 if addr == nil { 2800 break 2801 } 2802 2803 // Address will not be invalid, local or unroutable 2804 // because addrmanager rejects those on addition. 2805 // Just check that we don't already have an address 2806 // in the same group so that we are not connecting 2807 // to the same network segment at the expense of 2808 // others. 2809 key := addrmgr.GroupKey(addr.NetAddress()) 2810 if s.OutboundGroupCount(key) != 0 { 2811 continue 2812 } 2813 2814 // only allow recent nodes (10mins) after we failed 30 2815 // times 2816 if tries < 30 && time.Since(addr.LastAttempt()) < 10*time.Minute { 2817 continue 2818 } 2819 2820 // allow nondefault ports after 50 failed tries. 2821 if tries < 50 && fmt.Sprintf("%d", addr.NetAddress().Port) != 2822 activeNetParams.DefaultPort { 2823 continue 2824 } 2825 2826 // Mark an attempt for the valid address. 2827 s.addrManager.Attempt(addr.NetAddress()) 2828 2829 addrString := addrmgr.NetAddressKey(addr.NetAddress()) 2830 return addrStringToNetAddr(addrString) 2831 } 2832 2833 return nil, errors.New("no valid connect address") 2834 } 2835 } 2836 2837 // Create a connection manager. 2838 targetOutbound := defaultTargetOutbound 2839 if cfg.MaxPeers < targetOutbound { 2840 targetOutbound = cfg.MaxPeers 2841 } 2842 cmgr, err := connmgr.New(&connmgr.Config{ 2843 Listeners: listeners, 2844 OnAccept: s.inboundPeerConnected, 2845 RetryDuration: connectionRetryInterval, 2846 TargetOutbound: uint32(targetOutbound), 2847 Dial: btcdDial, 2848 OnConnection: s.outboundPeerConnected, 2849 GetNewAddress: newAddressFunc, 2850 }) 2851 if err != nil { 2852 return nil, err 2853 } 2854 s.connManager = cmgr 2855 2856 // Start up persistent peers. 2857 permanentPeers := cfg.ConnectPeers 2858 if len(permanentPeers) == 0 { 2859 permanentPeers = cfg.AddPeers 2860 } 2861 for _, addr := range permanentPeers { 2862 netAddr, err := addrStringToNetAddr(addr) 2863 if err != nil { 2864 return nil, err 2865 } 2866 2867 go s.connManager.Connect(&connmgr.ConnReq{ 2868 Addr: netAddr, 2869 Permanent: true, 2870 }) 2871 } 2872 2873 if !cfg.DisableRPC { 2874 // Setup listeners for the configured RPC listen addresses and 2875 // TLS settings. 2876 rpcListeners, err := setupRPCListeners() 2877 if err != nil { 2878 return nil, err 2879 } 2880 if len(rpcListeners) == 0 { 2881 return nil, errors.New("RPCS: No valid listen address") 2882 } 2883 2884 s.rpcServer, err = newRPCServer(&rpcserverConfig{ 2885 Listeners: rpcListeners, 2886 StartupTime: s.startupTime, 2887 ConnMgr: &rpcConnManager{&s}, 2888 SyncMgr: &rpcSyncMgr{&s, s.syncManager}, 2889 TimeSource: s.timeSource, 2890 Chain: s.chain, 2891 ChainParams: chainParams, 2892 DB: db, 2893 TxMemPool: s.txMemPool, 2894 Generator: blockTemplateGenerator, 2895 CPUMiner: s.cpuMiner, 2896 TxIndex: s.txIndex, 2897 AddrIndex: s.addrIndex, 2898 CfIndex: s.cfIndex, 2899 FeeEstimator: s.feeEstimator, 2900 }) 2901 if err != nil { 2902 return nil, err 2903 } 2904 2905 // Signal process shutdown when the RPC server requests it. 2906 go func() { 2907 <-s.rpcServer.RequestedProcessShutdown() 2908 shutdownRequestChannel <- struct{}{} 2909 }() 2910 } 2911 2912 return &s, nil 2913} 2914 2915// initListeners initializes the configured net listeners and adds any bound 2916// addresses to the address manager. Returns the listeners and a NAT interface, 2917// which is non-nil if UPnP is in use. 2918func initListeners(amgr *addrmgr.AddrManager, listenAddrs []string, services wire.ServiceFlag) ([]net.Listener, NAT, error) { 2919 // Listen for TCP connections at the configured addresses 2920 netAddrs, err := parseListeners(listenAddrs) 2921 if err != nil { 2922 return nil, nil, err 2923 } 2924 2925 listeners := make([]net.Listener, 0, len(netAddrs)) 2926 for _, addr := range netAddrs { 2927 listener, err := net.Listen(addr.Network(), addr.String()) 2928 if err != nil { 2929 srvrLog.Warnf("Can't listen on %s: %v", addr, err) 2930 continue 2931 } 2932 listeners = append(listeners, listener) 2933 } 2934 2935 var nat NAT 2936 if len(cfg.ExternalIPs) != 0 { 2937 defaultPort, err := strconv.ParseUint(activeNetParams.DefaultPort, 10, 16) 2938 if err != nil { 2939 srvrLog.Errorf("Can not parse default port %s for active chain: %v", 2940 activeNetParams.DefaultPort, err) 2941 return nil, nil, err 2942 } 2943 2944 for _, sip := range cfg.ExternalIPs { 2945 eport := uint16(defaultPort) 2946 host, portstr, err := net.SplitHostPort(sip) 2947 if err != nil { 2948 // no port, use default. 2949 host = sip 2950 } else { 2951 port, err := strconv.ParseUint(portstr, 10, 16) 2952 if err != nil { 2953 srvrLog.Warnf("Can not parse port from %s for "+ 2954 "externalip: %v", sip, err) 2955 continue 2956 } 2957 eport = uint16(port) 2958 } 2959 na, err := amgr.HostToNetAddress(host, eport, services) 2960 if err != nil { 2961 srvrLog.Warnf("Not adding %s as externalip: %v", sip, err) 2962 continue 2963 } 2964 2965 err = amgr.AddLocalAddress(na, addrmgr.ManualPrio) 2966 if err != nil { 2967 amgrLog.Warnf("Skipping specified external IP: %v", err) 2968 } 2969 } 2970 } else { 2971 if cfg.Upnp { 2972 var err error 2973 nat, err = Discover() 2974 if err != nil { 2975 srvrLog.Warnf("Can't discover upnp: %v", err) 2976 } 2977 // nil nat here is fine, just means no upnp on network. 2978 } 2979 2980 // Add bound addresses to address manager to be advertised to peers. 2981 for _, listener := range listeners { 2982 addr := listener.Addr().String() 2983 err := addLocalAddress(amgr, addr, services) 2984 if err != nil { 2985 amgrLog.Warnf("Skipping bound address %s: %v", addr, err) 2986 } 2987 } 2988 } 2989 2990 return listeners, nat, nil 2991} 2992 2993// addrStringToNetAddr takes an address in the form of 'host:port' and returns 2994// a net.Addr which maps to the original address with any host names resolved 2995// to IP addresses. It also handles tor addresses properly by returning a 2996// net.Addr that encapsulates the address. 2997func addrStringToNetAddr(addr string) (net.Addr, error) { 2998 host, strPort, err := net.SplitHostPort(addr) 2999 if err != nil { 3000 return nil, err 3001 } 3002 3003 port, err := strconv.Atoi(strPort) 3004 if err != nil { 3005 return nil, err 3006 } 3007 3008 // Skip if host is already an IP address. 3009 if ip := net.ParseIP(host); ip != nil { 3010 return &net.TCPAddr{ 3011 IP: ip, 3012 Port: port, 3013 }, nil 3014 } 3015 3016 // Tor addresses cannot be resolved to an IP, so just return an onion 3017 // address instead. 3018 if strings.HasSuffix(host, ".onion") { 3019 if cfg.NoOnion { 3020 return nil, errors.New("tor has been disabled") 3021 } 3022 3023 return &onionAddr{addr: addr}, nil 3024 } 3025 3026 // Attempt to look up an IP address associated with the parsed host. 3027 ips, err := btcdLookup(host) 3028 if err != nil { 3029 return nil, err 3030 } 3031 if len(ips) == 0 { 3032 return nil, fmt.Errorf("no addresses found for %s", host) 3033 } 3034 3035 return &net.TCPAddr{ 3036 IP: ips[0], 3037 Port: port, 3038 }, nil 3039} 3040 3041// addLocalAddress adds an address that this node is listening on to the 3042// address manager so that it may be relayed to peers. 3043func addLocalAddress(addrMgr *addrmgr.AddrManager, addr string, services wire.ServiceFlag) error { 3044 host, portStr, err := net.SplitHostPort(addr) 3045 if err != nil { 3046 return err 3047 } 3048 port, err := strconv.ParseUint(portStr, 10, 16) 3049 if err != nil { 3050 return err 3051 } 3052 3053 if ip := net.ParseIP(host); ip != nil && ip.IsUnspecified() { 3054 // If bound to unspecified address, advertise all local interfaces 3055 addrs, err := net.InterfaceAddrs() 3056 if err != nil { 3057 return err 3058 } 3059 3060 for _, addr := range addrs { 3061 ifaceIP, _, err := net.ParseCIDR(addr.String()) 3062 if err != nil { 3063 continue 3064 } 3065 3066 // If bound to 0.0.0.0, do not add IPv6 interfaces and if bound to 3067 // ::, do not add IPv4 interfaces. 3068 if (ip.To4() == nil) != (ifaceIP.To4() == nil) { 3069 continue 3070 } 3071 3072 netAddr := wire.NewNetAddressIPPort(ifaceIP, uint16(port), services) 3073 addrMgr.AddLocalAddress(netAddr, addrmgr.BoundPrio) 3074 } 3075 } else { 3076 netAddr, err := addrMgr.HostToNetAddress(host, uint16(port), services) 3077 if err != nil { 3078 return err 3079 } 3080 3081 addrMgr.AddLocalAddress(netAddr, addrmgr.BoundPrio) 3082 } 3083 3084 return nil 3085} 3086 3087// dynamicTickDuration is a convenience function used to dynamically choose a 3088// tick duration based on remaining time. It is primarily used during 3089// server shutdown to make shutdown warnings more frequent as the shutdown time 3090// approaches. 3091func dynamicTickDuration(remaining time.Duration) time.Duration { 3092 switch { 3093 case remaining <= time.Second*5: 3094 return time.Second 3095 case remaining <= time.Second*15: 3096 return time.Second * 5 3097 case remaining <= time.Minute: 3098 return time.Second * 15 3099 case remaining <= time.Minute*5: 3100 return time.Minute 3101 case remaining <= time.Minute*15: 3102 return time.Minute * 5 3103 case remaining <= time.Hour: 3104 return time.Minute * 15 3105 } 3106 return time.Hour 3107} 3108 3109// isWhitelisted returns whether the IP address is included in the whitelisted 3110// networks and IPs. 3111func isWhitelisted(addr net.Addr) bool { 3112 if len(cfg.whitelists) == 0 { 3113 return false 3114 } 3115 3116 host, _, err := net.SplitHostPort(addr.String()) 3117 if err != nil { 3118 srvrLog.Warnf("Unable to SplitHostPort on '%s': %v", addr, err) 3119 return false 3120 } 3121 ip := net.ParseIP(host) 3122 if ip == nil { 3123 srvrLog.Warnf("Unable to parse IP '%s'", addr) 3124 return false 3125 } 3126 3127 for _, ipnet := range cfg.whitelists { 3128 if ipnet.Contains(ip) { 3129 return true 3130 } 3131 } 3132 return false 3133} 3134 3135// checkpointSorter implements sort.Interface to allow a slice of checkpoints to 3136// be sorted. 3137type checkpointSorter []chaincfg.Checkpoint 3138 3139// Len returns the number of checkpoints in the slice. It is part of the 3140// sort.Interface implementation. 3141func (s checkpointSorter) Len() int { 3142 return len(s) 3143} 3144 3145// Swap swaps the checkpoints at the passed indices. It is part of the 3146// sort.Interface implementation. 3147func (s checkpointSorter) Swap(i, j int) { 3148 s[i], s[j] = s[j], s[i] 3149} 3150 3151// Less returns whether the checkpoint with index i should sort before the 3152// checkpoint with index j. It is part of the sort.Interface implementation. 3153func (s checkpointSorter) Less(i, j int) bool { 3154 return s[i].Height < s[j].Height 3155} 3156 3157// mergeCheckpoints returns two slices of checkpoints merged into one slice 3158// such that the checkpoints are sorted by height. In the case the additional 3159// checkpoints contain a checkpoint with the same height as a checkpoint in the 3160// default checkpoints, the additional checkpoint will take precedence and 3161// overwrite the default one. 3162func mergeCheckpoints(defaultCheckpoints, additional []chaincfg.Checkpoint) []chaincfg.Checkpoint { 3163 // Create a map of the additional checkpoints to remove duplicates while 3164 // leaving the most recently-specified checkpoint. 3165 extra := make(map[int32]chaincfg.Checkpoint) 3166 for _, checkpoint := range additional { 3167 extra[checkpoint.Height] = checkpoint 3168 } 3169 3170 // Add all default checkpoints that do not have an override in the 3171 // additional checkpoints. 3172 numDefault := len(defaultCheckpoints) 3173 checkpoints := make([]chaincfg.Checkpoint, 0, numDefault+len(extra)) 3174 for _, checkpoint := range defaultCheckpoints { 3175 if _, exists := extra[checkpoint.Height]; !exists { 3176 checkpoints = append(checkpoints, checkpoint) 3177 } 3178 } 3179 3180 // Append the additional checkpoints and return the sorted results. 3181 for _, checkpoint := range extra { 3182 checkpoints = append(checkpoints, checkpoint) 3183 } 3184 sort.Sort(checkpointSorter(checkpoints)) 3185 return checkpoints 3186} 3187 3188// HasUndesiredUserAgent determines whether the server should continue to pursue 3189// a connection with this peer based on its advertised user agent. It performs 3190// the following steps: 3191// 1) Reject the peer if it contains a blacklisted agent. 3192// 2) If no whitelist is provided, accept all user agents. 3193// 3) Accept the peer if it contains a whitelisted agent. 3194// 4) Reject all other peers. 3195func (sp *serverPeer) HasUndesiredUserAgent(blacklistedAgents, 3196 whitelistedAgents []string) bool { 3197 3198 agent := sp.UserAgent() 3199 3200 // First, if peer's user agent contains any blacklisted substring, we 3201 // will ignore the connection request. 3202 for _, blacklistedAgent := range blacklistedAgents { 3203 if strings.Contains(agent, blacklistedAgent) { 3204 srvrLog.Debugf("Ignoring peer %s, user agent "+ 3205 "contains blacklisted user agent: %s", sp, 3206 agent) 3207 return true 3208 } 3209 } 3210 3211 // If no whitelist is provided, we will accept all user agents. 3212 if len(whitelistedAgents) == 0 { 3213 return false 3214 } 3215 3216 // Peer's user agent passed blacklist. Now check to see if it contains 3217 // one of our whitelisted user agents, if so accept. 3218 for _, whitelistedAgent := range whitelistedAgents { 3219 if strings.Contains(agent, whitelistedAgent) { 3220 return false 3221 } 3222 } 3223 3224 // Otherwise, the peer's user agent was not included in our whitelist. 3225 // Ignore just in case it could stall the initial block download. 3226 srvrLog.Debugf("Ignoring peer %s, user agent: %s not found in "+ 3227 "whitelist", sp, agent) 3228 3229 return true 3230} 3231