1// Copyright 2020 The Go Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4 5package jsonrpc2 6 7import ( 8 "context" 9 "io" 10 "sync" 11 "time" 12 13 errors "golang.org/x/xerrors" 14) 15 16// Listener is implemented by protocols to accept new inbound connections. 17type Listener interface { 18 // Accept an inbound connection to a server. 19 // It must block until an inbound connection is made, or the listener is 20 // shut down. 21 Accept(context.Context) (io.ReadWriteCloser, error) 22 23 // Close is used to ask a listener to stop accepting new connections. 24 Close() error 25 26 // Dialer returns a dialer that can be used to connect to this listener 27 // locally. 28 // If a listener does not implement this it will return a nil. 29 Dialer() Dialer 30} 31 32// Dialer is used by clients to dial a server. 33type Dialer interface { 34 // Dial returns a new communication byte stream to a listening server. 35 Dial(ctx context.Context) (io.ReadWriteCloser, error) 36} 37 38// Server is a running server that is accepting incoming connections. 39type Server struct { 40 listener Listener 41 binder Binder 42 async async 43} 44 45// Dial uses the dialer to make a new connection, wraps the returned 46// reader and writer using the framer to make a stream, and then builds 47// a connection on top of that stream using the binder. 48func Dial(ctx context.Context, dialer Dialer, binder Binder) (*Connection, error) { 49 // dial a server 50 rwc, err := dialer.Dial(ctx) 51 if err != nil { 52 return nil, err 53 } 54 return newConnection(ctx, rwc, binder) 55} 56 57// Serve starts a new server listening for incoming connections and returns 58// it. 59// This returns a fully running and connected server, it does not block on 60// the listener. 61// You can call Wait to block on the server, or Shutdown to get the sever to 62// terminate gracefully. 63// To notice incoming connections, use an intercepting Binder. 64func Serve(ctx context.Context, listener Listener, binder Binder) (*Server, error) { 65 server := &Server{ 66 listener: listener, 67 binder: binder, 68 } 69 server.async.init() 70 go server.run(ctx) 71 return server, nil 72} 73 74// Wait returns only when the server has shut down. 75func (s *Server) Wait() error { 76 return s.async.wait() 77} 78 79// run accepts incoming connections from the listener, 80// If IdleTimeout is non-zero, run exits after there are no clients for this 81// duration, otherwise it exits only on error. 82func (s *Server) run(ctx context.Context) { 83 defer s.async.done() 84 var activeConns []*Connection 85 for { 86 // we never close the accepted connection, we rely on the other end 87 // closing or the socket closing itself naturally 88 rwc, err := s.listener.Accept(ctx) 89 if err != nil { 90 if !isClosingError(err) { 91 s.async.setError(err) 92 } 93 // we are done generating new connections for good 94 break 95 } 96 97 // see if any connections were closed while we were waiting 98 activeConns = onlyActive(activeConns) 99 100 // a new inbound connection, 101 conn, err := newConnection(ctx, rwc, s.binder) 102 if err != nil { 103 if !isClosingError(err) { 104 s.async.setError(err) 105 } 106 continue 107 } 108 activeConns = append(activeConns, conn) 109 } 110 111 // wait for all active conns to finish 112 for _, c := range activeConns { 113 c.Wait() 114 } 115} 116 117func onlyActive(conns []*Connection) []*Connection { 118 i := 0 119 for _, c := range conns { 120 if !c.async.isDone() { 121 conns[i] = c 122 i++ 123 } 124 } 125 // trim the slice down 126 return conns[:i] 127} 128 129// isClosingError reports if the error occurs normally during the process of 130// closing a network connection. It uses imperfect heuristics that err on the 131// side of false negatives, and should not be used for anything critical. 132func isClosingError(err error) bool { 133 if err == nil { 134 return false 135 } 136 // Fully unwrap the error, so the following tests work. 137 for wrapped := err; wrapped != nil; wrapped = errors.Unwrap(err) { 138 err = wrapped 139 } 140 141 // Was it based on an EOF error? 142 if err == io.EOF { 143 return true 144 } 145 146 // Was it based on a closed pipe? 147 if err == io.ErrClosedPipe { 148 return true 149 } 150 151 // Per https://github.com/golang/go/issues/4373, this error string should not 152 // change. This is not ideal, but since the worst that could happen here is 153 // some superfluous logging, it is acceptable. 154 if err.Error() == "use of closed network connection" { 155 return true 156 } 157 158 return false 159} 160 161// NewIdleListener wraps a listener with an idle timeout. 162// When there are no active connections for at least the timeout duration a 163// call to accept will fail with ErrIdleTimeout. 164func NewIdleListener(timeout time.Duration, wrap Listener) Listener { 165 l := &idleListener{ 166 timeout: timeout, 167 wrapped: wrap, 168 newConns: make(chan *idleCloser), 169 closed: make(chan struct{}), 170 wasTimeout: make(chan struct{}), 171 } 172 go l.run() 173 return l 174} 175 176type idleListener struct { 177 wrapped Listener 178 timeout time.Duration 179 newConns chan *idleCloser 180 closed chan struct{} 181 wasTimeout chan struct{} 182 closeOnce sync.Once 183} 184 185type idleCloser struct { 186 wrapped io.ReadWriteCloser 187 closed chan struct{} 188 closeOnce sync.Once 189} 190 191func (c *idleCloser) Read(p []byte) (int, error) { 192 n, err := c.wrapped.Read(p) 193 if err != nil && isClosingError(err) { 194 c.closeOnce.Do(func() { close(c.closed) }) 195 } 196 return n, err 197} 198 199func (c *idleCloser) Write(p []byte) (int, error) { 200 // we do not close on write failure, we rely on the wrapped writer to do that 201 // if it is appropriate, which we will detect in the next read. 202 return c.wrapped.Write(p) 203} 204 205func (c *idleCloser) Close() error { 206 // we rely on closing the wrapped stream to signal to the next read that we 207 // are closed, rather than triggering the closed signal directly 208 return c.wrapped.Close() 209} 210 211func (l *idleListener) Accept(ctx context.Context) (io.ReadWriteCloser, error) { 212 rwc, err := l.wrapped.Accept(ctx) 213 if err != nil { 214 if isClosingError(err) { 215 // underlying listener was closed 216 l.closeOnce.Do(func() { close(l.closed) }) 217 // was it closed because of the idle timeout? 218 select { 219 case <-l.wasTimeout: 220 err = ErrIdleTimeout 221 default: 222 } 223 } 224 return nil, err 225 } 226 conn := &idleCloser{ 227 wrapped: rwc, 228 closed: make(chan struct{}), 229 } 230 l.newConns <- conn 231 return conn, err 232} 233 234func (l *idleListener) Close() error { 235 defer l.closeOnce.Do(func() { close(l.closed) }) 236 return l.wrapped.Close() 237} 238 239func (l *idleListener) Dialer() Dialer { 240 return l.wrapped.Dialer() 241} 242 243func (l *idleListener) run() { 244 var conns []*idleCloser 245 for { 246 var firstClosed chan struct{} // left at nil if there are no active conns 247 var timeout <-chan time.Time // left at nil if there are active conns 248 if len(conns) > 0 { 249 firstClosed = conns[0].closed 250 } else { 251 timeout = time.After(l.timeout) 252 } 253 select { 254 case <-l.closed: 255 // the main listener closed, no need to keep going 256 return 257 case conn := <-l.newConns: 258 // a new conn arrived, add it to the list 259 conns = append(conns, conn) 260 case <-timeout: 261 // we timed out, only happens when there are no active conns 262 // close the underlying listener, and allow the normal closing process to happen 263 close(l.wasTimeout) 264 l.wrapped.Close() 265 case <-firstClosed: 266 // a conn closed, remove it from the active list 267 conns = conns[:copy(conns, conns[1:])] 268 } 269 } 270} 271