1// Copyright 2020 The go-ethereum Authors
2// This file is part of the go-ethereum library.
3//
4// The go-ethereum library is free software: you can redistribute it and/or modify
5// it under the terms of the GNU Lesser General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8//
9// The go-ethereum library is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12// GNU Lesser General Public License for more details.
13//
14// You should have received a copy of the GNU Lesser General Public License
15// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
16
17package eth
18
19import (
20	"math/big"
21	"math/rand"
22	"sync"
23
24	mapset "github.com/deckarep/golang-set"
25	"github.com/ethereum/go-ethereum/common"
26	"github.com/ethereum/go-ethereum/core/types"
27	"github.com/ethereum/go-ethereum/p2p"
28	"github.com/ethereum/go-ethereum/rlp"
29)
30
31const (
32	// maxKnownTxs is the maximum transactions hashes to keep in the known list
33	// before starting to randomly evict them.
34	maxKnownTxs = 32768
35
36	// maxKnownBlocks is the maximum block hashes to keep in the known list
37	// before starting to randomly evict them.
38	maxKnownBlocks = 1024
39
40	// maxQueuedTxs is the maximum number of transactions to queue up before dropping
41	// older broadcasts.
42	maxQueuedTxs = 4096
43
44	// maxQueuedTxAnns is the maximum number of transaction announcements to queue up
45	// before dropping older announcements.
46	maxQueuedTxAnns = 4096
47
48	// maxQueuedBlocks is the maximum number of block propagations to queue up before
49	// dropping broadcasts. There's not much point in queueing stale blocks, so a few
50	// that might cover uncles should be enough.
51	maxQueuedBlocks = 4
52
53	// maxQueuedBlockAnns is the maximum number of block announcements to queue up before
54	// dropping broadcasts. Similarly to block propagations, there's no point to queue
55	// above some healthy uncle limit, so use that.
56	maxQueuedBlockAnns = 4
57)
58
59// max is a helper function which returns the larger of the two given integers.
60func max(a, b int) int {
61	if a > b {
62		return a
63	}
64	return b
65}
66
67// Peer is a collection of relevant information we have about a `eth` peer.
68type Peer struct {
69	id string // Unique ID for the peer, cached
70
71	*p2p.Peer                   // The embedded P2P package peer
72	rw        p2p.MsgReadWriter // Input/output streams for snap
73	version   uint              // Protocol version negotiated
74
75	head common.Hash // Latest advertised head block hash
76	td   *big.Int    // Latest advertised head block total difficulty
77
78	knownBlocks     *knownCache            // Set of block hashes known to be known by this peer
79	queuedBlocks    chan *blockPropagation // Queue of blocks to broadcast to the peer
80	queuedBlockAnns chan *types.Block      // Queue of blocks to announce to the peer
81
82	txpool      TxPool             // Transaction pool used by the broadcasters for liveness checks
83	knownTxs    *knownCache        // Set of transaction hashes known to be known by this peer
84	txBroadcast chan []common.Hash // Channel used to queue transaction propagation requests
85	txAnnounce  chan []common.Hash // Channel used to queue transaction announcement requests
86
87	reqDispatch chan *request  // Dispatch channel to send requests and track then until fulfilment
88	reqCancel   chan *cancel   // Dispatch channel to cancel pending requests and untrack them
89	resDispatch chan *response // Dispatch channel to fulfil pending requests and untrack them
90
91	term chan struct{} // Termination channel to stop the broadcasters
92	lock sync.RWMutex  // Mutex protecting the internal fields
93}
94
95// NewPeer create a wrapper for a network connection and negotiated  protocol
96// version.
97func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool) *Peer {
98	peer := &Peer{
99		id:              p.ID().String(),
100		Peer:            p,
101		rw:              rw,
102		version:         version,
103		knownTxs:        newKnownCache(maxKnownTxs),
104		knownBlocks:     newKnownCache(maxKnownBlocks),
105		queuedBlocks:    make(chan *blockPropagation, maxQueuedBlocks),
106		queuedBlockAnns: make(chan *types.Block, maxQueuedBlockAnns),
107		txBroadcast:     make(chan []common.Hash),
108		txAnnounce:      make(chan []common.Hash),
109		reqDispatch:     make(chan *request),
110		reqCancel:       make(chan *cancel),
111		resDispatch:     make(chan *response),
112		txpool:          txpool,
113		term:            make(chan struct{}),
114	}
115	// Start up all the broadcasters
116	go peer.broadcastBlocks()
117	go peer.broadcastTransactions()
118	go peer.announceTransactions()
119	go peer.dispatcher()
120
121	return peer
122}
123
124// Close signals the broadcast goroutine to terminate. Only ever call this if
125// you created the peer yourself via NewPeer. Otherwise let whoever created it
126// clean it up!
127func (p *Peer) Close() {
128	close(p.term)
129}
130
131// ID retrieves the peer's unique identifier.
132func (p *Peer) ID() string {
133	return p.id
134}
135
136// Version retrieves the peer's negoatiated `eth` protocol version.
137func (p *Peer) Version() uint {
138	return p.version
139}
140
141// Head retrieves the current head hash and total difficulty of the peer.
142func (p *Peer) Head() (hash common.Hash, td *big.Int) {
143	p.lock.RLock()
144	defer p.lock.RUnlock()
145
146	copy(hash[:], p.head[:])
147	return hash, new(big.Int).Set(p.td)
148}
149
150// SetHead updates the head hash and total difficulty of the peer.
151func (p *Peer) SetHead(hash common.Hash, td *big.Int) {
152	p.lock.Lock()
153	defer p.lock.Unlock()
154
155	copy(p.head[:], hash[:])
156	p.td.Set(td)
157}
158
159// KnownBlock returns whether peer is known to already have a block.
160func (p *Peer) KnownBlock(hash common.Hash) bool {
161	return p.knownBlocks.Contains(hash)
162}
163
164// KnownTransaction returns whether peer is known to already have a transaction.
165func (p *Peer) KnownTransaction(hash common.Hash) bool {
166	return p.knownTxs.Contains(hash)
167}
168
169// markBlock marks a block as known for the peer, ensuring that the block will
170// never be propagated to this particular peer.
171func (p *Peer) markBlock(hash common.Hash) {
172	// If we reached the memory allowance, drop a previously known block hash
173	p.knownBlocks.Add(hash)
174}
175
176// markTransaction marks a transaction as known for the peer, ensuring that it
177// will never be propagated to this particular peer.
178func (p *Peer) markTransaction(hash common.Hash) {
179	// If we reached the memory allowance, drop a previously known transaction hash
180	p.knownTxs.Add(hash)
181}
182
183// SendTransactions sends transactions to the peer and includes the hashes
184// in its transaction hash set for future reference.
185//
186// This method is a helper used by the async transaction sender. Don't call it
187// directly as the queueing (memory) and transmission (bandwidth) costs should
188// not be managed directly.
189//
190// The reasons this is public is to allow packages using this protocol to write
191// tests that directly send messages without having to do the asyn queueing.
192func (p *Peer) SendTransactions(txs types.Transactions) error {
193	// Mark all the transactions as known, but ensure we don't overflow our limits
194	for _, tx := range txs {
195		p.knownTxs.Add(tx.Hash())
196	}
197	return p2p.Send(p.rw, TransactionsMsg, txs)
198}
199
200// AsyncSendTransactions queues a list of transactions (by hash) to eventually
201// propagate to a remote peer. The number of pending sends are capped (new ones
202// will force old sends to be dropped)
203func (p *Peer) AsyncSendTransactions(hashes []common.Hash) {
204	select {
205	case p.txBroadcast <- hashes:
206		// Mark all the transactions as known, but ensure we don't overflow our limits
207		p.knownTxs.Add(hashes...)
208	case <-p.term:
209		p.Log().Debug("Dropping transaction propagation", "count", len(hashes))
210	}
211}
212
213// sendPooledTransactionHashes sends transaction hashes to the peer and includes
214// them in its transaction hash set for future reference.
215//
216// This method is a helper used by the async transaction announcer. Don't call it
217// directly as the queueing (memory) and transmission (bandwidth) costs should
218// not be managed directly.
219func (p *Peer) sendPooledTransactionHashes(hashes []common.Hash) error {
220	// Mark all the transactions as known, but ensure we don't overflow our limits
221	p.knownTxs.Add(hashes...)
222	return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket(hashes))
223}
224
225// AsyncSendPooledTransactionHashes queues a list of transactions hashes to eventually
226// announce to a remote peer.  The number of pending sends are capped (new ones
227// will force old sends to be dropped)
228func (p *Peer) AsyncSendPooledTransactionHashes(hashes []common.Hash) {
229	select {
230	case p.txAnnounce <- hashes:
231		// Mark all the transactions as known, but ensure we don't overflow our limits
232		p.knownTxs.Add(hashes...)
233	case <-p.term:
234		p.Log().Debug("Dropping transaction announcement", "count", len(hashes))
235	}
236}
237
238// ReplyPooledTransactionsRLP is the eth/66 version of SendPooledTransactionsRLP.
239func (p *Peer) ReplyPooledTransactionsRLP(id uint64, hashes []common.Hash, txs []rlp.RawValue) error {
240	// Mark all the transactions as known, but ensure we don't overflow our limits
241	p.knownTxs.Add(hashes...)
242
243	// Not packed into PooledTransactionsPacket to avoid RLP decoding
244	return p2p.Send(p.rw, PooledTransactionsMsg, PooledTransactionsRLPPacket66{
245		RequestId:                   id,
246		PooledTransactionsRLPPacket: txs,
247	})
248}
249
250// SendNewBlockHashes announces the availability of a number of blocks through
251// a hash notification.
252func (p *Peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error {
253	// Mark all the block hashes as known, but ensure we don't overflow our limits
254	p.knownBlocks.Add(hashes...)
255
256	request := make(NewBlockHashesPacket, len(hashes))
257	for i := 0; i < len(hashes); i++ {
258		request[i].Hash = hashes[i]
259		request[i].Number = numbers[i]
260	}
261	return p2p.Send(p.rw, NewBlockHashesMsg, request)
262}
263
264// AsyncSendNewBlockHash queues the availability of a block for propagation to a
265// remote peer. If the peer's broadcast queue is full, the event is silently
266// dropped.
267func (p *Peer) AsyncSendNewBlockHash(block *types.Block) {
268	select {
269	case p.queuedBlockAnns <- block:
270		// Mark all the block hash as known, but ensure we don't overflow our limits
271		p.knownBlocks.Add(block.Hash())
272	default:
273		p.Log().Debug("Dropping block announcement", "number", block.NumberU64(), "hash", block.Hash())
274	}
275}
276
277// SendNewBlock propagates an entire block to a remote peer.
278func (p *Peer) SendNewBlock(block *types.Block, td *big.Int) error {
279	// Mark all the block hash as known, but ensure we don't overflow our limits
280	p.knownBlocks.Add(block.Hash())
281	return p2p.Send(p.rw, NewBlockMsg, &NewBlockPacket{
282		Block: block,
283		TD:    td,
284	})
285}
286
287// AsyncSendNewBlock queues an entire block for propagation to a remote peer. If
288// the peer's broadcast queue is full, the event is silently dropped.
289func (p *Peer) AsyncSendNewBlock(block *types.Block, td *big.Int) {
290	select {
291	case p.queuedBlocks <- &blockPropagation{block: block, td: td}:
292		// Mark all the block hash as known, but ensure we don't overflow our limits
293		p.knownBlocks.Add(block.Hash())
294	default:
295		p.Log().Debug("Dropping block propagation", "number", block.NumberU64(), "hash", block.Hash())
296	}
297}
298
299// ReplyBlockHeaders is the eth/66 version of SendBlockHeaders.
300func (p *Peer) ReplyBlockHeadersRLP(id uint64, headers []rlp.RawValue) error {
301	return p2p.Send(p.rw, BlockHeadersMsg, BlockHeadersRLPPacket66{
302		RequestId:             id,
303		BlockHeadersRLPPacket: headers,
304	})
305}
306
307// ReplyBlockBodiesRLP is the eth/66 version of SendBlockBodiesRLP.
308func (p *Peer) ReplyBlockBodiesRLP(id uint64, bodies []rlp.RawValue) error {
309	// Not packed into BlockBodiesPacket to avoid RLP decoding
310	return p2p.Send(p.rw, BlockBodiesMsg, BlockBodiesRLPPacket66{
311		RequestId:            id,
312		BlockBodiesRLPPacket: bodies,
313	})
314}
315
316// ReplyNodeData is the eth/66 response to GetNodeData.
317func (p *Peer) ReplyNodeData(id uint64, data [][]byte) error {
318	return p2p.Send(p.rw, NodeDataMsg, NodeDataPacket66{
319		RequestId:      id,
320		NodeDataPacket: data,
321	})
322}
323
324// ReplyReceiptsRLP is the eth/66 response to GetReceipts.
325func (p *Peer) ReplyReceiptsRLP(id uint64, receipts []rlp.RawValue) error {
326	return p2p.Send(p.rw, ReceiptsMsg, ReceiptsRLPPacket66{
327		RequestId:         id,
328		ReceiptsRLPPacket: receipts,
329	})
330}
331
332// RequestOneHeader is a wrapper around the header query functions to fetch a
333// single header. It is used solely by the fetcher.
334func (p *Peer) RequestOneHeader(hash common.Hash, sink chan *Response) (*Request, error) {
335	p.Log().Debug("Fetching single header", "hash", hash)
336	id := rand.Uint64()
337
338	req := &Request{
339		id:   id,
340		sink: sink,
341		code: GetBlockHeadersMsg,
342		want: BlockHeadersMsg,
343		data: &GetBlockHeadersPacket66{
344			RequestId: id,
345			GetBlockHeadersPacket: &GetBlockHeadersPacket{
346				Origin:  HashOrNumber{Hash: hash},
347				Amount:  uint64(1),
348				Skip:    uint64(0),
349				Reverse: false,
350			},
351		},
352	}
353	if err := p.dispatchRequest(req); err != nil {
354		return nil, err
355	}
356	return req, nil
357}
358
359// RequestHeadersByHash fetches a batch of blocks' headers corresponding to the
360// specified header query, based on the hash of an origin block.
361func (p *Peer) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool, sink chan *Response) (*Request, error) {
362	p.Log().Debug("Fetching batch of headers", "count", amount, "fromhash", origin, "skip", skip, "reverse", reverse)
363	id := rand.Uint64()
364
365	req := &Request{
366		id:   id,
367		sink: sink,
368		code: GetBlockHeadersMsg,
369		want: BlockHeadersMsg,
370		data: &GetBlockHeadersPacket66{
371			RequestId: id,
372			GetBlockHeadersPacket: &GetBlockHeadersPacket{
373				Origin:  HashOrNumber{Hash: origin},
374				Amount:  uint64(amount),
375				Skip:    uint64(skip),
376				Reverse: reverse,
377			},
378		},
379	}
380	if err := p.dispatchRequest(req); err != nil {
381		return nil, err
382	}
383	return req, nil
384}
385
386// RequestHeadersByNumber fetches a batch of blocks' headers corresponding to the
387// specified header query, based on the number of an origin block.
388func (p *Peer) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool, sink chan *Response) (*Request, error) {
389	p.Log().Debug("Fetching batch of headers", "count", amount, "fromnum", origin, "skip", skip, "reverse", reverse)
390	id := rand.Uint64()
391
392	req := &Request{
393		id:   id,
394		sink: sink,
395		code: GetBlockHeadersMsg,
396		want: BlockHeadersMsg,
397		data: &GetBlockHeadersPacket66{
398			RequestId: id,
399			GetBlockHeadersPacket: &GetBlockHeadersPacket{
400				Origin:  HashOrNumber{Number: origin},
401				Amount:  uint64(amount),
402				Skip:    uint64(skip),
403				Reverse: reverse,
404			},
405		},
406	}
407	if err := p.dispatchRequest(req); err != nil {
408		return nil, err
409	}
410	return req, nil
411}
412
413// RequestBodies fetches a batch of blocks' bodies corresponding to the hashes
414// specified.
415func (p *Peer) RequestBodies(hashes []common.Hash, sink chan *Response) (*Request, error) {
416	p.Log().Debug("Fetching batch of block bodies", "count", len(hashes))
417	id := rand.Uint64()
418
419	req := &Request{
420		id:   id,
421		sink: sink,
422		code: GetBlockBodiesMsg,
423		want: BlockBodiesMsg,
424		data: &GetBlockBodiesPacket66{
425			RequestId:            id,
426			GetBlockBodiesPacket: hashes,
427		},
428	}
429	if err := p.dispatchRequest(req); err != nil {
430		return nil, err
431	}
432	return req, nil
433}
434
435// RequestNodeData fetches a batch of arbitrary data from a node's known state
436// data, corresponding to the specified hashes.
437func (p *Peer) RequestNodeData(hashes []common.Hash, sink chan *Response) (*Request, error) {
438	p.Log().Debug("Fetching batch of state data", "count", len(hashes))
439	id := rand.Uint64()
440
441	req := &Request{
442		id:   id,
443		sink: sink,
444		code: GetNodeDataMsg,
445		want: NodeDataMsg,
446		data: &GetNodeDataPacket66{
447			RequestId:         id,
448			GetNodeDataPacket: hashes,
449		},
450	}
451	if err := p.dispatchRequest(req); err != nil {
452		return nil, err
453	}
454	return req, nil
455}
456
457// RequestReceipts fetches a batch of transaction receipts from a remote node.
458func (p *Peer) RequestReceipts(hashes []common.Hash, sink chan *Response) (*Request, error) {
459	p.Log().Debug("Fetching batch of receipts", "count", len(hashes))
460	id := rand.Uint64()
461
462	req := &Request{
463		id:   id,
464		sink: sink,
465		code: GetReceiptsMsg,
466		want: ReceiptsMsg,
467		data: &GetReceiptsPacket66{
468			RequestId:         id,
469			GetReceiptsPacket: hashes,
470		},
471	}
472	if err := p.dispatchRequest(req); err != nil {
473		return nil, err
474	}
475	return req, nil
476}
477
478// RequestTxs fetches a batch of transactions from a remote node.
479func (p *Peer) RequestTxs(hashes []common.Hash) error {
480	p.Log().Debug("Fetching batch of transactions", "count", len(hashes))
481	id := rand.Uint64()
482
483	requestTracker.Track(p.id, p.version, GetPooledTransactionsMsg, PooledTransactionsMsg, id)
484	return p2p.Send(p.rw, GetPooledTransactionsMsg, &GetPooledTransactionsPacket66{
485		RequestId:                   id,
486		GetPooledTransactionsPacket: hashes,
487	})
488}
489
490// knownCache is a cache for known hashes.
491type knownCache struct {
492	hashes mapset.Set
493	max    int
494}
495
496// newKnownCache creates a new knownCache with a max capacity.
497func newKnownCache(max int) *knownCache {
498	return &knownCache{
499		max:    max,
500		hashes: mapset.NewSet(),
501	}
502}
503
504// Add adds a list of elements to the set.
505func (k *knownCache) Add(hashes ...common.Hash) {
506	for k.hashes.Cardinality() > max(0, k.max-len(hashes)) {
507		k.hashes.Pop()
508	}
509	for _, hash := range hashes {
510		k.hashes.Add(hash)
511	}
512}
513
514// Contains returns whether the given item is in the set.
515func (k *knownCache) Contains(hash common.Hash) bool {
516	return k.hashes.Contains(hash)
517}
518
519// Cardinality returns the number of elements in the set.
520func (k *knownCache) Cardinality() int {
521	return k.hashes.Cardinality()
522}
523