1// Copyright (C) 2020 Storj Labs, Inc.
2// See LICENSE for copying information.
3
4package rpcpool
5
6import (
7	"context"
8	"sync"
9
10	"github.com/zeebo/errs"
11
12	"storj.io/drpc"
13)
14
15// poolConn grabs a connection from the pool for every invoke/stream.
16type poolConn struct {
17	on sync.Once
18	ch chan struct{}
19
20	pk   poolKey
21	dial Dialer
22	pool *Pool
23}
24
25// Close marks the poolConn as closed and will not allow future calls to Invoke or NewStream
26// to proceed. It does not stop any ongoing calls to Invoke or NewStream.
27func (c *poolConn) Close() (err error) {
28	c.on.Do(func() { close(c.ch) })
29	return nil
30}
31
32// Closed returns true if the poolConn is closed.
33func (c *poolConn) Closed() <-chan struct{} {
34	return c.ch
35}
36
37// Invoke acquires a connection from the pool, dialing if necessary, and issues the Invoke on that
38// connection. The connection is replaced into the pool after the invoke finishes.
39func (c *poolConn) Invoke(ctx context.Context, rpc string, enc drpc.Encoding, in, out drpc.Message) (err error) {
40	defer mon.Task()(&ctx)(&err)
41
42	select {
43	case <-c.ch:
44		return errs.New("connection closed")
45	default:
46	}
47
48	pv, err := c.pool.get(ctx, c.pk, c.dial)
49	if err != nil {
50		return err
51	}
52	defer c.pool.cache.Put(c.pk, pv)
53
54	return pv.conn.Invoke(ctx, rpc, enc, in, out)
55}
56
57// NewStream acquires a connection from the pool, dialing if necessary, and issues the NewStream on
58// that connection. The connection is replaced into the pool after the stream is finished.
59func (c *poolConn) NewStream(ctx context.Context, rpc string, enc drpc.Encoding) (_ drpc.Stream, err error) {
60	defer mon.Task()(&ctx)(&err)
61
62	select {
63	case <-c.ch:
64		return nil, errs.New("connection closed")
65	default:
66	}
67
68	pv, err := c.pool.get(ctx, c.pk, c.dial)
69	if err != nil {
70		return nil, err
71	}
72
73	stream, err := pv.conn.NewStream(ctx, rpc, enc)
74	if err != nil {
75		return nil, err
76	}
77
78	// the stream's done channel is closed when we're sure no reads/writes are
79	// coming in for that stream anymore. it has been fully terminated.
80	go func() {
81		<-stream.Context().Done()
82		c.pool.cache.Put(c.pk, pv)
83	}()
84
85	return stream, nil
86}
87