1// Copyright 2021 The go-ethereum Authors
2// This file is part of the go-ethereum library.
3//
4// The go-ethereum library is free software: you can redistribute it and/or modify
5// it under the terms of the GNU Lesser General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8//
9// The go-ethereum library is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12// GNU Lesser General Public License for more details.
13//
14// You should have received a copy of the GNU Lesser General Public License
15// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
16
17package eth
18
19import (
20	"errors"
21	"fmt"
22	"time"
23
24	"github.com/ethereum/go-ethereum/p2p"
25)
26
27var (
28	// errDisconnected is returned if a request is attempted to be made to a peer
29	// that was already closed.
30	errDisconnected = errors.New("disconnected")
31
32	// errDanglingResponse is returned if a response arrives with a request id
33	// which does not match to any existing pending requests.
34	errDanglingResponse = errors.New("response to non-existent request")
35
36	// errMismatchingResponseType is returned if the remote peer sent a different
37	// packet type as a response to a request than what the local node expected.
38	errMismatchingResponseType = errors.New("mismatching response type")
39)
40
41// Request is a pending request to allow tracking it and delivering a response
42// back to the requester on their chosen channel.
43type Request struct {
44	peer *Peer  // Peer to which this request belogs for untracking
45	id   uint64 // Request ID to match up replies to
46
47	sink   chan *Response // Channel to deliver the response on
48	cancel chan struct{}  // Channel to cancel requests ahead of time
49
50	code uint64      // Message code of the request packet
51	want uint64      // Message code of the response packet
52	data interface{} // Data content of the request packet
53
54	Peer string    // Demultiplexer if cross-peer requests are batched together
55	Sent time.Time // Timestamp when the request was sent
56}
57
58// Close aborts an in-flight request. Although there's no way to notify the
59// remote peer about the cancellation, this method notifies the dispatcher to
60// discard any late responses.
61func (r *Request) Close() error {
62	if r.peer == nil { // Tests mock out the dispatcher, skip internal cancellation
63		return nil
64	}
65	cancelOp := &cancel{
66		id:   r.id,
67		fail: make(chan error),
68	}
69	select {
70	case r.peer.reqCancel <- cancelOp:
71		if err := <-cancelOp.fail; err != nil {
72			return err
73		}
74		close(r.cancel)
75		return nil
76	case <-r.peer.term:
77		return errDisconnected
78	}
79}
80
81// request is a wrapper around a client Request that has an error channel to
82// signal on if sending the request already failed on a network level.
83type request struct {
84	req  *Request
85	fail chan error
86}
87
88// cancel is a maintenance type on the dispatcher to stop tracking a pending
89// request.
90type cancel struct {
91	id   uint64 // Request ID to stop tracking
92	fail chan error
93}
94
95// Response is a reply packet to a previously created request. It is delivered
96// on the channel assigned by the requester subsystem and contains the original
97// request embedded to allow uniquely matching it caller side.
98type Response struct {
99	id   uint64    // Request ID to match up this reply to
100	recv time.Time // Timestamp when the request was received
101	code uint64    // Response packet type to cross validate with request
102
103	Req  *Request      // Original request to cross-reference with
104	Res  interface{}   // Remote response for the request query
105	Meta interface{}   // Metadata generated locally on the receiver thread
106	Time time.Duration // Time it took for the request to be served
107	Done chan error    // Channel to signal message handling to the reader
108}
109
110// response is a wrapper around a remote Response that has an error channel to
111// signal on if processing the response failed.
112type response struct {
113	res  *Response
114	fail chan error
115}
116
117// dispatchRequest schedules the request to the dispatcher for tracking and
118// network serialization, blocking until it's successfully sent.
119//
120// The returned Request must either be closed before discarding it, or the reply
121// must be waited for and the Response's Done channel signalled.
122func (p *Peer) dispatchRequest(req *Request) error {
123	reqOp := &request{
124		req:  req,
125		fail: make(chan error),
126	}
127	req.cancel = make(chan struct{})
128	req.peer = p
129	req.Peer = p.id
130
131	select {
132	case p.reqDispatch <- reqOp:
133		return <-reqOp.fail
134	case <-p.term:
135		return errDisconnected
136	}
137}
138
139// dispatchRequest fulfils a pending request and delivers it to the requested
140// sink.
141func (p *Peer) dispatchResponse(res *Response, metadata func() interface{}) error {
142	resOp := &response{
143		res:  res,
144		fail: make(chan error),
145	}
146	res.recv = time.Now()
147	res.Done = make(chan error)
148
149	select {
150	case p.resDispatch <- resOp:
151		// Ensure the response is accepted by the dispatcher
152		if err := <-resOp.fail; err != nil {
153			return nil
154		}
155		// Request was accepted, run any postprocessing step to generate metadata
156		// on the receiver thread, not the sink thread
157		if metadata != nil {
158			res.Meta = metadata()
159		}
160		// Deliver the filled out response and wait until it's handled. This
161		// path is a bit funky as Go's select has no order, so if a response
162		// arrives to an already cancelled request, there's a 50-50% changes
163		// of picking on channel or the other. To avoid such cases delivering
164		// the packet upstream, check for cancellation first and only after
165		// block on delivery.
166		select {
167		case <-res.Req.cancel:
168			return nil // Request cancelled, silently discard response
169		default:
170			// Request not yet cancelled, attempt to deliver it, but do watch
171			// for fresh cancellations too
172			select {
173			case res.Req.sink <- res:
174				return <-res.Done // Response delivered, return any errors
175			case <-res.Req.cancel:
176				return nil // Request cancelled, silently discard response
177			}
178		}
179
180	case <-p.term:
181		return errDisconnected
182	}
183}
184
185// dispatcher is a loop that accepts requests from higher layer packages, pushes
186// it to the network and tracks and dispatches the responses back to the original
187// requester.
188func (p *Peer) dispatcher() {
189	pending := make(map[uint64]*Request)
190
191	for {
192		select {
193		case reqOp := <-p.reqDispatch:
194			req := reqOp.req
195			req.Sent = time.Now()
196
197			requestTracker.Track(p.id, p.version, req.code, req.want, req.id)
198			err := p2p.Send(p.rw, req.code, req.data)
199			reqOp.fail <- err
200
201			if err == nil {
202				pending[req.id] = req
203			}
204
205		case cancelOp := <-p.reqCancel:
206			// Retrieve the pendign request to cancel and short circuit if it
207			// has already been serviced and is not available anymore
208			req := pending[cancelOp.id]
209			if req == nil {
210				cancelOp.fail <- nil
211				continue
212			}
213			// Stop tracking the request
214			delete(pending, cancelOp.id)
215			cancelOp.fail <- nil
216
217		case resOp := <-p.resDispatch:
218			res := resOp.res
219			res.Req = pending[res.id]
220
221			// Independent if the request exists or not, track this packet
222			requestTracker.Fulfil(p.id, p.version, res.code, res.id)
223
224			switch {
225			case res.Req == nil:
226				// Response arrived with an untracked ID. Since even cancelled
227				// requests are tracked until fulfilment, a dangling repsponse
228				// means the remote peer implements the protocol badly.
229				resOp.fail <- errDanglingResponse
230
231			case res.Req.want != res.code:
232				// Response arrived, but it's a different packet type than the
233				// one expected by the requester. Either the local code is bad,
234				// or the remote peer send junk. In neither cases can we handle
235				// the packet.
236				resOp.fail <- fmt.Errorf("%w: have %d, want %d", errMismatchingResponseType, res.code, res.Req.want)
237
238			default:
239				// All dispatcher checks passed and the response was initialized
240				// with the matching request. Signal to the delivery routine that
241				// it can wait for a handler response and dispatch the data.
242				res.Time = res.recv.Sub(res.Req.Sent)
243				resOp.fail <- nil
244
245				// Stop tracking the request, the response dispatcher will deliver
246				delete(pending, res.id)
247			}
248
249		case <-p.term:
250			return
251		}
252	}
253}
254