1// Copyright 2013 go-dockerclient authors. All rights reserved. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4 5// Package docker provides a client for the Docker remote API. 6// 7// See https://goo.gl/o2v3rk for more details on the remote API. 8package docker 9 10import ( 11 "bufio" 12 "bytes" 13 "context" 14 "crypto/tls" 15 "crypto/x509" 16 "encoding/json" 17 "errors" 18 "fmt" 19 "io" 20 "io/ioutil" 21 "net" 22 "net/http" 23 "net/http/httputil" 24 "net/url" 25 "os" 26 "path/filepath" 27 "reflect" 28 "runtime" 29 "strconv" 30 "strings" 31 "sync/atomic" 32 "time" 33 34 "github.com/docker/docker/pkg/homedir" 35 "github.com/docker/docker/pkg/jsonmessage" 36 "github.com/docker/docker/pkg/stdcopy" 37) 38 39const ( 40 userAgent = "go-dockerclient" 41 42 unixProtocol = "unix" 43 namedPipeProtocol = "npipe" 44) 45 46var ( 47 // ErrInvalidEndpoint is returned when the endpoint is not a valid HTTP URL. 48 ErrInvalidEndpoint = errors.New("invalid endpoint") 49 50 // ErrConnectionRefused is returned when the client cannot connect to the given endpoint. 51 ErrConnectionRefused = errors.New("cannot connect to Docker endpoint") 52 53 // ErrInactivityTimeout is returned when a streamable call has been inactive for some time. 54 ErrInactivityTimeout = errors.New("inactivity time exceeded timeout") 55 56 apiVersion112, _ = NewAPIVersion("1.12") 57 apiVersion118, _ = NewAPIVersion("1.18") 58 apiVersion119, _ = NewAPIVersion("1.19") 59 apiVersion121, _ = NewAPIVersion("1.21") 60 apiVersion124, _ = NewAPIVersion("1.24") 61 apiVersion125, _ = NewAPIVersion("1.25") 62 apiVersion135, _ = NewAPIVersion("1.35") 63) 64 65// APIVersion is an internal representation of a version of the Remote API. 66type APIVersion []int 67 68// NewAPIVersion returns an instance of APIVersion for the given string. 69// 70// The given string must be in the form <major>.<minor>.<patch>, where <major>, 71// <minor> and <patch> are integer numbers. 72func NewAPIVersion(input string) (APIVersion, error) { 73 if !strings.Contains(input, ".") { 74 return nil, fmt.Errorf("unable to parse version %q", input) 75 } 76 raw := strings.Split(input, "-") 77 arr := strings.Split(raw[0], ".") 78 ret := make(APIVersion, len(arr)) 79 var err error 80 for i, val := range arr { 81 ret[i], err = strconv.Atoi(val) 82 if err != nil { 83 return nil, fmt.Errorf("unable to parse version %q: %q is not an integer", input, val) 84 } 85 } 86 return ret, nil 87} 88 89func (version APIVersion) String() string { 90 parts := make([]string, len(version)) 91 for i, val := range version { 92 parts[i] = strconv.Itoa(val) 93 } 94 return strings.Join(parts, ".") 95} 96 97// LessThan is a function for comparing APIVersion structs. 98func (version APIVersion) LessThan(other APIVersion) bool { 99 return version.compare(other) < 0 100} 101 102// LessThanOrEqualTo is a function for comparing APIVersion structs. 103func (version APIVersion) LessThanOrEqualTo(other APIVersion) bool { 104 return version.compare(other) <= 0 105} 106 107// GreaterThan is a function for comparing APIVersion structs. 108func (version APIVersion) GreaterThan(other APIVersion) bool { 109 return version.compare(other) > 0 110} 111 112// GreaterThanOrEqualTo is a function for comparing APIVersion structs. 113func (version APIVersion) GreaterThanOrEqualTo(other APIVersion) bool { 114 return version.compare(other) >= 0 115} 116 117func (version APIVersion) compare(other APIVersion) int { 118 for i, v := range version { 119 if i <= len(other)-1 { 120 otherVersion := other[i] 121 122 if v < otherVersion { 123 return -1 124 } else if v > otherVersion { 125 return 1 126 } 127 } 128 } 129 if len(version) > len(other) { 130 return 1 131 } 132 if len(version) < len(other) { 133 return -1 134 } 135 return 0 136} 137 138// Client is the basic type of this package. It provides methods for 139// interaction with the API. 140type Client struct { 141 SkipServerVersionCheck bool 142 HTTPClient *http.Client 143 TLSConfig *tls.Config 144 Dialer Dialer 145 146 endpoint string 147 endpointURL *url.URL 148 eventMonitor *eventMonitoringState 149 requestedAPIVersion APIVersion 150 serverAPIVersion APIVersion 151 expectedAPIVersion APIVersion 152} 153 154// Dialer is an interface that allows network connections to be dialed 155// (net.Dialer fulfills this interface) and named pipes (a shim using 156// winio.DialPipe) 157type Dialer interface { 158 Dial(network, address string) (net.Conn, error) 159} 160 161// NewClient returns a Client instance ready for communication with the given 162// server endpoint. It will use the latest remote API version available in the 163// server. 164func NewClient(endpoint string) (*Client, error) { 165 client, err := NewVersionedClient(endpoint, "") 166 if err != nil { 167 return nil, err 168 } 169 client.SkipServerVersionCheck = true 170 return client, nil 171} 172 173// NewTLSClient returns a Client instance ready for TLS communications with the givens 174// server endpoint, key and certificates . It will use the latest remote API version 175// available in the server. 176func NewTLSClient(endpoint string, cert, key, ca string) (*Client, error) { 177 client, err := NewVersionedTLSClient(endpoint, cert, key, ca, "") 178 if err != nil { 179 return nil, err 180 } 181 client.SkipServerVersionCheck = true 182 return client, nil 183} 184 185// NewTLSClientFromBytes returns a Client instance ready for TLS communications with the givens 186// server endpoint, key and certificates (passed inline to the function as opposed to being 187// read from a local file). It will use the latest remote API version available in the server. 188func NewTLSClientFromBytes(endpoint string, certPEMBlock, keyPEMBlock, caPEMCert []byte) (*Client, error) { 189 client, err := NewVersionedTLSClientFromBytes(endpoint, certPEMBlock, keyPEMBlock, caPEMCert, "") 190 if err != nil { 191 return nil, err 192 } 193 client.SkipServerVersionCheck = true 194 return client, nil 195} 196 197// NewVersionedClient returns a Client instance ready for communication with 198// the given server endpoint, using a specific remote API version. 199func NewVersionedClient(endpoint string, apiVersionString string) (*Client, error) { 200 u, err := parseEndpoint(endpoint, false) 201 if err != nil { 202 return nil, err 203 } 204 var requestedAPIVersion APIVersion 205 if strings.Contains(apiVersionString, ".") { 206 requestedAPIVersion, err = NewAPIVersion(apiVersionString) 207 if err != nil { 208 return nil, err 209 } 210 } 211 c := &Client{ 212 HTTPClient: defaultClient(), 213 Dialer: &net.Dialer{}, 214 endpoint: endpoint, 215 endpointURL: u, 216 eventMonitor: new(eventMonitoringState), 217 requestedAPIVersion: requestedAPIVersion, 218 } 219 c.initializeNativeClient(defaultTransport) 220 return c, nil 221} 222 223// WithTransport replaces underlying HTTP client of Docker Client by accepting 224// a function that returns pointer to a transport object. 225func (c *Client) WithTransport(trFunc func() *http.Transport) { 226 c.initializeNativeClient(trFunc) 227} 228 229// NewVersionnedTLSClient is like NewVersionedClient, but with ann extra n. 230// 231// Deprecated: Use NewVersionedTLSClient instead. 232func NewVersionnedTLSClient(endpoint string, cert, key, ca, apiVersionString string) (*Client, error) { 233 return NewVersionedTLSClient(endpoint, cert, key, ca, apiVersionString) 234} 235 236// NewVersionedTLSClient returns a Client instance ready for TLS communications with the givens 237// server endpoint, key and certificates, using a specific remote API version. 238func NewVersionedTLSClient(endpoint string, cert, key, ca, apiVersionString string) (*Client, error) { 239 var certPEMBlock []byte 240 var keyPEMBlock []byte 241 var caPEMCert []byte 242 if _, err := os.Stat(cert); !os.IsNotExist(err) { 243 certPEMBlock, err = ioutil.ReadFile(cert) 244 if err != nil { 245 return nil, err 246 } 247 } 248 if _, err := os.Stat(key); !os.IsNotExist(err) { 249 keyPEMBlock, err = ioutil.ReadFile(key) 250 if err != nil { 251 return nil, err 252 } 253 } 254 if _, err := os.Stat(ca); !os.IsNotExist(err) { 255 caPEMCert, err = ioutil.ReadFile(ca) 256 if err != nil { 257 return nil, err 258 } 259 } 260 return NewVersionedTLSClientFromBytes(endpoint, certPEMBlock, keyPEMBlock, caPEMCert, apiVersionString) 261} 262 263// NewClientFromEnv returns a Client instance ready for communication created from 264// Docker's default logic for the environment variables DOCKER_HOST, DOCKER_TLS_VERIFY, DOCKER_CERT_PATH, 265// and DOCKER_API_VERSION. 266// 267// See https://github.com/docker/docker/blob/1f963af697e8df3a78217f6fdbf67b8123a7db94/docker/docker.go#L68. 268// See https://github.com/docker/compose/blob/81707ef1ad94403789166d2fe042c8a718a4c748/compose/cli/docker_client.py#L7. 269// See https://github.com/moby/moby/blob/28d7dba41d0c0d9c7f0dafcc79d3c59f2b3f5dc3/client/options.go#L51 270func NewClientFromEnv() (*Client, error) { 271 apiVersionString := os.Getenv("DOCKER_API_VERSION") 272 client, err := NewVersionedClientFromEnv(apiVersionString) 273 if err != nil { 274 return nil, err 275 } 276 client.SkipServerVersionCheck = apiVersionString == "" 277 return client, nil 278} 279 280// NewVersionedClientFromEnv returns a Client instance ready for TLS communications created from 281// Docker's default logic for the environment variables DOCKER_HOST, DOCKER_TLS_VERIFY, and DOCKER_CERT_PATH, 282// and using a specific remote API version. 283// 284// See https://github.com/docker/docker/blob/1f963af697e8df3a78217f6fdbf67b8123a7db94/docker/docker.go#L68. 285// See https://github.com/docker/compose/blob/81707ef1ad94403789166d2fe042c8a718a4c748/compose/cli/docker_client.py#L7. 286func NewVersionedClientFromEnv(apiVersionString string) (*Client, error) { 287 dockerEnv, err := getDockerEnv() 288 if err != nil { 289 return nil, err 290 } 291 dockerHost := dockerEnv.dockerHost 292 if dockerEnv.dockerTLSVerify { 293 parts := strings.SplitN(dockerEnv.dockerHost, "://", 2) 294 if len(parts) != 2 { 295 return nil, fmt.Errorf("could not split %s into two parts by ://", dockerHost) 296 } 297 cert := filepath.Join(dockerEnv.dockerCertPath, "cert.pem") 298 key := filepath.Join(dockerEnv.dockerCertPath, "key.pem") 299 ca := filepath.Join(dockerEnv.dockerCertPath, "ca.pem") 300 return NewVersionedTLSClient(dockerEnv.dockerHost, cert, key, ca, apiVersionString) 301 } 302 return NewVersionedClient(dockerEnv.dockerHost, apiVersionString) 303} 304 305// NewVersionedTLSClientFromBytes returns a Client instance ready for TLS communications with the givens 306// server endpoint, key and certificates (passed inline to the function as opposed to being 307// read from a local file), using a specific remote API version. 308func NewVersionedTLSClientFromBytes(endpoint string, certPEMBlock, keyPEMBlock, caPEMCert []byte, apiVersionString string) (*Client, error) { 309 u, err := parseEndpoint(endpoint, true) 310 if err != nil { 311 return nil, err 312 } 313 var requestedAPIVersion APIVersion 314 if strings.Contains(apiVersionString, ".") { 315 requestedAPIVersion, err = NewAPIVersion(apiVersionString) 316 if err != nil { 317 return nil, err 318 } 319 } 320 tlsConfig := &tls.Config{} 321 if certPEMBlock != nil && keyPEMBlock != nil { 322 tlsCert, err := tls.X509KeyPair(certPEMBlock, keyPEMBlock) 323 if err != nil { 324 return nil, err 325 } 326 tlsConfig.Certificates = []tls.Certificate{tlsCert} 327 } 328 if caPEMCert == nil { 329 tlsConfig.InsecureSkipVerify = true 330 } else { 331 caPool := x509.NewCertPool() 332 if !caPool.AppendCertsFromPEM(caPEMCert) { 333 return nil, errors.New("could not add RootCA pem") 334 } 335 tlsConfig.RootCAs = caPool 336 } 337 tr := defaultTransport() 338 tr.TLSClientConfig = tlsConfig 339 if err != nil { 340 return nil, err 341 } 342 c := &Client{ 343 HTTPClient: &http.Client{Transport: tr}, 344 TLSConfig: tlsConfig, 345 Dialer: &net.Dialer{}, 346 endpoint: endpoint, 347 endpointURL: u, 348 eventMonitor: new(eventMonitoringState), 349 requestedAPIVersion: requestedAPIVersion, 350 } 351 c.initializeNativeClient(defaultTransport) 352 return c, nil 353} 354 355// SetTimeout takes a timeout and applies it to the HTTPClient. It should not 356// be called concurrently with any other Client methods. 357func (c *Client) SetTimeout(t time.Duration) { 358 if c.HTTPClient != nil { 359 c.HTTPClient.Timeout = t 360 } 361} 362 363func (c *Client) checkAPIVersion() error { 364 serverAPIVersionString, err := c.getServerAPIVersionString() 365 if err != nil { 366 return err 367 } 368 c.serverAPIVersion, err = NewAPIVersion(serverAPIVersionString) 369 if err != nil { 370 return err 371 } 372 if c.requestedAPIVersion == nil { 373 c.expectedAPIVersion = c.serverAPIVersion 374 } else { 375 c.expectedAPIVersion = c.requestedAPIVersion 376 } 377 return nil 378} 379 380// Endpoint returns the current endpoint. It's useful for getting the endpoint 381// when using functions that get this data from the environment (like 382// NewClientFromEnv. 383func (c *Client) Endpoint() string { 384 return c.endpoint 385} 386 387// Ping pings the docker server 388// 389// See https://goo.gl/wYfgY1 for more details. 390func (c *Client) Ping() error { 391 return c.PingWithContext(context.TODO()) 392} 393 394// PingWithContext pings the docker server 395// The context object can be used to cancel the ping request. 396// 397// See https://goo.gl/wYfgY1 for more details. 398func (c *Client) PingWithContext(ctx context.Context) error { 399 path := "/_ping" 400 resp, err := c.do(http.MethodGet, path, doOptions{context: ctx}) 401 if err != nil { 402 return err 403 } 404 if resp.StatusCode != http.StatusOK { 405 return newError(resp) 406 } 407 resp.Body.Close() 408 return nil 409} 410 411func (c *Client) getServerAPIVersionString() (version string, err error) { 412 resp, err := c.do(http.MethodGet, "/version", doOptions{}) 413 if err != nil { 414 return "", err 415 } 416 defer resp.Body.Close() 417 if resp.StatusCode != http.StatusOK { 418 return "", fmt.Errorf("received unexpected status %d while trying to retrieve the server version", resp.StatusCode) 419 } 420 var versionResponse map[string]interface{} 421 if err := json.NewDecoder(resp.Body).Decode(&versionResponse); err != nil { 422 return "", err 423 } 424 if version, ok := (versionResponse["ApiVersion"]).(string); ok { 425 return version, nil 426 } 427 return "", nil 428} 429 430type doOptions struct { 431 data interface{} 432 forceJSON bool 433 headers map[string]string 434 context context.Context 435} 436 437func (c *Client) do(method, path string, doOptions doOptions) (*http.Response, error) { 438 var params io.Reader 439 if doOptions.data != nil || doOptions.forceJSON { 440 buf, err := json.Marshal(doOptions.data) 441 if err != nil { 442 return nil, err 443 } 444 params = bytes.NewBuffer(buf) 445 } 446 if path != "/version" && !c.SkipServerVersionCheck && c.expectedAPIVersion == nil { 447 err := c.checkAPIVersion() 448 if err != nil { 449 return nil, err 450 } 451 } 452 protocol := c.endpointURL.Scheme 453 var u string 454 switch protocol { 455 case unixProtocol, namedPipeProtocol: 456 u = c.getFakeNativeURL(path) 457 default: 458 u = c.getURL(path) 459 } 460 461 req, err := http.NewRequest(method, u, params) 462 if err != nil { 463 return nil, err 464 } 465 req.Header.Set("User-Agent", userAgent) 466 if doOptions.data != nil { 467 req.Header.Set("Content-Type", "application/json") 468 } else if method == http.MethodPost { 469 req.Header.Set("Content-Type", "plain/text") 470 } 471 472 for k, v := range doOptions.headers { 473 req.Header.Set(k, v) 474 } 475 476 ctx := doOptions.context 477 if ctx == nil { 478 ctx = context.Background() 479 } 480 481 resp, err := c.HTTPClient.Do(req.WithContext(ctx)) 482 if err != nil { 483 if strings.Contains(err.Error(), "connection refused") { 484 return nil, ErrConnectionRefused 485 } 486 487 return nil, chooseError(ctx, err) 488 } 489 if resp.StatusCode < 200 || resp.StatusCode >= 400 { 490 return nil, newError(resp) 491 } 492 return resp, nil 493} 494 495type streamOptions struct { 496 setRawTerminal bool 497 rawJSONStream bool 498 useJSONDecoder bool 499 headers map[string]string 500 in io.Reader 501 stdout io.Writer 502 stderr io.Writer 503 reqSent chan struct{} 504 // timeout is the initial connection timeout 505 timeout time.Duration 506 // Timeout with no data is received, it's reset every time new data 507 // arrives 508 inactivityTimeout time.Duration 509 context context.Context 510} 511 512func chooseError(ctx context.Context, err error) error { 513 select { 514 case <-ctx.Done(): 515 return ctx.Err() 516 default: 517 return err 518 } 519} 520 521func (c *Client) stream(method, path string, streamOptions streamOptions) error { 522 if (method == http.MethodPost || method == http.MethodPut) && streamOptions.in == nil { 523 streamOptions.in = bytes.NewReader(nil) 524 } 525 if path != "/version" && !c.SkipServerVersionCheck && c.expectedAPIVersion == nil { 526 err := c.checkAPIVersion() 527 if err != nil { 528 return err 529 } 530 } 531 return c.streamURL(method, c.getURL(path), streamOptions) 532} 533 534func (c *Client) streamURL(method, url string, streamOptions streamOptions) error { 535 if (method == http.MethodPost || method == http.MethodPut) && streamOptions.in == nil { 536 streamOptions.in = bytes.NewReader(nil) 537 } 538 if !c.SkipServerVersionCheck && c.expectedAPIVersion == nil { 539 err := c.checkAPIVersion() 540 if err != nil { 541 return err 542 } 543 } 544 req, err := http.NewRequest(method, url, streamOptions.in) 545 if err != nil { 546 return err 547 } 548 req.Header.Set("User-Agent", userAgent) 549 if method == http.MethodPost { 550 req.Header.Set("Content-Type", "plain/text") 551 } 552 for key, val := range streamOptions.headers { 553 req.Header.Set(key, val) 554 } 555 var resp *http.Response 556 protocol := c.endpointURL.Scheme 557 address := c.endpointURL.Path 558 if streamOptions.stdout == nil { 559 streamOptions.stdout = ioutil.Discard 560 } 561 if streamOptions.stderr == nil { 562 streamOptions.stderr = ioutil.Discard 563 } 564 565 // make a sub-context so that our active cancellation does not affect parent 566 ctx := streamOptions.context 567 if ctx == nil { 568 ctx = context.Background() 569 } 570 subCtx, cancelRequest := context.WithCancel(ctx) 571 defer cancelRequest() 572 573 if protocol == unixProtocol || protocol == namedPipeProtocol { 574 var dial net.Conn 575 dial, err = c.Dialer.Dial(protocol, address) 576 if err != nil { 577 return err 578 } 579 go func() { 580 <-subCtx.Done() 581 dial.Close() 582 }() 583 breader := bufio.NewReader(dial) 584 err = req.Write(dial) 585 if err != nil { 586 return chooseError(subCtx, err) 587 } 588 589 // ReadResponse may hang if server does not replay 590 if streamOptions.timeout > 0 { 591 dial.SetDeadline(time.Now().Add(streamOptions.timeout)) 592 } 593 594 if streamOptions.reqSent != nil { 595 close(streamOptions.reqSent) 596 } 597 if resp, err = http.ReadResponse(breader, req); err != nil { 598 // Cancel timeout for future I/O operations 599 if streamOptions.timeout > 0 { 600 dial.SetDeadline(time.Time{}) 601 } 602 if strings.Contains(err.Error(), "connection refused") { 603 return ErrConnectionRefused 604 } 605 606 return chooseError(subCtx, err) 607 } 608 defer resp.Body.Close() 609 } else { 610 if resp, err = c.HTTPClient.Do(req.WithContext(subCtx)); err != nil { 611 if strings.Contains(err.Error(), "connection refused") { 612 return ErrConnectionRefused 613 } 614 return chooseError(subCtx, err) 615 } 616 defer resp.Body.Close() 617 if streamOptions.reqSent != nil { 618 close(streamOptions.reqSent) 619 } 620 } 621 if resp.StatusCode < 200 || resp.StatusCode >= 400 { 622 return newError(resp) 623 } 624 var canceled uint32 625 if streamOptions.inactivityTimeout > 0 { 626 var ch chan<- struct{} 627 resp.Body, ch = handleInactivityTimeout(resp.Body, streamOptions.inactivityTimeout, cancelRequest, &canceled) 628 defer close(ch) 629 } 630 err = handleStreamResponse(resp, &streamOptions) 631 if err != nil { 632 if atomic.LoadUint32(&canceled) != 0 { 633 return ErrInactivityTimeout 634 } 635 return chooseError(subCtx, err) 636 } 637 return nil 638} 639 640func handleStreamResponse(resp *http.Response, streamOptions *streamOptions) error { 641 var err error 642 if !streamOptions.useJSONDecoder && resp.Header.Get("Content-Type") != "application/json" { 643 if streamOptions.setRawTerminal { 644 _, err = io.Copy(streamOptions.stdout, resp.Body) 645 } else { 646 _, err = stdcopy.StdCopy(streamOptions.stdout, streamOptions.stderr, resp.Body) 647 } 648 return err 649 } 650 // if we want to get raw json stream, just copy it back to output 651 // without decoding it 652 if streamOptions.rawJSONStream { 653 _, err = io.Copy(streamOptions.stdout, resp.Body) 654 return err 655 } 656 if st, ok := streamOptions.stdout.(stream); ok { 657 err = jsonmessage.DisplayJSONMessagesToStream(resp.Body, st, nil) 658 } else { 659 err = jsonmessage.DisplayJSONMessagesStream(resp.Body, streamOptions.stdout, 0, false, nil) 660 } 661 return err 662} 663 664type stream interface { 665 io.Writer 666 FD() uintptr 667 IsTerminal() bool 668} 669 670type proxyReader struct { 671 io.ReadCloser 672 calls uint64 673} 674 675func (p *proxyReader) callCount() uint64 { 676 return atomic.LoadUint64(&p.calls) 677} 678 679func (p *proxyReader) Read(data []byte) (int, error) { 680 atomic.AddUint64(&p.calls, 1) 681 return p.ReadCloser.Read(data) 682} 683 684func handleInactivityTimeout(reader io.ReadCloser, timeout time.Duration, cancelRequest func(), canceled *uint32) (io.ReadCloser, chan<- struct{}) { 685 done := make(chan struct{}) 686 proxyReader := &proxyReader{ReadCloser: reader} 687 go func() { 688 var lastCallCount uint64 689 for { 690 select { 691 case <-time.After(timeout): 692 case <-done: 693 return 694 } 695 curCallCount := proxyReader.callCount() 696 if curCallCount == lastCallCount { 697 atomic.AddUint32(canceled, 1) 698 cancelRequest() 699 return 700 } 701 lastCallCount = curCallCount 702 } 703 }() 704 return proxyReader, done 705} 706 707type hijackOptions struct { 708 success chan struct{} 709 setRawTerminal bool 710 in io.Reader 711 stdout io.Writer 712 stderr io.Writer 713 data interface{} 714} 715 716// CloseWaiter is an interface with methods for closing the underlying resource 717// and then waiting for it to finish processing. 718type CloseWaiter interface { 719 io.Closer 720 Wait() error 721} 722 723type waiterFunc func() error 724 725func (w waiterFunc) Wait() error { return w() } 726 727type closerFunc func() error 728 729func (c closerFunc) Close() error { return c() } 730 731func (c *Client) hijack(method, path string, hijackOptions hijackOptions) (CloseWaiter, error) { 732 if path != "/version" && !c.SkipServerVersionCheck && c.expectedAPIVersion == nil { 733 err := c.checkAPIVersion() 734 if err != nil { 735 return nil, err 736 } 737 } 738 var params io.Reader 739 if hijackOptions.data != nil { 740 buf, err := json.Marshal(hijackOptions.data) 741 if err != nil { 742 return nil, err 743 } 744 params = bytes.NewBuffer(buf) 745 } 746 req, err := http.NewRequest(method, c.getURL(path), params) 747 if err != nil { 748 return nil, err 749 } 750 req.Header.Set("Content-Type", "application/json") 751 req.Header.Set("Connection", "Upgrade") 752 req.Header.Set("Upgrade", "tcp") 753 protocol := c.endpointURL.Scheme 754 address := c.endpointURL.Path 755 if protocol != unixProtocol && protocol != namedPipeProtocol { 756 protocol = "tcp" 757 address = c.endpointURL.Host 758 } 759 var dial net.Conn 760 if c.TLSConfig != nil && protocol != unixProtocol && protocol != namedPipeProtocol { 761 netDialer, ok := c.Dialer.(*net.Dialer) 762 if !ok { 763 return nil, ErrTLSNotSupported 764 } 765 dial, err = tlsDialWithDialer(netDialer, protocol, address, c.TLSConfig) 766 if err != nil { 767 return nil, err 768 } 769 } else { 770 dial, err = c.Dialer.Dial(protocol, address) 771 if err != nil { 772 return nil, err 773 } 774 } 775 776 errs := make(chan error, 1) 777 quit := make(chan struct{}) 778 go func() { 779 //nolint:staticcheck 780 clientconn := httputil.NewClientConn(dial, nil) 781 defer clientconn.Close() 782 //nolint:bodyclose 783 clientconn.Do(req) 784 if hijackOptions.success != nil { 785 hijackOptions.success <- struct{}{} 786 <-hijackOptions.success 787 } 788 rwc, br := clientconn.Hijack() 789 defer rwc.Close() 790 791 errChanOut := make(chan error, 1) 792 errChanIn := make(chan error, 2) 793 if hijackOptions.stdout == nil && hijackOptions.stderr == nil { 794 close(errChanOut) 795 } else { 796 // Only copy if hijackOptions.stdout and/or hijackOptions.stderr is actually set. 797 // Otherwise, if the only stream you care about is stdin, your attach session 798 // will "hang" until the container terminates, even though you're not reading 799 // stdout/stderr 800 if hijackOptions.stdout == nil { 801 hijackOptions.stdout = ioutil.Discard 802 } 803 if hijackOptions.stderr == nil { 804 hijackOptions.stderr = ioutil.Discard 805 } 806 807 go func() { 808 defer func() { 809 if hijackOptions.in != nil { 810 if closer, ok := hijackOptions.in.(io.Closer); ok { 811 closer.Close() 812 } 813 errChanIn <- nil 814 } 815 }() 816 817 var err error 818 if hijackOptions.setRawTerminal { 819 _, err = io.Copy(hijackOptions.stdout, br) 820 } else { 821 _, err = stdcopy.StdCopy(hijackOptions.stdout, hijackOptions.stderr, br) 822 } 823 errChanOut <- err 824 }() 825 } 826 827 go func() { 828 var err error 829 if hijackOptions.in != nil { 830 _, err = io.Copy(rwc, hijackOptions.in) 831 } 832 errChanIn <- err 833 rwc.(interface { 834 CloseWrite() error 835 }).CloseWrite() 836 }() 837 838 var errIn error 839 select { 840 case errIn = <-errChanIn: 841 case <-quit: 842 } 843 844 var errOut error 845 select { 846 case errOut = <-errChanOut: 847 case <-quit: 848 } 849 850 if errIn != nil { 851 errs <- errIn 852 } else { 853 errs <- errOut 854 } 855 }() 856 857 return struct { 858 closerFunc 859 waiterFunc 860 }{ 861 closerFunc(func() error { close(quit); return nil }), 862 waiterFunc(func() error { return <-errs }), 863 }, nil 864} 865 866func (c *Client) getURL(path string) string { 867 urlStr := strings.TrimRight(c.endpointURL.String(), "/") 868 if c.endpointURL.Scheme == unixProtocol || c.endpointURL.Scheme == namedPipeProtocol { 869 urlStr = "" 870 } 871 if c.requestedAPIVersion != nil { 872 return fmt.Sprintf("%s/v%s%s", urlStr, c.requestedAPIVersion, path) 873 } 874 return fmt.Sprintf("%s%s", urlStr, path) 875} 876 877func (c *Client) getPath(basepath string, opts interface{}) (string, error) { 878 queryStr, requiredAPIVersion := queryStringVersion(opts) 879 return c.pathVersionCheck(basepath, queryStr, requiredAPIVersion) 880} 881 882func (c *Client) pathVersionCheck(basepath, queryStr string, requiredAPIVersion APIVersion) (string, error) { 883 urlStr := strings.TrimRight(c.endpointURL.String(), "/") 884 if c.endpointURL.Scheme == unixProtocol || c.endpointURL.Scheme == namedPipeProtocol { 885 urlStr = "" 886 } 887 if c.requestedAPIVersion != nil { 888 if c.requestedAPIVersion.GreaterThanOrEqualTo(requiredAPIVersion) { 889 return fmt.Sprintf("%s/v%s%s?%s", urlStr, c.requestedAPIVersion, basepath, queryStr), nil 890 } 891 return "", fmt.Errorf("API %s requires version %s, requested version %s is insufficient", 892 basepath, requiredAPIVersion, c.requestedAPIVersion) 893 } 894 if requiredAPIVersion != nil { 895 return fmt.Sprintf("%s/v%s%s?%s", urlStr, requiredAPIVersion, basepath, queryStr), nil 896 } 897 return fmt.Sprintf("%s%s?%s", urlStr, basepath, queryStr), nil 898} 899 900// getFakeNativeURL returns the URL needed to make an HTTP request over a UNIX 901// domain socket to the given path. 902func (c *Client) getFakeNativeURL(path string) string { 903 u := *c.endpointURL // Copy. 904 905 // Override URL so that net/http will not complain. 906 u.Scheme = "http" 907 u.Host = "unix.sock" // Doesn't matter what this is - it's not used. 908 u.Path = "" 909 urlStr := strings.TrimRight(u.String(), "/") 910 if c.requestedAPIVersion != nil { 911 return fmt.Sprintf("%s/v%s%s", urlStr, c.requestedAPIVersion, path) 912 } 913 return fmt.Sprintf("%s%s", urlStr, path) 914} 915 916func queryStringVersion(opts interface{}) (string, APIVersion) { 917 if opts == nil { 918 return "", nil 919 } 920 value := reflect.ValueOf(opts) 921 if value.Kind() == reflect.Ptr { 922 value = value.Elem() 923 } 924 if value.Kind() != reflect.Struct { 925 return "", nil 926 } 927 var apiVersion APIVersion 928 items := url.Values(map[string][]string{}) 929 for i := 0; i < value.NumField(); i++ { 930 field := value.Type().Field(i) 931 if field.PkgPath != "" { 932 continue 933 } 934 key := field.Tag.Get("qs") 935 if key == "" { 936 key = strings.ToLower(field.Name) 937 } else if key == "-" { 938 continue 939 } 940 if addQueryStringValue(items, key, value.Field(i)) { 941 verstr := field.Tag.Get("ver") 942 if verstr != "" { 943 ver, _ := NewAPIVersion(verstr) 944 if apiVersion == nil { 945 apiVersion = ver 946 } else if ver.GreaterThan(apiVersion) { 947 apiVersion = ver 948 } 949 } 950 } 951 } 952 return items.Encode(), apiVersion 953} 954 955func queryString(opts interface{}) string { 956 s, _ := queryStringVersion(opts) 957 return s 958} 959 960func addQueryStringValue(items url.Values, key string, v reflect.Value) bool { 961 switch v.Kind() { 962 case reflect.Bool: 963 if v.Bool() { 964 items.Add(key, "1") 965 return true 966 } 967 case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: 968 if v.Int() > 0 { 969 items.Add(key, strconv.FormatInt(v.Int(), 10)) 970 return true 971 } 972 case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: 973 if v.Uint() > 0 { 974 items.Add(key, strconv.FormatUint(v.Uint(), 10)) 975 return true 976 } 977 case reflect.Float32, reflect.Float64: 978 if v.Float() > 0 { 979 items.Add(key, strconv.FormatFloat(v.Float(), 'f', -1, 64)) 980 return true 981 } 982 case reflect.String: 983 if v.String() != "" { 984 items.Add(key, v.String()) 985 return true 986 } 987 case reflect.Ptr: 988 if !v.IsNil() { 989 if b, err := json.Marshal(v.Interface()); err == nil { 990 items.Add(key, string(b)) 991 return true 992 } 993 } 994 case reflect.Map: 995 if len(v.MapKeys()) > 0 { 996 if b, err := json.Marshal(v.Interface()); err == nil { 997 items.Add(key, string(b)) 998 return true 999 } 1000 } 1001 case reflect.Array, reflect.Slice: 1002 vLen := v.Len() 1003 var valuesAdded int 1004 if vLen > 0 { 1005 for i := 0; i < vLen; i++ { 1006 if addQueryStringValue(items, key, v.Index(i)) { 1007 valuesAdded++ 1008 } 1009 } 1010 } 1011 return valuesAdded > 0 1012 } 1013 return false 1014} 1015 1016// Error represents failures in the API. It represents a failure from the API. 1017type Error struct { 1018 Status int 1019 Message string 1020} 1021 1022func newError(resp *http.Response) *Error { 1023 type ErrMsg struct { 1024 Message string `json:"message"` 1025 } 1026 defer resp.Body.Close() 1027 data, err := ioutil.ReadAll(resp.Body) 1028 if err != nil { 1029 return &Error{Status: resp.StatusCode, Message: fmt.Sprintf("cannot read body, err: %v", err)} 1030 } 1031 var emsg ErrMsg 1032 err = json.Unmarshal(data, &emsg) 1033 if err != nil { 1034 return &Error{Status: resp.StatusCode, Message: string(data)} 1035 } 1036 return &Error{Status: resp.StatusCode, Message: emsg.Message} 1037} 1038 1039func (e *Error) Error() string { 1040 return fmt.Sprintf("API error (%d): %s", e.Status, e.Message) 1041} 1042 1043func parseEndpoint(endpoint string, tls bool) (*url.URL, error) { 1044 if endpoint != "" && !strings.Contains(endpoint, "://") { 1045 endpoint = "tcp://" + endpoint 1046 } 1047 u, err := url.Parse(endpoint) 1048 if err != nil { 1049 return nil, ErrInvalidEndpoint 1050 } 1051 if tls && u.Scheme != "unix" { 1052 u.Scheme = "https" 1053 } 1054 switch u.Scheme { 1055 case unixProtocol, namedPipeProtocol: 1056 return u, nil 1057 case "http", "https", "tcp": 1058 _, port, err := net.SplitHostPort(u.Host) 1059 if err != nil { 1060 if e, ok := err.(*net.AddrError); ok { 1061 if e.Err == "missing port in address" { 1062 return u, nil 1063 } 1064 } 1065 return nil, ErrInvalidEndpoint 1066 } 1067 number, err := strconv.ParseInt(port, 10, 64) 1068 if err == nil && number > 0 && number < 65536 { 1069 if u.Scheme == "tcp" { 1070 if tls { 1071 u.Scheme = "https" 1072 } else { 1073 u.Scheme = "http" 1074 } 1075 } 1076 return u, nil 1077 } 1078 return nil, ErrInvalidEndpoint 1079 default: 1080 return nil, ErrInvalidEndpoint 1081 } 1082} 1083 1084type dockerEnv struct { 1085 dockerHost string 1086 dockerTLSVerify bool 1087 dockerCertPath string 1088} 1089 1090func getDockerEnv() (*dockerEnv, error) { 1091 dockerHost := os.Getenv("DOCKER_HOST") 1092 var err error 1093 if dockerHost == "" { 1094 dockerHost = defaultHost 1095 } 1096 dockerTLSVerify := os.Getenv("DOCKER_TLS_VERIFY") != "" 1097 var dockerCertPath string 1098 if dockerTLSVerify { 1099 dockerCertPath = os.Getenv("DOCKER_CERT_PATH") 1100 if dockerCertPath == "" { 1101 home := homedir.Get() 1102 if home == "" { 1103 return nil, errors.New("environment variable HOME must be set if DOCKER_CERT_PATH is not set") 1104 } 1105 dockerCertPath = filepath.Join(home, ".docker") 1106 dockerCertPath, err = filepath.Abs(dockerCertPath) 1107 if err != nil { 1108 return nil, err 1109 } 1110 } 1111 } 1112 return &dockerEnv{ 1113 dockerHost: dockerHost, 1114 dockerTLSVerify: dockerTLSVerify, 1115 dockerCertPath: dockerCertPath, 1116 }, nil 1117} 1118 1119// defaultTransport returns a new http.Transport with similar default values to 1120// http.DefaultTransport, but with idle connections and keepalives disabled. 1121func defaultTransport() *http.Transport { 1122 transport := defaultPooledTransport() 1123 transport.DisableKeepAlives = true 1124 transport.MaxIdleConnsPerHost = -1 1125 return transport 1126} 1127 1128// defaultPooledTransport returns a new http.Transport with similar default 1129// values to http.DefaultTransport. Do not use this for transient transports as 1130// it can leak file descriptors over time. Only use this for transports that 1131// will be re-used for the same host(s). 1132func defaultPooledTransport() *http.Transport { 1133 transport := &http.Transport{ 1134 Proxy: http.ProxyFromEnvironment, 1135 DialContext: (&net.Dialer{ 1136 Timeout: 30 * time.Second, 1137 KeepAlive: 30 * time.Second, 1138 }).DialContext, 1139 MaxIdleConns: 100, 1140 IdleConnTimeout: 90 * time.Second, 1141 TLSHandshakeTimeout: 10 * time.Second, 1142 ExpectContinueTimeout: 1 * time.Second, 1143 MaxIdleConnsPerHost: runtime.GOMAXPROCS(0) + 1, 1144 } 1145 return transport 1146} 1147 1148// defaultClient returns a new http.Client with similar default values to 1149// http.Client, but with a non-shared Transport, idle connections disabled, and 1150// keepalives disabled. 1151func defaultClient() *http.Client { 1152 return &http.Client{ 1153 Transport: defaultTransport(), 1154 } 1155} 1156