1// Copyright 2021 The go-ethereum Authors 2// This file is part of the go-ethereum library. 3// 4// The go-ethereum library is free software: you can redistribute it and/or modify 5// it under the terms of the GNU Lesser General Public License as published by 6// the Free Software Foundation, either version 3 of the License, or 7// (at your option) any later version. 8// 9// The go-ethereum library is distributed in the hope that it will be useful, 10// but WITHOUT ANY WARRANTY; without even the implied warranty of 11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12// GNU Lesser General Public License for more details. 13// 14// You should have received a copy of the GNU Lesser General Public License 15// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. 16 17package eth 18 19import ( 20 "errors" 21 "fmt" 22 "time" 23 24 "github.com/ethereum/go-ethereum/p2p" 25) 26 27var ( 28 // errDisconnected is returned if a request is attempted to be made to a peer 29 // that was already closed. 30 errDisconnected = errors.New("disconnected") 31 32 // errDanglingResponse is returned if a response arrives with a request id 33 // which does not match to any existing pending requests. 34 errDanglingResponse = errors.New("response to non-existent request") 35 36 // errMismatchingResponseType is returned if the remote peer sent a different 37 // packet type as a response to a request than what the local node expected. 38 errMismatchingResponseType = errors.New("mismatching response type") 39) 40 41// Request is a pending request to allow tracking it and delivering a response 42// back to the requester on their chosen channel. 43type Request struct { 44 peer *Peer // Peer to which this request belogs for untracking 45 id uint64 // Request ID to match up replies to 46 47 sink chan *Response // Channel to deliver the response on 48 cancel chan struct{} // Channel to cancel requests ahead of time 49 50 code uint64 // Message code of the request packet 51 want uint64 // Message code of the response packet 52 data interface{} // Data content of the request packet 53 54 Peer string // Demultiplexer if cross-peer requests are batched together 55 Sent time.Time // Timestamp when the request was sent 56} 57 58// Close aborts an in-flight request. Although there's no way to notify the 59// remote peer about the cancellation, this method notifies the dispatcher to 60// discard any late responses. 61func (r *Request) Close() error { 62 if r.peer == nil { // Tests mock out the dispatcher, skip internal cancellation 63 return nil 64 } 65 cancelOp := &cancel{ 66 id: r.id, 67 fail: make(chan error), 68 } 69 select { 70 case r.peer.reqCancel <- cancelOp: 71 if err := <-cancelOp.fail; err != nil { 72 return err 73 } 74 close(r.cancel) 75 return nil 76 case <-r.peer.term: 77 return errDisconnected 78 } 79} 80 81// request is a wrapper around a client Request that has an error channel to 82// signal on if sending the request already failed on a network level. 83type request struct { 84 req *Request 85 fail chan error 86} 87 88// cancel is a maintenance type on the dispatcher to stop tracking a pending 89// request. 90type cancel struct { 91 id uint64 // Request ID to stop tracking 92 fail chan error 93} 94 95// Response is a reply packet to a previously created request. It is delivered 96// on the channel assigned by the requester subsystem and contains the original 97// request embedded to allow uniquely matching it caller side. 98type Response struct { 99 id uint64 // Request ID to match up this reply to 100 recv time.Time // Timestamp when the request was received 101 code uint64 // Response packet type to cross validate with request 102 103 Req *Request // Original request to cross-reference with 104 Res interface{} // Remote response for the request query 105 Meta interface{} // Metadata generated locally on the receiver thread 106 Time time.Duration // Time it took for the request to be served 107 Done chan error // Channel to signal message handling to the reader 108} 109 110// response is a wrapper around a remote Response that has an error channel to 111// signal on if processing the response failed. 112type response struct { 113 res *Response 114 fail chan error 115} 116 117// dispatchRequest schedules the request to the dispatcher for tracking and 118// network serialization, blocking until it's successfully sent. 119// 120// The returned Request must either be closed before discarding it, or the reply 121// must be waited for and the Response's Done channel signalled. 122func (p *Peer) dispatchRequest(req *Request) error { 123 reqOp := &request{ 124 req: req, 125 fail: make(chan error), 126 } 127 req.cancel = make(chan struct{}) 128 req.peer = p 129 req.Peer = p.id 130 131 select { 132 case p.reqDispatch <- reqOp: 133 return <-reqOp.fail 134 case <-p.term: 135 return errDisconnected 136 } 137} 138 139// dispatchRequest fulfils a pending request and delivers it to the requested 140// sink. 141func (p *Peer) dispatchResponse(res *Response, metadata func() interface{}) error { 142 resOp := &response{ 143 res: res, 144 fail: make(chan error), 145 } 146 res.recv = time.Now() 147 res.Done = make(chan error) 148 149 select { 150 case p.resDispatch <- resOp: 151 // Ensure the response is accepted by the dispatcher 152 if err := <-resOp.fail; err != nil { 153 return nil 154 } 155 // Request was accepted, run any postprocessing step to generate metadata 156 // on the receiver thread, not the sink thread 157 if metadata != nil { 158 res.Meta = metadata() 159 } 160 // Deliver the filled out response and wait until it's handled. This 161 // path is a bit funky as Go's select has no order, so if a response 162 // arrives to an already cancelled request, there's a 50-50% changes 163 // of picking on channel or the other. To avoid such cases delivering 164 // the packet upstream, check for cancellation first and only after 165 // block on delivery. 166 select { 167 case <-res.Req.cancel: 168 return nil // Request cancelled, silently discard response 169 default: 170 // Request not yet cancelled, attempt to deliver it, but do watch 171 // for fresh cancellations too 172 select { 173 case res.Req.sink <- res: 174 return <-res.Done // Response delivered, return any errors 175 case <-res.Req.cancel: 176 return nil // Request cancelled, silently discard response 177 } 178 } 179 180 case <-p.term: 181 return errDisconnected 182 } 183} 184 185// dispatcher is a loop that accepts requests from higher layer packages, pushes 186// it to the network and tracks and dispatches the responses back to the original 187// requester. 188func (p *Peer) dispatcher() { 189 pending := make(map[uint64]*Request) 190 191 for { 192 select { 193 case reqOp := <-p.reqDispatch: 194 req := reqOp.req 195 req.Sent = time.Now() 196 197 requestTracker.Track(p.id, p.version, req.code, req.want, req.id) 198 err := p2p.Send(p.rw, req.code, req.data) 199 reqOp.fail <- err 200 201 if err == nil { 202 pending[req.id] = req 203 } 204 205 case cancelOp := <-p.reqCancel: 206 // Retrieve the pendign request to cancel and short circuit if it 207 // has already been serviced and is not available anymore 208 req := pending[cancelOp.id] 209 if req == nil { 210 cancelOp.fail <- nil 211 continue 212 } 213 // Stop tracking the request 214 delete(pending, cancelOp.id) 215 cancelOp.fail <- nil 216 217 case resOp := <-p.resDispatch: 218 res := resOp.res 219 res.Req = pending[res.id] 220 221 // Independent if the request exists or not, track this packet 222 requestTracker.Fulfil(p.id, p.version, res.code, res.id) 223 224 switch { 225 case res.Req == nil: 226 // Response arrived with an untracked ID. Since even cancelled 227 // requests are tracked until fulfilment, a dangling repsponse 228 // means the remote peer implements the protocol badly. 229 resOp.fail <- errDanglingResponse 230 231 case res.Req.want != res.code: 232 // Response arrived, but it's a different packet type than the 233 // one expected by the requester. Either the local code is bad, 234 // or the remote peer send junk. In neither cases can we handle 235 // the packet. 236 resOp.fail <- fmt.Errorf("%w: have %d, want %d", errMismatchingResponseType, res.code, res.Req.want) 237 238 default: 239 // All dispatcher checks passed and the response was initialized 240 // with the matching request. Signal to the delivery routine that 241 // it can wait for a handler response and dispatch the data. 242 res.Time = res.recv.Sub(res.Req.Sent) 243 resOp.fail <- nil 244 245 // Stop tracking the request, the response dispatcher will deliver 246 delete(pending, res.id) 247 } 248 249 case <-p.term: 250 return 251 } 252 } 253} 254