1// Copyright 2020 The go-ethereum Authors 2// This file is part of the go-ethereum library. 3// 4// The go-ethereum library is free software: you can redistribute it and/or modify 5// it under the terms of the GNU Lesser General Public License as published by 6// the Free Software Foundation, either version 3 of the License, or 7// (at your option) any later version. 8// 9// The go-ethereum library is distributed in the hope that it will be useful, 10// but WITHOUT ANY WARRANTY; without even the implied warranty of 11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12// GNU Lesser General Public License for more details. 13// 14// You should have received a copy of the GNU Lesser General Public License 15// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. 16 17package eth 18 19import ( 20 "math/big" 21 "math/rand" 22 "sync" 23 24 mapset "github.com/deckarep/golang-set" 25 "github.com/ethereum/go-ethereum/common" 26 "github.com/ethereum/go-ethereum/core/types" 27 "github.com/ethereum/go-ethereum/p2p" 28 "github.com/ethereum/go-ethereum/rlp" 29) 30 31const ( 32 // maxKnownTxs is the maximum transactions hashes to keep in the known list 33 // before starting to randomly evict them. 34 maxKnownTxs = 32768 35 36 // maxKnownBlocks is the maximum block hashes to keep in the known list 37 // before starting to randomly evict them. 38 maxKnownBlocks = 1024 39 40 // maxQueuedTxs is the maximum number of transactions to queue up before dropping 41 // older broadcasts. 42 maxQueuedTxs = 4096 43 44 // maxQueuedTxAnns is the maximum number of transaction announcements to queue up 45 // before dropping older announcements. 46 maxQueuedTxAnns = 4096 47 48 // maxQueuedBlocks is the maximum number of block propagations to queue up before 49 // dropping broadcasts. There's not much point in queueing stale blocks, so a few 50 // that might cover uncles should be enough. 51 maxQueuedBlocks = 4 52 53 // maxQueuedBlockAnns is the maximum number of block announcements to queue up before 54 // dropping broadcasts. Similarly to block propagations, there's no point to queue 55 // above some healthy uncle limit, so use that. 56 maxQueuedBlockAnns = 4 57) 58 59// max is a helper function which returns the larger of the two given integers. 60func max(a, b int) int { 61 if a > b { 62 return a 63 } 64 return b 65} 66 67// Peer is a collection of relevant information we have about a `eth` peer. 68type Peer struct { 69 id string // Unique ID for the peer, cached 70 71 *p2p.Peer // The embedded P2P package peer 72 rw p2p.MsgReadWriter // Input/output streams for snap 73 version uint // Protocol version negotiated 74 75 head common.Hash // Latest advertised head block hash 76 td *big.Int // Latest advertised head block total difficulty 77 78 knownBlocks *knownCache // Set of block hashes known to be known by this peer 79 queuedBlocks chan *blockPropagation // Queue of blocks to broadcast to the peer 80 queuedBlockAnns chan *types.Block // Queue of blocks to announce to the peer 81 82 txpool TxPool // Transaction pool used by the broadcasters for liveness checks 83 knownTxs *knownCache // Set of transaction hashes known to be known by this peer 84 txBroadcast chan []common.Hash // Channel used to queue transaction propagation requests 85 txAnnounce chan []common.Hash // Channel used to queue transaction announcement requests 86 87 reqDispatch chan *request // Dispatch channel to send requests and track then until fulfilment 88 reqCancel chan *cancel // Dispatch channel to cancel pending requests and untrack them 89 resDispatch chan *response // Dispatch channel to fulfil pending requests and untrack them 90 91 term chan struct{} // Termination channel to stop the broadcasters 92 lock sync.RWMutex // Mutex protecting the internal fields 93} 94 95// NewPeer create a wrapper for a network connection and negotiated protocol 96// version. 97func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool) *Peer { 98 peer := &Peer{ 99 id: p.ID().String(), 100 Peer: p, 101 rw: rw, 102 version: version, 103 knownTxs: newKnownCache(maxKnownTxs), 104 knownBlocks: newKnownCache(maxKnownBlocks), 105 queuedBlocks: make(chan *blockPropagation, maxQueuedBlocks), 106 queuedBlockAnns: make(chan *types.Block, maxQueuedBlockAnns), 107 txBroadcast: make(chan []common.Hash), 108 txAnnounce: make(chan []common.Hash), 109 reqDispatch: make(chan *request), 110 reqCancel: make(chan *cancel), 111 resDispatch: make(chan *response), 112 txpool: txpool, 113 term: make(chan struct{}), 114 } 115 // Start up all the broadcasters 116 go peer.broadcastBlocks() 117 go peer.broadcastTransactions() 118 go peer.announceTransactions() 119 go peer.dispatcher() 120 121 return peer 122} 123 124// Close signals the broadcast goroutine to terminate. Only ever call this if 125// you created the peer yourself via NewPeer. Otherwise let whoever created it 126// clean it up! 127func (p *Peer) Close() { 128 close(p.term) 129} 130 131// ID retrieves the peer's unique identifier. 132func (p *Peer) ID() string { 133 return p.id 134} 135 136// Version retrieves the peer's negoatiated `eth` protocol version. 137func (p *Peer) Version() uint { 138 return p.version 139} 140 141// Head retrieves the current head hash and total difficulty of the peer. 142func (p *Peer) Head() (hash common.Hash, td *big.Int) { 143 p.lock.RLock() 144 defer p.lock.RUnlock() 145 146 copy(hash[:], p.head[:]) 147 return hash, new(big.Int).Set(p.td) 148} 149 150// SetHead updates the head hash and total difficulty of the peer. 151func (p *Peer) SetHead(hash common.Hash, td *big.Int) { 152 p.lock.Lock() 153 defer p.lock.Unlock() 154 155 copy(p.head[:], hash[:]) 156 p.td.Set(td) 157} 158 159// KnownBlock returns whether peer is known to already have a block. 160func (p *Peer) KnownBlock(hash common.Hash) bool { 161 return p.knownBlocks.Contains(hash) 162} 163 164// KnownTransaction returns whether peer is known to already have a transaction. 165func (p *Peer) KnownTransaction(hash common.Hash) bool { 166 return p.knownTxs.Contains(hash) 167} 168 169// markBlock marks a block as known for the peer, ensuring that the block will 170// never be propagated to this particular peer. 171func (p *Peer) markBlock(hash common.Hash) { 172 // If we reached the memory allowance, drop a previously known block hash 173 p.knownBlocks.Add(hash) 174} 175 176// markTransaction marks a transaction as known for the peer, ensuring that it 177// will never be propagated to this particular peer. 178func (p *Peer) markTransaction(hash common.Hash) { 179 // If we reached the memory allowance, drop a previously known transaction hash 180 p.knownTxs.Add(hash) 181} 182 183// SendTransactions sends transactions to the peer and includes the hashes 184// in its transaction hash set for future reference. 185// 186// This method is a helper used by the async transaction sender. Don't call it 187// directly as the queueing (memory) and transmission (bandwidth) costs should 188// not be managed directly. 189// 190// The reasons this is public is to allow packages using this protocol to write 191// tests that directly send messages without having to do the asyn queueing. 192func (p *Peer) SendTransactions(txs types.Transactions) error { 193 // Mark all the transactions as known, but ensure we don't overflow our limits 194 for _, tx := range txs { 195 p.knownTxs.Add(tx.Hash()) 196 } 197 return p2p.Send(p.rw, TransactionsMsg, txs) 198} 199 200// AsyncSendTransactions queues a list of transactions (by hash) to eventually 201// propagate to a remote peer. The number of pending sends are capped (new ones 202// will force old sends to be dropped) 203func (p *Peer) AsyncSendTransactions(hashes []common.Hash) { 204 select { 205 case p.txBroadcast <- hashes: 206 // Mark all the transactions as known, but ensure we don't overflow our limits 207 p.knownTxs.Add(hashes...) 208 case <-p.term: 209 p.Log().Debug("Dropping transaction propagation", "count", len(hashes)) 210 } 211} 212 213// sendPooledTransactionHashes sends transaction hashes to the peer and includes 214// them in its transaction hash set for future reference. 215// 216// This method is a helper used by the async transaction announcer. Don't call it 217// directly as the queueing (memory) and transmission (bandwidth) costs should 218// not be managed directly. 219func (p *Peer) sendPooledTransactionHashes(hashes []common.Hash) error { 220 // Mark all the transactions as known, but ensure we don't overflow our limits 221 p.knownTxs.Add(hashes...) 222 return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket(hashes)) 223} 224 225// AsyncSendPooledTransactionHashes queues a list of transactions hashes to eventually 226// announce to a remote peer. The number of pending sends are capped (new ones 227// will force old sends to be dropped) 228func (p *Peer) AsyncSendPooledTransactionHashes(hashes []common.Hash) { 229 select { 230 case p.txAnnounce <- hashes: 231 // Mark all the transactions as known, but ensure we don't overflow our limits 232 p.knownTxs.Add(hashes...) 233 case <-p.term: 234 p.Log().Debug("Dropping transaction announcement", "count", len(hashes)) 235 } 236} 237 238// ReplyPooledTransactionsRLP is the eth/66 version of SendPooledTransactionsRLP. 239func (p *Peer) ReplyPooledTransactionsRLP(id uint64, hashes []common.Hash, txs []rlp.RawValue) error { 240 // Mark all the transactions as known, but ensure we don't overflow our limits 241 p.knownTxs.Add(hashes...) 242 243 // Not packed into PooledTransactionsPacket to avoid RLP decoding 244 return p2p.Send(p.rw, PooledTransactionsMsg, PooledTransactionsRLPPacket66{ 245 RequestId: id, 246 PooledTransactionsRLPPacket: txs, 247 }) 248} 249 250// SendNewBlockHashes announces the availability of a number of blocks through 251// a hash notification. 252func (p *Peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error { 253 // Mark all the block hashes as known, but ensure we don't overflow our limits 254 p.knownBlocks.Add(hashes...) 255 256 request := make(NewBlockHashesPacket, len(hashes)) 257 for i := 0; i < len(hashes); i++ { 258 request[i].Hash = hashes[i] 259 request[i].Number = numbers[i] 260 } 261 return p2p.Send(p.rw, NewBlockHashesMsg, request) 262} 263 264// AsyncSendNewBlockHash queues the availability of a block for propagation to a 265// remote peer. If the peer's broadcast queue is full, the event is silently 266// dropped. 267func (p *Peer) AsyncSendNewBlockHash(block *types.Block) { 268 select { 269 case p.queuedBlockAnns <- block: 270 // Mark all the block hash as known, but ensure we don't overflow our limits 271 p.knownBlocks.Add(block.Hash()) 272 default: 273 p.Log().Debug("Dropping block announcement", "number", block.NumberU64(), "hash", block.Hash()) 274 } 275} 276 277// SendNewBlock propagates an entire block to a remote peer. 278func (p *Peer) SendNewBlock(block *types.Block, td *big.Int) error { 279 // Mark all the block hash as known, but ensure we don't overflow our limits 280 p.knownBlocks.Add(block.Hash()) 281 return p2p.Send(p.rw, NewBlockMsg, &NewBlockPacket{ 282 Block: block, 283 TD: td, 284 }) 285} 286 287// AsyncSendNewBlock queues an entire block for propagation to a remote peer. If 288// the peer's broadcast queue is full, the event is silently dropped. 289func (p *Peer) AsyncSendNewBlock(block *types.Block, td *big.Int) { 290 select { 291 case p.queuedBlocks <- &blockPropagation{block: block, td: td}: 292 // Mark all the block hash as known, but ensure we don't overflow our limits 293 p.knownBlocks.Add(block.Hash()) 294 default: 295 p.Log().Debug("Dropping block propagation", "number", block.NumberU64(), "hash", block.Hash()) 296 } 297} 298 299// ReplyBlockHeaders is the eth/66 version of SendBlockHeaders. 300func (p *Peer) ReplyBlockHeadersRLP(id uint64, headers []rlp.RawValue) error { 301 return p2p.Send(p.rw, BlockHeadersMsg, BlockHeadersRLPPacket66{ 302 RequestId: id, 303 BlockHeadersRLPPacket: headers, 304 }) 305} 306 307// ReplyBlockBodiesRLP is the eth/66 version of SendBlockBodiesRLP. 308func (p *Peer) ReplyBlockBodiesRLP(id uint64, bodies []rlp.RawValue) error { 309 // Not packed into BlockBodiesPacket to avoid RLP decoding 310 return p2p.Send(p.rw, BlockBodiesMsg, BlockBodiesRLPPacket66{ 311 RequestId: id, 312 BlockBodiesRLPPacket: bodies, 313 }) 314} 315 316// ReplyNodeData is the eth/66 response to GetNodeData. 317func (p *Peer) ReplyNodeData(id uint64, data [][]byte) error { 318 return p2p.Send(p.rw, NodeDataMsg, NodeDataPacket66{ 319 RequestId: id, 320 NodeDataPacket: data, 321 }) 322} 323 324// ReplyReceiptsRLP is the eth/66 response to GetReceipts. 325func (p *Peer) ReplyReceiptsRLP(id uint64, receipts []rlp.RawValue) error { 326 return p2p.Send(p.rw, ReceiptsMsg, ReceiptsRLPPacket66{ 327 RequestId: id, 328 ReceiptsRLPPacket: receipts, 329 }) 330} 331 332// RequestOneHeader is a wrapper around the header query functions to fetch a 333// single header. It is used solely by the fetcher. 334func (p *Peer) RequestOneHeader(hash common.Hash, sink chan *Response) (*Request, error) { 335 p.Log().Debug("Fetching single header", "hash", hash) 336 id := rand.Uint64() 337 338 req := &Request{ 339 id: id, 340 sink: sink, 341 code: GetBlockHeadersMsg, 342 want: BlockHeadersMsg, 343 data: &GetBlockHeadersPacket66{ 344 RequestId: id, 345 GetBlockHeadersPacket: &GetBlockHeadersPacket{ 346 Origin: HashOrNumber{Hash: hash}, 347 Amount: uint64(1), 348 Skip: uint64(0), 349 Reverse: false, 350 }, 351 }, 352 } 353 if err := p.dispatchRequest(req); err != nil { 354 return nil, err 355 } 356 return req, nil 357} 358 359// RequestHeadersByHash fetches a batch of blocks' headers corresponding to the 360// specified header query, based on the hash of an origin block. 361func (p *Peer) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool, sink chan *Response) (*Request, error) { 362 p.Log().Debug("Fetching batch of headers", "count", amount, "fromhash", origin, "skip", skip, "reverse", reverse) 363 id := rand.Uint64() 364 365 req := &Request{ 366 id: id, 367 sink: sink, 368 code: GetBlockHeadersMsg, 369 want: BlockHeadersMsg, 370 data: &GetBlockHeadersPacket66{ 371 RequestId: id, 372 GetBlockHeadersPacket: &GetBlockHeadersPacket{ 373 Origin: HashOrNumber{Hash: origin}, 374 Amount: uint64(amount), 375 Skip: uint64(skip), 376 Reverse: reverse, 377 }, 378 }, 379 } 380 if err := p.dispatchRequest(req); err != nil { 381 return nil, err 382 } 383 return req, nil 384} 385 386// RequestHeadersByNumber fetches a batch of blocks' headers corresponding to the 387// specified header query, based on the number of an origin block. 388func (p *Peer) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool, sink chan *Response) (*Request, error) { 389 p.Log().Debug("Fetching batch of headers", "count", amount, "fromnum", origin, "skip", skip, "reverse", reverse) 390 id := rand.Uint64() 391 392 req := &Request{ 393 id: id, 394 sink: sink, 395 code: GetBlockHeadersMsg, 396 want: BlockHeadersMsg, 397 data: &GetBlockHeadersPacket66{ 398 RequestId: id, 399 GetBlockHeadersPacket: &GetBlockHeadersPacket{ 400 Origin: HashOrNumber{Number: origin}, 401 Amount: uint64(amount), 402 Skip: uint64(skip), 403 Reverse: reverse, 404 }, 405 }, 406 } 407 if err := p.dispatchRequest(req); err != nil { 408 return nil, err 409 } 410 return req, nil 411} 412 413// RequestBodies fetches a batch of blocks' bodies corresponding to the hashes 414// specified. 415func (p *Peer) RequestBodies(hashes []common.Hash, sink chan *Response) (*Request, error) { 416 p.Log().Debug("Fetching batch of block bodies", "count", len(hashes)) 417 id := rand.Uint64() 418 419 req := &Request{ 420 id: id, 421 sink: sink, 422 code: GetBlockBodiesMsg, 423 want: BlockBodiesMsg, 424 data: &GetBlockBodiesPacket66{ 425 RequestId: id, 426 GetBlockBodiesPacket: hashes, 427 }, 428 } 429 if err := p.dispatchRequest(req); err != nil { 430 return nil, err 431 } 432 return req, nil 433} 434 435// RequestNodeData fetches a batch of arbitrary data from a node's known state 436// data, corresponding to the specified hashes. 437func (p *Peer) RequestNodeData(hashes []common.Hash, sink chan *Response) (*Request, error) { 438 p.Log().Debug("Fetching batch of state data", "count", len(hashes)) 439 id := rand.Uint64() 440 441 req := &Request{ 442 id: id, 443 sink: sink, 444 code: GetNodeDataMsg, 445 want: NodeDataMsg, 446 data: &GetNodeDataPacket66{ 447 RequestId: id, 448 GetNodeDataPacket: hashes, 449 }, 450 } 451 if err := p.dispatchRequest(req); err != nil { 452 return nil, err 453 } 454 return req, nil 455} 456 457// RequestReceipts fetches a batch of transaction receipts from a remote node. 458func (p *Peer) RequestReceipts(hashes []common.Hash, sink chan *Response) (*Request, error) { 459 p.Log().Debug("Fetching batch of receipts", "count", len(hashes)) 460 id := rand.Uint64() 461 462 req := &Request{ 463 id: id, 464 sink: sink, 465 code: GetReceiptsMsg, 466 want: ReceiptsMsg, 467 data: &GetReceiptsPacket66{ 468 RequestId: id, 469 GetReceiptsPacket: hashes, 470 }, 471 } 472 if err := p.dispatchRequest(req); err != nil { 473 return nil, err 474 } 475 return req, nil 476} 477 478// RequestTxs fetches a batch of transactions from a remote node. 479func (p *Peer) RequestTxs(hashes []common.Hash) error { 480 p.Log().Debug("Fetching batch of transactions", "count", len(hashes)) 481 id := rand.Uint64() 482 483 requestTracker.Track(p.id, p.version, GetPooledTransactionsMsg, PooledTransactionsMsg, id) 484 return p2p.Send(p.rw, GetPooledTransactionsMsg, &GetPooledTransactionsPacket66{ 485 RequestId: id, 486 GetPooledTransactionsPacket: hashes, 487 }) 488} 489 490// knownCache is a cache for known hashes. 491type knownCache struct { 492 hashes mapset.Set 493 max int 494} 495 496// newKnownCache creates a new knownCache with a max capacity. 497func newKnownCache(max int) *knownCache { 498 return &knownCache{ 499 max: max, 500 hashes: mapset.NewSet(), 501 } 502} 503 504// Add adds a list of elements to the set. 505func (k *knownCache) Add(hashes ...common.Hash) { 506 for k.hashes.Cardinality() > max(0, k.max-len(hashes)) { 507 k.hashes.Pop() 508 } 509 for _, hash := range hashes { 510 k.hashes.Add(hash) 511 } 512} 513 514// Contains returns whether the given item is in the set. 515func (k *knownCache) Contains(hash common.Hash) bool { 516 return k.hashes.Contains(hash) 517} 518 519// Cardinality returns the number of elements in the set. 520func (k *knownCache) Cardinality() int { 521 return k.hashes.Cardinality() 522} 523