1package plugin 2 3import ( 4 "bufio" 5 "context" 6 "crypto/subtle" 7 "crypto/tls" 8 "crypto/x509" 9 "encoding/base64" 10 "errors" 11 "fmt" 12 "hash" 13 "io" 14 "io/ioutil" 15 "net" 16 "os" 17 "os/exec" 18 "path/filepath" 19 "strconv" 20 "strings" 21 "sync" 22 "sync/atomic" 23 "time" 24 25 "github.com/hashicorp/go-hclog" 26 "google.golang.org/grpc" 27) 28 29// If this is 1, then we've called CleanupClients. This can be used 30// by plugin RPC implementations to change error behavior since you 31// can expected network connection errors at this point. This should be 32// read by using sync/atomic. 33var Killed uint32 = 0 34 35// This is a slice of the "managed" clients which are cleaned up when 36// calling Cleanup 37var managedClients = make([]*Client, 0, 5) 38var managedClientsLock sync.Mutex 39 40// Error types 41var ( 42 // ErrProcessNotFound is returned when a client is instantiated to 43 // reattach to an existing process and it isn't found. 44 ErrProcessNotFound = errors.New("Reattachment process not found") 45 46 // ErrChecksumsDoNotMatch is returned when binary's checksum doesn't match 47 // the one provided in the SecureConfig. 48 ErrChecksumsDoNotMatch = errors.New("checksums did not match") 49 50 // ErrSecureNoChecksum is returned when an empty checksum is provided to the 51 // SecureConfig. 52 ErrSecureConfigNoChecksum = errors.New("no checksum provided") 53 54 // ErrSecureNoHash is returned when a nil Hash object is provided to the 55 // SecureConfig. 56 ErrSecureConfigNoHash = errors.New("no hash implementation provided") 57 58 // ErrSecureConfigAndReattach is returned when both Reattach and 59 // SecureConfig are set. 60 ErrSecureConfigAndReattach = errors.New("only one of Reattach or SecureConfig can be set") 61) 62 63// Client handles the lifecycle of a plugin application. It launches 64// plugins, connects to them, dispenses interface implementations, and handles 65// killing the process. 66// 67// Plugin hosts should use one Client for each plugin executable. To 68// dispense a plugin type, use the `Client.Client` function, and then 69// cal `Dispense`. This awkward API is mostly historical but is used to split 70// the client that deals with subprocess management and the client that 71// does RPC management. 72// 73// See NewClient and ClientConfig for using a Client. 74type Client struct { 75 config *ClientConfig 76 exited bool 77 l sync.Mutex 78 address net.Addr 79 process *os.Process 80 client ClientProtocol 81 protocol Protocol 82 logger hclog.Logger 83 doneCtx context.Context 84 ctxCancel context.CancelFunc 85 negotiatedVersion int 86 87 // clientWaitGroup is used to manage the lifecycle of the plugin management 88 // goroutines. 89 clientWaitGroup sync.WaitGroup 90 91 // stderrWaitGroup is used to prevent the command's Wait() function from 92 // being called before we've finished reading from the stderr pipe. 93 stderrWaitGroup sync.WaitGroup 94 95 // processKilled is used for testing only, to flag when the process was 96 // forcefully killed. 97 processKilled bool 98} 99 100// NegotiatedVersion returns the protocol version negotiated with the server. 101// This is only valid after Start() is called. 102func (c *Client) NegotiatedVersion() int { 103 return c.negotiatedVersion 104} 105 106// ClientConfig is the configuration used to initialize a new 107// plugin client. After being used to initialize a plugin client, 108// that configuration must not be modified again. 109type ClientConfig struct { 110 // HandshakeConfig is the configuration that must match servers. 111 HandshakeConfig 112 113 // Plugins are the plugins that can be consumed. 114 // The implied version of this PluginSet is the Handshake.ProtocolVersion. 115 Plugins PluginSet 116 117 // VersionedPlugins is a map of PluginSets for specific protocol versions. 118 // These can be used to negotiate a compatible version between client and 119 // server. If this is set, Handshake.ProtocolVersion is not required. 120 VersionedPlugins map[int]PluginSet 121 122 // One of the following must be set, but not both. 123 // 124 // Cmd is the unstarted subprocess for starting the plugin. If this is 125 // set, then the Client starts the plugin process on its own and connects 126 // to it. 127 // 128 // Reattach is configuration for reattaching to an existing plugin process 129 // that is already running. This isn't common. 130 Cmd *exec.Cmd 131 Reattach *ReattachConfig 132 133 // SecureConfig is configuration for verifying the integrity of the 134 // executable. It can not be used with Reattach. 135 SecureConfig *SecureConfig 136 137 // TLSConfig is used to enable TLS on the RPC client. 138 TLSConfig *tls.Config 139 140 // Managed represents if the client should be managed by the 141 // plugin package or not. If true, then by calling CleanupClients, 142 // it will automatically be cleaned up. Otherwise, the client 143 // user is fully responsible for making sure to Kill all plugin 144 // clients. By default the client is _not_ managed. 145 Managed bool 146 147 // The minimum and maximum port to use for communicating with 148 // the subprocess. If not set, this defaults to 10,000 and 25,000 149 // respectively. 150 MinPort, MaxPort uint 151 152 // StartTimeout is the timeout to wait for the plugin to say it 153 // has started successfully. 154 StartTimeout time.Duration 155 156 // If non-nil, then the stderr of the client will be written to here 157 // (as well as the log). This is the original os.Stderr of the subprocess. 158 // This isn't the output of synced stderr. 159 Stderr io.Writer 160 161 // SyncStdout, SyncStderr can be set to override the 162 // respective os.Std* values in the plugin. Care should be taken to 163 // avoid races here. If these are nil, then this will be set to 164 // ioutil.Discard. 165 SyncStdout io.Writer 166 SyncStderr io.Writer 167 168 // AllowedProtocols is a list of allowed protocols. If this isn't set, 169 // then only netrpc is allowed. This is so that older go-plugin systems 170 // can show friendly errors if they see a plugin with an unknown 171 // protocol. 172 // 173 // By setting this, you can cause an error immediately on plugin start 174 // if an unsupported protocol is used with a good error message. 175 // 176 // If this isn't set at all (nil value), then only net/rpc is accepted. 177 // This is done for legacy reasons. You must explicitly opt-in to 178 // new protocols. 179 AllowedProtocols []Protocol 180 181 // Logger is the logger that the client will used. If none is provided, 182 // it will default to hclog's default logger. 183 Logger hclog.Logger 184 185 // AutoMTLS has the client and server automatically negotiate mTLS for 186 // transport authentication. This ensures that only the original client will 187 // be allowed to connect to the server, and all other connections will be 188 // rejected. The client will also refuse to connect to any server that isn't 189 // the original instance started by the client. 190 // 191 // In this mode of operation, the client generates a one-time use tls 192 // certificate, sends the public x.509 certificate to the new server, and 193 // the server generates a one-time use tls certificate, and sends the public 194 // x.509 certificate back to the client. These are used to authenticate all 195 // rpc connections between the client and server. 196 // 197 // Setting AutoMTLS to true implies that the server must support the 198 // protocol, and correctly negotiate the tls certificates, or a connection 199 // failure will result. 200 // 201 // The client should not set TLSConfig, nor should the server set a 202 // TLSProvider, because AutoMTLS implies that a new certificate and tls 203 // configuration will be generated at startup. 204 // 205 // You cannot Reattach to a server with this option enabled. 206 AutoMTLS bool 207 208 // GRPCDialOptions allows plugin users to pass custom grpc.DialOption 209 // to create gRPC connections. This only affects plugins using the gRPC 210 // protocol. 211 GRPCDialOptions []grpc.DialOption 212} 213 214// ReattachConfig is used to configure a client to reattach to an 215// already-running plugin process. You can retrieve this information by 216// calling ReattachConfig on Client. 217type ReattachConfig struct { 218 Protocol Protocol 219 ProtocolVersion int 220 Addr net.Addr 221 Pid int 222 223 // Test is set to true if this is reattaching to to a plugin in "test mode" 224 // (see ServeConfig.Test). In this mode, client.Kill will NOT kill the 225 // process and instead will rely on the plugin to terminate itself. This 226 // should not be used in non-test environments. 227 Test bool 228} 229 230// SecureConfig is used to configure a client to verify the integrity of an 231// executable before running. It does this by verifying the checksum is 232// expected. Hash is used to specify the hashing method to use when checksumming 233// the file. The configuration is verified by the client by calling the 234// SecureConfig.Check() function. 235// 236// The host process should ensure the checksum was provided by a trusted and 237// authoritative source. The binary should be installed in such a way that it 238// can not be modified by an unauthorized user between the time of this check 239// and the time of execution. 240type SecureConfig struct { 241 Checksum []byte 242 Hash hash.Hash 243} 244 245// Check takes the filepath to an executable and returns true if the checksum of 246// the file matches the checksum provided in the SecureConfig. 247func (s *SecureConfig) Check(filePath string) (bool, error) { 248 if len(s.Checksum) == 0 { 249 return false, ErrSecureConfigNoChecksum 250 } 251 252 if s.Hash == nil { 253 return false, ErrSecureConfigNoHash 254 } 255 256 file, err := os.Open(filePath) 257 if err != nil { 258 return false, err 259 } 260 defer file.Close() 261 262 _, err = io.Copy(s.Hash, file) 263 if err != nil { 264 return false, err 265 } 266 267 sum := s.Hash.Sum(nil) 268 269 return subtle.ConstantTimeCompare(sum, s.Checksum) == 1, nil 270} 271 272// This makes sure all the managed subprocesses are killed and properly 273// logged. This should be called before the parent process running the 274// plugins exits. 275// 276// This must only be called _once_. 277func CleanupClients() { 278 // Set the killed to true so that we don't get unexpected panics 279 atomic.StoreUint32(&Killed, 1) 280 281 // Kill all the managed clients in parallel and use a WaitGroup 282 // to wait for them all to finish up. 283 var wg sync.WaitGroup 284 managedClientsLock.Lock() 285 for _, client := range managedClients { 286 wg.Add(1) 287 288 go func(client *Client) { 289 client.Kill() 290 wg.Done() 291 }(client) 292 } 293 managedClientsLock.Unlock() 294 295 wg.Wait() 296} 297 298// Creates a new plugin client which manages the lifecycle of an external 299// plugin and gets the address for the RPC connection. 300// 301// The client must be cleaned up at some point by calling Kill(). If 302// the client is a managed client (created with NewManagedClient) you 303// can just call CleanupClients at the end of your program and they will 304// be properly cleaned. 305func NewClient(config *ClientConfig) (c *Client) { 306 if config.MinPort == 0 && config.MaxPort == 0 { 307 config.MinPort = 10000 308 config.MaxPort = 25000 309 } 310 311 if config.StartTimeout == 0 { 312 config.StartTimeout = 1 * time.Minute 313 } 314 315 if config.Stderr == nil { 316 config.Stderr = ioutil.Discard 317 } 318 319 if config.SyncStdout == nil { 320 config.SyncStdout = ioutil.Discard 321 } 322 if config.SyncStderr == nil { 323 config.SyncStderr = ioutil.Discard 324 } 325 326 if config.AllowedProtocols == nil { 327 config.AllowedProtocols = []Protocol{ProtocolNetRPC} 328 } 329 330 if config.Logger == nil { 331 config.Logger = hclog.New(&hclog.LoggerOptions{ 332 Output: hclog.DefaultOutput, 333 Level: hclog.Trace, 334 Name: "plugin", 335 }) 336 } 337 338 c = &Client{ 339 config: config, 340 logger: config.Logger, 341 } 342 if config.Managed { 343 managedClientsLock.Lock() 344 managedClients = append(managedClients, c) 345 managedClientsLock.Unlock() 346 } 347 348 return 349} 350 351// Client returns the protocol client for this connection. 352// 353// Subsequent calls to this will return the same client. 354func (c *Client) Client() (ClientProtocol, error) { 355 _, err := c.Start() 356 if err != nil { 357 return nil, err 358 } 359 360 c.l.Lock() 361 defer c.l.Unlock() 362 363 if c.client != nil { 364 return c.client, nil 365 } 366 367 switch c.protocol { 368 case ProtocolNetRPC: 369 c.client, err = newRPCClient(c) 370 371 case ProtocolGRPC: 372 c.client, err = newGRPCClient(c.doneCtx, c) 373 374 default: 375 return nil, fmt.Errorf("unknown server protocol: %s", c.protocol) 376 } 377 378 if err != nil { 379 c.client = nil 380 return nil, err 381 } 382 383 return c.client, nil 384} 385 386// Tells whether or not the underlying process has exited. 387func (c *Client) Exited() bool { 388 c.l.Lock() 389 defer c.l.Unlock() 390 return c.exited 391} 392 393// killed is used in tests to check if a process failed to exit gracefully, and 394// needed to be killed. 395func (c *Client) killed() bool { 396 c.l.Lock() 397 defer c.l.Unlock() 398 return c.processKilled 399} 400 401// End the executing subprocess (if it is running) and perform any cleanup 402// tasks necessary such as capturing any remaining logs and so on. 403// 404// This method blocks until the process successfully exits. 405// 406// This method can safely be called multiple times. 407func (c *Client) Kill() { 408 // Grab a lock to read some private fields. 409 c.l.Lock() 410 process := c.process 411 addr := c.address 412 c.l.Unlock() 413 414 // If there is no process, there is nothing to kill. 415 if process == nil { 416 return 417 } 418 419 defer func() { 420 // Wait for the all client goroutines to finish. 421 c.clientWaitGroup.Wait() 422 423 // Make sure there is no reference to the old process after it has been 424 // killed. 425 c.l.Lock() 426 c.process = nil 427 c.l.Unlock() 428 }() 429 430 // We need to check for address here. It is possible that the plugin 431 // started (process != nil) but has no address (addr == nil) if the 432 // plugin failed at startup. If we do have an address, we need to close 433 // the plugin net connections. 434 graceful := false 435 if addr != nil { 436 // Close the client to cleanly exit the process. 437 client, err := c.Client() 438 if err == nil { 439 err = client.Close() 440 441 // If there is no error, then we attempt to wait for a graceful 442 // exit. If there was an error, we assume that graceful cleanup 443 // won't happen and just force kill. 444 graceful = err == nil 445 if err != nil { 446 // If there was an error just log it. We're going to force 447 // kill in a moment anyways. 448 c.logger.Warn("error closing client during Kill", "err", err) 449 } 450 } else { 451 c.logger.Error("client", "error", err) 452 } 453 } 454 455 // If we're attempting a graceful exit, then we wait for a short period 456 // of time to allow that to happen. To wait for this we just wait on the 457 // doneCh which would be closed if the process exits. 458 if graceful { 459 select { 460 case <-c.doneCtx.Done(): 461 c.logger.Debug("plugin exited") 462 return 463 case <-time.After(2 * time.Second): 464 } 465 } 466 467 // If graceful exiting failed, just kill it 468 c.logger.Warn("plugin failed to exit gracefully") 469 process.Kill() 470 471 c.l.Lock() 472 c.processKilled = true 473 c.l.Unlock() 474} 475 476// Starts the underlying subprocess, communicating with it to negotiate 477// a port for RPC connections, and returning the address to connect via RPC. 478// 479// This method is safe to call multiple times. Subsequent calls have no effect. 480// Once a client has been started once, it cannot be started again, even if 481// it was killed. 482func (c *Client) Start() (addr net.Addr, err error) { 483 c.l.Lock() 484 defer c.l.Unlock() 485 486 if c.address != nil { 487 return c.address, nil 488 } 489 490 // If one of cmd or reattach isn't set, then it is an error. We wrap 491 // this in a {} for scoping reasons, and hopeful that the escape 492 // analysis will pop the stack here. 493 { 494 cmdSet := c.config.Cmd != nil 495 attachSet := c.config.Reattach != nil 496 secureSet := c.config.SecureConfig != nil 497 if cmdSet == attachSet { 498 return nil, fmt.Errorf("Only one of Cmd or Reattach must be set") 499 } 500 501 if secureSet && attachSet { 502 return nil, ErrSecureConfigAndReattach 503 } 504 } 505 506 if c.config.Reattach != nil { 507 return c.reattach() 508 } 509 510 if c.config.VersionedPlugins == nil { 511 c.config.VersionedPlugins = make(map[int]PluginSet) 512 } 513 514 // handle all plugins as versioned, using the handshake config as the default. 515 version := int(c.config.ProtocolVersion) 516 517 // Make sure we're not overwriting a real version 0. If ProtocolVersion was 518 // non-zero, then we have to just assume the user made sure that 519 // VersionedPlugins doesn't conflict. 520 if _, ok := c.config.VersionedPlugins[version]; !ok && c.config.Plugins != nil { 521 c.config.VersionedPlugins[version] = c.config.Plugins 522 } 523 524 var versionStrings []string 525 for v := range c.config.VersionedPlugins { 526 versionStrings = append(versionStrings, strconv.Itoa(v)) 527 } 528 529 env := []string{ 530 fmt.Sprintf("%s=%s", c.config.MagicCookieKey, c.config.MagicCookieValue), 531 fmt.Sprintf("PLUGIN_MIN_PORT=%d", c.config.MinPort), 532 fmt.Sprintf("PLUGIN_MAX_PORT=%d", c.config.MaxPort), 533 fmt.Sprintf("PLUGIN_PROTOCOL_VERSIONS=%s", strings.Join(versionStrings, ",")), 534 } 535 536 cmd := c.config.Cmd 537 cmd.Env = append(cmd.Env, os.Environ()...) 538 cmd.Env = append(cmd.Env, env...) 539 cmd.Stdin = os.Stdin 540 541 cmdStdout, err := cmd.StdoutPipe() 542 if err != nil { 543 return nil, err 544 } 545 cmdStderr, err := cmd.StderrPipe() 546 if err != nil { 547 return nil, err 548 } 549 550 if c.config.SecureConfig != nil { 551 if ok, err := c.config.SecureConfig.Check(cmd.Path); err != nil { 552 return nil, fmt.Errorf("error verifying checksum: %s", err) 553 } else if !ok { 554 return nil, ErrChecksumsDoNotMatch 555 } 556 } 557 558 // Setup a temporary certificate for client/server mtls, and send the public 559 // certificate to the plugin. 560 if c.config.AutoMTLS { 561 c.logger.Info("configuring client automatic mTLS") 562 certPEM, keyPEM, err := generateCert() 563 if err != nil { 564 c.logger.Error("failed to generate client certificate", "error", err) 565 return nil, err 566 } 567 cert, err := tls.X509KeyPair(certPEM, keyPEM) 568 if err != nil { 569 c.logger.Error("failed to parse client certificate", "error", err) 570 return nil, err 571 } 572 573 cmd.Env = append(cmd.Env, fmt.Sprintf("PLUGIN_CLIENT_CERT=%s", certPEM)) 574 575 c.config.TLSConfig = &tls.Config{ 576 Certificates: []tls.Certificate{cert}, 577 ServerName: "localhost", 578 } 579 } 580 581 c.logger.Debug("starting plugin", "path", cmd.Path, "args", cmd.Args) 582 err = cmd.Start() 583 if err != nil { 584 return 585 } 586 587 // Set the process 588 c.process = cmd.Process 589 c.logger.Debug("plugin started", "path", cmd.Path, "pid", c.process.Pid) 590 591 // Make sure the command is properly cleaned up if there is an error 592 defer func() { 593 r := recover() 594 595 if err != nil || r != nil { 596 cmd.Process.Kill() 597 } 598 599 if r != nil { 600 panic(r) 601 } 602 }() 603 604 // Create a context for when we kill 605 c.doneCtx, c.ctxCancel = context.WithCancel(context.Background()) 606 607 // Start goroutine that logs the stderr 608 c.clientWaitGroup.Add(1) 609 c.stderrWaitGroup.Add(1) 610 // logStderr calls Done() 611 go c.logStderr(cmdStderr) 612 613 c.clientWaitGroup.Add(1) 614 go func() { 615 // ensure the context is cancelled when we're done 616 defer c.ctxCancel() 617 618 defer c.clientWaitGroup.Done() 619 620 // get the cmd info early, since the process information will be removed 621 // in Kill. 622 pid := c.process.Pid 623 path := cmd.Path 624 625 // wait to finish reading from stderr since the stderr pipe reader 626 // will be closed by the subsequent call to cmd.Wait(). 627 c.stderrWaitGroup.Wait() 628 629 // Wait for the command to end. 630 err := cmd.Wait() 631 632 debugMsgArgs := []interface{}{ 633 "path", path, 634 "pid", pid, 635 } 636 if err != nil { 637 debugMsgArgs = append(debugMsgArgs, 638 []interface{}{"error", err.Error()}...) 639 } 640 641 // Log and make sure to flush the logs write away 642 c.logger.Debug("plugin process exited", debugMsgArgs...) 643 os.Stderr.Sync() 644 645 // Set that we exited, which takes a lock 646 c.l.Lock() 647 defer c.l.Unlock() 648 c.exited = true 649 }() 650 651 // Start a goroutine that is going to be reading the lines 652 // out of stdout 653 linesCh := make(chan string) 654 c.clientWaitGroup.Add(1) 655 go func() { 656 defer c.clientWaitGroup.Done() 657 defer close(linesCh) 658 659 scanner := bufio.NewScanner(cmdStdout) 660 for scanner.Scan() { 661 linesCh <- scanner.Text() 662 } 663 }() 664 665 // Make sure after we exit we read the lines from stdout forever 666 // so they don't block since it is a pipe. 667 // The scanner goroutine above will close this, but track it with a wait 668 // group for completeness. 669 c.clientWaitGroup.Add(1) 670 defer func() { 671 go func() { 672 defer c.clientWaitGroup.Done() 673 for range linesCh { 674 } 675 }() 676 }() 677 678 // Some channels for the next step 679 timeout := time.After(c.config.StartTimeout) 680 681 // Start looking for the address 682 c.logger.Debug("waiting for RPC address", "path", cmd.Path) 683 select { 684 case <-timeout: 685 err = errors.New("timeout while waiting for plugin to start") 686 case <-c.doneCtx.Done(): 687 err = errors.New("plugin exited before we could connect") 688 case line := <-linesCh: 689 // Trim the line and split by "|" in order to get the parts of 690 // the output. 691 line = strings.TrimSpace(line) 692 parts := strings.SplitN(line, "|", 6) 693 if len(parts) < 4 { 694 err = fmt.Errorf( 695 "Unrecognized remote plugin message: %s\n\n"+ 696 "This usually means that the plugin is either invalid or simply\n"+ 697 "needs to be recompiled to support the latest protocol.", line) 698 return 699 } 700 701 // Check the core protocol. Wrapped in a {} for scoping. 702 { 703 var coreProtocol int 704 coreProtocol, err = strconv.Atoi(parts[0]) 705 if err != nil { 706 err = fmt.Errorf("Error parsing core protocol version: %s", err) 707 return 708 } 709 710 if coreProtocol != CoreProtocolVersion { 711 err = fmt.Errorf("Incompatible core API version with plugin. "+ 712 "Plugin version: %s, Core version: %d\n\n"+ 713 "To fix this, the plugin usually only needs to be recompiled.\n"+ 714 "Please report this to the plugin author.", parts[0], CoreProtocolVersion) 715 return 716 } 717 } 718 719 // Test the API version 720 version, pluginSet, err := c.checkProtoVersion(parts[1]) 721 if err != nil { 722 return addr, err 723 } 724 725 // set the Plugins value to the compatible set, so the version 726 // doesn't need to be passed through to the ClientProtocol 727 // implementation. 728 c.config.Plugins = pluginSet 729 c.negotiatedVersion = version 730 c.logger.Debug("using plugin", "version", version) 731 732 switch parts[2] { 733 case "tcp": 734 addr, err = net.ResolveTCPAddr("tcp", parts[3]) 735 case "unix": 736 addr, err = net.ResolveUnixAddr("unix", parts[3]) 737 default: 738 err = fmt.Errorf("Unknown address type: %s", parts[3]) 739 } 740 741 // If we have a server type, then record that. We default to net/rpc 742 // for backwards compatibility. 743 c.protocol = ProtocolNetRPC 744 if len(parts) >= 5 { 745 c.protocol = Protocol(parts[4]) 746 } 747 748 found := false 749 for _, p := range c.config.AllowedProtocols { 750 if p == c.protocol { 751 found = true 752 break 753 } 754 } 755 if !found { 756 err = fmt.Errorf("Unsupported plugin protocol %q. Supported: %v", 757 c.protocol, c.config.AllowedProtocols) 758 return addr, err 759 } 760 761 // See if we have a TLS certificate from the server. 762 // Checking if the length is > 50 rules out catching the unused "extra" 763 // data returned from some older implementations. 764 if len(parts) >= 6 && len(parts[5]) > 50 { 765 err := c.loadServerCert(parts[5]) 766 if err != nil { 767 return nil, fmt.Errorf("error parsing server cert: %s", err) 768 } 769 } 770 } 771 772 c.address = addr 773 return 774} 775 776// loadServerCert is used by AutoMTLS to read an x.509 cert returned by the 777// server, and load it as the RootCA for the client TLSConfig. 778func (c *Client) loadServerCert(cert string) error { 779 certPool := x509.NewCertPool() 780 781 asn1, err := base64.RawStdEncoding.DecodeString(cert) 782 if err != nil { 783 return err 784 } 785 786 x509Cert, err := x509.ParseCertificate([]byte(asn1)) 787 if err != nil { 788 return err 789 } 790 791 certPool.AddCert(x509Cert) 792 793 c.config.TLSConfig.RootCAs = certPool 794 return nil 795} 796 797func (c *Client) reattach() (net.Addr, error) { 798 // Verify the process still exists. If not, then it is an error 799 p, err := os.FindProcess(c.config.Reattach.Pid) 800 if err != nil { 801 // On Unix systems, FindProcess never returns an error. 802 // On Windows, for non-existent pids it returns: 803 // os.SyscallError - 'OpenProcess: the paremter is incorrect' 804 return nil, ErrProcessNotFound 805 } 806 807 // Attempt to connect to the addr since on Unix systems FindProcess 808 // doesn't actually return an error if it can't find the process. 809 conn, err := net.Dial( 810 c.config.Reattach.Addr.Network(), 811 c.config.Reattach.Addr.String()) 812 if err != nil { 813 p.Kill() 814 return nil, ErrProcessNotFound 815 } 816 conn.Close() 817 818 // Create a context for when we kill 819 c.doneCtx, c.ctxCancel = context.WithCancel(context.Background()) 820 821 c.clientWaitGroup.Add(1) 822 // Goroutine to mark exit status 823 go func(pid int) { 824 defer c.clientWaitGroup.Done() 825 826 // ensure the context is cancelled when we're done 827 defer c.ctxCancel() 828 829 // Wait for the process to die 830 pidWait(pid) 831 832 // Log so we can see it 833 c.logger.Debug("reattached plugin process exited") 834 835 // Mark it 836 c.l.Lock() 837 defer c.l.Unlock() 838 c.exited = true 839 }(p.Pid) 840 841 // Set the address and protocol 842 c.address = c.config.Reattach.Addr 843 c.protocol = c.config.Reattach.Protocol 844 if c.protocol == "" { 845 // Default the protocol to net/rpc for backwards compatibility 846 c.protocol = ProtocolNetRPC 847 } 848 849 if c.config.Reattach.Test { 850 c.negotiatedVersion = c.config.Reattach.ProtocolVersion 851 } 852 853 // If we're in test mode, we do NOT set the process. This avoids the 854 // process being killed (the only purpose we have for c.process), since 855 // in test mode the process is responsible for exiting on its own. 856 if !c.config.Reattach.Test { 857 c.process = p 858 } 859 860 return c.address, nil 861} 862 863// checkProtoVersion returns the negotiated version and PluginSet. 864// This returns an error if the server returned an incompatible protocol 865// version, or an invalid handshake response. 866func (c *Client) checkProtoVersion(protoVersion string) (int, PluginSet, error) { 867 serverVersion, err := strconv.Atoi(protoVersion) 868 if err != nil { 869 return 0, nil, fmt.Errorf("Error parsing protocol version %q: %s", protoVersion, err) 870 } 871 872 // record these for the error message 873 var clientVersions []int 874 875 // all versions, including the legacy ProtocolVersion have been added to 876 // the versions set 877 for version, plugins := range c.config.VersionedPlugins { 878 clientVersions = append(clientVersions, version) 879 880 if serverVersion != version { 881 continue 882 } 883 return version, plugins, nil 884 } 885 886 return 0, nil, fmt.Errorf("Incompatible API version with plugin. "+ 887 "Plugin version: %d, Client versions: %d", serverVersion, clientVersions) 888} 889 890// ReattachConfig returns the information that must be provided to NewClient 891// to reattach to the plugin process that this client started. This is 892// useful for plugins that detach from their parent process. 893// 894// If this returns nil then the process hasn't been started yet. Please 895// call Start or Client before calling this. 896func (c *Client) ReattachConfig() *ReattachConfig { 897 c.l.Lock() 898 defer c.l.Unlock() 899 900 if c.address == nil { 901 return nil 902 } 903 904 if c.config.Cmd != nil && c.config.Cmd.Process == nil { 905 return nil 906 } 907 908 // If we connected via reattach, just return the information as-is 909 if c.config.Reattach != nil { 910 return c.config.Reattach 911 } 912 913 return &ReattachConfig{ 914 Protocol: c.protocol, 915 Addr: c.address, 916 Pid: c.config.Cmd.Process.Pid, 917 } 918} 919 920// Protocol returns the protocol of server on the remote end. This will 921// start the plugin process if it isn't already started. Errors from 922// starting the plugin are surpressed and ProtocolInvalid is returned. It 923// is recommended you call Start explicitly before calling Protocol to ensure 924// no errors occur. 925func (c *Client) Protocol() Protocol { 926 _, err := c.Start() 927 if err != nil { 928 return ProtocolInvalid 929 } 930 931 return c.protocol 932} 933 934func netAddrDialer(addr net.Addr) func(string, time.Duration) (net.Conn, error) { 935 return func(_ string, _ time.Duration) (net.Conn, error) { 936 // Connect to the client 937 conn, err := net.Dial(addr.Network(), addr.String()) 938 if err != nil { 939 return nil, err 940 } 941 if tcpConn, ok := conn.(*net.TCPConn); ok { 942 // Make sure to set keep alive so that the connection doesn't die 943 tcpConn.SetKeepAlive(true) 944 } 945 946 return conn, nil 947 } 948} 949 950// dialer is compatible with grpc.WithDialer and creates the connection 951// to the plugin. 952func (c *Client) dialer(_ string, timeout time.Duration) (net.Conn, error) { 953 conn, err := netAddrDialer(c.address)("", timeout) 954 if err != nil { 955 return nil, err 956 } 957 958 // If we have a TLS config we wrap our connection. We only do this 959 // for net/rpc since gRPC uses its own mechanism for TLS. 960 if c.protocol == ProtocolNetRPC && c.config.TLSConfig != nil { 961 conn = tls.Client(conn, c.config.TLSConfig) 962 } 963 964 return conn, nil 965} 966 967var stdErrBufferSize = 64 * 1024 968 969func (c *Client) logStderr(r io.Reader) { 970 defer c.clientWaitGroup.Done() 971 defer c.stderrWaitGroup.Done() 972 l := c.logger.Named(filepath.Base(c.config.Cmd.Path)) 973 974 reader := bufio.NewReaderSize(r, stdErrBufferSize) 975 // continuation indicates the previous line was a prefix 976 continuation := false 977 978 for { 979 line, isPrefix, err := reader.ReadLine() 980 switch { 981 case err == io.EOF: 982 return 983 case err != nil: 984 l.Error("reading plugin stderr", "error", err) 985 return 986 } 987 988 c.config.Stderr.Write(line) 989 990 // The line was longer than our max token size, so it's likely 991 // incomplete and won't unmarshal. 992 if isPrefix || continuation { 993 l.Debug(string(line)) 994 995 // if we're finishing a continued line, add the newline back in 996 if !isPrefix { 997 c.config.Stderr.Write([]byte{'\n'}) 998 } 999 1000 continuation = isPrefix 1001 continue 1002 } 1003 1004 c.config.Stderr.Write([]byte{'\n'}) 1005 1006 entry, err := parseJSON(line) 1007 // If output is not JSON format, print directly to Debug 1008 if err != nil { 1009 // Attempt to infer the desired log level from the commonly used 1010 // string prefixes 1011 switch line := string(line); { 1012 case strings.HasPrefix(line, "[TRACE]"): 1013 l.Trace(line) 1014 case strings.HasPrefix(line, "[DEBUG]"): 1015 l.Debug(line) 1016 case strings.HasPrefix(line, "[INFO]"): 1017 l.Info(line) 1018 case strings.HasPrefix(line, "[WARN]"): 1019 l.Warn(line) 1020 case strings.HasPrefix(line, "[ERROR]"): 1021 l.Error(line) 1022 default: 1023 l.Debug(line) 1024 } 1025 } else { 1026 out := flattenKVPairs(entry.KVPairs) 1027 1028 out = append(out, "timestamp", entry.Timestamp.Format(hclog.TimeFormat)) 1029 switch hclog.LevelFromString(entry.Level) { 1030 case hclog.Trace: 1031 l.Trace(entry.Message, out...) 1032 case hclog.Debug: 1033 l.Debug(entry.Message, out...) 1034 case hclog.Info: 1035 l.Info(entry.Message, out...) 1036 case hclog.Warn: 1037 l.Warn(entry.Message, out...) 1038 case hclog.Error: 1039 l.Error(entry.Message, out...) 1040 default: 1041 // if there was no log level, it's likely this is unexpected 1042 // json from something other than hclog, and we should output 1043 // it verbatim. 1044 l.Debug(string(line)) 1045 } 1046 } 1047 } 1048} 1049