1package plugin
2
3import (
4	"bufio"
5	"context"
6	"crypto/subtle"
7	"crypto/tls"
8	"crypto/x509"
9	"encoding/base64"
10	"errors"
11	"fmt"
12	"hash"
13	"io"
14	"io/ioutil"
15	"net"
16	"os"
17	"os/exec"
18	"path/filepath"
19	"strconv"
20	"strings"
21	"sync"
22	"sync/atomic"
23	"time"
24
25	hclog "github.com/hashicorp/go-hclog"
26)
27
28// If this is 1, then we've called CleanupClients. This can be used
29// by plugin RPC implementations to change error behavior since you
30// can expected network connection errors at this point. This should be
31// read by using sync/atomic.
32var Killed uint32 = 0
33
34// This is a slice of the "managed" clients which are cleaned up when
35// calling Cleanup
36var managedClients = make([]*Client, 0, 5)
37var managedClientsLock sync.Mutex
38
39// Error types
40var (
41	// ErrProcessNotFound is returned when a client is instantiated to
42	// reattach to an existing process and it isn't found.
43	ErrProcessNotFound = errors.New("Reattachment process not found")
44
45	// ErrChecksumsDoNotMatch is returned when binary's checksum doesn't match
46	// the one provided in the SecureConfig.
47	ErrChecksumsDoNotMatch = errors.New("checksums did not match")
48
49	// ErrSecureNoChecksum is returned when an empty checksum is provided to the
50	// SecureConfig.
51	ErrSecureConfigNoChecksum = errors.New("no checksum provided")
52
53	// ErrSecureNoHash is returned when a nil Hash object is provided to the
54	// SecureConfig.
55	ErrSecureConfigNoHash = errors.New("no hash implementation provided")
56
57	// ErrSecureConfigAndReattach is returned when both Reattach and
58	// SecureConfig are set.
59	ErrSecureConfigAndReattach = errors.New("only one of Reattach or SecureConfig can be set")
60)
61
62// Client handles the lifecycle of a plugin application. It launches
63// plugins, connects to them, dispenses interface implementations, and handles
64// killing the process.
65//
66// Plugin hosts should use one Client for each plugin executable. To
67// dispense a plugin type, use the `Client.Client` function, and then
68// cal `Dispense`. This awkward API is mostly historical but is used to split
69// the client that deals with subprocess management and the client that
70// does RPC management.
71//
72// See NewClient and ClientConfig for using a Client.
73type Client struct {
74	config            *ClientConfig
75	exited            bool
76	l                 sync.Mutex
77	address           net.Addr
78	process           *os.Process
79	client            ClientProtocol
80	protocol          Protocol
81	logger            hclog.Logger
82	doneCtx           context.Context
83	ctxCancel         context.CancelFunc
84	negotiatedVersion int
85
86	// clientWaitGroup is used to manage the lifecycle of the plugin management
87	// goroutines.
88	clientWaitGroup sync.WaitGroup
89
90	// processKilled is used for testing only, to flag when the process was
91	// forcefully killed.
92	processKilled bool
93}
94
95// NegotiatedVersion returns the protocol version negotiated with the server.
96// This is only valid after Start() is called.
97func (c *Client) NegotiatedVersion() int {
98	return c.negotiatedVersion
99}
100
101// ClientConfig is the configuration used to initialize a new
102// plugin client. After being used to initialize a plugin client,
103// that configuration must not be modified again.
104type ClientConfig struct {
105	// HandshakeConfig is the configuration that must match servers.
106	HandshakeConfig
107
108	// Plugins are the plugins that can be consumed.
109	// The implied version of this PluginSet is the Handshake.ProtocolVersion.
110	Plugins PluginSet
111
112	// VersionedPlugins is a map of PluginSets for specific protocol versions.
113	// These can be used to negotiate a compatible version between client and
114	// server. If this is set, Handshake.ProtocolVersion is not required.
115	VersionedPlugins map[int]PluginSet
116
117	// One of the following must be set, but not both.
118	//
119	// Cmd is the unstarted subprocess for starting the plugin. If this is
120	// set, then the Client starts the plugin process on its own and connects
121	// to it.
122	//
123	// Reattach is configuration for reattaching to an existing plugin process
124	// that is already running. This isn't common.
125	Cmd      *exec.Cmd
126	Reattach *ReattachConfig
127
128	// SecureConfig is configuration for verifying the integrity of the
129	// executable. It can not be used with Reattach.
130	SecureConfig *SecureConfig
131
132	// TLSConfig is used to enable TLS on the RPC client.
133	TLSConfig *tls.Config
134
135	// Managed represents if the client should be managed by the
136	// plugin package or not. If true, then by calling CleanupClients,
137	// it will automatically be cleaned up. Otherwise, the client
138	// user is fully responsible for making sure to Kill all plugin
139	// clients. By default the client is _not_ managed.
140	Managed bool
141
142	// The minimum and maximum port to use for communicating with
143	// the subprocess. If not set, this defaults to 10,000 and 25,000
144	// respectively.
145	MinPort, MaxPort uint
146
147	// StartTimeout is the timeout to wait for the plugin to say it
148	// has started successfully.
149	StartTimeout time.Duration
150
151	// If non-nil, then the stderr of the client will be written to here
152	// (as well as the log). This is the original os.Stderr of the subprocess.
153	// This isn't the output of synced stderr.
154	Stderr io.Writer
155
156	// SyncStdout, SyncStderr can be set to override the
157	// respective os.Std* values in the plugin. Care should be taken to
158	// avoid races here. If these are nil, then this will automatically be
159	// hooked up to os.Stdin, Stdout, and Stderr, respectively.
160	//
161	// If the default values (nil) are used, then this package will not
162	// sync any of these streams.
163	SyncStdout io.Writer
164	SyncStderr io.Writer
165
166	// AllowedProtocols is a list of allowed protocols. If this isn't set,
167	// then only netrpc is allowed. This is so that older go-plugin systems
168	// can show friendly errors if they see a plugin with an unknown
169	// protocol.
170	//
171	// By setting this, you can cause an error immediately on plugin start
172	// if an unsupported protocol is used with a good error message.
173	//
174	// If this isn't set at all (nil value), then only net/rpc is accepted.
175	// This is done for legacy reasons. You must explicitly opt-in to
176	// new protocols.
177	AllowedProtocols []Protocol
178
179	// Logger is the logger that the client will used. If none is provided,
180	// it will default to hclog's default logger.
181	Logger hclog.Logger
182
183	// AutoMTLS has the client and server automatically negotiate mTLS for
184	// transport authentication. This ensures that only the original client will
185	// be allowed to connect to the server, and all other connections will be
186	// rejected. The client will also refuse to connect to any server that isn't
187	// the original instance started by the client.
188	//
189	// In this mode of operation, the client generates a one-time use tls
190	// certificate, sends the public x.509 certificate to the new server, and
191	// the server generates a one-time use tls certificate, and sends the public
192	// x.509 certificate back to the client. These are used to authenticate all
193	// rpc connections between the client and server.
194	//
195	// Setting AutoMTLS to true implies that the server must support the
196	// protocol, and correctly negotiate the tls certificates, or a connection
197	// failure will result.
198	//
199	// The client should not set TLSConfig, nor should the server set a
200	// TLSProvider, because AutoMTLS implies that a new certificate and tls
201	// configuration will be generated at startup.
202	//
203	// You cannot Reattach to a server with this option enabled.
204	AutoMTLS bool
205}
206
207// ReattachConfig is used to configure a client to reattach to an
208// already-running plugin process. You can retrieve this information by
209// calling ReattachConfig on Client.
210type ReattachConfig struct {
211	Protocol Protocol
212	Addr     net.Addr
213	Pid      int
214}
215
216// SecureConfig is used to configure a client to verify the integrity of an
217// executable before running. It does this by verifying the checksum is
218// expected. Hash is used to specify the hashing method to use when checksumming
219// the file.  The configuration is verified by the client by calling the
220// SecureConfig.Check() function.
221//
222// The host process should ensure the checksum was provided by a trusted and
223// authoritative source. The binary should be installed in such a way that it
224// can not be modified by an unauthorized user between the time of this check
225// and the time of execution.
226type SecureConfig struct {
227	Checksum []byte
228	Hash     hash.Hash
229}
230
231// Check takes the filepath to an executable and returns true if the checksum of
232// the file matches the checksum provided in the SecureConfig.
233func (s *SecureConfig) Check(filePath string) (bool, error) {
234	if len(s.Checksum) == 0 {
235		return false, ErrSecureConfigNoChecksum
236	}
237
238	if s.Hash == nil {
239		return false, ErrSecureConfigNoHash
240	}
241
242	file, err := os.Open(filePath)
243	if err != nil {
244		return false, err
245	}
246	defer file.Close()
247
248	_, err = io.Copy(s.Hash, file)
249	if err != nil {
250		return false, err
251	}
252
253	sum := s.Hash.Sum(nil)
254
255	return subtle.ConstantTimeCompare(sum, s.Checksum) == 1, nil
256}
257
258// This makes sure all the managed subprocesses are killed and properly
259// logged. This should be called before the parent process running the
260// plugins exits.
261//
262// This must only be called _once_.
263func CleanupClients() {
264	// Set the killed to true so that we don't get unexpected panics
265	atomic.StoreUint32(&Killed, 1)
266
267	// Kill all the managed clients in parallel and use a WaitGroup
268	// to wait for them all to finish up.
269	var wg sync.WaitGroup
270	managedClientsLock.Lock()
271	for _, client := range managedClients {
272		wg.Add(1)
273
274		go func(client *Client) {
275			client.Kill()
276			wg.Done()
277		}(client)
278	}
279	managedClientsLock.Unlock()
280
281	wg.Wait()
282}
283
284// Creates a new plugin client which manages the lifecycle of an external
285// plugin and gets the address for the RPC connection.
286//
287// The client must be cleaned up at some point by calling Kill(). If
288// the client is a managed client (created with NewManagedClient) you
289// can just call CleanupClients at the end of your program and they will
290// be properly cleaned.
291func NewClient(config *ClientConfig) (c *Client) {
292	if config.MinPort == 0 && config.MaxPort == 0 {
293		config.MinPort = 10000
294		config.MaxPort = 25000
295	}
296
297	if config.StartTimeout == 0 {
298		config.StartTimeout = 1 * time.Minute
299	}
300
301	if config.Stderr == nil {
302		config.Stderr = ioutil.Discard
303	}
304
305	if config.SyncStdout == nil {
306		config.SyncStdout = ioutil.Discard
307	}
308	if config.SyncStderr == nil {
309		config.SyncStderr = ioutil.Discard
310	}
311
312	if config.AllowedProtocols == nil {
313		config.AllowedProtocols = []Protocol{ProtocolNetRPC}
314	}
315
316	if config.Logger == nil {
317		config.Logger = hclog.New(&hclog.LoggerOptions{
318			Output: hclog.DefaultOutput,
319			Level:  hclog.Trace,
320			Name:   "plugin",
321		})
322	}
323
324	c = &Client{
325		config: config,
326		logger: config.Logger,
327	}
328	if config.Managed {
329		managedClientsLock.Lock()
330		managedClients = append(managedClients, c)
331		managedClientsLock.Unlock()
332	}
333
334	return
335}
336
337// Client returns the protocol client for this connection.
338//
339// Subsequent calls to this will return the same client.
340func (c *Client) Client() (ClientProtocol, error) {
341	_, err := c.Start()
342	if err != nil {
343		return nil, err
344	}
345
346	c.l.Lock()
347	defer c.l.Unlock()
348
349	if c.client != nil {
350		return c.client, nil
351	}
352
353	switch c.protocol {
354	case ProtocolNetRPC:
355		c.client, err = newRPCClient(c)
356
357	case ProtocolGRPC:
358		c.client, err = newGRPCClient(c.doneCtx, c)
359
360	default:
361		return nil, fmt.Errorf("unknown server protocol: %s", c.protocol)
362	}
363
364	if err != nil {
365		c.client = nil
366		return nil, err
367	}
368
369	return c.client, nil
370}
371
372// Tells whether or not the underlying process has exited.
373func (c *Client) Exited() bool {
374	c.l.Lock()
375	defer c.l.Unlock()
376	return c.exited
377}
378
379// killed is used in tests to check if a process failed to exit gracefully, and
380// needed to be killed.
381func (c *Client) killed() bool {
382	c.l.Lock()
383	defer c.l.Unlock()
384	return c.processKilled
385}
386
387// End the executing subprocess (if it is running) and perform any cleanup
388// tasks necessary such as capturing any remaining logs and so on.
389//
390// This method blocks until the process successfully exits.
391//
392// This method can safely be called multiple times.
393func (c *Client) Kill() {
394	// Grab a lock to read some private fields.
395	c.l.Lock()
396	process := c.process
397	addr := c.address
398	c.l.Unlock()
399
400	// If there is no process, there is nothing to kill.
401	if process == nil {
402		return
403	}
404
405	defer func() {
406		// Wait for the all client goroutines to finish.
407		c.clientWaitGroup.Wait()
408
409		// Make sure there is no reference to the old process after it has been
410		// killed.
411		c.l.Lock()
412		c.process = nil
413		c.l.Unlock()
414	}()
415
416	// We need to check for address here. It is possible that the plugin
417	// started (process != nil) but has no address (addr == nil) if the
418	// plugin failed at startup. If we do have an address, we need to close
419	// the plugin net connections.
420	graceful := false
421	if addr != nil {
422		// Close the client to cleanly exit the process.
423		client, err := c.Client()
424		if err == nil {
425			err = client.Close()
426
427			// If there is no error, then we attempt to wait for a graceful
428			// exit. If there was an error, we assume that graceful cleanup
429			// won't happen and just force kill.
430			graceful = err == nil
431			if err != nil {
432				// If there was an error just log it. We're going to force
433				// kill in a moment anyways.
434				c.logger.Warn("error closing client during Kill", "err", err)
435			}
436		} else {
437			c.logger.Error("client", "error", err)
438		}
439	}
440
441	// If we're attempting a graceful exit, then we wait for a short period
442	// of time to allow that to happen. To wait for this we just wait on the
443	// doneCh which would be closed if the process exits.
444	if graceful {
445		select {
446		case <-c.doneCtx.Done():
447			c.logger.Debug("plugin exited")
448			return
449		case <-time.After(2 * time.Second):
450		}
451	}
452
453	// If graceful exiting failed, just kill it
454	c.logger.Warn("plugin failed to exit gracefully")
455	process.Kill()
456
457	c.l.Lock()
458	c.processKilled = true
459	c.l.Unlock()
460}
461
462// Starts the underlying subprocess, communicating with it to negotiate
463// a port for RPC connections, and returning the address to connect via RPC.
464//
465// This method is safe to call multiple times. Subsequent calls have no effect.
466// Once a client has been started once, it cannot be started again, even if
467// it was killed.
468func (c *Client) Start() (addr net.Addr, err error) {
469	c.l.Lock()
470	defer c.l.Unlock()
471
472	if c.address != nil {
473		return c.address, nil
474	}
475
476	// If one of cmd or reattach isn't set, then it is an error. We wrap
477	// this in a {} for scoping reasons, and hopeful that the escape
478	// analysis will pop the stack here.
479	{
480		cmdSet := c.config.Cmd != nil
481		attachSet := c.config.Reattach != nil
482		secureSet := c.config.SecureConfig != nil
483		if cmdSet == attachSet {
484			return nil, fmt.Errorf("Only one of Cmd or Reattach must be set")
485		}
486
487		if secureSet && attachSet {
488			return nil, ErrSecureConfigAndReattach
489		}
490	}
491
492	if c.config.Reattach != nil {
493		return c.reattach()
494	}
495
496	if c.config.VersionedPlugins == nil {
497		c.config.VersionedPlugins = make(map[int]PluginSet)
498	}
499
500	// handle all plugins as versioned, using the handshake config as the default.
501	version := int(c.config.ProtocolVersion)
502
503	// Make sure we're not overwriting a real version 0. If ProtocolVersion was
504	// non-zero, then we have to just assume the user made sure that
505	// VersionedPlugins doesn't conflict.
506	if _, ok := c.config.VersionedPlugins[version]; !ok && c.config.Plugins != nil {
507		c.config.VersionedPlugins[version] = c.config.Plugins
508	}
509
510	var versionStrings []string
511	for v := range c.config.VersionedPlugins {
512		versionStrings = append(versionStrings, strconv.Itoa(v))
513	}
514
515	env := []string{
516		fmt.Sprintf("%s=%s", c.config.MagicCookieKey, c.config.MagicCookieValue),
517		fmt.Sprintf("PLUGIN_MIN_PORT=%d", c.config.MinPort),
518		fmt.Sprintf("PLUGIN_MAX_PORT=%d", c.config.MaxPort),
519		fmt.Sprintf("PLUGIN_PROTOCOL_VERSIONS=%s", strings.Join(versionStrings, ",")),
520	}
521
522	cmd := c.config.Cmd
523	cmd.Env = append(cmd.Env, os.Environ()...)
524	cmd.Env = append(cmd.Env, env...)
525	cmd.Stdin = os.Stdin
526
527	cmdStdout, err := cmd.StdoutPipe()
528	if err != nil {
529		return nil, err
530	}
531	cmdStderr, err := cmd.StderrPipe()
532	if err != nil {
533		return nil, err
534	}
535
536	if c.config.SecureConfig != nil {
537		if ok, err := c.config.SecureConfig.Check(cmd.Path); err != nil {
538			return nil, fmt.Errorf("error verifying checksum: %s", err)
539		} else if !ok {
540			return nil, ErrChecksumsDoNotMatch
541		}
542	}
543
544	// Setup a temporary certificate for client/server mtls, and send the public
545	// certificate to the plugin.
546	if c.config.AutoMTLS {
547		c.logger.Info("configuring client automatic mTLS")
548		certPEM, keyPEM, err := generateCert()
549		if err != nil {
550			c.logger.Error("failed to generate client certificate", "error", err)
551			return nil, err
552		}
553		cert, err := tls.X509KeyPair(certPEM, keyPEM)
554		if err != nil {
555			c.logger.Error("failed to parse client certificate", "error", err)
556			return nil, err
557		}
558
559		cmd.Env = append(cmd.Env, fmt.Sprintf("PLUGIN_CLIENT_CERT=%s", certPEM))
560
561		c.config.TLSConfig = &tls.Config{
562			Certificates: []tls.Certificate{cert},
563			ServerName:   "localhost",
564		}
565	}
566
567	c.logger.Debug("starting plugin", "path", cmd.Path, "args", cmd.Args)
568	err = cmd.Start()
569	if err != nil {
570		return
571	}
572
573	// Set the process
574	c.process = cmd.Process
575	c.logger.Debug("plugin started", "path", cmd.Path, "pid", c.process.Pid)
576
577	// Make sure the command is properly cleaned up if there is an error
578	defer func() {
579		r := recover()
580
581		if err != nil || r != nil {
582			cmd.Process.Kill()
583		}
584
585		if r != nil {
586			panic(r)
587		}
588	}()
589
590	// Create a context for when we kill
591	c.doneCtx, c.ctxCancel = context.WithCancel(context.Background())
592
593	c.clientWaitGroup.Add(1)
594	go func() {
595		// ensure the context is cancelled when we're done
596		defer c.ctxCancel()
597
598		defer c.clientWaitGroup.Done()
599
600		// get the cmd info early, since the process information will be removed
601		// in Kill.
602		pid := c.process.Pid
603		path := cmd.Path
604
605		// Wait for the command to end.
606		err := cmd.Wait()
607
608		debugMsgArgs := []interface{}{
609			"path", path,
610			"pid", pid,
611		}
612		if err != nil {
613			debugMsgArgs = append(debugMsgArgs,
614				[]interface{}{"error", err.Error()}...)
615		}
616
617		// Log and make sure to flush the logs write away
618		c.logger.Debug("plugin process exited", debugMsgArgs...)
619		os.Stderr.Sync()
620
621		// Set that we exited, which takes a lock
622		c.l.Lock()
623		defer c.l.Unlock()
624		c.exited = true
625	}()
626
627	// Start goroutine that logs the stderr
628	c.clientWaitGroup.Add(1)
629	// logStderr calls Done()
630	go c.logStderr(cmdStderr)
631
632	// Start a goroutine that is going to be reading the lines
633	// out of stdout
634	linesCh := make(chan string)
635	c.clientWaitGroup.Add(1)
636	go func() {
637		defer c.clientWaitGroup.Done()
638		defer close(linesCh)
639
640		scanner := bufio.NewScanner(cmdStdout)
641		for scanner.Scan() {
642			linesCh <- scanner.Text()
643		}
644	}()
645
646	// Make sure after we exit we read the lines from stdout forever
647	// so they don't block since it is a pipe.
648	// The scanner goroutine above will close this, but track it with a wait
649	// group for completeness.
650	c.clientWaitGroup.Add(1)
651	defer func() {
652		go func() {
653			defer c.clientWaitGroup.Done()
654			for range linesCh {
655			}
656		}()
657	}()
658
659	// Some channels for the next step
660	timeout := time.After(c.config.StartTimeout)
661
662	// Start looking for the address
663	c.logger.Debug("waiting for RPC address", "path", cmd.Path)
664	select {
665	case <-timeout:
666		err = errors.New("timeout while waiting for plugin to start")
667	case <-c.doneCtx.Done():
668		err = errors.New("plugin exited before we could connect")
669	case line := <-linesCh:
670		// Trim the line and split by "|" in order to get the parts of
671		// the output.
672		line = strings.TrimSpace(line)
673		parts := strings.SplitN(line, "|", 6)
674		if len(parts) < 4 {
675			err = fmt.Errorf(
676				"Unrecognized remote plugin message: %s\n\n"+
677					"This usually means that the plugin is either invalid or simply\n"+
678					"needs to be recompiled to support the latest protocol.", line)
679			return
680		}
681
682		// Check the core protocol. Wrapped in a {} for scoping.
683		{
684			var coreProtocol int64
685			coreProtocol, err = strconv.ParseInt(parts[0], 10, 0)
686			if err != nil {
687				err = fmt.Errorf("Error parsing core protocol version: %s", err)
688				return
689			}
690
691			if int(coreProtocol) != CoreProtocolVersion {
692				err = fmt.Errorf("Incompatible core API version with plugin. "+
693					"Plugin version: %s, Core version: %d\n\n"+
694					"To fix this, the plugin usually only needs to be recompiled.\n"+
695					"Please report this to the plugin author.", parts[0], CoreProtocolVersion)
696				return
697			}
698		}
699
700		// Test the API version
701		version, pluginSet, err := c.checkProtoVersion(parts[1])
702		if err != nil {
703			return addr, err
704		}
705
706		// set the Plugins value to the compatible set, so the version
707		// doesn't need to be passed through to the ClientProtocol
708		// implementation.
709		c.config.Plugins = pluginSet
710		c.negotiatedVersion = version
711		c.logger.Debug("using plugin", "version", version)
712
713		switch parts[2] {
714		case "tcp":
715			addr, err = net.ResolveTCPAddr("tcp", parts[3])
716		case "unix":
717			addr, err = net.ResolveUnixAddr("unix", parts[3])
718		default:
719			err = fmt.Errorf("Unknown address type: %s", parts[3])
720		}
721
722		// If we have a server type, then record that. We default to net/rpc
723		// for backwards compatibility.
724		c.protocol = ProtocolNetRPC
725		if len(parts) >= 5 {
726			c.protocol = Protocol(parts[4])
727		}
728
729		found := false
730		for _, p := range c.config.AllowedProtocols {
731			if p == c.protocol {
732				found = true
733				break
734			}
735		}
736		if !found {
737			err = fmt.Errorf("Unsupported plugin protocol %q. Supported: %v",
738				c.protocol, c.config.AllowedProtocols)
739			return addr, err
740		}
741
742		// See if we have a TLS certificate from the server.
743		// Checking if the length is > 50 rules out catching the unused "extra"
744		// data returned from some older implementations.
745		if len(parts) >= 6 && len(parts[5]) > 50 {
746			err := c.loadServerCert(parts[5])
747			if err != nil {
748				return nil, fmt.Errorf("error parsing server cert: %s", err)
749			}
750		}
751	}
752
753	c.address = addr
754	return
755}
756
757// loadServerCert is used by AutoMTLS to read an x.509 cert returned by the
758// server, and load it as the RootCA for the client TLSConfig.
759func (c *Client) loadServerCert(cert string) error {
760	certPool := x509.NewCertPool()
761
762	asn1, err := base64.RawStdEncoding.DecodeString(cert)
763	if err != nil {
764		return err
765	}
766
767	x509Cert, err := x509.ParseCertificate([]byte(asn1))
768	if err != nil {
769		return err
770	}
771
772	certPool.AddCert(x509Cert)
773
774	c.config.TLSConfig.RootCAs = certPool
775	return nil
776}
777
778func (c *Client) reattach() (net.Addr, error) {
779	// Verify the process still exists. If not, then it is an error
780	p, err := os.FindProcess(c.config.Reattach.Pid)
781	if err != nil {
782		return nil, err
783	}
784
785	// Attempt to connect to the addr since on Unix systems FindProcess
786	// doesn't actually return an error if it can't find the process.
787	conn, err := net.Dial(
788		c.config.Reattach.Addr.Network(),
789		c.config.Reattach.Addr.String())
790	if err != nil {
791		p.Kill()
792		return nil, ErrProcessNotFound
793	}
794	conn.Close()
795
796	// Create a context for when we kill
797	c.doneCtx, c.ctxCancel = context.WithCancel(context.Background())
798
799	c.clientWaitGroup.Add(1)
800	// Goroutine to mark exit status
801	go func(pid int) {
802		defer c.clientWaitGroup.Done()
803
804		// ensure the context is cancelled when we're done
805		defer c.ctxCancel()
806
807		// Wait for the process to die
808		pidWait(pid)
809
810		// Log so we can see it
811		c.logger.Debug("reattached plugin process exited")
812
813		// Mark it
814		c.l.Lock()
815		defer c.l.Unlock()
816		c.exited = true
817	}(p.Pid)
818
819	// Set the address and process
820	c.address = c.config.Reattach.Addr
821	c.process = p
822	c.protocol = c.config.Reattach.Protocol
823	if c.protocol == "" {
824		// Default the protocol to net/rpc for backwards compatibility
825		c.protocol = ProtocolNetRPC
826	}
827
828	return c.address, nil
829}
830
831// checkProtoVersion returns the negotiated version and PluginSet.
832// This returns an error if the server returned an incompatible protocol
833// version, or an invalid handshake response.
834func (c *Client) checkProtoVersion(protoVersion string) (int, PluginSet, error) {
835	serverVersion, err := strconv.Atoi(protoVersion)
836	if err != nil {
837		return 0, nil, fmt.Errorf("Error parsing protocol version %q: %s", protoVersion, err)
838	}
839
840	// record these for the error message
841	var clientVersions []int
842
843	// all versions, including the legacy ProtocolVersion have been added to
844	// the versions set
845	for version, plugins := range c.config.VersionedPlugins {
846		clientVersions = append(clientVersions, version)
847
848		if serverVersion != version {
849			continue
850		}
851		return version, plugins, nil
852	}
853
854	return 0, nil, fmt.Errorf("Incompatible API version with plugin. "+
855		"Plugin version: %d, Client versions: %d", serverVersion, clientVersions)
856}
857
858// ReattachConfig returns the information that must be provided to NewClient
859// to reattach to the plugin process that this client started. This is
860// useful for plugins that detach from their parent process.
861//
862// If this returns nil then the process hasn't been started yet. Please
863// call Start or Client before calling this.
864func (c *Client) ReattachConfig() *ReattachConfig {
865	c.l.Lock()
866	defer c.l.Unlock()
867
868	if c.address == nil {
869		return nil
870	}
871
872	if c.config.Cmd != nil && c.config.Cmd.Process == nil {
873		return nil
874	}
875
876	// If we connected via reattach, just return the information as-is
877	if c.config.Reattach != nil {
878		return c.config.Reattach
879	}
880
881	return &ReattachConfig{
882		Protocol: c.protocol,
883		Addr:     c.address,
884		Pid:      c.config.Cmd.Process.Pid,
885	}
886}
887
888// Protocol returns the protocol of server on the remote end. This will
889// start the plugin process if it isn't already started. Errors from
890// starting the plugin are surpressed and ProtocolInvalid is returned. It
891// is recommended you call Start explicitly before calling Protocol to ensure
892// no errors occur.
893func (c *Client) Protocol() Protocol {
894	_, err := c.Start()
895	if err != nil {
896		return ProtocolInvalid
897	}
898
899	return c.protocol
900}
901
902func netAddrDialer(addr net.Addr) func(string, time.Duration) (net.Conn, error) {
903	return func(_ string, _ time.Duration) (net.Conn, error) {
904		// Connect to the client
905		conn, err := net.Dial(addr.Network(), addr.String())
906		if err != nil {
907			return nil, err
908		}
909		if tcpConn, ok := conn.(*net.TCPConn); ok {
910			// Make sure to set keep alive so that the connection doesn't die
911			tcpConn.SetKeepAlive(true)
912		}
913
914		return conn, nil
915	}
916}
917
918// dialer is compatible with grpc.WithDialer and creates the connection
919// to the plugin.
920func (c *Client) dialer(_ string, timeout time.Duration) (net.Conn, error) {
921	conn, err := netAddrDialer(c.address)("", timeout)
922	if err != nil {
923		return nil, err
924	}
925
926	// If we have a TLS config we wrap our connection. We only do this
927	// for net/rpc since gRPC uses its own mechanism for TLS.
928	if c.protocol == ProtocolNetRPC && c.config.TLSConfig != nil {
929		conn = tls.Client(conn, c.config.TLSConfig)
930	}
931
932	return conn, nil
933}
934
935var stdErrBufferSize = 64 * 1024
936
937func (c *Client) logStderr(r io.Reader) {
938	defer c.clientWaitGroup.Done()
939	l := c.logger.Named(filepath.Base(c.config.Cmd.Path))
940
941	reader := bufio.NewReaderSize(r, stdErrBufferSize)
942	// continuation indicates the previous line was a prefix
943	continuation := false
944
945	for {
946		line, isPrefix, err := reader.ReadLine()
947		switch {
948		case err == io.EOF:
949			return
950		case err != nil:
951			l.Error("reading plugin stderr", "error", err)
952			return
953		}
954
955		c.config.Stderr.Write(line)
956
957		// The line was longer than our max token size, so it's likely
958		// incomplete and won't unmarshal.
959		if isPrefix || continuation {
960			l.Debug(string(line))
961
962			// if we're finishing a continued line, add the newline back in
963			if !isPrefix {
964				c.config.Stderr.Write([]byte{'\n'})
965			}
966
967			continuation = isPrefix
968			continue
969		}
970
971		c.config.Stderr.Write([]byte{'\n'})
972
973		entry, err := parseJSON(line)
974		// If output is not JSON format, print directly to Debug
975		if err != nil {
976			// Attempt to infer the desired log level from the commonly used
977			// string prefixes
978			switch line := string(line); {
979			case strings.HasPrefix(line, "[TRACE]"):
980				l.Trace(line)
981			case strings.HasPrefix(line, "[DEBUG]"):
982				l.Debug(line)
983			case strings.HasPrefix(line, "[INFO]"):
984				l.Info(line)
985			case strings.HasPrefix(line, "[WARN]"):
986				l.Warn(line)
987			case strings.HasPrefix(line, "[ERROR]"):
988				l.Error(line)
989			default:
990				l.Debug(line)
991			}
992		} else {
993			out := flattenKVPairs(entry.KVPairs)
994
995			out = append(out, "timestamp", entry.Timestamp.Format(hclog.TimeFormat))
996			switch hclog.LevelFromString(entry.Level) {
997			case hclog.Trace:
998				l.Trace(entry.Message, out...)
999			case hclog.Debug:
1000				l.Debug(entry.Message, out...)
1001			case hclog.Info:
1002				l.Info(entry.Message, out...)
1003			case hclog.Warn:
1004				l.Warn(entry.Message, out...)
1005			case hclog.Error:
1006				l.Error(entry.Message, out...)
1007			default:
1008				// if there was no log level, it's likely this is unexpected
1009				// json from something other than hclog, and we should output
1010				// it verbatim.
1011				l.Debug(string(line))
1012			}
1013		}
1014	}
1015}
1016