1package plugin 2 3import ( 4 "context" 5 "crypto/tls" 6 "errors" 7 "fmt" 8 "log" 9 "net" 10 "sync" 11 "sync/atomic" 12 "time" 13 14 "github.com/hashicorp/go-plugin/internal/plugin" 15 16 "github.com/oklog/run" 17 "google.golang.org/grpc" 18 "google.golang.org/grpc/credentials" 19) 20 21// streamer interface is used in the broker to send/receive connection 22// information. 23type streamer interface { 24 Send(*plugin.ConnInfo) error 25 Recv() (*plugin.ConnInfo, error) 26 Close() 27} 28 29// sendErr is used to pass errors back during a send. 30type sendErr struct { 31 i *plugin.ConnInfo 32 ch chan error 33} 34 35// gRPCBrokerServer is used by the plugin to start a stream and to send 36// connection information to/from the plugin. Implements GRPCBrokerServer and 37// streamer interfaces. 38type gRPCBrokerServer struct { 39 // send is used to send connection info to the gRPC stream. 40 send chan *sendErr 41 42 // recv is used to receive connection info from the gRPC stream. 43 recv chan *plugin.ConnInfo 44 45 // quit closes down the stream. 46 quit chan struct{} 47 48 // o is used to ensure we close the quit channel only once. 49 o sync.Once 50} 51 52func newGRPCBrokerServer() *gRPCBrokerServer { 53 return &gRPCBrokerServer{ 54 send: make(chan *sendErr), 55 recv: make(chan *plugin.ConnInfo), 56 quit: make(chan struct{}), 57 } 58} 59 60// StartStream implements the GRPCBrokerServer interface and will block until 61// the quit channel is closed or the context reports Done. The stream will pass 62// connection information to/from the client. 63func (s *gRPCBrokerServer) StartStream(stream plugin.GRPCBroker_StartStreamServer) error { 64 doneCh := stream.Context().Done() 65 defer s.Close() 66 67 // Proccess send stream 68 go func() { 69 for { 70 select { 71 case <-doneCh: 72 return 73 case <-s.quit: 74 return 75 case se := <-s.send: 76 err := stream.Send(se.i) 77 se.ch <- err 78 } 79 } 80 }() 81 82 // Process receive stream 83 for { 84 i, err := stream.Recv() 85 if err != nil { 86 return err 87 } 88 select { 89 case <-doneCh: 90 return nil 91 case <-s.quit: 92 return nil 93 case s.recv <- i: 94 } 95 } 96 97 return nil 98} 99 100// Send is used by the GRPCBroker to pass connection information into the stream 101// to the client. 102func (s *gRPCBrokerServer) Send(i *plugin.ConnInfo) error { 103 ch := make(chan error) 104 defer close(ch) 105 106 select { 107 case <-s.quit: 108 return errors.New("broker closed") 109 case s.send <- &sendErr{ 110 i: i, 111 ch: ch, 112 }: 113 } 114 115 return <-ch 116} 117 118// Recv is used by the GRPCBroker to pass connection information that has been 119// sent from the client from the stream to the broker. 120func (s *gRPCBrokerServer) Recv() (*plugin.ConnInfo, error) { 121 select { 122 case <-s.quit: 123 return nil, errors.New("broker closed") 124 case i := <-s.recv: 125 return i, nil 126 } 127} 128 129// Close closes the quit channel, shutting down the stream. 130func (s *gRPCBrokerServer) Close() { 131 s.o.Do(func() { 132 close(s.quit) 133 }) 134} 135 136// gRPCBrokerClientImpl is used by the client to start a stream and to send 137// connection information to/from the client. Implements GRPCBrokerClient and 138// streamer interfaces. 139type gRPCBrokerClientImpl struct { 140 // client is the underlying GRPC client used to make calls to the server. 141 client plugin.GRPCBrokerClient 142 143 // send is used to send connection info to the gRPC stream. 144 send chan *sendErr 145 146 // recv is used to receive connection info from the gRPC stream. 147 recv chan *plugin.ConnInfo 148 149 // quit closes down the stream. 150 quit chan struct{} 151 152 // o is used to ensure we close the quit channel only once. 153 o sync.Once 154} 155 156func newGRPCBrokerClient(conn *grpc.ClientConn) *gRPCBrokerClientImpl { 157 return &gRPCBrokerClientImpl{ 158 client: plugin.NewGRPCBrokerClient(conn), 159 send: make(chan *sendErr), 160 recv: make(chan *plugin.ConnInfo), 161 quit: make(chan struct{}), 162 } 163} 164 165// StartStream implements the GRPCBrokerClient interface and will block until 166// the quit channel is closed or the context reports Done. The stream will pass 167// connection information to/from the plugin. 168func (s *gRPCBrokerClientImpl) StartStream() error { 169 ctx, cancelFunc := context.WithCancel(context.Background()) 170 defer cancelFunc() 171 defer s.Close() 172 173 stream, err := s.client.StartStream(ctx) 174 if err != nil { 175 return err 176 } 177 doneCh := stream.Context().Done() 178 179 go func() { 180 for { 181 select { 182 case <-doneCh: 183 return 184 case <-s.quit: 185 return 186 case se := <-s.send: 187 err := stream.Send(se.i) 188 se.ch <- err 189 } 190 } 191 }() 192 193 for { 194 i, err := stream.Recv() 195 if err != nil { 196 return err 197 } 198 select { 199 case <-doneCh: 200 return nil 201 case <-s.quit: 202 return nil 203 case s.recv <- i: 204 } 205 } 206 207 return nil 208} 209 210// Send is used by the GRPCBroker to pass connection information into the stream 211// to the plugin. 212func (s *gRPCBrokerClientImpl) Send(i *plugin.ConnInfo) error { 213 ch := make(chan error) 214 defer close(ch) 215 216 select { 217 case <-s.quit: 218 return errors.New("broker closed") 219 case s.send <- &sendErr{ 220 i: i, 221 ch: ch, 222 }: 223 } 224 225 return <-ch 226} 227 228// Recv is used by the GRPCBroker to pass connection information that has been 229// sent from the plugin to the broker. 230func (s *gRPCBrokerClientImpl) Recv() (*plugin.ConnInfo, error) { 231 select { 232 case <-s.quit: 233 return nil, errors.New("broker closed") 234 case i := <-s.recv: 235 return i, nil 236 } 237} 238 239// Close closes the quit channel, shutting down the stream. 240func (s *gRPCBrokerClientImpl) Close() { 241 s.o.Do(func() { 242 close(s.quit) 243 }) 244} 245 246// GRPCBroker is responsible for brokering connections by unique ID. 247// 248// It is used by plugins to create multiple gRPC connections and data 249// streams between the plugin process and the host process. 250// 251// This allows a plugin to request a channel with a specific ID to connect to 252// or accept a connection from, and the broker handles the details of 253// holding these channels open while they're being negotiated. 254// 255// The Plugin interface has access to these for both Server and Client. 256// The broker can be used by either (optionally) to reserve and connect to 257// new streams. This is useful for complex args and return values, 258// or anything else you might need a data stream for. 259type GRPCBroker struct { 260 nextId uint32 261 streamer streamer 262 streams map[uint32]*gRPCBrokerPending 263 tls *tls.Config 264 doneCh chan struct{} 265 o sync.Once 266 267 sync.Mutex 268} 269 270type gRPCBrokerPending struct { 271 ch chan *plugin.ConnInfo 272 doneCh chan struct{} 273} 274 275func newGRPCBroker(s streamer, tls *tls.Config) *GRPCBroker { 276 return &GRPCBroker{ 277 streamer: s, 278 streams: make(map[uint32]*gRPCBrokerPending), 279 tls: tls, 280 doneCh: make(chan struct{}), 281 } 282} 283 284// Accept accepts a connection by ID. 285// 286// This should not be called multiple times with the same ID at one time. 287func (b *GRPCBroker) Accept(id uint32) (net.Listener, error) { 288 listener, err := serverListener() 289 if err != nil { 290 return nil, err 291 } 292 293 err = b.streamer.Send(&plugin.ConnInfo{ 294 ServiceId: id, 295 Network: listener.Addr().Network(), 296 Address: listener.Addr().String(), 297 }) 298 if err != nil { 299 return nil, err 300 } 301 302 return listener, nil 303} 304 305// AcceptAndServe is used to accept a specific stream ID and immediately 306// serve a gRPC server on that stream ID. This is used to easily serve 307// complex arguments. Each AcceptAndServe call opens a new listener socket and 308// sends the connection info down the stream to the dialer. Since a new 309// connection is opened every call, these calls should be used sparingly. 310// Multiple gRPC server implementations can be registered to a single 311// AcceptAndServe call. 312func (b *GRPCBroker) AcceptAndServe(id uint32, s func([]grpc.ServerOption) *grpc.Server) { 313 listener, err := b.Accept(id) 314 if err != nil { 315 log.Printf("[ERR] plugin: plugin acceptAndServe error: %s", err) 316 return 317 } 318 defer listener.Close() 319 320 var opts []grpc.ServerOption 321 if b.tls != nil { 322 opts = []grpc.ServerOption{grpc.Creds(credentials.NewTLS(b.tls))} 323 } 324 325 server := s(opts) 326 327 // Here we use a run group to close this goroutine if the server is shutdown 328 // or the broker is shutdown. 329 var g run.Group 330 { 331 // Serve on the listener, if shutting down call GracefulStop. 332 g.Add(func() error { 333 return server.Serve(listener) 334 }, func(err error) { 335 server.GracefulStop() 336 }) 337 } 338 { 339 // block on the closeCh or the doneCh. If we are shutting down close the 340 // closeCh. 341 closeCh := make(chan struct{}) 342 g.Add(func() error { 343 select { 344 case <-b.doneCh: 345 case <-closeCh: 346 } 347 return nil 348 }, func(err error) { 349 close(closeCh) 350 }) 351 } 352 353 // Block until we are done 354 g.Run() 355} 356 357// Close closes the stream and all servers. 358func (b *GRPCBroker) Close() error { 359 b.streamer.Close() 360 b.o.Do(func() { 361 close(b.doneCh) 362 }) 363 return nil 364} 365 366// Dial opens a connection by ID. 367func (b *GRPCBroker) Dial(id uint32) (conn *grpc.ClientConn, err error) { 368 var c *plugin.ConnInfo 369 370 // Open the stream 371 p := b.getStream(id) 372 select { 373 case c = <-p.ch: 374 close(p.doneCh) 375 case <-time.After(5 * time.Second): 376 return nil, fmt.Errorf("timeout waiting for connection info") 377 } 378 379 var addr net.Addr 380 switch c.Network { 381 case "tcp": 382 addr, err = net.ResolveTCPAddr("tcp", c.Address) 383 case "unix": 384 addr, err = net.ResolveUnixAddr("unix", c.Address) 385 default: 386 err = fmt.Errorf("Unknown address type: %s", c.Address) 387 } 388 if err != nil { 389 return nil, err 390 } 391 392 return dialGRPCConn(b.tls, netAddrDialer(addr)) 393} 394 395// NextId returns a unique ID to use next. 396// 397// It is possible for very long-running plugin hosts to wrap this value, 398// though it would require a very large amount of calls. In practice 399// we've never seen it happen. 400func (m *GRPCBroker) NextId() uint32 { 401 return atomic.AddUint32(&m.nextId, 1) 402} 403 404// Run starts the brokering and should be executed in a goroutine, since it 405// blocks forever, or until the session closes. 406// 407// Uses of GRPCBroker never need to call this. It is called internally by 408// the plugin host/client. 409func (m *GRPCBroker) Run() { 410 for { 411 stream, err := m.streamer.Recv() 412 if err != nil { 413 // Once we receive an error, just exit 414 break 415 } 416 417 // Initialize the waiter 418 p := m.getStream(stream.ServiceId) 419 select { 420 case p.ch <- stream: 421 default: 422 } 423 424 go m.timeoutWait(stream.ServiceId, p) 425 } 426} 427 428func (m *GRPCBroker) getStream(id uint32) *gRPCBrokerPending { 429 m.Lock() 430 defer m.Unlock() 431 432 p, ok := m.streams[id] 433 if ok { 434 return p 435 } 436 437 m.streams[id] = &gRPCBrokerPending{ 438 ch: make(chan *plugin.ConnInfo, 1), 439 doneCh: make(chan struct{}), 440 } 441 return m.streams[id] 442} 443 444func (m *GRPCBroker) timeoutWait(id uint32, p *gRPCBrokerPending) { 445 // Wait for the stream to either be picked up and connected, or 446 // for a timeout. 447 select { 448 case <-p.doneCh: 449 case <-time.After(5 * time.Second): 450 } 451 452 m.Lock() 453 defer m.Unlock() 454 455 // Delete the stream so no one else can grab it 456 delete(m.streams, id) 457} 458