1package api 2 3import ( 4 "bytes" 5 "compress/gzip" 6 "crypto/tls" 7 "encoding/json" 8 "fmt" 9 "io" 10 "net" 11 "net/http" 12 "net/url" 13 "os" 14 "strconv" 15 "strings" 16 "time" 17 18 "github.com/gorilla/websocket" 19 cleanhttp "github.com/hashicorp/go-cleanhttp" 20 rootcerts "github.com/hashicorp/go-rootcerts" 21) 22 23var ( 24 // ClientConnTimeout is the timeout applied when attempting to contact a 25 // client directly before switching to a connection through the Nomad 26 // server. 27 ClientConnTimeout = 1 * time.Second 28) 29 30// QueryOptions are used to parametrize a query 31type QueryOptions struct { 32 // Providing a datacenter overwrites the region provided 33 // by the Config 34 Region string 35 36 // Namespace is the target namespace for the query. 37 Namespace string 38 39 // AllowStale allows any Nomad server (non-leader) to service 40 // a read. This allows for lower latency and higher throughput 41 AllowStale bool 42 43 // WaitIndex is used to enable a blocking query. Waits 44 // until the timeout or the next index is reached 45 WaitIndex uint64 46 47 // WaitTime is used to bound the duration of a wait. 48 // Defaults to that of the Config, but can be overridden. 49 WaitTime time.Duration 50 51 // If set, used as prefix for resource list searches 52 Prefix string 53 54 // Set HTTP parameters on the query. 55 Params map[string]string 56 57 // AuthToken is the secret ID of an ACL token 58 AuthToken string 59} 60 61// WriteOptions are used to parametrize a write 62type WriteOptions struct { 63 // Providing a datacenter overwrites the region provided 64 // by the Config 65 Region string 66 67 // Namespace is the target namespace for the write. 68 Namespace string 69 70 // AuthToken is the secret ID of an ACL token 71 AuthToken string 72} 73 74// QueryMeta is used to return meta data about a query 75type QueryMeta struct { 76 // LastIndex. This can be used as a WaitIndex to perform 77 // a blocking query 78 LastIndex uint64 79 80 // Time of last contact from the leader for the 81 // server servicing the request 82 LastContact time.Duration 83 84 // Is there a known leader 85 KnownLeader bool 86 87 // How long did the request take 88 RequestTime time.Duration 89} 90 91// WriteMeta is used to return meta data about a write 92type WriteMeta struct { 93 // LastIndex. This can be used as a WaitIndex to perform 94 // a blocking query 95 LastIndex uint64 96 97 // How long did the request take 98 RequestTime time.Duration 99} 100 101// HttpBasicAuth is used to authenticate http client with HTTP Basic Authentication 102type HttpBasicAuth struct { 103 // Username to use for HTTP Basic Authentication 104 Username string 105 106 // Password to use for HTTP Basic Authentication 107 Password string 108} 109 110// Config is used to configure the creation of a client 111type Config struct { 112 // Address is the address of the Nomad agent 113 Address string 114 115 // Region to use. If not provided, the default agent region is used. 116 Region string 117 118 // SecretID to use. This can be overwritten per request. 119 SecretID string 120 121 // Namespace to use. If not provided the default namespace is used. 122 Namespace string 123 124 // HttpClient is the client to use. Default will be used if not provided. 125 // 126 // If set, it expected to be configured for tls already, and TLSConfig is ignored. 127 // You may use ConfigureTLS() function to aid with initialization. 128 HttpClient *http.Client 129 130 // HttpAuth is the auth info to use for http access. 131 HttpAuth *HttpBasicAuth 132 133 // WaitTime limits how long a Watch will block. If not provided, 134 // the agent default values will be used. 135 WaitTime time.Duration 136 137 // TLSConfig provides the various TLS related configurations for the http 138 // client. 139 // 140 // TLSConfig is ignored if HttpClient is set. 141 TLSConfig *TLSConfig 142} 143 144// ClientConfig copies the configuration with a new client address, region, and 145// whether the client has TLS enabled. 146func (c *Config) ClientConfig(region, address string, tlsEnabled bool) *Config { 147 scheme := "http" 148 if tlsEnabled { 149 scheme = "https" 150 } 151 config := &Config{ 152 Address: fmt.Sprintf("%s://%s", scheme, address), 153 Region: region, 154 Namespace: c.Namespace, 155 HttpClient: c.HttpClient, 156 SecretID: c.SecretID, 157 HttpAuth: c.HttpAuth, 158 WaitTime: c.WaitTime, 159 TLSConfig: c.TLSConfig.Copy(), 160 } 161 162 // Update the tls server name for connecting to a client 163 if tlsEnabled && config.TLSConfig != nil { 164 config.TLSConfig.TLSServerName = fmt.Sprintf("client.%s.nomad", region) 165 } 166 167 return config 168} 169 170// TLSConfig contains the parameters needed to configure TLS on the HTTP client 171// used to communicate with Nomad. 172type TLSConfig struct { 173 // CACert is the path to a PEM-encoded CA cert file to use to verify the 174 // Nomad server SSL certificate. 175 CACert string 176 177 // CAPath is the path to a directory of PEM-encoded CA cert files to verify 178 // the Nomad server SSL certificate. 179 CAPath string 180 181 // CACertPem is the PEM-encoded CA cert to use to verify the Nomad server 182 // SSL certificate. 183 CACertPEM []byte 184 185 // ClientCert is the path to the certificate for Nomad communication 186 ClientCert string 187 188 // ClientCertPEM is the PEM-encoded certificate for Nomad communication 189 ClientCertPEM []byte 190 191 // ClientKey is the path to the private key for Nomad communication 192 ClientKey string 193 194 // ClientKeyPEM is the PEM-encoded private key for Nomad communication 195 ClientKeyPEM []byte 196 197 // TLSServerName, if set, is used to set the SNI host when connecting via 198 // TLS. 199 TLSServerName string 200 201 // Insecure enables or disables SSL verification 202 Insecure bool 203} 204 205func (t *TLSConfig) Copy() *TLSConfig { 206 if t == nil { 207 return nil 208 } 209 210 nt := new(TLSConfig) 211 *nt = *t 212 return nt 213} 214 215func defaultHttpClient() *http.Client { 216 httpClient := cleanhttp.DefaultClient() 217 transport := httpClient.Transport.(*http.Transport) 218 transport.TLSHandshakeTimeout = 10 * time.Second 219 transport.TLSClientConfig = &tls.Config{ 220 MinVersion: tls.VersionTLS12, 221 } 222 223 return httpClient 224} 225 226// DefaultConfig returns a default configuration for the client 227func DefaultConfig() *Config { 228 config := &Config{ 229 Address: "http://127.0.0.1:4646", 230 TLSConfig: &TLSConfig{}, 231 } 232 if addr := os.Getenv("NOMAD_ADDR"); addr != "" { 233 config.Address = addr 234 } 235 if v := os.Getenv("NOMAD_REGION"); v != "" { 236 config.Region = v 237 } 238 if v := os.Getenv("NOMAD_NAMESPACE"); v != "" { 239 config.Namespace = v 240 } 241 if auth := os.Getenv("NOMAD_HTTP_AUTH"); auth != "" { 242 var username, password string 243 if strings.Contains(auth, ":") { 244 split := strings.SplitN(auth, ":", 2) 245 username = split[0] 246 password = split[1] 247 } else { 248 username = auth 249 } 250 251 config.HttpAuth = &HttpBasicAuth{ 252 Username: username, 253 Password: password, 254 } 255 } 256 257 // Read TLS specific env vars 258 if v := os.Getenv("NOMAD_CACERT"); v != "" { 259 config.TLSConfig.CACert = v 260 } 261 if v := os.Getenv("NOMAD_CAPATH"); v != "" { 262 config.TLSConfig.CAPath = v 263 } 264 if v := os.Getenv("NOMAD_CLIENT_CERT"); v != "" { 265 config.TLSConfig.ClientCert = v 266 } 267 if v := os.Getenv("NOMAD_CLIENT_KEY"); v != "" { 268 config.TLSConfig.ClientKey = v 269 } 270 if v := os.Getenv("NOMAD_TLS_SERVER_NAME"); v != "" { 271 config.TLSConfig.TLSServerName = v 272 } 273 if v := os.Getenv("NOMAD_SKIP_VERIFY"); v != "" { 274 if insecure, err := strconv.ParseBool(v); err == nil { 275 config.TLSConfig.Insecure = insecure 276 } 277 } 278 if v := os.Getenv("NOMAD_TOKEN"); v != "" { 279 config.SecretID = v 280 } 281 return config 282} 283 284// cloneWithTimeout returns a cloned httpClient with set timeout if positive; 285// otherwise, returns the same client 286func cloneWithTimeout(httpClient *http.Client, t time.Duration) (*http.Client, error) { 287 if httpClient == nil { 288 return nil, fmt.Errorf("nil HTTP client") 289 } else if httpClient.Transport == nil { 290 return nil, fmt.Errorf("nil HTTP client transport") 291 } 292 293 if t.Nanoseconds() < 0 { 294 return httpClient, nil 295 } 296 297 tr, ok := httpClient.Transport.(*http.Transport) 298 if !ok { 299 return nil, fmt.Errorf("unexpected HTTP transport: %T", httpClient.Transport) 300 } 301 302 // copy all public fields, to avoid copying transient state and locks 303 ntr := &http.Transport{ 304 Proxy: tr.Proxy, 305 DialContext: tr.DialContext, 306 Dial: tr.Dial, 307 DialTLS: tr.DialTLS, 308 TLSClientConfig: tr.TLSClientConfig, 309 TLSHandshakeTimeout: tr.TLSHandshakeTimeout, 310 DisableKeepAlives: tr.DisableKeepAlives, 311 DisableCompression: tr.DisableCompression, 312 MaxIdleConns: tr.MaxIdleConns, 313 MaxIdleConnsPerHost: tr.MaxIdleConnsPerHost, 314 MaxConnsPerHost: tr.MaxConnsPerHost, 315 IdleConnTimeout: tr.IdleConnTimeout, 316 ResponseHeaderTimeout: tr.ResponseHeaderTimeout, 317 ExpectContinueTimeout: tr.ExpectContinueTimeout, 318 TLSNextProto: tr.TLSNextProto, 319 ProxyConnectHeader: tr.ProxyConnectHeader, 320 MaxResponseHeaderBytes: tr.MaxResponseHeaderBytes, 321 } 322 323 // apply timeout 324 ntr.DialContext = (&net.Dialer{ 325 Timeout: t, 326 KeepAlive: 30 * time.Second, 327 }).DialContext 328 329 // clone http client with new transport 330 nc := *httpClient 331 nc.Transport = ntr 332 return &nc, nil 333} 334 335// ConfigureTLS applies a set of TLS configurations to the the HTTP client. 336func ConfigureTLS(httpClient *http.Client, tlsConfig *TLSConfig) error { 337 if tlsConfig == nil { 338 return nil 339 } 340 if httpClient == nil { 341 return fmt.Errorf("config HTTP Client must be set") 342 } 343 344 var clientCert tls.Certificate 345 foundClientCert := false 346 if tlsConfig.ClientCert != "" || tlsConfig.ClientKey != "" { 347 if tlsConfig.ClientCert != "" && tlsConfig.ClientKey != "" { 348 var err error 349 clientCert, err = tls.LoadX509KeyPair(tlsConfig.ClientCert, tlsConfig.ClientKey) 350 if err != nil { 351 return err 352 } 353 foundClientCert = true 354 } else { 355 return fmt.Errorf("Both client cert and client key must be provided") 356 } 357 } else if len(tlsConfig.ClientCertPEM) != 0 || len(tlsConfig.ClientKeyPEM) != 0 { 358 if len(tlsConfig.ClientCertPEM) != 0 && len(tlsConfig.ClientKeyPEM) != 0 { 359 var err error 360 clientCert, err = tls.X509KeyPair(tlsConfig.ClientCertPEM, tlsConfig.ClientKeyPEM) 361 if err != nil { 362 return err 363 } 364 foundClientCert = true 365 } else { 366 return fmt.Errorf("Both client cert and client key must be provided") 367 } 368 } 369 370 clientTLSConfig := httpClient.Transport.(*http.Transport).TLSClientConfig 371 rootConfig := &rootcerts.Config{ 372 CAFile: tlsConfig.CACert, 373 CAPath: tlsConfig.CAPath, 374 CACertificate: tlsConfig.CACertPEM, 375 } 376 if err := rootcerts.ConfigureTLS(clientTLSConfig, rootConfig); err != nil { 377 return err 378 } 379 380 clientTLSConfig.InsecureSkipVerify = tlsConfig.Insecure 381 382 if foundClientCert { 383 clientTLSConfig.Certificates = []tls.Certificate{clientCert} 384 } 385 if tlsConfig.TLSServerName != "" { 386 clientTLSConfig.ServerName = tlsConfig.TLSServerName 387 } 388 389 return nil 390} 391 392// Client provides a client to the Nomad API 393type Client struct { 394 httpClient *http.Client 395 config Config 396} 397 398// NewClient returns a new client 399func NewClient(config *Config) (*Client, error) { 400 // bootstrap the config 401 defConfig := DefaultConfig() 402 403 if config.Address == "" { 404 config.Address = defConfig.Address 405 } else if _, err := url.Parse(config.Address); err != nil { 406 return nil, fmt.Errorf("invalid address '%s': %v", config.Address, err) 407 } 408 409 httpClient := config.HttpClient 410 if httpClient == nil { 411 httpClient = defaultHttpClient() 412 if err := ConfigureTLS(httpClient, config.TLSConfig); err != nil { 413 return nil, err 414 } 415 } 416 417 client := &Client{ 418 config: *config, 419 httpClient: httpClient, 420 } 421 return client, nil 422} 423 424// Address return the address of the Nomad agent 425func (c *Client) Address() string { 426 return c.config.Address 427} 428 429// SetRegion sets the region to forward API requests to. 430func (c *Client) SetRegion(region string) { 431 c.config.Region = region 432} 433 434// SetNamespace sets the namespace to forward API requests to. 435func (c *Client) SetNamespace(namespace string) { 436 c.config.Namespace = namespace 437} 438 439// GetNodeClient returns a new Client that will dial the specified node. If the 440// QueryOptions is set, its region will be used. 441func (c *Client) GetNodeClient(nodeID string, q *QueryOptions) (*Client, error) { 442 return c.getNodeClientImpl(nodeID, -1, q, c.Nodes().Info) 443} 444 445// GetNodeClientWithTimeout returns a new Client that will dial the specified 446// node using the specified timeout. If the QueryOptions is set, its region will 447// be used. 448func (c *Client) GetNodeClientWithTimeout( 449 nodeID string, timeout time.Duration, q *QueryOptions) (*Client, error) { 450 return c.getNodeClientImpl(nodeID, timeout, q, c.Nodes().Info) 451} 452 453// nodeLookup is the definition of a function used to lookup a node. This is 454// largely used to mock the lookup in tests. 455type nodeLookup func(nodeID string, q *QueryOptions) (*Node, *QueryMeta, error) 456 457// getNodeClientImpl is the implementation of creating a API client for 458// contacting a node. It takes a function to lookup the node such that it can be 459// mocked during tests. 460func (c *Client) getNodeClientImpl(nodeID string, timeout time.Duration, q *QueryOptions, lookup nodeLookup) (*Client, error) { 461 node, _, err := lookup(nodeID, q) 462 if err != nil { 463 return nil, err 464 } 465 if node.Status == "down" { 466 return nil, NodeDownErr 467 } 468 if node.HTTPAddr == "" { 469 return nil, fmt.Errorf("http addr of node %q (%s) is not advertised", node.Name, nodeID) 470 } 471 472 var region string 473 switch { 474 case q != nil && q.Region != "": 475 // Prefer the region set in the query parameter 476 region = q.Region 477 case c.config.Region != "": 478 // If the client is configured for a particular region use that 479 region = c.config.Region 480 default: 481 // No region information is given so use GlobalRegion as the default. 482 region = GlobalRegion 483 } 484 485 // Get an API client for the node 486 conf := c.config.ClientConfig(region, node.HTTPAddr, node.TLSEnabled) 487 488 // set timeout - preserve old behavior where errors are ignored and use untimed one 489 httpClient, err := cloneWithTimeout(c.httpClient, timeout) 490 // on error, fallback to using current http client 491 if err != nil { 492 httpClient = c.httpClient 493 } 494 conf.HttpClient = httpClient 495 496 return NewClient(conf) 497} 498 499// SetSecretID sets the ACL token secret for API requests. 500func (c *Client) SetSecretID(secretID string) { 501 c.config.SecretID = secretID 502} 503 504// request is used to help build up a request 505type request struct { 506 config *Config 507 method string 508 url *url.URL 509 params url.Values 510 token string 511 body io.Reader 512 obj interface{} 513} 514 515// setQueryOptions is used to annotate the request with 516// additional query options 517func (r *request) setQueryOptions(q *QueryOptions) { 518 if q == nil { 519 return 520 } 521 if q.Region != "" { 522 r.params.Set("region", q.Region) 523 } 524 if q.Namespace != "" { 525 r.params.Set("namespace", q.Namespace) 526 } 527 if q.AuthToken != "" { 528 r.token = q.AuthToken 529 } 530 if q.AllowStale { 531 r.params.Set("stale", "") 532 } 533 if q.WaitIndex != 0 { 534 r.params.Set("index", strconv.FormatUint(q.WaitIndex, 10)) 535 } 536 if q.WaitTime != 0 { 537 r.params.Set("wait", durToMsec(q.WaitTime)) 538 } 539 if q.Prefix != "" { 540 r.params.Set("prefix", q.Prefix) 541 } 542 for k, v := range q.Params { 543 r.params.Set(k, v) 544 } 545} 546 547// durToMsec converts a duration to a millisecond specified string 548func durToMsec(dur time.Duration) string { 549 return fmt.Sprintf("%dms", dur/time.Millisecond) 550} 551 552// setWriteOptions is used to annotate the request with 553// additional write options 554func (r *request) setWriteOptions(q *WriteOptions) { 555 if q == nil { 556 return 557 } 558 if q.Region != "" { 559 r.params.Set("region", q.Region) 560 } 561 if q.Namespace != "" { 562 r.params.Set("namespace", q.Namespace) 563 } 564 if q.AuthToken != "" { 565 r.token = q.AuthToken 566 } 567} 568 569// toHTTP converts the request to an HTTP request 570func (r *request) toHTTP() (*http.Request, error) { 571 // Encode the query parameters 572 r.url.RawQuery = r.params.Encode() 573 574 // Check if we should encode the body 575 if r.body == nil && r.obj != nil { 576 if b, err := encodeBody(r.obj); err != nil { 577 return nil, err 578 } else { 579 r.body = b 580 } 581 } 582 583 // Create the HTTP request 584 req, err := http.NewRequest(r.method, r.url.RequestURI(), r.body) 585 if err != nil { 586 return nil, err 587 } 588 589 // Optionally configure HTTP basic authentication 590 if r.url.User != nil { 591 username := r.url.User.Username() 592 password, _ := r.url.User.Password() 593 req.SetBasicAuth(username, password) 594 } else if r.config.HttpAuth != nil { 595 req.SetBasicAuth(r.config.HttpAuth.Username, r.config.HttpAuth.Password) 596 } 597 598 req.Header.Add("Accept-Encoding", "gzip") 599 if r.token != "" { 600 req.Header.Set("X-Nomad-Token", r.token) 601 } 602 603 req.URL.Host = r.url.Host 604 req.URL.Scheme = r.url.Scheme 605 req.Host = r.url.Host 606 return req, nil 607} 608 609// newRequest is used to create a new request 610func (c *Client) newRequest(method, path string) (*request, error) { 611 base, _ := url.Parse(c.config.Address) 612 u, err := url.Parse(path) 613 if err != nil { 614 return nil, err 615 } 616 r := &request{ 617 config: &c.config, 618 method: method, 619 url: &url.URL{ 620 Scheme: base.Scheme, 621 User: base.User, 622 Host: base.Host, 623 Path: u.Path, 624 RawPath: u.RawPath, 625 }, 626 params: make(map[string][]string), 627 } 628 if c.config.Region != "" { 629 r.params.Set("region", c.config.Region) 630 } 631 if c.config.Namespace != "" { 632 r.params.Set("namespace", c.config.Namespace) 633 } 634 if c.config.WaitTime != 0 { 635 r.params.Set("wait", durToMsec(r.config.WaitTime)) 636 } 637 if c.config.SecretID != "" { 638 r.token = r.config.SecretID 639 } 640 641 // Add in the query parameters, if any 642 for key, values := range u.Query() { 643 for _, value := range values { 644 r.params.Add(key, value) 645 } 646 } 647 648 return r, nil 649} 650 651// multiCloser is to wrap a ReadCloser such that when close is called, multiple 652// Closes occur. 653type multiCloser struct { 654 reader io.Reader 655 inorderClose []io.Closer 656} 657 658func (m *multiCloser) Close() error { 659 for _, c := range m.inorderClose { 660 if err := c.Close(); err != nil { 661 return err 662 } 663 } 664 return nil 665} 666 667func (m *multiCloser) Read(p []byte) (int, error) { 668 return m.reader.Read(p) 669} 670 671// doRequest runs a request with our client 672func (c *Client) doRequest(r *request) (time.Duration, *http.Response, error) { 673 req, err := r.toHTTP() 674 if err != nil { 675 return 0, nil, err 676 } 677 start := time.Now() 678 resp, err := c.httpClient.Do(req) 679 diff := time.Now().Sub(start) 680 681 // If the response is compressed, we swap the body's reader. 682 if resp != nil && resp.Header != nil { 683 var reader io.ReadCloser 684 switch resp.Header.Get("Content-Encoding") { 685 case "gzip": 686 greader, err := gzip.NewReader(resp.Body) 687 if err != nil { 688 return 0, nil, err 689 } 690 691 // The gzip reader doesn't close the wrapped reader so we use 692 // multiCloser. 693 reader = &multiCloser{ 694 reader: greader, 695 inorderClose: []io.Closer{greader, resp.Body}, 696 } 697 default: 698 reader = resp.Body 699 } 700 resp.Body = reader 701 } 702 703 return diff, resp, err 704} 705 706// rawQuery makes a GET request to the specified endpoint but returns just the 707// response body. 708func (c *Client) rawQuery(endpoint string, q *QueryOptions) (io.ReadCloser, error) { 709 r, err := c.newRequest("GET", endpoint) 710 if err != nil { 711 return nil, err 712 } 713 r.setQueryOptions(q) 714 _, resp, err := requireOK(c.doRequest(r)) 715 if err != nil { 716 return nil, err 717 } 718 719 return resp.Body, nil 720} 721 722// websocket makes a websocket request to the specific endpoint 723func (c *Client) websocket(endpoint string, q *QueryOptions) (*websocket.Conn, *http.Response, error) { 724 725 transport, ok := c.httpClient.Transport.(*http.Transport) 726 if !ok { 727 return nil, nil, fmt.Errorf("unsupported transport") 728 } 729 dialer := websocket.Dialer{ 730 ReadBufferSize: 4096, 731 WriteBufferSize: 4096, 732 HandshakeTimeout: c.httpClient.Timeout, 733 734 // values to inherit from http client configuration 735 NetDial: transport.Dial, 736 NetDialContext: transport.DialContext, 737 Proxy: transport.Proxy, 738 TLSClientConfig: transport.TLSClientConfig, 739 } 740 741 // build request object for header and parameters 742 r, err := c.newRequest("GET", endpoint) 743 if err != nil { 744 return nil, nil, err 745 } 746 r.setQueryOptions(q) 747 748 rhttp, err := r.toHTTP() 749 if err != nil { 750 return nil, nil, err 751 } 752 753 // convert scheme 754 wsScheme := "" 755 switch rhttp.URL.Scheme { 756 case "http": 757 wsScheme = "ws" 758 case "https": 759 wsScheme = "wss" 760 default: 761 return nil, nil, fmt.Errorf("unsupported scheme: %v", rhttp.URL.Scheme) 762 } 763 rhttp.URL.Scheme = wsScheme 764 765 conn, resp, err := dialer.Dial(rhttp.URL.String(), rhttp.Header) 766 767 // check resp status code, as it's more informative than handshake error we get from ws library 768 if resp != nil && resp.StatusCode != 101 { 769 var buf bytes.Buffer 770 771 if resp.Header.Get("Content-Encoding") == "gzip" { 772 greader, err := gzip.NewReader(resp.Body) 773 if err != nil { 774 return nil, nil, fmt.Errorf("Unexpected response code: %d", resp.StatusCode) 775 } 776 io.Copy(&buf, greader) 777 } else { 778 io.Copy(&buf, resp.Body) 779 } 780 resp.Body.Close() 781 782 return nil, nil, fmt.Errorf("Unexpected response code: %d (%s)", resp.StatusCode, buf.Bytes()) 783 } 784 785 return conn, resp, err 786} 787 788// query is used to do a GET request against an endpoint 789// and deserialize the response into an interface using 790// standard Nomad conventions. 791func (c *Client) query(endpoint string, out interface{}, q *QueryOptions) (*QueryMeta, error) { 792 r, err := c.newRequest("GET", endpoint) 793 if err != nil { 794 return nil, err 795 } 796 r.setQueryOptions(q) 797 rtt, resp, err := requireOK(c.doRequest(r)) 798 if err != nil { 799 return nil, err 800 } 801 defer resp.Body.Close() 802 803 qm := &QueryMeta{} 804 parseQueryMeta(resp, qm) 805 qm.RequestTime = rtt 806 807 if err := decodeBody(resp, out); err != nil { 808 return nil, err 809 } 810 return qm, nil 811} 812 813// putQuery is used to do a PUT request when doing a read against an endpoint 814// and deserialize the response into an interface using standard Nomad 815// conventions. 816func (c *Client) putQuery(endpoint string, in, out interface{}, q *QueryOptions) (*QueryMeta, error) { 817 r, err := c.newRequest("PUT", endpoint) 818 if err != nil { 819 return nil, err 820 } 821 r.setQueryOptions(q) 822 r.obj = in 823 rtt, resp, err := requireOK(c.doRequest(r)) 824 if err != nil { 825 return nil, err 826 } 827 defer resp.Body.Close() 828 829 qm := &QueryMeta{} 830 parseQueryMeta(resp, qm) 831 qm.RequestTime = rtt 832 833 if err := decodeBody(resp, out); err != nil { 834 return nil, err 835 } 836 return qm, nil 837} 838 839// write is used to do a PUT request against an endpoint 840// and serialize/deserialized using the standard Nomad conventions. 841func (c *Client) write(endpoint string, in, out interface{}, q *WriteOptions) (*WriteMeta, error) { 842 r, err := c.newRequest("PUT", endpoint) 843 if err != nil { 844 return nil, err 845 } 846 r.setWriteOptions(q) 847 r.obj = in 848 rtt, resp, err := requireOK(c.doRequest(r)) 849 if err != nil { 850 return nil, err 851 } 852 defer resp.Body.Close() 853 854 wm := &WriteMeta{RequestTime: rtt} 855 parseWriteMeta(resp, wm) 856 857 if out != nil { 858 if err := decodeBody(resp, &out); err != nil { 859 return nil, err 860 } 861 } 862 return wm, nil 863} 864 865// delete is used to do a DELETE request against an endpoint 866// and serialize/deserialized using the standard Nomad conventions. 867func (c *Client) delete(endpoint string, out interface{}, q *WriteOptions) (*WriteMeta, error) { 868 r, err := c.newRequest("DELETE", endpoint) 869 if err != nil { 870 return nil, err 871 } 872 r.setWriteOptions(q) 873 rtt, resp, err := requireOK(c.doRequest(r)) 874 if err != nil { 875 return nil, err 876 } 877 defer resp.Body.Close() 878 879 wm := &WriteMeta{RequestTime: rtt} 880 parseWriteMeta(resp, wm) 881 882 if out != nil { 883 if err := decodeBody(resp, &out); err != nil { 884 return nil, err 885 } 886 } 887 return wm, nil 888} 889 890// parseQueryMeta is used to help parse query meta-data 891func parseQueryMeta(resp *http.Response, q *QueryMeta) error { 892 header := resp.Header 893 894 // Parse the X-Nomad-Index 895 index, err := strconv.ParseUint(header.Get("X-Nomad-Index"), 10, 64) 896 if err != nil { 897 return fmt.Errorf("Failed to parse X-Nomad-Index: %v", err) 898 } 899 q.LastIndex = index 900 901 // Parse the X-Nomad-LastContact 902 last, err := strconv.ParseUint(header.Get("X-Nomad-LastContact"), 10, 64) 903 if err != nil { 904 return fmt.Errorf("Failed to parse X-Nomad-LastContact: %v", err) 905 } 906 q.LastContact = time.Duration(last) * time.Millisecond 907 908 // Parse the X-Nomad-KnownLeader 909 switch header.Get("X-Nomad-KnownLeader") { 910 case "true": 911 q.KnownLeader = true 912 default: 913 q.KnownLeader = false 914 } 915 return nil 916} 917 918// parseWriteMeta is used to help parse write meta-data 919func parseWriteMeta(resp *http.Response, q *WriteMeta) error { 920 header := resp.Header 921 922 // Parse the X-Nomad-Index 923 index, err := strconv.ParseUint(header.Get("X-Nomad-Index"), 10, 64) 924 if err != nil { 925 return fmt.Errorf("Failed to parse X-Nomad-Index: %v", err) 926 } 927 q.LastIndex = index 928 return nil 929} 930 931// decodeBody is used to JSON decode a body 932func decodeBody(resp *http.Response, out interface{}) error { 933 dec := json.NewDecoder(resp.Body) 934 return dec.Decode(out) 935} 936 937// encodeBody is used to encode a request body 938func encodeBody(obj interface{}) (io.Reader, error) { 939 buf := bytes.NewBuffer(nil) 940 enc := json.NewEncoder(buf) 941 if err := enc.Encode(obj); err != nil { 942 return nil, err 943 } 944 return buf, nil 945} 946 947// requireOK is used to wrap doRequest and check for a 200 948func requireOK(d time.Duration, resp *http.Response, e error) (time.Duration, *http.Response, error) { 949 if e != nil { 950 if resp != nil { 951 resp.Body.Close() 952 } 953 return d, nil, e 954 } 955 if resp.StatusCode != 200 { 956 var buf bytes.Buffer 957 io.Copy(&buf, resp.Body) 958 resp.Body.Close() 959 return d, nil, fmt.Errorf("Unexpected response code: %d (%s)", resp.StatusCode, buf.Bytes()) 960 } 961 return d, resp, nil 962} 963