1// Package connectionbroker is a layer on top of remotes that returns 2// a gRPC connection to a manager. The connection may be a local connection 3// using a local socket such as a UNIX socket. 4package connectionbroker 5 6import ( 7 "net" 8 "sync" 9 "time" 10 11 "github.com/docker/swarmkit/api" 12 "github.com/docker/swarmkit/remotes" 13 grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" 14 "google.golang.org/grpc" 15) 16 17// Broker is a simple connection broker. It can either return a fresh 18// connection to a remote manager selected with weighted randomization, or a 19// local gRPC connection to the local manager. 20type Broker struct { 21 mu sync.Mutex 22 remotes remotes.Remotes 23 localConn *grpc.ClientConn 24} 25 26// New creates a new connection broker. 27func New(remotes remotes.Remotes) *Broker { 28 return &Broker{ 29 remotes: remotes, 30 } 31} 32 33// SetLocalConn changes the local gRPC connection used by the connection broker. 34func (b *Broker) SetLocalConn(localConn *grpc.ClientConn) { 35 b.mu.Lock() 36 defer b.mu.Unlock() 37 38 b.localConn = localConn 39} 40 41// Select a manager from the set of available managers, and return a connection. 42func (b *Broker) Select(dialOpts ...grpc.DialOption) (*Conn, error) { 43 b.mu.Lock() 44 localConn := b.localConn 45 b.mu.Unlock() 46 47 if localConn != nil { 48 return &Conn{ 49 ClientConn: localConn, 50 isLocal: true, 51 }, nil 52 } 53 54 return b.SelectRemote(dialOpts...) 55} 56 57// SelectRemote chooses a manager from the remotes, and returns a TCP 58// connection. 59func (b *Broker) SelectRemote(dialOpts ...grpc.DialOption) (*Conn, error) { 60 peer, err := b.remotes.Select() 61 62 if err != nil { 63 return nil, err 64 } 65 66 // gRPC dialer connects to proxy first. Provide a custom dialer here avoid that. 67 // TODO(anshul) Add an option to configure this. 68 dialOpts = append(dialOpts, 69 grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor), 70 grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor), 71 grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) { 72 return net.DialTimeout("tcp", addr, timeout) 73 })) 74 75 cc, err := grpc.Dial(peer.Addr, dialOpts...) 76 if err != nil { 77 b.remotes.ObserveIfExists(peer, -remotes.DefaultObservationWeight) 78 return nil, err 79 } 80 81 return &Conn{ 82 ClientConn: cc, 83 remotes: b.remotes, 84 peer: peer, 85 }, nil 86} 87 88// Remotes returns the remotes interface used by the broker, so the caller 89// can make observations or see weights directly. 90func (b *Broker) Remotes() remotes.Remotes { 91 return b.remotes 92} 93 94// Conn is a wrapper around a gRPC client connection. 95type Conn struct { 96 *grpc.ClientConn 97 isLocal bool 98 remotes remotes.Remotes 99 peer api.Peer 100} 101 102// Peer returns the peer for this Conn. 103func (c *Conn) Peer() api.Peer { 104 return c.peer 105} 106 107// Close closes the client connection if it is a remote connection. It also 108// records a positive experience with the remote peer if success is true, 109// otherwise it records a negative experience. If a local connection is in use, 110// Close is a noop. 111func (c *Conn) Close(success bool) error { 112 if c.isLocal { 113 return nil 114 } 115 116 if success { 117 c.remotes.ObserveIfExists(c.peer, remotes.DefaultObservationWeight) 118 } else { 119 c.remotes.ObserveIfExists(c.peer, -remotes.DefaultObservationWeight) 120 } 121 122 return c.ClientConn.Close() 123} 124