1// Package connectionbroker is a layer on top of remotes that returns
2// a gRPC connection to a manager. The connection may be a local connection
3// using a local socket such as a UNIX socket.
4package connectionbroker
5
6import (
7	"net"
8	"sync"
9	"time"
10
11	"github.com/docker/swarmkit/api"
12	"github.com/docker/swarmkit/remotes"
13	grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
14	"google.golang.org/grpc"
15)
16
17// Broker is a simple connection broker. It can either return a fresh
18// connection to a remote manager selected with weighted randomization, or a
19// local gRPC connection to the local manager.
20type Broker struct {
21	mu        sync.Mutex
22	remotes   remotes.Remotes
23	localConn *grpc.ClientConn
24}
25
26// New creates a new connection broker.
27func New(remotes remotes.Remotes) *Broker {
28	return &Broker{
29		remotes: remotes,
30	}
31}
32
33// SetLocalConn changes the local gRPC connection used by the connection broker.
34func (b *Broker) SetLocalConn(localConn *grpc.ClientConn) {
35	b.mu.Lock()
36	defer b.mu.Unlock()
37
38	b.localConn = localConn
39}
40
41// Select a manager from the set of available managers, and return a connection.
42func (b *Broker) Select(dialOpts ...grpc.DialOption) (*Conn, error) {
43	b.mu.Lock()
44	localConn := b.localConn
45	b.mu.Unlock()
46
47	if localConn != nil {
48		return &Conn{
49			ClientConn: localConn,
50			isLocal:    true,
51		}, nil
52	}
53
54	return b.SelectRemote(dialOpts...)
55}
56
57// SelectRemote chooses a manager from the remotes, and returns a TCP
58// connection.
59func (b *Broker) SelectRemote(dialOpts ...grpc.DialOption) (*Conn, error) {
60	peer, err := b.remotes.Select()
61
62	if err != nil {
63		return nil, err
64	}
65
66	// gRPC dialer connects to proxy first. Provide a custom dialer here avoid that.
67	// TODO(anshul) Add an option to configure this.
68	dialOpts = append(dialOpts,
69		grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor),
70		grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor),
71		grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
72			return net.DialTimeout("tcp", addr, timeout)
73		}))
74
75	cc, err := grpc.Dial(peer.Addr, dialOpts...)
76	if err != nil {
77		b.remotes.ObserveIfExists(peer, -remotes.DefaultObservationWeight)
78		return nil, err
79	}
80
81	return &Conn{
82		ClientConn: cc,
83		remotes:    b.remotes,
84		peer:       peer,
85	}, nil
86}
87
88// Remotes returns the remotes interface used by the broker, so the caller
89// can make observations or see weights directly.
90func (b *Broker) Remotes() remotes.Remotes {
91	return b.remotes
92}
93
94// Conn is a wrapper around a gRPC client connection.
95type Conn struct {
96	*grpc.ClientConn
97	isLocal bool
98	remotes remotes.Remotes
99	peer    api.Peer
100}
101
102// Peer returns the peer for this Conn.
103func (c *Conn) Peer() api.Peer {
104	return c.peer
105}
106
107// Close closes the client connection if it is a remote connection. It also
108// records a positive experience with the remote peer if success is true,
109// otherwise it records a negative experience. If a local connection is in use,
110// Close is a noop.
111func (c *Conn) Close(success bool) error {
112	if c.isLocal {
113		return nil
114	}
115
116	if success {
117		c.remotes.ObserveIfExists(c.peer, remotes.DefaultObservationWeight)
118	} else {
119		c.remotes.ObserveIfExists(c.peer, -remotes.DefaultObservationWeight)
120	}
121
122	return c.ClientConn.Close()
123}
124