1// Copyright 2019 The go-ethereum Authors 2// This file is part of the go-ethereum library. 3// 4// The go-ethereum library is free software: you can redistribute it and/or modify 5// it under the terms of the GNU Lesser General Public License as published by 6// the Free Software Foundation, either version 3 of the License, or 7// (at your option) any later version. 8// 9// The go-ethereum library is distributed in the hope that it will be useful, 10// but WITHOUT ANY WARRANTY; without even the implied warranty of 11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12// GNU Lesser General Public License for more details. 13// 14// You should have received a copy of the GNU Lesser General Public License 15// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. 16 17package eth 18 19import ( 20 "math/big" 21 22 "github.com/ethereum/go-ethereum/common" 23 "github.com/ethereum/go-ethereum/core/types" 24) 25 26const ( 27 // This is the target size for the packs of transactions or announcements. A 28 // pack can get larger than this if a single transactions exceeds this size. 29 maxTxPacketSize = 100 * 1024 30) 31 32// blockPropagation is a block propagation event, waiting for its turn in the 33// broadcast queue. 34type blockPropagation struct { 35 block *types.Block 36 td *big.Int 37} 38 39// broadcastBlocks is a write loop that multiplexes blocks and block accouncements 40// to the remote peer. The goal is to have an async writer that does not lock up 41// node internals and at the same time rate limits queued data. 42func (p *Peer) broadcastBlocks() { 43 for { 44 select { 45 case prop := <-p.queuedBlocks: 46 if err := p.SendNewBlock(prop.block, prop.td); err != nil { 47 return 48 } 49 p.Log().Trace("Propagated block", "number", prop.block.Number(), "hash", prop.block.Hash(), "td", prop.td) 50 51 case block := <-p.queuedBlockAnns: 52 if err := p.SendNewBlockHashes([]common.Hash{block.Hash()}, []uint64{block.NumberU64()}); err != nil { 53 return 54 } 55 p.Log().Trace("Announced block", "number", block.Number(), "hash", block.Hash()) 56 57 case <-p.term: 58 return 59 } 60 } 61} 62 63// broadcastTransactions is a write loop that schedules transaction broadcasts 64// to the remote peer. The goal is to have an async writer that does not lock up 65// node internals and at the same time rate limits queued data. 66func (p *Peer) broadcastTransactions() { 67 var ( 68 queue []common.Hash // Queue of hashes to broadcast as full transactions 69 done chan struct{} // Non-nil if background broadcaster is running 70 fail = make(chan error, 1) // Channel used to receive network error 71 failed bool // Flag whether a send failed, discard everything onward 72 ) 73 for { 74 // If there's no in-flight broadcast running, check if a new one is needed 75 if done == nil && len(queue) > 0 { 76 // Pile transaction until we reach our allowed network limit 77 var ( 78 hashesCount uint64 79 txs []*types.Transaction 80 size common.StorageSize 81 ) 82 for i := 0; i < len(queue) && size < maxTxPacketSize; i++ { 83 if tx := p.txpool.Get(queue[i]); tx != nil { 84 txs = append(txs, tx) 85 size += tx.Size() 86 } 87 hashesCount++ 88 } 89 queue = queue[:copy(queue, queue[hashesCount:])] 90 91 // If there's anything available to transfer, fire up an async writer 92 if len(txs) > 0 { 93 done = make(chan struct{}) 94 go func() { 95 if err := p.SendTransactions(txs); err != nil { 96 fail <- err 97 return 98 } 99 close(done) 100 p.Log().Trace("Sent transactions", "count", len(txs)) 101 }() 102 } 103 } 104 // Transfer goroutine may or may not have been started, listen for events 105 select { 106 case hashes := <-p.txBroadcast: 107 // If the connection failed, discard all transaction events 108 if failed { 109 continue 110 } 111 // New batch of transactions to be broadcast, queue them (with cap) 112 queue = append(queue, hashes...) 113 if len(queue) > maxQueuedTxs { 114 // Fancy copy and resize to ensure buffer doesn't grow indefinitely 115 queue = queue[:copy(queue, queue[len(queue)-maxQueuedTxs:])] 116 } 117 118 case <-done: 119 done = nil 120 121 case <-fail: 122 failed = true 123 124 case <-p.term: 125 return 126 } 127 } 128} 129 130// announceTransactions is a write loop that schedules transaction broadcasts 131// to the remote peer. The goal is to have an async writer that does not lock up 132// node internals and at the same time rate limits queued data. 133func (p *Peer) announceTransactions() { 134 var ( 135 queue []common.Hash // Queue of hashes to announce as transaction stubs 136 done chan struct{} // Non-nil if background announcer is running 137 fail = make(chan error, 1) // Channel used to receive network error 138 failed bool // Flag whether a send failed, discard everything onward 139 ) 140 for { 141 // If there's no in-flight announce running, check if a new one is needed 142 if done == nil && len(queue) > 0 { 143 // Pile transaction hashes until we reach our allowed network limit 144 var ( 145 count int 146 pending []common.Hash 147 size common.StorageSize 148 ) 149 for count = 0; count < len(queue) && size < maxTxPacketSize; count++ { 150 if p.txpool.Get(queue[count]) != nil { 151 pending = append(pending, queue[count]) 152 size += common.HashLength 153 } 154 } 155 // Shift and trim queue 156 queue = queue[:copy(queue, queue[count:])] 157 158 // If there's anything available to transfer, fire up an async writer 159 if len(pending) > 0 { 160 done = make(chan struct{}) 161 go func() { 162 if err := p.sendPooledTransactionHashes(pending); err != nil { 163 fail <- err 164 return 165 } 166 close(done) 167 p.Log().Trace("Sent transaction announcements", "count", len(pending)) 168 }() 169 } 170 } 171 // Transfer goroutine may or may not have been started, listen for events 172 select { 173 case hashes := <-p.txAnnounce: 174 // If the connection failed, discard all transaction events 175 if failed { 176 continue 177 } 178 // New batch of transactions to be broadcast, queue them (with cap) 179 queue = append(queue, hashes...) 180 if len(queue) > maxQueuedTxAnns { 181 // Fancy copy and resize to ensure buffer doesn't grow indefinitely 182 queue = queue[:copy(queue, queue[len(queue)-maxQueuedTxAnns:])] 183 } 184 185 case <-done: 186 done = nil 187 188 case <-fail: 189 failed = true 190 191 case <-p.term: 192 return 193 } 194 } 195} 196