1// Copyright 2019 The go-ethereum Authors
2// This file is part of the go-ethereum library.
3//
4// The go-ethereum library is free software: you can redistribute it and/or modify
5// it under the terms of the GNU Lesser General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8//
9// The go-ethereum library is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12// GNU Lesser General Public License for more details.
13//
14// You should have received a copy of the GNU Lesser General Public License
15// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
16
17package eth
18
19import (
20	"math/big"
21
22	"github.com/ethereum/go-ethereum/common"
23	"github.com/ethereum/go-ethereum/core/types"
24)
25
26const (
27	// This is the target size for the packs of transactions or announcements. A
28	// pack can get larger than this if a single transactions exceeds this size.
29	maxTxPacketSize = 100 * 1024
30)
31
32// blockPropagation is a block propagation event, waiting for its turn in the
33// broadcast queue.
34type blockPropagation struct {
35	block *types.Block
36	td    *big.Int
37}
38
39// broadcastBlocks is a write loop that multiplexes blocks and block accouncements
40// to the remote peer. The goal is to have an async writer that does not lock up
41// node internals and at the same time rate limits queued data.
42func (p *Peer) broadcastBlocks() {
43	for {
44		select {
45		case prop := <-p.queuedBlocks:
46			if err := p.SendNewBlock(prop.block, prop.td); err != nil {
47				return
48			}
49			p.Log().Trace("Propagated block", "number", prop.block.Number(), "hash", prop.block.Hash(), "td", prop.td)
50
51		case block := <-p.queuedBlockAnns:
52			if err := p.SendNewBlockHashes([]common.Hash{block.Hash()}, []uint64{block.NumberU64()}); err != nil {
53				return
54			}
55			p.Log().Trace("Announced block", "number", block.Number(), "hash", block.Hash())
56
57		case <-p.term:
58			return
59		}
60	}
61}
62
63// broadcastTransactions is a write loop that schedules transaction broadcasts
64// to the remote peer. The goal is to have an async writer that does not lock up
65// node internals and at the same time rate limits queued data.
66func (p *Peer) broadcastTransactions() {
67	var (
68		queue  []common.Hash         // Queue of hashes to broadcast as full transactions
69		done   chan struct{}         // Non-nil if background broadcaster is running
70		fail   = make(chan error, 1) // Channel used to receive network error
71		failed bool                  // Flag whether a send failed, discard everything onward
72	)
73	for {
74		// If there's no in-flight broadcast running, check if a new one is needed
75		if done == nil && len(queue) > 0 {
76			// Pile transaction until we reach our allowed network limit
77			var (
78				hashesCount uint64
79				txs         []*types.Transaction
80				size        common.StorageSize
81			)
82			for i := 0; i < len(queue) && size < maxTxPacketSize; i++ {
83				if tx := p.txpool.Get(queue[i]); tx != nil {
84					txs = append(txs, tx)
85					size += tx.Size()
86				}
87				hashesCount++
88			}
89			queue = queue[:copy(queue, queue[hashesCount:])]
90
91			// If there's anything available to transfer, fire up an async writer
92			if len(txs) > 0 {
93				done = make(chan struct{})
94				go func() {
95					if err := p.SendTransactions(txs); err != nil {
96						fail <- err
97						return
98					}
99					close(done)
100					p.Log().Trace("Sent transactions", "count", len(txs))
101				}()
102			}
103		}
104		// Transfer goroutine may or may not have been started, listen for events
105		select {
106		case hashes := <-p.txBroadcast:
107			// If the connection failed, discard all transaction events
108			if failed {
109				continue
110			}
111			// New batch of transactions to be broadcast, queue them (with cap)
112			queue = append(queue, hashes...)
113			if len(queue) > maxQueuedTxs {
114				// Fancy copy and resize to ensure buffer doesn't grow indefinitely
115				queue = queue[:copy(queue, queue[len(queue)-maxQueuedTxs:])]
116			}
117
118		case <-done:
119			done = nil
120
121		case <-fail:
122			failed = true
123
124		case <-p.term:
125			return
126		}
127	}
128}
129
130// announceTransactions is a write loop that schedules transaction broadcasts
131// to the remote peer. The goal is to have an async writer that does not lock up
132// node internals and at the same time rate limits queued data.
133func (p *Peer) announceTransactions() {
134	var (
135		queue  []common.Hash         // Queue of hashes to announce as transaction stubs
136		done   chan struct{}         // Non-nil if background announcer is running
137		fail   = make(chan error, 1) // Channel used to receive network error
138		failed bool                  // Flag whether a send failed, discard everything onward
139	)
140	for {
141		// If there's no in-flight announce running, check if a new one is needed
142		if done == nil && len(queue) > 0 {
143			// Pile transaction hashes until we reach our allowed network limit
144			var (
145				count   int
146				pending []common.Hash
147				size    common.StorageSize
148			)
149			for count = 0; count < len(queue) && size < maxTxPacketSize; count++ {
150				if p.txpool.Get(queue[count]) != nil {
151					pending = append(pending, queue[count])
152					size += common.HashLength
153				}
154			}
155			// Shift and trim queue
156			queue = queue[:copy(queue, queue[count:])]
157
158			// If there's anything available to transfer, fire up an async writer
159			if len(pending) > 0 {
160				done = make(chan struct{})
161				go func() {
162					if err := p.sendPooledTransactionHashes(pending); err != nil {
163						fail <- err
164						return
165					}
166					close(done)
167					p.Log().Trace("Sent transaction announcements", "count", len(pending))
168				}()
169			}
170		}
171		// Transfer goroutine may or may not have been started, listen for events
172		select {
173		case hashes := <-p.txAnnounce:
174			// If the connection failed, discard all transaction events
175			if failed {
176				continue
177			}
178			// New batch of transactions to be broadcast, queue them (with cap)
179			queue = append(queue, hashes...)
180			if len(queue) > maxQueuedTxAnns {
181				// Fancy copy and resize to ensure buffer doesn't grow indefinitely
182				queue = queue[:copy(queue, queue[len(queue)-maxQueuedTxAnns:])]
183			}
184
185		case <-done:
186			done = nil
187
188		case <-fail:
189			failed = true
190
191		case <-p.term:
192			return
193		}
194	}
195}
196