1package plugin
2
3import (
4	"context"
5	"crypto/tls"
6	"errors"
7	"fmt"
8	"log"
9	"net"
10	"sync"
11	"sync/atomic"
12	"time"
13
14	"github.com/hashicorp/go-plugin/internal/plugin"
15
16	"github.com/oklog/run"
17	"google.golang.org/grpc"
18	"google.golang.org/grpc/credentials"
19)
20
21// streamer interface is used in the broker to send/receive connection
22// information.
23type streamer interface {
24	Send(*plugin.ConnInfo) error
25	Recv() (*plugin.ConnInfo, error)
26	Close()
27}
28
29// sendErr is used to pass errors back during a send.
30type sendErr struct {
31	i  *plugin.ConnInfo
32	ch chan error
33}
34
35// gRPCBrokerServer is used by the plugin to start a stream and to send
36// connection information to/from the plugin. Implements GRPCBrokerServer and
37// streamer interfaces.
38type gRPCBrokerServer struct {
39	// send is used to send connection info to the gRPC stream.
40	send chan *sendErr
41
42	// recv is used to receive connection info from the gRPC stream.
43	recv chan *plugin.ConnInfo
44
45	// quit closes down the stream.
46	quit chan struct{}
47
48	// o is used to ensure we close the quit channel only once.
49	o sync.Once
50}
51
52func newGRPCBrokerServer() *gRPCBrokerServer {
53	return &gRPCBrokerServer{
54		send: make(chan *sendErr),
55		recv: make(chan *plugin.ConnInfo),
56		quit: make(chan struct{}),
57	}
58}
59
60// StartStream implements the GRPCBrokerServer interface and will block until
61// the quit channel is closed or the context reports Done. The stream will pass
62// connection information to/from the client.
63func (s *gRPCBrokerServer) StartStream(stream plugin.GRPCBroker_StartStreamServer) error {
64	doneCh := stream.Context().Done()
65	defer s.Close()
66
67	// Proccess send stream
68	go func() {
69		for {
70			select {
71			case <-doneCh:
72				return
73			case <-s.quit:
74				return
75			case se := <-s.send:
76				err := stream.Send(se.i)
77				se.ch <- err
78			}
79		}
80	}()
81
82	// Process receive stream
83	for {
84		i, err := stream.Recv()
85		if err != nil {
86			return err
87		}
88		select {
89		case <-doneCh:
90			return nil
91		case <-s.quit:
92			return nil
93		case s.recv <- i:
94		}
95	}
96
97	return nil
98}
99
100// Send is used by the GRPCBroker to pass connection information into the stream
101// to the client.
102func (s *gRPCBrokerServer) Send(i *plugin.ConnInfo) error {
103	ch := make(chan error)
104	defer close(ch)
105
106	select {
107	case <-s.quit:
108		return errors.New("broker closed")
109	case s.send <- &sendErr{
110		i:  i,
111		ch: ch,
112	}:
113	}
114
115	return <-ch
116}
117
118// Recv is used by the GRPCBroker to pass connection information that has been
119// sent from the client from the stream to the broker.
120func (s *gRPCBrokerServer) Recv() (*plugin.ConnInfo, error) {
121	select {
122	case <-s.quit:
123		return nil, errors.New("broker closed")
124	case i := <-s.recv:
125		return i, nil
126	}
127}
128
129// Close closes the quit channel, shutting down the stream.
130func (s *gRPCBrokerServer) Close() {
131	s.o.Do(func() {
132		close(s.quit)
133	})
134}
135
136// gRPCBrokerClientImpl is used by the client to start a stream and to send
137// connection information to/from the client. Implements GRPCBrokerClient and
138// streamer interfaces.
139type gRPCBrokerClientImpl struct {
140	// client is the underlying GRPC client used to make calls to the server.
141	client plugin.GRPCBrokerClient
142
143	// send is used to send connection info to the gRPC stream.
144	send chan *sendErr
145
146	// recv is used to receive connection info from the gRPC stream.
147	recv chan *plugin.ConnInfo
148
149	// quit closes down the stream.
150	quit chan struct{}
151
152	// o is used to ensure we close the quit channel only once.
153	o sync.Once
154}
155
156func newGRPCBrokerClient(conn *grpc.ClientConn) *gRPCBrokerClientImpl {
157	return &gRPCBrokerClientImpl{
158		client: plugin.NewGRPCBrokerClient(conn),
159		send:   make(chan *sendErr),
160		recv:   make(chan *plugin.ConnInfo),
161		quit:   make(chan struct{}),
162	}
163}
164
165// StartStream implements the GRPCBrokerClient interface and will block until
166// the quit channel is closed or the context reports Done. The stream will pass
167// connection information to/from the plugin.
168func (s *gRPCBrokerClientImpl) StartStream() error {
169	ctx, cancelFunc := context.WithCancel(context.Background())
170	defer cancelFunc()
171	defer s.Close()
172
173	stream, err := s.client.StartStream(ctx)
174	if err != nil {
175		return err
176	}
177	doneCh := stream.Context().Done()
178
179	go func() {
180		for {
181			select {
182			case <-doneCh:
183				return
184			case <-s.quit:
185				return
186			case se := <-s.send:
187				err := stream.Send(se.i)
188				se.ch <- err
189			}
190		}
191	}()
192
193	for {
194		i, err := stream.Recv()
195		if err != nil {
196			return err
197		}
198		select {
199		case <-doneCh:
200			return nil
201		case <-s.quit:
202			return nil
203		case s.recv <- i:
204		}
205	}
206
207	return nil
208}
209
210// Send is used by the GRPCBroker to pass connection information into the stream
211// to the plugin.
212func (s *gRPCBrokerClientImpl) Send(i *plugin.ConnInfo) error {
213	ch := make(chan error)
214	defer close(ch)
215
216	select {
217	case <-s.quit:
218		return errors.New("broker closed")
219	case s.send <- &sendErr{
220		i:  i,
221		ch: ch,
222	}:
223	}
224
225	return <-ch
226}
227
228// Recv is used by the GRPCBroker to pass connection information that has been
229// sent from the plugin to the broker.
230func (s *gRPCBrokerClientImpl) Recv() (*plugin.ConnInfo, error) {
231	select {
232	case <-s.quit:
233		return nil, errors.New("broker closed")
234	case i := <-s.recv:
235		return i, nil
236	}
237}
238
239// Close closes the quit channel, shutting down the stream.
240func (s *gRPCBrokerClientImpl) Close() {
241	s.o.Do(func() {
242		close(s.quit)
243	})
244}
245
246// GRPCBroker is responsible for brokering connections by unique ID.
247//
248// It is used by plugins to create multiple gRPC connections and data
249// streams between the plugin process and the host process.
250//
251// This allows a plugin to request a channel with a specific ID to connect to
252// or accept a connection from, and the broker handles the details of
253// holding these channels open while they're being negotiated.
254//
255// The Plugin interface has access to these for both Server and Client.
256// The broker can be used by either (optionally) to reserve and connect to
257// new streams. This is useful for complex args and return values,
258// or anything else you might need a data stream for.
259type GRPCBroker struct {
260	nextId   uint32
261	streamer streamer
262	streams  map[uint32]*gRPCBrokerPending
263	tls      *tls.Config
264	doneCh   chan struct{}
265	o        sync.Once
266
267	sync.Mutex
268}
269
270type gRPCBrokerPending struct {
271	ch     chan *plugin.ConnInfo
272	doneCh chan struct{}
273}
274
275func newGRPCBroker(s streamer, tls *tls.Config) *GRPCBroker {
276	return &GRPCBroker{
277		streamer: s,
278		streams:  make(map[uint32]*gRPCBrokerPending),
279		tls:      tls,
280		doneCh:   make(chan struct{}),
281	}
282}
283
284// Accept accepts a connection by ID.
285//
286// This should not be called multiple times with the same ID at one time.
287func (b *GRPCBroker) Accept(id uint32) (net.Listener, error) {
288	listener, err := serverListener()
289	if err != nil {
290		return nil, err
291	}
292
293	err = b.streamer.Send(&plugin.ConnInfo{
294		ServiceId: id,
295		Network:   listener.Addr().Network(),
296		Address:   listener.Addr().String(),
297	})
298	if err != nil {
299		return nil, err
300	}
301
302	return listener, nil
303}
304
305// AcceptAndServe is used to accept a specific stream ID and immediately
306// serve a gRPC server on that stream ID. This is used to easily serve
307// complex arguments. Each AcceptAndServe call opens a new listener socket and
308// sends the connection info down the stream to the dialer. Since a new
309// connection is opened every call, these calls should be used sparingly.
310// Multiple gRPC server implementations can be registered to a single
311// AcceptAndServe call.
312func (b *GRPCBroker) AcceptAndServe(id uint32, s func([]grpc.ServerOption) *grpc.Server) {
313	listener, err := b.Accept(id)
314	if err != nil {
315		log.Printf("[ERR] plugin: plugin acceptAndServe error: %s", err)
316		return
317	}
318	defer listener.Close()
319
320	var opts []grpc.ServerOption
321	if b.tls != nil {
322		opts = []grpc.ServerOption{grpc.Creds(credentials.NewTLS(b.tls))}
323	}
324
325	server := s(opts)
326
327	// Here we use a run group to close this goroutine if the server is shutdown
328	// or the broker is shutdown.
329	var g run.Group
330	{
331		// Serve on the listener, if shutting down call GracefulStop.
332		g.Add(func() error {
333			return server.Serve(listener)
334		}, func(err error) {
335			server.GracefulStop()
336		})
337	}
338	{
339		// block on the closeCh or the doneCh. If we are shutting down close the
340		// closeCh.
341		closeCh := make(chan struct{})
342		g.Add(func() error {
343			select {
344			case <-b.doneCh:
345			case <-closeCh:
346			}
347			return nil
348		}, func(err error) {
349			close(closeCh)
350		})
351	}
352
353	// Block until we are done
354	g.Run()
355}
356
357// Close closes the stream and all servers.
358func (b *GRPCBroker) Close() error {
359	b.streamer.Close()
360	b.o.Do(func() {
361		close(b.doneCh)
362	})
363	return nil
364}
365
366// Dial opens a connection by ID.
367func (b *GRPCBroker) Dial(id uint32) (conn *grpc.ClientConn, err error) {
368	var c *plugin.ConnInfo
369
370	// Open the stream
371	p := b.getStream(id)
372	select {
373	case c = <-p.ch:
374		close(p.doneCh)
375	case <-time.After(5 * time.Second):
376		return nil, fmt.Errorf("timeout waiting for connection info")
377	}
378
379	var addr net.Addr
380	switch c.Network {
381	case "tcp":
382		addr, err = net.ResolveTCPAddr("tcp", c.Address)
383	case "unix":
384		addr, err = net.ResolveUnixAddr("unix", c.Address)
385	default:
386		err = fmt.Errorf("Unknown address type: %s", c.Address)
387	}
388	if err != nil {
389		return nil, err
390	}
391
392	return dialGRPCConn(b.tls, netAddrDialer(addr))
393}
394
395// NextId returns a unique ID to use next.
396//
397// It is possible for very long-running plugin hosts to wrap this value,
398// though it would require a very large amount of calls. In practice
399// we've never seen it happen.
400func (m *GRPCBroker) NextId() uint32 {
401	return atomic.AddUint32(&m.nextId, 1)
402}
403
404// Run starts the brokering and should be executed in a goroutine, since it
405// blocks forever, or until the session closes.
406//
407// Uses of GRPCBroker never need to call this. It is called internally by
408// the plugin host/client.
409func (m *GRPCBroker) Run() {
410	for {
411		stream, err := m.streamer.Recv()
412		if err != nil {
413			// Once we receive an error, just exit
414			break
415		}
416
417		// Initialize the waiter
418		p := m.getStream(stream.ServiceId)
419		select {
420		case p.ch <- stream:
421		default:
422		}
423
424		go m.timeoutWait(stream.ServiceId, p)
425	}
426}
427
428func (m *GRPCBroker) getStream(id uint32) *gRPCBrokerPending {
429	m.Lock()
430	defer m.Unlock()
431
432	p, ok := m.streams[id]
433	if ok {
434		return p
435	}
436
437	m.streams[id] = &gRPCBrokerPending{
438		ch:     make(chan *plugin.ConnInfo, 1),
439		doneCh: make(chan struct{}),
440	}
441	return m.streams[id]
442}
443
444func (m *GRPCBroker) timeoutWait(id uint32, p *gRPCBrokerPending) {
445	// Wait for the stream to either be picked up and connected, or
446	// for a timeout.
447	select {
448	case <-p.doneCh:
449	case <-time.After(5 * time.Second):
450	}
451
452	m.Lock()
453	defer m.Unlock()
454
455	// Delete the stream so no one else can grab it
456	delete(m.streams, id)
457}
458