1// Copyright 2020 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package jsonrpc2
6
7import (
8	"context"
9	"io"
10	"sync"
11	"time"
12
13	errors "golang.org/x/xerrors"
14)
15
16// Listener is implemented by protocols to accept new inbound connections.
17type Listener interface {
18	// Accept an inbound connection to a server.
19	// It must block until an inbound connection is made, or the listener is
20	// shut down.
21	Accept(context.Context) (io.ReadWriteCloser, error)
22
23	// Close is used to ask a listener to stop accepting new connections.
24	Close() error
25
26	// Dialer returns a dialer that can be used to connect to this listener
27	// locally.
28	// If a listener does not implement this it will return a nil.
29	Dialer() Dialer
30}
31
32// Dialer is used by clients to dial a server.
33type Dialer interface {
34	// Dial returns a new communication byte stream to a listening server.
35	Dial(ctx context.Context) (io.ReadWriteCloser, error)
36}
37
38// Server is a running server that is accepting incoming connections.
39type Server struct {
40	listener Listener
41	binder   Binder
42	async    async
43}
44
45// Dial uses the dialer to make a new connection, wraps the returned
46// reader and writer using the framer to make a stream, and then builds
47// a connection on top of that stream using the binder.
48func Dial(ctx context.Context, dialer Dialer, binder Binder) (*Connection, error) {
49	// dial a server
50	rwc, err := dialer.Dial(ctx)
51	if err != nil {
52		return nil, err
53	}
54	return newConnection(ctx, rwc, binder)
55}
56
57// Serve starts a new server listening for incoming connections and returns
58// it.
59// This returns a fully running and connected server, it does not block on
60// the listener.
61// You can call Wait to block on the server, or Shutdown to get the sever to
62// terminate gracefully.
63// To notice incoming connections, use an intercepting Binder.
64func Serve(ctx context.Context, listener Listener, binder Binder) (*Server, error) {
65	server := &Server{
66		listener: listener,
67		binder:   binder,
68	}
69	server.async.init()
70	go server.run(ctx)
71	return server, nil
72}
73
74// Wait returns only when the server has shut down.
75func (s *Server) Wait() error {
76	return s.async.wait()
77}
78
79// run accepts incoming connections from the listener,
80// If IdleTimeout is non-zero, run exits after there are no clients for this
81// duration, otherwise it exits only on error.
82func (s *Server) run(ctx context.Context) {
83	defer s.async.done()
84	var activeConns []*Connection
85	for {
86		// we never close the accepted connection, we rely on the other end
87		// closing or the socket closing itself naturally
88		rwc, err := s.listener.Accept(ctx)
89		if err != nil {
90			if !isClosingError(err) {
91				s.async.setError(err)
92			}
93			// we are done generating new connections for good
94			break
95		}
96
97		// see if any connections were closed while we were waiting
98		activeConns = onlyActive(activeConns)
99
100		// a new inbound connection,
101		conn, err := newConnection(ctx, rwc, s.binder)
102		if err != nil {
103			if !isClosingError(err) {
104				s.async.setError(err)
105			}
106			continue
107		}
108		activeConns = append(activeConns, conn)
109	}
110
111	// wait for all active conns to finish
112	for _, c := range activeConns {
113		c.Wait()
114	}
115}
116
117func onlyActive(conns []*Connection) []*Connection {
118	i := 0
119	for _, c := range conns {
120		if !c.async.isDone() {
121			conns[i] = c
122			i++
123		}
124	}
125	// trim the slice down
126	return conns[:i]
127}
128
129// isClosingError reports if the error occurs normally during the process of
130// closing a network connection. It uses imperfect heuristics that err on the
131// side of false negatives, and should not be used for anything critical.
132func isClosingError(err error) bool {
133	if err == nil {
134		return false
135	}
136	// Fully unwrap the error, so the following tests work.
137	for wrapped := err; wrapped != nil; wrapped = errors.Unwrap(err) {
138		err = wrapped
139	}
140
141	// Was it based on an EOF error?
142	if err == io.EOF {
143		return true
144	}
145
146	// Was it based on a closed pipe?
147	if err == io.ErrClosedPipe {
148		return true
149	}
150
151	// Per https://github.com/golang/go/issues/4373, this error string should not
152	// change. This is not ideal, but since the worst that could happen here is
153	// some superfluous logging, it is acceptable.
154	if err.Error() == "use of closed network connection" {
155		return true
156	}
157
158	return false
159}
160
161// NewIdleListener wraps a listener with an idle timeout.
162// When there are no active connections for at least the timeout duration a
163// call to accept will fail with ErrIdleTimeout.
164func NewIdleListener(timeout time.Duration, wrap Listener) Listener {
165	l := &idleListener{
166		timeout:    timeout,
167		wrapped:    wrap,
168		newConns:   make(chan *idleCloser),
169		closed:     make(chan struct{}),
170		wasTimeout: make(chan struct{}),
171	}
172	go l.run()
173	return l
174}
175
176type idleListener struct {
177	wrapped    Listener
178	timeout    time.Duration
179	newConns   chan *idleCloser
180	closed     chan struct{}
181	wasTimeout chan struct{}
182	closeOnce  sync.Once
183}
184
185type idleCloser struct {
186	wrapped   io.ReadWriteCloser
187	closed    chan struct{}
188	closeOnce sync.Once
189}
190
191func (c *idleCloser) Read(p []byte) (int, error) {
192	n, err := c.wrapped.Read(p)
193	if err != nil && isClosingError(err) {
194		c.closeOnce.Do(func() { close(c.closed) })
195	}
196	return n, err
197}
198
199func (c *idleCloser) Write(p []byte) (int, error) {
200	// we do not close on write failure, we rely on the wrapped writer to do that
201	// if it is appropriate, which we will detect in the next read.
202	return c.wrapped.Write(p)
203}
204
205func (c *idleCloser) Close() error {
206	// we rely on closing the wrapped stream to signal to the next read that we
207	// are closed, rather than triggering the closed signal directly
208	return c.wrapped.Close()
209}
210
211func (l *idleListener) Accept(ctx context.Context) (io.ReadWriteCloser, error) {
212	rwc, err := l.wrapped.Accept(ctx)
213	if err != nil {
214		if isClosingError(err) {
215			// underlying listener was closed
216			l.closeOnce.Do(func() { close(l.closed) })
217			// was it closed because of the idle timeout?
218			select {
219			case <-l.wasTimeout:
220				err = ErrIdleTimeout
221			default:
222			}
223		}
224		return nil, err
225	}
226	conn := &idleCloser{
227		wrapped: rwc,
228		closed:  make(chan struct{}),
229	}
230	l.newConns <- conn
231	return conn, err
232}
233
234func (l *idleListener) Close() error {
235	defer l.closeOnce.Do(func() { close(l.closed) })
236	return l.wrapped.Close()
237}
238
239func (l *idleListener) Dialer() Dialer {
240	return l.wrapped.Dialer()
241}
242
243func (l *idleListener) run() {
244	var conns []*idleCloser
245	for {
246		var firstClosed chan struct{} // left at nil if there are no active conns
247		var timeout <-chan time.Time  // left at nil if there are  active conns
248		if len(conns) > 0 {
249			firstClosed = conns[0].closed
250		} else {
251			timeout = time.After(l.timeout)
252		}
253		select {
254		case <-l.closed:
255			// the main listener closed, no need to keep going
256			return
257		case conn := <-l.newConns:
258			// a new conn arrived, add it to the list
259			conns = append(conns, conn)
260		case <-timeout:
261			// we timed out, only happens when there are no active conns
262			// close the underlying listener, and allow the normal closing process to happen
263			close(l.wasTimeout)
264			l.wrapped.Close()
265		case <-firstClosed:
266			// a conn closed, remove it from the active list
267			conns = conns[:copy(conns, conns[1:])]
268		}
269	}
270}
271