1// Copyright (C) 2020 Storj Labs, Inc. 2// See LICENSE for copying information. 3 4package rpcpool 5 6import ( 7 "context" 8 "sync" 9 10 "github.com/zeebo/errs" 11 12 "storj.io/drpc" 13) 14 15// poolConn grabs a connection from the pool for every invoke/stream. 16type poolConn struct { 17 on sync.Once 18 ch chan struct{} 19 20 pk poolKey 21 dial Dialer 22 pool *Pool 23} 24 25// Close marks the poolConn as closed and will not allow future calls to Invoke or NewStream 26// to proceed. It does not stop any ongoing calls to Invoke or NewStream. 27func (c *poolConn) Close() (err error) { 28 c.on.Do(func() { close(c.ch) }) 29 return nil 30} 31 32// Closed returns true if the poolConn is closed. 33func (c *poolConn) Closed() <-chan struct{} { 34 return c.ch 35} 36 37// Invoke acquires a connection from the pool, dialing if necessary, and issues the Invoke on that 38// connection. The connection is replaced into the pool after the invoke finishes. 39func (c *poolConn) Invoke(ctx context.Context, rpc string, enc drpc.Encoding, in, out drpc.Message) (err error) { 40 defer mon.Task()(&ctx)(&err) 41 42 select { 43 case <-c.ch: 44 return errs.New("connection closed") 45 default: 46 } 47 48 pv, err := c.pool.get(ctx, c.pk, c.dial) 49 if err != nil { 50 return err 51 } 52 defer c.pool.cache.Put(c.pk, pv) 53 54 return pv.conn.Invoke(ctx, rpc, enc, in, out) 55} 56 57// NewStream acquires a connection from the pool, dialing if necessary, and issues the NewStream on 58// that connection. The connection is replaced into the pool after the stream is finished. 59func (c *poolConn) NewStream(ctx context.Context, rpc string, enc drpc.Encoding) (_ drpc.Stream, err error) { 60 defer mon.Task()(&ctx)(&err) 61 62 select { 63 case <-c.ch: 64 return nil, errs.New("connection closed") 65 default: 66 } 67 68 pv, err := c.pool.get(ctx, c.pk, c.dial) 69 if err != nil { 70 return nil, err 71 } 72 73 stream, err := pv.conn.NewStream(ctx, rpc, enc) 74 if err != nil { 75 return nil, err 76 } 77 78 // the stream's done channel is closed when we're sure no reads/writes are 79 // coming in for that stream anymore. it has been fully terminated. 80 go func() { 81 <-stream.Context().Done() 82 c.pool.cache.Put(c.pk, pv) 83 }() 84 85 return stream, nil 86} 87