1package plugin
2
3import (
4	"errors"
5	"fmt"
6	"io"
7	"log"
8	"net"
9	"net/rpc"
10	"sync"
11
12	"github.com/hashicorp/yamux"
13)
14
15// RPCServer listens for network connections and then dispenses interface
16// implementations over net/rpc.
17//
18// After setting the fields below, they shouldn't be read again directly
19// from the structure which may be reading/writing them concurrently.
20type RPCServer struct {
21	Plugins map[string]Plugin
22
23	// Stdout, Stderr are what this server will use instead of the
24	// normal stdin/out/err. This is because due to the multi-process nature
25	// of our plugin system, we can't use the normal process values so we
26	// make our own custom one we pipe across.
27	Stdout io.Reader
28	Stderr io.Reader
29
30	// DoneCh should be set to a non-nil channel that will be closed
31	// when the control requests the RPC server to end.
32	DoneCh chan<- struct{}
33
34	lock sync.Mutex
35}
36
37// ServerProtocol impl.
38func (s *RPCServer) Init() error { return nil }
39
40// ServerProtocol impl.
41func (s *RPCServer) Config() string { return "" }
42
43// ServerProtocol impl.
44func (s *RPCServer) Serve(lis net.Listener) {
45	for {
46		conn, err := lis.Accept()
47		if err != nil {
48			log.Printf("[ERR] plugin: plugin server: %s", err)
49			return
50		}
51
52		go s.ServeConn(conn)
53	}
54}
55
56// ServeConn runs a single connection.
57//
58// ServeConn blocks, serving the connection until the client hangs up.
59func (s *RPCServer) ServeConn(conn io.ReadWriteCloser) {
60	// First create the yamux server to wrap this connection
61	mux, err := yamux.Server(conn, nil)
62	if err != nil {
63		conn.Close()
64		log.Printf("[ERR] plugin: error creating yamux server: %s", err)
65		return
66	}
67
68	// Accept the control connection
69	control, err := mux.Accept()
70	if err != nil {
71		mux.Close()
72		if err != io.EOF {
73			log.Printf("[ERR] plugin: error accepting control connection: %s", err)
74		}
75
76		return
77	}
78
79	// Connect the stdstreams (in, out, err)
80	stdstream := make([]net.Conn, 2)
81	for i, _ := range stdstream {
82		stdstream[i], err = mux.Accept()
83		if err != nil {
84			mux.Close()
85			log.Printf("[ERR] plugin: accepting stream %d: %s", i, err)
86			return
87		}
88	}
89
90	// Copy std streams out to the proper place
91	go copyStream("stdout", stdstream[0], s.Stdout)
92	go copyStream("stderr", stdstream[1], s.Stderr)
93
94	// Create the broker and start it up
95	broker := newMuxBroker(mux)
96	go broker.Run()
97
98	// Use the control connection to build the dispenser and serve the
99	// connection.
100	server := rpc.NewServer()
101	server.RegisterName("Control", &controlServer{
102		server: s,
103	})
104	server.RegisterName("Dispenser", &dispenseServer{
105		broker:  broker,
106		plugins: s.Plugins,
107	})
108	server.ServeConn(control)
109}
110
111// done is called internally by the control server to trigger the
112// doneCh to close which is listened to by the main process to cleanly
113// exit.
114func (s *RPCServer) done() {
115	s.lock.Lock()
116	defer s.lock.Unlock()
117
118	if s.DoneCh != nil {
119		close(s.DoneCh)
120		s.DoneCh = nil
121	}
122}
123
124// dispenseServer dispenses variousinterface implementations for Terraform.
125type controlServer struct {
126	server *RPCServer
127}
128
129// Ping can be called to verify the connection (and likely the binary)
130// is still alive to a plugin.
131func (c *controlServer) Ping(
132	null bool, response *struct{}) error {
133	*response = struct{}{}
134	return nil
135}
136
137func (c *controlServer) Quit(
138	null bool, response *struct{}) error {
139	// End the server
140	c.server.done()
141
142	// Always return true
143	*response = struct{}{}
144
145	return nil
146}
147
148// dispenseServer dispenses variousinterface implementations for Terraform.
149type dispenseServer struct {
150	broker  *MuxBroker
151	plugins map[string]Plugin
152}
153
154func (d *dispenseServer) Dispense(
155	name string, response *uint32) error {
156	// Find the function to create this implementation
157	p, ok := d.plugins[name]
158	if !ok {
159		return fmt.Errorf("unknown plugin type: %s", name)
160	}
161
162	// Create the implementation first so we know if there is an error.
163	impl, err := p.Server(d.broker)
164	if err != nil {
165		// We turn the error into an errors error so that it works across RPC
166		return errors.New(err.Error())
167	}
168
169	// Reserve an ID for our implementation
170	id := d.broker.NextId()
171	*response = id
172
173	// Run the rest in a goroutine since it can only happen once this RPC
174	// call returns. We wait for a connection for the plugin implementation
175	// and serve it.
176	go func() {
177		conn, err := d.broker.Accept(id)
178		if err != nil {
179			log.Printf("[ERR] go-plugin: plugin dispense error: %s: %s", name, err)
180			return
181		}
182
183		serve(conn, "Plugin", impl)
184	}()
185
186	return nil
187}
188
189func serve(conn io.ReadWriteCloser, name string, v interface{}) {
190	server := rpc.NewServer()
191	if err := server.RegisterName(name, v); err != nil {
192		log.Printf("[ERR] go-plugin: plugin dispense error: %s", err)
193		return
194	}
195
196	server.ServeConn(conn)
197}
198