1package plugin 2 3import ( 4 "errors" 5 "fmt" 6 "io" 7 "log" 8 "net" 9 "net/rpc" 10 "sync" 11 12 "github.com/hashicorp/yamux" 13) 14 15// RPCServer listens for network connections and then dispenses interface 16// implementations over net/rpc. 17// 18// After setting the fields below, they shouldn't be read again directly 19// from the structure which may be reading/writing them concurrently. 20type RPCServer struct { 21 Plugins map[string]Plugin 22 23 // Stdout, Stderr are what this server will use instead of the 24 // normal stdin/out/err. This is because due to the multi-process nature 25 // of our plugin system, we can't use the normal process values so we 26 // make our own custom one we pipe across. 27 Stdout io.Reader 28 Stderr io.Reader 29 30 // DoneCh should be set to a non-nil channel that will be closed 31 // when the control requests the RPC server to end. 32 DoneCh chan<- struct{} 33 34 lock sync.Mutex 35} 36 37// ServerProtocol impl. 38func (s *RPCServer) Init() error { return nil } 39 40// ServerProtocol impl. 41func (s *RPCServer) Config() string { return "" } 42 43// ServerProtocol impl. 44func (s *RPCServer) Serve(lis net.Listener) { 45 for { 46 conn, err := lis.Accept() 47 if err != nil { 48 log.Printf("[ERR] plugin: plugin server: %s", err) 49 return 50 } 51 52 go s.ServeConn(conn) 53 } 54} 55 56// ServeConn runs a single connection. 57// 58// ServeConn blocks, serving the connection until the client hangs up. 59func (s *RPCServer) ServeConn(conn io.ReadWriteCloser) { 60 // First create the yamux server to wrap this connection 61 mux, err := yamux.Server(conn, nil) 62 if err != nil { 63 conn.Close() 64 log.Printf("[ERR] plugin: error creating yamux server: %s", err) 65 return 66 } 67 68 // Accept the control connection 69 control, err := mux.Accept() 70 if err != nil { 71 mux.Close() 72 if err != io.EOF { 73 log.Printf("[ERR] plugin: error accepting control connection: %s", err) 74 } 75 76 return 77 } 78 79 // Connect the stdstreams (in, out, err) 80 stdstream := make([]net.Conn, 2) 81 for i, _ := range stdstream { 82 stdstream[i], err = mux.Accept() 83 if err != nil { 84 mux.Close() 85 log.Printf("[ERR] plugin: accepting stream %d: %s", i, err) 86 return 87 } 88 } 89 90 // Copy std streams out to the proper place 91 go copyStream("stdout", stdstream[0], s.Stdout) 92 go copyStream("stderr", stdstream[1], s.Stderr) 93 94 // Create the broker and start it up 95 broker := newMuxBroker(mux) 96 go broker.Run() 97 98 // Use the control connection to build the dispenser and serve the 99 // connection. 100 server := rpc.NewServer() 101 server.RegisterName("Control", &controlServer{ 102 server: s, 103 }) 104 server.RegisterName("Dispenser", &dispenseServer{ 105 broker: broker, 106 plugins: s.Plugins, 107 }) 108 server.ServeConn(control) 109} 110 111// done is called internally by the control server to trigger the 112// doneCh to close which is listened to by the main process to cleanly 113// exit. 114func (s *RPCServer) done() { 115 s.lock.Lock() 116 defer s.lock.Unlock() 117 118 if s.DoneCh != nil { 119 close(s.DoneCh) 120 s.DoneCh = nil 121 } 122} 123 124// dispenseServer dispenses variousinterface implementations for Terraform. 125type controlServer struct { 126 server *RPCServer 127} 128 129// Ping can be called to verify the connection (and likely the binary) 130// is still alive to a plugin. 131func (c *controlServer) Ping( 132 null bool, response *struct{}) error { 133 *response = struct{}{} 134 return nil 135} 136 137func (c *controlServer) Quit( 138 null bool, response *struct{}) error { 139 // End the server 140 c.server.done() 141 142 // Always return true 143 *response = struct{}{} 144 145 return nil 146} 147 148// dispenseServer dispenses variousinterface implementations for Terraform. 149type dispenseServer struct { 150 broker *MuxBroker 151 plugins map[string]Plugin 152} 153 154func (d *dispenseServer) Dispense( 155 name string, response *uint32) error { 156 // Find the function to create this implementation 157 p, ok := d.plugins[name] 158 if !ok { 159 return fmt.Errorf("unknown plugin type: %s", name) 160 } 161 162 // Create the implementation first so we know if there is an error. 163 impl, err := p.Server(d.broker) 164 if err != nil { 165 // We turn the error into an errors error so that it works across RPC 166 return errors.New(err.Error()) 167 } 168 169 // Reserve an ID for our implementation 170 id := d.broker.NextId() 171 *response = id 172 173 // Run the rest in a goroutine since it can only happen once this RPC 174 // call returns. We wait for a connection for the plugin implementation 175 // and serve it. 176 go func() { 177 conn, err := d.broker.Accept(id) 178 if err != nil { 179 log.Printf("[ERR] go-plugin: plugin dispense error: %s: %s", name, err) 180 return 181 } 182 183 serve(conn, "Plugin", impl) 184 }() 185 186 return nil 187} 188 189func serve(conn io.ReadWriteCloser, name string, v interface{}) { 190 server := rpc.NewServer() 191 if err := server.RegisterName(name, v); err != nil { 192 log.Printf("[ERR] go-plugin: plugin dispense error: %s", err) 193 return 194 } 195 196 server.ServeConn(conn) 197} 198