1package plugin 2 3import ( 4 "bufio" 5 "context" 6 "crypto/subtle" 7 "crypto/tls" 8 "crypto/x509" 9 "encoding/base64" 10 "errors" 11 "fmt" 12 "hash" 13 "io" 14 "io/ioutil" 15 "net" 16 "os" 17 "os/exec" 18 "path/filepath" 19 "strconv" 20 "strings" 21 "sync" 22 "sync/atomic" 23 "time" 24 25 hclog "github.com/hashicorp/go-hclog" 26) 27 28// If this is 1, then we've called CleanupClients. This can be used 29// by plugin RPC implementations to change error behavior since you 30// can expected network connection errors at this point. This should be 31// read by using sync/atomic. 32var Killed uint32 = 0 33 34// This is a slice of the "managed" clients which are cleaned up when 35// calling Cleanup 36var managedClients = make([]*Client, 0, 5) 37var managedClientsLock sync.Mutex 38 39// Error types 40var ( 41 // ErrProcessNotFound is returned when a client is instantiated to 42 // reattach to an existing process and it isn't found. 43 ErrProcessNotFound = errors.New("Reattachment process not found") 44 45 // ErrChecksumsDoNotMatch is returned when binary's checksum doesn't match 46 // the one provided in the SecureConfig. 47 ErrChecksumsDoNotMatch = errors.New("checksums did not match") 48 49 // ErrSecureNoChecksum is returned when an empty checksum is provided to the 50 // SecureConfig. 51 ErrSecureConfigNoChecksum = errors.New("no checksum provided") 52 53 // ErrSecureNoHash is returned when a nil Hash object is provided to the 54 // SecureConfig. 55 ErrSecureConfigNoHash = errors.New("no hash implementation provided") 56 57 // ErrSecureConfigAndReattach is returned when both Reattach and 58 // SecureConfig are set. 59 ErrSecureConfigAndReattach = errors.New("only one of Reattach or SecureConfig can be set") 60) 61 62// Client handles the lifecycle of a plugin application. It launches 63// plugins, connects to them, dispenses interface implementations, and handles 64// killing the process. 65// 66// Plugin hosts should use one Client for each plugin executable. To 67// dispense a plugin type, use the `Client.Client` function, and then 68// cal `Dispense`. This awkward API is mostly historical but is used to split 69// the client that deals with subprocess management and the client that 70// does RPC management. 71// 72// See NewClient and ClientConfig for using a Client. 73type Client struct { 74 config *ClientConfig 75 exited bool 76 l sync.Mutex 77 address net.Addr 78 process *os.Process 79 client ClientProtocol 80 protocol Protocol 81 logger hclog.Logger 82 doneCtx context.Context 83 ctxCancel context.CancelFunc 84 negotiatedVersion int 85 86 // clientWaitGroup is used to manage the lifecycle of the plugin management 87 // goroutines. 88 clientWaitGroup sync.WaitGroup 89 90 // processKilled is used for testing only, to flag when the process was 91 // forcefully killed. 92 processKilled bool 93} 94 95// NegotiatedVersion returns the protocol version negotiated with the server. 96// This is only valid after Start() is called. 97func (c *Client) NegotiatedVersion() int { 98 return c.negotiatedVersion 99} 100 101// ClientConfig is the configuration used to initialize a new 102// plugin client. After being used to initialize a plugin client, 103// that configuration must not be modified again. 104type ClientConfig struct { 105 // HandshakeConfig is the configuration that must match servers. 106 HandshakeConfig 107 108 // Plugins are the plugins that can be consumed. 109 // The implied version of this PluginSet is the Handshake.ProtocolVersion. 110 Plugins PluginSet 111 112 // VersionedPlugins is a map of PluginSets for specific protocol versions. 113 // These can be used to negotiate a compatible version between client and 114 // server. If this is set, Handshake.ProtocolVersion is not required. 115 VersionedPlugins map[int]PluginSet 116 117 // One of the following must be set, but not both. 118 // 119 // Cmd is the unstarted subprocess for starting the plugin. If this is 120 // set, then the Client starts the plugin process on its own and connects 121 // to it. 122 // 123 // Reattach is configuration for reattaching to an existing plugin process 124 // that is already running. This isn't common. 125 Cmd *exec.Cmd 126 Reattach *ReattachConfig 127 128 // SecureConfig is configuration for verifying the integrity of the 129 // executable. It can not be used with Reattach. 130 SecureConfig *SecureConfig 131 132 // TLSConfig is used to enable TLS on the RPC client. 133 TLSConfig *tls.Config 134 135 // Managed represents if the client should be managed by the 136 // plugin package or not. If true, then by calling CleanupClients, 137 // it will automatically be cleaned up. Otherwise, the client 138 // user is fully responsible for making sure to Kill all plugin 139 // clients. By default the client is _not_ managed. 140 Managed bool 141 142 // The minimum and maximum port to use for communicating with 143 // the subprocess. If not set, this defaults to 10,000 and 25,000 144 // respectively. 145 MinPort, MaxPort uint 146 147 // StartTimeout is the timeout to wait for the plugin to say it 148 // has started successfully. 149 StartTimeout time.Duration 150 151 // If non-nil, then the stderr of the client will be written to here 152 // (as well as the log). This is the original os.Stderr of the subprocess. 153 // This isn't the output of synced stderr. 154 Stderr io.Writer 155 156 // SyncStdout, SyncStderr can be set to override the 157 // respective os.Std* values in the plugin. Care should be taken to 158 // avoid races here. If these are nil, then this will automatically be 159 // hooked up to os.Stdin, Stdout, and Stderr, respectively. 160 // 161 // If the default values (nil) are used, then this package will not 162 // sync any of these streams. 163 SyncStdout io.Writer 164 SyncStderr io.Writer 165 166 // AllowedProtocols is a list of allowed protocols. If this isn't set, 167 // then only netrpc is allowed. This is so that older go-plugin systems 168 // can show friendly errors if they see a plugin with an unknown 169 // protocol. 170 // 171 // By setting this, you can cause an error immediately on plugin start 172 // if an unsupported protocol is used with a good error message. 173 // 174 // If this isn't set at all (nil value), then only net/rpc is accepted. 175 // This is done for legacy reasons. You must explicitly opt-in to 176 // new protocols. 177 AllowedProtocols []Protocol 178 179 // Logger is the logger that the client will used. If none is provided, 180 // it will default to hclog's default logger. 181 Logger hclog.Logger 182 183 // AutoMTLS has the client and server automatically negotiate mTLS for 184 // transport authentication. This ensures that only the original client will 185 // be allowed to connect to the server, and all other connections will be 186 // rejected. The client will also refuse to connect to any server that isn't 187 // the original instance started by the client. 188 // 189 // In this mode of operation, the client generates a one-time use tls 190 // certificate, sends the public x.509 certificate to the new server, and 191 // the server generates a one-time use tls certificate, and sends the public 192 // x.509 certificate back to the client. These are used to authenticate all 193 // rpc connections between the client and server. 194 // 195 // Setting AutoMTLS to true implies that the server must support the 196 // protocol, and correctly negotiate the tls certificates, or a connection 197 // failure will result. 198 // 199 // The client should not set TLSConfig, nor should the server set a 200 // TLSProvider, because AutoMTLS implies that a new certificate and tls 201 // configuration will be generated at startup. 202 // 203 // You cannot Reattach to a server with this option enabled. 204 AutoMTLS bool 205} 206 207// ReattachConfig is used to configure a client to reattach to an 208// already-running plugin process. You can retrieve this information by 209// calling ReattachConfig on Client. 210type ReattachConfig struct { 211 Protocol Protocol 212 Addr net.Addr 213 Pid int 214} 215 216// SecureConfig is used to configure a client to verify the integrity of an 217// executable before running. It does this by verifying the checksum is 218// expected. Hash is used to specify the hashing method to use when checksumming 219// the file. The configuration is verified by the client by calling the 220// SecureConfig.Check() function. 221// 222// The host process should ensure the checksum was provided by a trusted and 223// authoritative source. The binary should be installed in such a way that it 224// can not be modified by an unauthorized user between the time of this check 225// and the time of execution. 226type SecureConfig struct { 227 Checksum []byte 228 Hash hash.Hash 229} 230 231// Check takes the filepath to an executable and returns true if the checksum of 232// the file matches the checksum provided in the SecureConfig. 233func (s *SecureConfig) Check(filePath string) (bool, error) { 234 if len(s.Checksum) == 0 { 235 return false, ErrSecureConfigNoChecksum 236 } 237 238 if s.Hash == nil { 239 return false, ErrSecureConfigNoHash 240 } 241 242 file, err := os.Open(filePath) 243 if err != nil { 244 return false, err 245 } 246 defer file.Close() 247 248 _, err = io.Copy(s.Hash, file) 249 if err != nil { 250 return false, err 251 } 252 253 sum := s.Hash.Sum(nil) 254 255 return subtle.ConstantTimeCompare(sum, s.Checksum) == 1, nil 256} 257 258// This makes sure all the managed subprocesses are killed and properly 259// logged. This should be called before the parent process running the 260// plugins exits. 261// 262// This must only be called _once_. 263func CleanupClients() { 264 // Set the killed to true so that we don't get unexpected panics 265 atomic.StoreUint32(&Killed, 1) 266 267 // Kill all the managed clients in parallel and use a WaitGroup 268 // to wait for them all to finish up. 269 var wg sync.WaitGroup 270 managedClientsLock.Lock() 271 for _, client := range managedClients { 272 wg.Add(1) 273 274 go func(client *Client) { 275 client.Kill() 276 wg.Done() 277 }(client) 278 } 279 managedClientsLock.Unlock() 280 281 wg.Wait() 282} 283 284// Creates a new plugin client which manages the lifecycle of an external 285// plugin and gets the address for the RPC connection. 286// 287// The client must be cleaned up at some point by calling Kill(). If 288// the client is a managed client (created with NewManagedClient) you 289// can just call CleanupClients at the end of your program and they will 290// be properly cleaned. 291func NewClient(config *ClientConfig) (c *Client) { 292 if config.MinPort == 0 && config.MaxPort == 0 { 293 config.MinPort = 10000 294 config.MaxPort = 25000 295 } 296 297 if config.StartTimeout == 0 { 298 config.StartTimeout = 1 * time.Minute 299 } 300 301 if config.Stderr == nil { 302 config.Stderr = ioutil.Discard 303 } 304 305 if config.SyncStdout == nil { 306 config.SyncStdout = ioutil.Discard 307 } 308 if config.SyncStderr == nil { 309 config.SyncStderr = ioutil.Discard 310 } 311 312 if config.AllowedProtocols == nil { 313 config.AllowedProtocols = []Protocol{ProtocolNetRPC} 314 } 315 316 if config.Logger == nil { 317 config.Logger = hclog.New(&hclog.LoggerOptions{ 318 Output: hclog.DefaultOutput, 319 Level: hclog.Trace, 320 Name: "plugin", 321 }) 322 } 323 324 c = &Client{ 325 config: config, 326 logger: config.Logger, 327 } 328 if config.Managed { 329 managedClientsLock.Lock() 330 managedClients = append(managedClients, c) 331 managedClientsLock.Unlock() 332 } 333 334 return 335} 336 337// Client returns the protocol client for this connection. 338// 339// Subsequent calls to this will return the same client. 340func (c *Client) Client() (ClientProtocol, error) { 341 _, err := c.Start() 342 if err != nil { 343 return nil, err 344 } 345 346 c.l.Lock() 347 defer c.l.Unlock() 348 349 if c.client != nil { 350 return c.client, nil 351 } 352 353 switch c.protocol { 354 case ProtocolNetRPC: 355 c.client, err = newRPCClient(c) 356 357 case ProtocolGRPC: 358 c.client, err = newGRPCClient(c.doneCtx, c) 359 360 default: 361 return nil, fmt.Errorf("unknown server protocol: %s", c.protocol) 362 } 363 364 if err != nil { 365 c.client = nil 366 return nil, err 367 } 368 369 return c.client, nil 370} 371 372// Tells whether or not the underlying process has exited. 373func (c *Client) Exited() bool { 374 c.l.Lock() 375 defer c.l.Unlock() 376 return c.exited 377} 378 379// killed is used in tests to check if a process failed to exit gracefully, and 380// needed to be killed. 381func (c *Client) killed() bool { 382 c.l.Lock() 383 defer c.l.Unlock() 384 return c.processKilled 385} 386 387// End the executing subprocess (if it is running) and perform any cleanup 388// tasks necessary such as capturing any remaining logs and so on. 389// 390// This method blocks until the process successfully exits. 391// 392// This method can safely be called multiple times. 393func (c *Client) Kill() { 394 // Grab a lock to read some private fields. 395 c.l.Lock() 396 process := c.process 397 addr := c.address 398 c.l.Unlock() 399 400 // If there is no process, there is nothing to kill. 401 if process == nil { 402 return 403 } 404 405 defer func() { 406 // Wait for the all client goroutines to finish. 407 c.clientWaitGroup.Wait() 408 409 // Make sure there is no reference to the old process after it has been 410 // killed. 411 c.l.Lock() 412 c.process = nil 413 c.l.Unlock() 414 }() 415 416 // We need to check for address here. It is possible that the plugin 417 // started (process != nil) but has no address (addr == nil) if the 418 // plugin failed at startup. If we do have an address, we need to close 419 // the plugin net connections. 420 graceful := false 421 if addr != nil { 422 // Close the client to cleanly exit the process. 423 client, err := c.Client() 424 if err == nil { 425 err = client.Close() 426 427 // If there is no error, then we attempt to wait for a graceful 428 // exit. If there was an error, we assume that graceful cleanup 429 // won't happen and just force kill. 430 graceful = err == nil 431 if err != nil { 432 // If there was an error just log it. We're going to force 433 // kill in a moment anyways. 434 c.logger.Warn("error closing client during Kill", "err", err) 435 } 436 } else { 437 c.logger.Error("client", "error", err) 438 } 439 } 440 441 // If we're attempting a graceful exit, then we wait for a short period 442 // of time to allow that to happen. To wait for this we just wait on the 443 // doneCh which would be closed if the process exits. 444 if graceful { 445 select { 446 case <-c.doneCtx.Done(): 447 c.logger.Debug("plugin exited") 448 return 449 case <-time.After(2 * time.Second): 450 } 451 } 452 453 // If graceful exiting failed, just kill it 454 c.logger.Warn("plugin failed to exit gracefully") 455 process.Kill() 456 457 c.l.Lock() 458 c.processKilled = true 459 c.l.Unlock() 460} 461 462// Starts the underlying subprocess, communicating with it to negotiate 463// a port for RPC connections, and returning the address to connect via RPC. 464// 465// This method is safe to call multiple times. Subsequent calls have no effect. 466// Once a client has been started once, it cannot be started again, even if 467// it was killed. 468func (c *Client) Start() (addr net.Addr, err error) { 469 c.l.Lock() 470 defer c.l.Unlock() 471 472 if c.address != nil { 473 return c.address, nil 474 } 475 476 // If one of cmd or reattach isn't set, then it is an error. We wrap 477 // this in a {} for scoping reasons, and hopeful that the escape 478 // analysis will pop the stack here. 479 { 480 cmdSet := c.config.Cmd != nil 481 attachSet := c.config.Reattach != nil 482 secureSet := c.config.SecureConfig != nil 483 if cmdSet == attachSet { 484 return nil, fmt.Errorf("Only one of Cmd or Reattach must be set") 485 } 486 487 if secureSet && attachSet { 488 return nil, ErrSecureConfigAndReattach 489 } 490 } 491 492 if c.config.Reattach != nil { 493 return c.reattach() 494 } 495 496 if c.config.VersionedPlugins == nil { 497 c.config.VersionedPlugins = make(map[int]PluginSet) 498 } 499 500 // handle all plugins as versioned, using the handshake config as the default. 501 version := int(c.config.ProtocolVersion) 502 503 // Make sure we're not overwriting a real version 0. If ProtocolVersion was 504 // non-zero, then we have to just assume the user made sure that 505 // VersionedPlugins doesn't conflict. 506 if _, ok := c.config.VersionedPlugins[version]; !ok && c.config.Plugins != nil { 507 c.config.VersionedPlugins[version] = c.config.Plugins 508 } 509 510 var versionStrings []string 511 for v := range c.config.VersionedPlugins { 512 versionStrings = append(versionStrings, strconv.Itoa(v)) 513 } 514 515 env := []string{ 516 fmt.Sprintf("%s=%s", c.config.MagicCookieKey, c.config.MagicCookieValue), 517 fmt.Sprintf("PLUGIN_MIN_PORT=%d", c.config.MinPort), 518 fmt.Sprintf("PLUGIN_MAX_PORT=%d", c.config.MaxPort), 519 fmt.Sprintf("PLUGIN_PROTOCOL_VERSIONS=%s", strings.Join(versionStrings, ",")), 520 } 521 522 cmd := c.config.Cmd 523 cmd.Env = append(cmd.Env, os.Environ()...) 524 cmd.Env = append(cmd.Env, env...) 525 cmd.Stdin = os.Stdin 526 527 cmdStdout, err := cmd.StdoutPipe() 528 if err != nil { 529 return nil, err 530 } 531 cmdStderr, err := cmd.StderrPipe() 532 if err != nil { 533 return nil, err 534 } 535 536 if c.config.SecureConfig != nil { 537 if ok, err := c.config.SecureConfig.Check(cmd.Path); err != nil { 538 return nil, fmt.Errorf("error verifying checksum: %s", err) 539 } else if !ok { 540 return nil, ErrChecksumsDoNotMatch 541 } 542 } 543 544 // Setup a temporary certificate for client/server mtls, and send the public 545 // certificate to the plugin. 546 if c.config.AutoMTLS { 547 c.logger.Info("configuring client automatic mTLS") 548 certPEM, keyPEM, err := generateCert() 549 if err != nil { 550 c.logger.Error("failed to generate client certificate", "error", err) 551 return nil, err 552 } 553 cert, err := tls.X509KeyPair(certPEM, keyPEM) 554 if err != nil { 555 c.logger.Error("failed to parse client certificate", "error", err) 556 return nil, err 557 } 558 559 cmd.Env = append(cmd.Env, fmt.Sprintf("PLUGIN_CLIENT_CERT=%s", certPEM)) 560 561 c.config.TLSConfig = &tls.Config{ 562 Certificates: []tls.Certificate{cert}, 563 ServerName: "localhost", 564 } 565 } 566 567 c.logger.Debug("starting plugin", "path", cmd.Path, "args", cmd.Args) 568 err = cmd.Start() 569 if err != nil { 570 return 571 } 572 573 // Set the process 574 c.process = cmd.Process 575 c.logger.Debug("plugin started", "path", cmd.Path, "pid", c.process.Pid) 576 577 // Make sure the command is properly cleaned up if there is an error 578 defer func() { 579 r := recover() 580 581 if err != nil || r != nil { 582 cmd.Process.Kill() 583 } 584 585 if r != nil { 586 panic(r) 587 } 588 }() 589 590 // Create a context for when we kill 591 c.doneCtx, c.ctxCancel = context.WithCancel(context.Background()) 592 593 c.clientWaitGroup.Add(1) 594 go func() { 595 // ensure the context is cancelled when we're done 596 defer c.ctxCancel() 597 598 defer c.clientWaitGroup.Done() 599 600 // get the cmd info early, since the process information will be removed 601 // in Kill. 602 pid := c.process.Pid 603 path := cmd.Path 604 605 // Wait for the command to end. 606 err := cmd.Wait() 607 608 debugMsgArgs := []interface{}{ 609 "path", path, 610 "pid", pid, 611 } 612 if err != nil { 613 debugMsgArgs = append(debugMsgArgs, 614 []interface{}{"error", err.Error()}...) 615 } 616 617 // Log and make sure to flush the logs write away 618 c.logger.Debug("plugin process exited", debugMsgArgs...) 619 os.Stderr.Sync() 620 621 // Set that we exited, which takes a lock 622 c.l.Lock() 623 defer c.l.Unlock() 624 c.exited = true 625 }() 626 627 // Start goroutine that logs the stderr 628 c.clientWaitGroup.Add(1) 629 // logStderr calls Done() 630 go c.logStderr(cmdStderr) 631 632 // Start a goroutine that is going to be reading the lines 633 // out of stdout 634 linesCh := make(chan string) 635 c.clientWaitGroup.Add(1) 636 go func() { 637 defer c.clientWaitGroup.Done() 638 defer close(linesCh) 639 640 scanner := bufio.NewScanner(cmdStdout) 641 for scanner.Scan() { 642 linesCh <- scanner.Text() 643 } 644 }() 645 646 // Make sure after we exit we read the lines from stdout forever 647 // so they don't block since it is a pipe. 648 // The scanner goroutine above will close this, but track it with a wait 649 // group for completeness. 650 c.clientWaitGroup.Add(1) 651 defer func() { 652 go func() { 653 defer c.clientWaitGroup.Done() 654 for range linesCh { 655 } 656 }() 657 }() 658 659 // Some channels for the next step 660 timeout := time.After(c.config.StartTimeout) 661 662 // Start looking for the address 663 c.logger.Debug("waiting for RPC address", "path", cmd.Path) 664 select { 665 case <-timeout: 666 err = errors.New("timeout while waiting for plugin to start") 667 case <-c.doneCtx.Done(): 668 err = errors.New("plugin exited before we could connect") 669 case line := <-linesCh: 670 // Trim the line and split by "|" in order to get the parts of 671 // the output. 672 line = strings.TrimSpace(line) 673 parts := strings.SplitN(line, "|", 6) 674 if len(parts) < 4 { 675 err = fmt.Errorf( 676 "Unrecognized remote plugin message: %s\n\n"+ 677 "This usually means that the plugin is either invalid or simply\n"+ 678 "needs to be recompiled to support the latest protocol.", line) 679 return 680 } 681 682 // Check the core protocol. Wrapped in a {} for scoping. 683 { 684 var coreProtocol int64 685 coreProtocol, err = strconv.ParseInt(parts[0], 10, 0) 686 if err != nil { 687 err = fmt.Errorf("Error parsing core protocol version: %s", err) 688 return 689 } 690 691 if int(coreProtocol) != CoreProtocolVersion { 692 err = fmt.Errorf("Incompatible core API version with plugin. "+ 693 "Plugin version: %s, Core version: %d\n\n"+ 694 "To fix this, the plugin usually only needs to be recompiled.\n"+ 695 "Please report this to the plugin author.", parts[0], CoreProtocolVersion) 696 return 697 } 698 } 699 700 // Test the API version 701 version, pluginSet, err := c.checkProtoVersion(parts[1]) 702 if err != nil { 703 return addr, err 704 } 705 706 // set the Plugins value to the compatible set, so the version 707 // doesn't need to be passed through to the ClientProtocol 708 // implementation. 709 c.config.Plugins = pluginSet 710 c.negotiatedVersion = version 711 c.logger.Debug("using plugin", "version", version) 712 713 switch parts[2] { 714 case "tcp": 715 addr, err = net.ResolveTCPAddr("tcp", parts[3]) 716 case "unix": 717 addr, err = net.ResolveUnixAddr("unix", parts[3]) 718 default: 719 err = fmt.Errorf("Unknown address type: %s", parts[3]) 720 } 721 722 // If we have a server type, then record that. We default to net/rpc 723 // for backwards compatibility. 724 c.protocol = ProtocolNetRPC 725 if len(parts) >= 5 { 726 c.protocol = Protocol(parts[4]) 727 } 728 729 found := false 730 for _, p := range c.config.AllowedProtocols { 731 if p == c.protocol { 732 found = true 733 break 734 } 735 } 736 if !found { 737 err = fmt.Errorf("Unsupported plugin protocol %q. Supported: %v", 738 c.protocol, c.config.AllowedProtocols) 739 return addr, err 740 } 741 742 // See if we have a TLS certificate from the server. 743 // Checking if the length is > 50 rules out catching the unused "extra" 744 // data returned from some older implementations. 745 if len(parts) >= 6 && len(parts[5]) > 50 { 746 err := c.loadServerCert(parts[5]) 747 if err != nil { 748 return nil, fmt.Errorf("error parsing server cert: %s", err) 749 } 750 } 751 } 752 753 c.address = addr 754 return 755} 756 757// loadServerCert is used by AutoMTLS to read an x.509 cert returned by the 758// server, and load it as the RootCA for the client TLSConfig. 759func (c *Client) loadServerCert(cert string) error { 760 certPool := x509.NewCertPool() 761 762 asn1, err := base64.RawStdEncoding.DecodeString(cert) 763 if err != nil { 764 return err 765 } 766 767 x509Cert, err := x509.ParseCertificate([]byte(asn1)) 768 if err != nil { 769 return err 770 } 771 772 certPool.AddCert(x509Cert) 773 774 c.config.TLSConfig.RootCAs = certPool 775 return nil 776} 777 778func (c *Client) reattach() (net.Addr, error) { 779 // Verify the process still exists. If not, then it is an error 780 p, err := os.FindProcess(c.config.Reattach.Pid) 781 if err != nil { 782 return nil, err 783 } 784 785 // Attempt to connect to the addr since on Unix systems FindProcess 786 // doesn't actually return an error if it can't find the process. 787 conn, err := net.Dial( 788 c.config.Reattach.Addr.Network(), 789 c.config.Reattach.Addr.String()) 790 if err != nil { 791 p.Kill() 792 return nil, ErrProcessNotFound 793 } 794 conn.Close() 795 796 // Create a context for when we kill 797 c.doneCtx, c.ctxCancel = context.WithCancel(context.Background()) 798 799 c.clientWaitGroup.Add(1) 800 // Goroutine to mark exit status 801 go func(pid int) { 802 defer c.clientWaitGroup.Done() 803 804 // ensure the context is cancelled when we're done 805 defer c.ctxCancel() 806 807 // Wait for the process to die 808 pidWait(pid) 809 810 // Log so we can see it 811 c.logger.Debug("reattached plugin process exited") 812 813 // Mark it 814 c.l.Lock() 815 defer c.l.Unlock() 816 c.exited = true 817 }(p.Pid) 818 819 // Set the address and process 820 c.address = c.config.Reattach.Addr 821 c.process = p 822 c.protocol = c.config.Reattach.Protocol 823 if c.protocol == "" { 824 // Default the protocol to net/rpc for backwards compatibility 825 c.protocol = ProtocolNetRPC 826 } 827 828 return c.address, nil 829} 830 831// checkProtoVersion returns the negotiated version and PluginSet. 832// This returns an error if the server returned an incompatible protocol 833// version, or an invalid handshake response. 834func (c *Client) checkProtoVersion(protoVersion string) (int, PluginSet, error) { 835 serverVersion, err := strconv.Atoi(protoVersion) 836 if err != nil { 837 return 0, nil, fmt.Errorf("Error parsing protocol version %q: %s", protoVersion, err) 838 } 839 840 // record these for the error message 841 var clientVersions []int 842 843 // all versions, including the legacy ProtocolVersion have been added to 844 // the versions set 845 for version, plugins := range c.config.VersionedPlugins { 846 clientVersions = append(clientVersions, version) 847 848 if serverVersion != version { 849 continue 850 } 851 return version, plugins, nil 852 } 853 854 return 0, nil, fmt.Errorf("Incompatible API version with plugin. "+ 855 "Plugin version: %d, Client versions: %d", serverVersion, clientVersions) 856} 857 858// ReattachConfig returns the information that must be provided to NewClient 859// to reattach to the plugin process that this client started. This is 860// useful for plugins that detach from their parent process. 861// 862// If this returns nil then the process hasn't been started yet. Please 863// call Start or Client before calling this. 864func (c *Client) ReattachConfig() *ReattachConfig { 865 c.l.Lock() 866 defer c.l.Unlock() 867 868 if c.address == nil { 869 return nil 870 } 871 872 if c.config.Cmd != nil && c.config.Cmd.Process == nil { 873 return nil 874 } 875 876 // If we connected via reattach, just return the information as-is 877 if c.config.Reattach != nil { 878 return c.config.Reattach 879 } 880 881 return &ReattachConfig{ 882 Protocol: c.protocol, 883 Addr: c.address, 884 Pid: c.config.Cmd.Process.Pid, 885 } 886} 887 888// Protocol returns the protocol of server on the remote end. This will 889// start the plugin process if it isn't already started. Errors from 890// starting the plugin are surpressed and ProtocolInvalid is returned. It 891// is recommended you call Start explicitly before calling Protocol to ensure 892// no errors occur. 893func (c *Client) Protocol() Protocol { 894 _, err := c.Start() 895 if err != nil { 896 return ProtocolInvalid 897 } 898 899 return c.protocol 900} 901 902func netAddrDialer(addr net.Addr) func(string, time.Duration) (net.Conn, error) { 903 return func(_ string, _ time.Duration) (net.Conn, error) { 904 // Connect to the client 905 conn, err := net.Dial(addr.Network(), addr.String()) 906 if err != nil { 907 return nil, err 908 } 909 if tcpConn, ok := conn.(*net.TCPConn); ok { 910 // Make sure to set keep alive so that the connection doesn't die 911 tcpConn.SetKeepAlive(true) 912 } 913 914 return conn, nil 915 } 916} 917 918// dialer is compatible with grpc.WithDialer and creates the connection 919// to the plugin. 920func (c *Client) dialer(_ string, timeout time.Duration) (net.Conn, error) { 921 conn, err := netAddrDialer(c.address)("", timeout) 922 if err != nil { 923 return nil, err 924 } 925 926 // If we have a TLS config we wrap our connection. We only do this 927 // for net/rpc since gRPC uses its own mechanism for TLS. 928 if c.protocol == ProtocolNetRPC && c.config.TLSConfig != nil { 929 conn = tls.Client(conn, c.config.TLSConfig) 930 } 931 932 return conn, nil 933} 934 935var stdErrBufferSize = 64 * 1024 936 937func (c *Client) logStderr(r io.Reader) { 938 defer c.clientWaitGroup.Done() 939 l := c.logger.Named(filepath.Base(c.config.Cmd.Path)) 940 941 reader := bufio.NewReaderSize(r, stdErrBufferSize) 942 // continuation indicates the previous line was a prefix 943 continuation := false 944 945 for { 946 line, isPrefix, err := reader.ReadLine() 947 switch { 948 case err == io.EOF: 949 return 950 case err != nil: 951 l.Error("reading plugin stderr", "error", err) 952 return 953 } 954 955 c.config.Stderr.Write(line) 956 957 // The line was longer than our max token size, so it's likely 958 // incomplete and won't unmarshal. 959 if isPrefix || continuation { 960 l.Debug(string(line)) 961 962 // if we're finishing a continued line, add the newline back in 963 if !isPrefix { 964 c.config.Stderr.Write([]byte{'\n'}) 965 } 966 967 continuation = isPrefix 968 continue 969 } 970 971 c.config.Stderr.Write([]byte{'\n'}) 972 973 entry, err := parseJSON(line) 974 // If output is not JSON format, print directly to Debug 975 if err != nil { 976 // Attempt to infer the desired log level from the commonly used 977 // string prefixes 978 switch line := string(line); { 979 case strings.HasPrefix(line, "[TRACE]"): 980 l.Trace(line) 981 case strings.HasPrefix(line, "[DEBUG]"): 982 l.Debug(line) 983 case strings.HasPrefix(line, "[INFO]"): 984 l.Info(line) 985 case strings.HasPrefix(line, "[WARN]"): 986 l.Warn(line) 987 case strings.HasPrefix(line, "[ERROR]"): 988 l.Error(line) 989 default: 990 l.Debug(line) 991 } 992 } else { 993 out := flattenKVPairs(entry.KVPairs) 994 995 out = append(out, "timestamp", entry.Timestamp.Format(hclog.TimeFormat)) 996 switch hclog.LevelFromString(entry.Level) { 997 case hclog.Trace: 998 l.Trace(entry.Message, out...) 999 case hclog.Debug: 1000 l.Debug(entry.Message, out...) 1001 case hclog.Info: 1002 l.Info(entry.Message, out...) 1003 case hclog.Warn: 1004 l.Warn(entry.Message, out...) 1005 case hclog.Error: 1006 l.Error(entry.Message, out...) 1007 default: 1008 // if there was no log level, it's likely this is unexpected 1009 // json from something other than hclog, and we should output 1010 // it verbatim. 1011 l.Debug(string(line)) 1012 } 1013 } 1014 } 1015} 1016