1package api 2 3import ( 4 "bufio" 5 "bytes" 6 "fmt" 7 "io" 8 "net/http" 9 "net/url" 10) 11 12// ServiceKind is the kind of service being registered. 13type ServiceKind string 14 15const ( 16 // ServiceKindTypical is a typical, classic Consul service. This is 17 // represented by the absence of a value. This was chosen for ease of 18 // backwards compatibility: existing services in the catalog would 19 // default to the typical service. 20 ServiceKindTypical ServiceKind = "" 21 22 // ServiceKindConnectProxy is a proxy for the Connect feature. This 23 // service proxies another service within Consul and speaks the connect 24 // protocol. 25 ServiceKindConnectProxy ServiceKind = "connect-proxy" 26 27 // ServiceKindMeshGateway is a Mesh Gateway for the Connect feature. This 28 // service will proxy connections based off the SNI header set by other 29 // connect proxies 30 ServiceKindMeshGateway ServiceKind = "mesh-gateway" 31) 32 33// UpstreamDestType is the type of upstream discovery mechanism. 34type UpstreamDestType string 35 36const ( 37 // UpstreamDestTypeService discovers instances via healthy service lookup. 38 UpstreamDestTypeService UpstreamDestType = "service" 39 40 // UpstreamDestTypePreparedQuery discovers instances via prepared query 41 // execution. 42 UpstreamDestTypePreparedQuery UpstreamDestType = "prepared_query" 43) 44 45// AgentCheck represents a check known to the agent 46type AgentCheck struct { 47 Node string 48 CheckID string 49 Name string 50 Status string 51 Notes string 52 Output string 53 ServiceID string 54 ServiceName string 55 Type string 56 Definition HealthCheckDefinition 57 Namespace string `json:",omitempty"` 58} 59 60// AgentWeights represent optional weights for a service 61type AgentWeights struct { 62 Passing int 63 Warning int 64} 65 66// AgentService represents a service known to the agent 67type AgentService struct { 68 Kind ServiceKind `json:",omitempty"` 69 ID string 70 Service string 71 Tags []string 72 Meta map[string]string 73 Port int 74 Address string 75 TaggedAddresses map[string]ServiceAddress `json:",omitempty"` 76 Weights AgentWeights 77 EnableTagOverride bool 78 CreateIndex uint64 `json:",omitempty" bexpr:"-"` 79 ModifyIndex uint64 `json:",omitempty" bexpr:"-"` 80 ContentHash string `json:",omitempty" bexpr:"-"` 81 Proxy *AgentServiceConnectProxyConfig `json:",omitempty"` 82 Connect *AgentServiceConnect `json:",omitempty"` 83 // NOTE: If we ever set the ContentHash outside of singular service lookup then we may need 84 // to include the Namespace in the hash. When we do, then we are in for lots of fun with tests. 85 // For now though, ignoring it works well enough. 86 Namespace string `json:",omitempty" bexpr:"-" hash:"ignore"` 87} 88 89// AgentServiceChecksInfo returns information about a Service and its checks 90type AgentServiceChecksInfo struct { 91 AggregatedStatus string 92 Service *AgentService 93 Checks HealthChecks 94} 95 96// AgentServiceConnect represents the Connect configuration of a service. 97type AgentServiceConnect struct { 98 Native bool `json:",omitempty"` 99 SidecarService *AgentServiceRegistration `json:",omitempty" bexpr:"-"` 100} 101 102// AgentServiceConnectProxyConfig is the proxy configuration in a connect-proxy 103// ServiceDefinition or response. 104type AgentServiceConnectProxyConfig struct { 105 DestinationServiceName string `json:",omitempty"` 106 DestinationServiceID string `json:",omitempty"` 107 LocalServiceAddress string `json:",omitempty"` 108 LocalServicePort int `json:",omitempty"` 109 Config map[string]interface{} `json:",omitempty" bexpr:"-"` 110 Upstreams []Upstream `json:",omitempty"` 111 MeshGateway MeshGatewayConfig `json:",omitempty"` 112 Expose ExposeConfig `json:",omitempty"` 113} 114 115// AgentMember represents a cluster member known to the agent 116type AgentMember struct { 117 Name string 118 Addr string 119 Port uint16 120 Tags map[string]string 121 Status int 122 ProtocolMin uint8 123 ProtocolMax uint8 124 ProtocolCur uint8 125 DelegateMin uint8 126 DelegateMax uint8 127 DelegateCur uint8 128} 129 130// AllSegments is used to select for all segments in MembersOpts. 131const AllSegments = "_all" 132 133// MembersOpts is used for querying member information. 134type MembersOpts struct { 135 // WAN is whether to show members from the WAN. 136 WAN bool 137 138 // Segment is the LAN segment to show members for. Setting this to the 139 // AllSegments value above will show members in all segments. 140 Segment string 141} 142 143// AgentServiceRegistration is used to register a new service 144type AgentServiceRegistration struct { 145 Kind ServiceKind `json:",omitempty"` 146 ID string `json:",omitempty"` 147 Name string `json:",omitempty"` 148 Tags []string `json:",omitempty"` 149 Port int `json:",omitempty"` 150 Address string `json:",omitempty"` 151 TaggedAddresses map[string]ServiceAddress `json:",omitempty"` 152 EnableTagOverride bool `json:",omitempty"` 153 Meta map[string]string `json:",omitempty"` 154 Weights *AgentWeights `json:",omitempty"` 155 Check *AgentServiceCheck 156 Checks AgentServiceChecks 157 Proxy *AgentServiceConnectProxyConfig `json:",omitempty"` 158 Connect *AgentServiceConnect `json:",omitempty"` 159 Namespace string `json:",omitempty" bexpr:"-" hash:"ignore"` 160} 161 162// AgentCheckRegistration is used to register a new check 163type AgentCheckRegistration struct { 164 ID string `json:",omitempty"` 165 Name string `json:",omitempty"` 166 Notes string `json:",omitempty"` 167 ServiceID string `json:",omitempty"` 168 AgentServiceCheck 169 Namespace string `json:",omitempty"` 170} 171 172// AgentServiceCheck is used to define a node or service level check 173type AgentServiceCheck struct { 174 CheckID string `json:",omitempty"` 175 Name string `json:",omitempty"` 176 Args []string `json:"ScriptArgs,omitempty"` 177 DockerContainerID string `json:",omitempty"` 178 Shell string `json:",omitempty"` // Only supported for Docker. 179 Interval string `json:",omitempty"` 180 Timeout string `json:",omitempty"` 181 TTL string `json:",omitempty"` 182 HTTP string `json:",omitempty"` 183 Header map[string][]string `json:",omitempty"` 184 Method string `json:",omitempty"` 185 TCP string `json:",omitempty"` 186 Status string `json:",omitempty"` 187 Notes string `json:",omitempty"` 188 TLSSkipVerify bool `json:",omitempty"` 189 GRPC string `json:",omitempty"` 190 GRPCUseTLS bool `json:",omitempty"` 191 AliasNode string `json:",omitempty"` 192 AliasService string `json:",omitempty"` 193 194 // In Consul 0.7 and later, checks that are associated with a service 195 // may also contain this optional DeregisterCriticalServiceAfter field, 196 // which is a timeout in the same Go time format as Interval and TTL. If 197 // a check is in the critical state for more than this configured value, 198 // then its associated service (and all of its associated checks) will 199 // automatically be deregistered. 200 DeregisterCriticalServiceAfter string `json:",omitempty"` 201} 202type AgentServiceChecks []*AgentServiceCheck 203 204// AgentToken is used when updating ACL tokens for an agent. 205type AgentToken struct { 206 Token string 207} 208 209// Metrics info is used to store different types of metric values from the agent. 210type MetricsInfo struct { 211 Timestamp string 212 Gauges []GaugeValue 213 Points []PointValue 214 Counters []SampledValue 215 Samples []SampledValue 216} 217 218// GaugeValue stores one value that is updated as time goes on, such as 219// the amount of memory allocated. 220type GaugeValue struct { 221 Name string 222 Value float32 223 Labels map[string]string 224} 225 226// PointValue holds a series of points for a metric. 227type PointValue struct { 228 Name string 229 Points []float32 230} 231 232// SampledValue stores info about a metric that is incremented over time, 233// such as the number of requests to an HTTP endpoint. 234type SampledValue struct { 235 Name string 236 Count int 237 Sum float64 238 Min float64 239 Max float64 240 Mean float64 241 Stddev float64 242 Labels map[string]string 243} 244 245// AgentAuthorizeParams are the request parameters for authorizing a request. 246type AgentAuthorizeParams struct { 247 Target string 248 ClientCertURI string 249 ClientCertSerial string 250} 251 252// AgentAuthorize is the response structure for Connect authorization. 253type AgentAuthorize struct { 254 Authorized bool 255 Reason string 256} 257 258// ConnectProxyConfig is the response structure for agent-local proxy 259// configuration. 260type ConnectProxyConfig struct { 261 ProxyServiceID string 262 TargetServiceID string 263 TargetServiceName string 264 ContentHash string 265 Config map[string]interface{} `bexpr:"-"` 266 Upstreams []Upstream 267} 268 269// Upstream is the response structure for a proxy upstream configuration. 270type Upstream struct { 271 DestinationType UpstreamDestType `json:",omitempty"` 272 DestinationNamespace string `json:",omitempty"` 273 DestinationName string 274 Datacenter string `json:",omitempty"` 275 LocalBindAddress string `json:",omitempty"` 276 LocalBindPort int `json:",omitempty"` 277 Config map[string]interface{} `json:",omitempty" bexpr:"-"` 278 MeshGateway MeshGatewayConfig `json:",omitempty"` 279} 280 281// Agent can be used to query the Agent endpoints 282type Agent struct { 283 c *Client 284 285 // cache the node name 286 nodeName string 287} 288 289// Agent returns a handle to the agent endpoints 290func (c *Client) Agent() *Agent { 291 return &Agent{c: c} 292} 293 294// Self is used to query the agent we are speaking to for 295// information about itself 296func (a *Agent) Self() (map[string]map[string]interface{}, error) { 297 r := a.c.newRequest("GET", "/v1/agent/self") 298 _, resp, err := requireOK(a.c.doRequest(r)) 299 if err != nil { 300 return nil, err 301 } 302 defer resp.Body.Close() 303 304 var out map[string]map[string]interface{} 305 if err := decodeBody(resp, &out); err != nil { 306 return nil, err 307 } 308 return out, nil 309} 310 311// Host is used to retrieve information about the host the 312// agent is running on such as CPU, memory, and disk. Requires 313// a operator:read ACL token. 314func (a *Agent) Host() (map[string]interface{}, error) { 315 r := a.c.newRequest("GET", "/v1/agent/host") 316 _, resp, err := requireOK(a.c.doRequest(r)) 317 if err != nil { 318 return nil, err 319 } 320 defer resp.Body.Close() 321 322 var out map[string]interface{} 323 if err := decodeBody(resp, &out); err != nil { 324 return nil, err 325 } 326 return out, nil 327} 328 329// Metrics is used to query the agent we are speaking to for 330// its current internal metric data 331func (a *Agent) Metrics() (*MetricsInfo, error) { 332 r := a.c.newRequest("GET", "/v1/agent/metrics") 333 _, resp, err := requireOK(a.c.doRequest(r)) 334 if err != nil { 335 return nil, err 336 } 337 defer resp.Body.Close() 338 339 var out *MetricsInfo 340 if err := decodeBody(resp, &out); err != nil { 341 return nil, err 342 } 343 return out, nil 344} 345 346// Reload triggers a configuration reload for the agent we are connected to. 347func (a *Agent) Reload() error { 348 r := a.c.newRequest("PUT", "/v1/agent/reload") 349 _, resp, err := requireOK(a.c.doRequest(r)) 350 if err != nil { 351 return err 352 } 353 resp.Body.Close() 354 return nil 355} 356 357// NodeName is used to get the node name of the agent 358func (a *Agent) NodeName() (string, error) { 359 if a.nodeName != "" { 360 return a.nodeName, nil 361 } 362 info, err := a.Self() 363 if err != nil { 364 return "", err 365 } 366 name := info["Config"]["NodeName"].(string) 367 a.nodeName = name 368 return name, nil 369} 370 371// Checks returns the locally registered checks 372func (a *Agent) Checks() (map[string]*AgentCheck, error) { 373 return a.ChecksWithFilter("") 374} 375 376// ChecksWithFilter returns a subset of the locally registered checks that match 377// the given filter expression 378func (a *Agent) ChecksWithFilter(filter string) (map[string]*AgentCheck, error) { 379 r := a.c.newRequest("GET", "/v1/agent/checks") 380 r.filterQuery(filter) 381 _, resp, err := requireOK(a.c.doRequest(r)) 382 if err != nil { 383 return nil, err 384 } 385 defer resp.Body.Close() 386 387 var out map[string]*AgentCheck 388 if err := decodeBody(resp, &out); err != nil { 389 return nil, err 390 } 391 return out, nil 392} 393 394// Services returns the locally registered services 395func (a *Agent) Services() (map[string]*AgentService, error) { 396 return a.ServicesWithFilter("") 397} 398 399// ServicesWithFilter returns a subset of the locally registered services that match 400// the given filter expression 401func (a *Agent) ServicesWithFilter(filter string) (map[string]*AgentService, error) { 402 r := a.c.newRequest("GET", "/v1/agent/services") 403 r.filterQuery(filter) 404 _, resp, err := requireOK(a.c.doRequest(r)) 405 if err != nil { 406 return nil, err 407 } 408 defer resp.Body.Close() 409 410 var out map[string]*AgentService 411 if err := decodeBody(resp, &out); err != nil { 412 return nil, err 413 } 414 415 return out, nil 416} 417 418// AgentHealthServiceByID returns for a given serviceID: the aggregated health status, the service definition or an error if any 419// - If the service is not found, will return status (critical, nil, nil) 420// - If the service is found, will return (critical|passing|warning), AgentServiceChecksInfo, nil) 421// - In all other cases, will return an error 422func (a *Agent) AgentHealthServiceByID(serviceID string) (string, *AgentServiceChecksInfo, error) { 423 path := fmt.Sprintf("/v1/agent/health/service/id/%v", url.PathEscape(serviceID)) 424 r := a.c.newRequest("GET", path) 425 r.params.Add("format", "json") 426 r.header.Set("Accept", "application/json") 427 _, resp, err := a.c.doRequest(r) 428 if err != nil { 429 return "", nil, err 430 } 431 defer resp.Body.Close() 432 // Service not Found 433 if resp.StatusCode == http.StatusNotFound { 434 return HealthCritical, nil, nil 435 } 436 var out *AgentServiceChecksInfo 437 if err := decodeBody(resp, &out); err != nil { 438 return HealthCritical, out, err 439 } 440 switch resp.StatusCode { 441 case http.StatusOK: 442 return HealthPassing, out, nil 443 case http.StatusTooManyRequests: 444 return HealthWarning, out, nil 445 case http.StatusServiceUnavailable: 446 return HealthCritical, out, nil 447 } 448 return HealthCritical, out, fmt.Errorf("Unexpected Error Code %v for %s", resp.StatusCode, path) 449} 450 451// AgentHealthServiceByName returns for a given service name: the aggregated health status for all services 452// having the specified name. 453// - If no service is not found, will return status (critical, [], nil) 454// - If the service is found, will return (critical|passing|warning), []api.AgentServiceChecksInfo, nil) 455// - In all other cases, will return an error 456func (a *Agent) AgentHealthServiceByName(service string) (string, []AgentServiceChecksInfo, error) { 457 path := fmt.Sprintf("/v1/agent/health/service/name/%v", url.PathEscape(service)) 458 r := a.c.newRequest("GET", path) 459 r.params.Add("format", "json") 460 r.header.Set("Accept", "application/json") 461 _, resp, err := a.c.doRequest(r) 462 if err != nil { 463 return "", nil, err 464 } 465 defer resp.Body.Close() 466 // Service not Found 467 if resp.StatusCode == http.StatusNotFound { 468 return HealthCritical, nil, nil 469 } 470 var out []AgentServiceChecksInfo 471 if err := decodeBody(resp, &out); err != nil { 472 return HealthCritical, out, err 473 } 474 switch resp.StatusCode { 475 case http.StatusOK: 476 return HealthPassing, out, nil 477 case http.StatusTooManyRequests: 478 return HealthWarning, out, nil 479 case http.StatusServiceUnavailable: 480 return HealthCritical, out, nil 481 } 482 return HealthCritical, out, fmt.Errorf("Unexpected Error Code %v for %s", resp.StatusCode, path) 483} 484 485// Service returns a locally registered service instance and allows for 486// hash-based blocking. 487// 488// Note that this uses an unconventional blocking mechanism since it's 489// agent-local state. That means there is no persistent raft index so we block 490// based on object hash instead. 491func (a *Agent) Service(serviceID string, q *QueryOptions) (*AgentService, *QueryMeta, error) { 492 r := a.c.newRequest("GET", "/v1/agent/service/"+serviceID) 493 r.setQueryOptions(q) 494 rtt, resp, err := requireOK(a.c.doRequest(r)) 495 if err != nil { 496 return nil, nil, err 497 } 498 defer resp.Body.Close() 499 500 qm := &QueryMeta{} 501 parseQueryMeta(resp, qm) 502 qm.RequestTime = rtt 503 504 var out *AgentService 505 if err := decodeBody(resp, &out); err != nil { 506 return nil, nil, err 507 } 508 509 return out, qm, nil 510} 511 512// Members returns the known gossip members. The WAN 513// flag can be used to query a server for WAN members. 514func (a *Agent) Members(wan bool) ([]*AgentMember, error) { 515 r := a.c.newRequest("GET", "/v1/agent/members") 516 if wan { 517 r.params.Set("wan", "1") 518 } 519 _, resp, err := requireOK(a.c.doRequest(r)) 520 if err != nil { 521 return nil, err 522 } 523 defer resp.Body.Close() 524 525 var out []*AgentMember 526 if err := decodeBody(resp, &out); err != nil { 527 return nil, err 528 } 529 return out, nil 530} 531 532// MembersOpts returns the known gossip members and can be passed 533// additional options for WAN/segment filtering. 534func (a *Agent) MembersOpts(opts MembersOpts) ([]*AgentMember, error) { 535 r := a.c.newRequest("GET", "/v1/agent/members") 536 r.params.Set("segment", opts.Segment) 537 if opts.WAN { 538 r.params.Set("wan", "1") 539 } 540 541 _, resp, err := requireOK(a.c.doRequest(r)) 542 if err != nil { 543 return nil, err 544 } 545 defer resp.Body.Close() 546 547 var out []*AgentMember 548 if err := decodeBody(resp, &out); err != nil { 549 return nil, err 550 } 551 return out, nil 552} 553 554// ServiceRegister is used to register a new service with 555// the local agent 556func (a *Agent) ServiceRegister(service *AgentServiceRegistration) error { 557 r := a.c.newRequest("PUT", "/v1/agent/service/register") 558 r.obj = service 559 _, resp, err := requireOK(a.c.doRequest(r)) 560 if err != nil { 561 return err 562 } 563 resp.Body.Close() 564 return nil 565} 566 567// ServiceDeregister is used to deregister a service with 568// the local agent 569func (a *Agent) ServiceDeregister(serviceID string) error { 570 r := a.c.newRequest("PUT", "/v1/agent/service/deregister/"+serviceID) 571 _, resp, err := requireOK(a.c.doRequest(r)) 572 if err != nil { 573 return err 574 } 575 resp.Body.Close() 576 return nil 577} 578 579// PassTTL is used to set a TTL check to the passing state. 580// 581// DEPRECATION NOTICE: This interface is deprecated in favor of UpdateTTL(). 582// The client interface will be removed in 0.8 or changed to use 583// UpdateTTL()'s endpoint and the server endpoints will be removed in 0.9. 584func (a *Agent) PassTTL(checkID, note string) error { 585 return a.updateTTL(checkID, note, "pass") 586} 587 588// WarnTTL is used to set a TTL check to the warning state. 589// 590// DEPRECATION NOTICE: This interface is deprecated in favor of UpdateTTL(). 591// The client interface will be removed in 0.8 or changed to use 592// UpdateTTL()'s endpoint and the server endpoints will be removed in 0.9. 593func (a *Agent) WarnTTL(checkID, note string) error { 594 return a.updateTTL(checkID, note, "warn") 595} 596 597// FailTTL is used to set a TTL check to the failing state. 598// 599// DEPRECATION NOTICE: This interface is deprecated in favor of UpdateTTL(). 600// The client interface will be removed in 0.8 or changed to use 601// UpdateTTL()'s endpoint and the server endpoints will be removed in 0.9. 602func (a *Agent) FailTTL(checkID, note string) error { 603 return a.updateTTL(checkID, note, "fail") 604} 605 606// updateTTL is used to update the TTL of a check. This is the internal 607// method that uses the old API that's present in Consul versions prior to 608// 0.6.4. Since Consul didn't have an analogous "update" API before it seemed 609// ok to break this (former) UpdateTTL in favor of the new UpdateTTL below, 610// but keep the old Pass/Warn/Fail methods using the old API under the hood. 611// 612// DEPRECATION NOTICE: This interface is deprecated in favor of UpdateTTL(). 613// The client interface will be removed in 0.8 and the server endpoints will 614// be removed in 0.9. 615func (a *Agent) updateTTL(checkID, note, status string) error { 616 switch status { 617 case "pass": 618 case "warn": 619 case "fail": 620 default: 621 return fmt.Errorf("Invalid status: %s", status) 622 } 623 endpoint := fmt.Sprintf("/v1/agent/check/%s/%s", status, checkID) 624 r := a.c.newRequest("PUT", endpoint) 625 r.params.Set("note", note) 626 _, resp, err := requireOK(a.c.doRequest(r)) 627 if err != nil { 628 return err 629 } 630 resp.Body.Close() 631 return nil 632} 633 634// checkUpdate is the payload for a PUT for a check update. 635type checkUpdate struct { 636 // Status is one of the api.Health* states: HealthPassing 637 // ("passing"), HealthWarning ("warning"), or HealthCritical 638 // ("critical"). 639 Status string 640 641 // Output is the information to post to the UI for operators as the 642 // output of the process that decided to hit the TTL check. This is 643 // different from the note field that's associated with the check 644 // itself. 645 Output string 646} 647 648// UpdateTTL is used to update the TTL of a check. This uses the newer API 649// that was introduced in Consul 0.6.4 and later. We translate the old status 650// strings for compatibility (though a newer version of Consul will still be 651// required to use this API). 652func (a *Agent) UpdateTTL(checkID, output, status string) error { 653 switch status { 654 case "pass", HealthPassing: 655 status = HealthPassing 656 case "warn", HealthWarning: 657 status = HealthWarning 658 case "fail", HealthCritical: 659 status = HealthCritical 660 default: 661 return fmt.Errorf("Invalid status: %s", status) 662 } 663 664 endpoint := fmt.Sprintf("/v1/agent/check/update/%s", checkID) 665 r := a.c.newRequest("PUT", endpoint) 666 r.obj = &checkUpdate{ 667 Status: status, 668 Output: output, 669 } 670 671 _, resp, err := requireOK(a.c.doRequest(r)) 672 if err != nil { 673 return err 674 } 675 resp.Body.Close() 676 return nil 677} 678 679// CheckRegister is used to register a new check with 680// the local agent 681func (a *Agent) CheckRegister(check *AgentCheckRegistration) error { 682 r := a.c.newRequest("PUT", "/v1/agent/check/register") 683 r.obj = check 684 _, resp, err := requireOK(a.c.doRequest(r)) 685 if err != nil { 686 return err 687 } 688 resp.Body.Close() 689 return nil 690} 691 692// CheckDeregister is used to deregister a check with 693// the local agent 694func (a *Agent) CheckDeregister(checkID string) error { 695 r := a.c.newRequest("PUT", "/v1/agent/check/deregister/"+checkID) 696 _, resp, err := requireOK(a.c.doRequest(r)) 697 if err != nil { 698 return err 699 } 700 resp.Body.Close() 701 return nil 702} 703 704// Join is used to instruct the agent to attempt a join to 705// another cluster member 706func (a *Agent) Join(addr string, wan bool) error { 707 r := a.c.newRequest("PUT", "/v1/agent/join/"+addr) 708 if wan { 709 r.params.Set("wan", "1") 710 } 711 _, resp, err := requireOK(a.c.doRequest(r)) 712 if err != nil { 713 return err 714 } 715 resp.Body.Close() 716 return nil 717} 718 719// Leave is used to have the agent gracefully leave the cluster and shutdown 720func (a *Agent) Leave() error { 721 r := a.c.newRequest("PUT", "/v1/agent/leave") 722 _, resp, err := requireOK(a.c.doRequest(r)) 723 if err != nil { 724 return err 725 } 726 resp.Body.Close() 727 return nil 728} 729 730// ForceLeave is used to have the agent eject a failed node 731func (a *Agent) ForceLeave(node string) error { 732 r := a.c.newRequest("PUT", "/v1/agent/force-leave/"+node) 733 _, resp, err := requireOK(a.c.doRequest(r)) 734 if err != nil { 735 return err 736 } 737 resp.Body.Close() 738 return nil 739} 740 741//ForceLeavePrune is used to have an a failed agent removed 742//from the list of members 743func (a *Agent) ForceLeavePrune(node string) error { 744 r := a.c.newRequest("PUT", "/v1/agent/force-leave/"+node) 745 r.params.Set("prune", "1") 746 _, resp, err := requireOK(a.c.doRequest(r)) 747 if err != nil { 748 return err 749 } 750 resp.Body.Close() 751 return nil 752} 753 754// ConnectAuthorize is used to authorize an incoming connection 755// to a natively integrated Connect service. 756func (a *Agent) ConnectAuthorize(auth *AgentAuthorizeParams) (*AgentAuthorize, error) { 757 r := a.c.newRequest("POST", "/v1/agent/connect/authorize") 758 r.obj = auth 759 _, resp, err := requireOK(a.c.doRequest(r)) 760 if err != nil { 761 return nil, err 762 } 763 defer resp.Body.Close() 764 765 var out AgentAuthorize 766 if err := decodeBody(resp, &out); err != nil { 767 return nil, err 768 } 769 return &out, nil 770} 771 772// ConnectCARoots returns the list of roots. 773func (a *Agent) ConnectCARoots(q *QueryOptions) (*CARootList, *QueryMeta, error) { 774 r := a.c.newRequest("GET", "/v1/agent/connect/ca/roots") 775 r.setQueryOptions(q) 776 rtt, resp, err := requireOK(a.c.doRequest(r)) 777 if err != nil { 778 return nil, nil, err 779 } 780 defer resp.Body.Close() 781 782 qm := &QueryMeta{} 783 parseQueryMeta(resp, qm) 784 qm.RequestTime = rtt 785 786 var out CARootList 787 if err := decodeBody(resp, &out); err != nil { 788 return nil, nil, err 789 } 790 return &out, qm, nil 791} 792 793// ConnectCALeaf gets the leaf certificate for the given service ID. 794func (a *Agent) ConnectCALeaf(serviceID string, q *QueryOptions) (*LeafCert, *QueryMeta, error) { 795 r := a.c.newRequest("GET", "/v1/agent/connect/ca/leaf/"+serviceID) 796 r.setQueryOptions(q) 797 rtt, resp, err := requireOK(a.c.doRequest(r)) 798 if err != nil { 799 return nil, nil, err 800 } 801 defer resp.Body.Close() 802 803 qm := &QueryMeta{} 804 parseQueryMeta(resp, qm) 805 qm.RequestTime = rtt 806 807 var out LeafCert 808 if err := decodeBody(resp, &out); err != nil { 809 return nil, nil, err 810 } 811 return &out, qm, nil 812} 813 814// EnableServiceMaintenance toggles service maintenance mode on 815// for the given service ID. 816func (a *Agent) EnableServiceMaintenance(serviceID, reason string) error { 817 r := a.c.newRequest("PUT", "/v1/agent/service/maintenance/"+serviceID) 818 r.params.Set("enable", "true") 819 r.params.Set("reason", reason) 820 _, resp, err := requireOK(a.c.doRequest(r)) 821 if err != nil { 822 return err 823 } 824 resp.Body.Close() 825 return nil 826} 827 828// DisableServiceMaintenance toggles service maintenance mode off 829// for the given service ID. 830func (a *Agent) DisableServiceMaintenance(serviceID string) error { 831 r := a.c.newRequest("PUT", "/v1/agent/service/maintenance/"+serviceID) 832 r.params.Set("enable", "false") 833 _, resp, err := requireOK(a.c.doRequest(r)) 834 if err != nil { 835 return err 836 } 837 resp.Body.Close() 838 return nil 839} 840 841// EnableNodeMaintenance toggles node maintenance mode on for the 842// agent we are connected to. 843func (a *Agent) EnableNodeMaintenance(reason string) error { 844 r := a.c.newRequest("PUT", "/v1/agent/maintenance") 845 r.params.Set("enable", "true") 846 r.params.Set("reason", reason) 847 _, resp, err := requireOK(a.c.doRequest(r)) 848 if err != nil { 849 return err 850 } 851 resp.Body.Close() 852 return nil 853} 854 855// DisableNodeMaintenance toggles node maintenance mode off for the 856// agent we are connected to. 857func (a *Agent) DisableNodeMaintenance() error { 858 r := a.c.newRequest("PUT", "/v1/agent/maintenance") 859 r.params.Set("enable", "false") 860 _, resp, err := requireOK(a.c.doRequest(r)) 861 if err != nil { 862 return err 863 } 864 resp.Body.Close() 865 return nil 866} 867 868// Monitor returns a channel which will receive streaming logs from the agent 869// Providing a non-nil stopCh can be used to close the connection and stop the 870// log stream. An empty string will be sent down the given channel when there's 871// nothing left to stream, after which the caller should close the stopCh. 872func (a *Agent) Monitor(loglevel string, stopCh <-chan struct{}, q *QueryOptions) (chan string, error) { 873 r := a.c.newRequest("GET", "/v1/agent/monitor") 874 r.setQueryOptions(q) 875 if loglevel != "" { 876 r.params.Add("loglevel", loglevel) 877 } 878 _, resp, err := requireOK(a.c.doRequest(r)) 879 if err != nil { 880 return nil, err 881 } 882 883 logCh := make(chan string, 64) 884 go func() { 885 defer resp.Body.Close() 886 887 scanner := bufio.NewScanner(resp.Body) 888 for { 889 select { 890 case <-stopCh: 891 close(logCh) 892 return 893 default: 894 } 895 if scanner.Scan() { 896 // An empty string signals to the caller that 897 // the scan is done, so make sure we only emit 898 // that when the scanner says it's done, not if 899 // we happen to ingest an empty line. 900 if text := scanner.Text(); text != "" { 901 logCh <- text 902 } else { 903 logCh <- " " 904 } 905 } else { 906 logCh <- "" 907 } 908 } 909 }() 910 911 return logCh, nil 912} 913 914// UpdateACLToken updates the agent's "acl_token". See updateToken for more 915// details. 916// 917// DEPRECATED (ACL-Legacy-Compat) - Prefer UpdateDefaultACLToken for v1.4.3 and above 918func (a *Agent) UpdateACLToken(token string, q *WriteOptions) (*WriteMeta, error) { 919 return a.updateToken("acl_token", token, q) 920} 921 922// UpdateACLAgentToken updates the agent's "acl_agent_token". See updateToken 923// for more details. 924// 925// DEPRECATED (ACL-Legacy-Compat) - Prefer UpdateAgentACLToken for v1.4.3 and above 926func (a *Agent) UpdateACLAgentToken(token string, q *WriteOptions) (*WriteMeta, error) { 927 return a.updateToken("acl_agent_token", token, q) 928} 929 930// UpdateACLAgentMasterToken updates the agent's "acl_agent_master_token". See 931// updateToken for more details. 932// 933// DEPRECATED (ACL-Legacy-Compat) - Prefer UpdateAgentMasterACLToken for v1.4.3 and above 934func (a *Agent) UpdateACLAgentMasterToken(token string, q *WriteOptions) (*WriteMeta, error) { 935 return a.updateToken("acl_agent_master_token", token, q) 936} 937 938// UpdateACLReplicationToken updates the agent's "acl_replication_token". See 939// updateToken for more details. 940// 941// DEPRECATED (ACL-Legacy-Compat) - Prefer UpdateReplicationACLToken for v1.4.3 and above 942func (a *Agent) UpdateACLReplicationToken(token string, q *WriteOptions) (*WriteMeta, error) { 943 return a.updateToken("acl_replication_token", token, q) 944} 945 946// UpdateDefaultACLToken updates the agent's "default" token. See updateToken 947// for more details 948func (a *Agent) UpdateDefaultACLToken(token string, q *WriteOptions) (*WriteMeta, error) { 949 return a.updateTokenFallback("default", "acl_token", token, q) 950} 951 952// UpdateAgentACLToken updates the agent's "agent" token. See updateToken 953// for more details 954func (a *Agent) UpdateAgentACLToken(token string, q *WriteOptions) (*WriteMeta, error) { 955 return a.updateTokenFallback("agent", "acl_agent_token", token, q) 956} 957 958// UpdateAgentMasterACLToken updates the agent's "agent_master" token. See updateToken 959// for more details 960func (a *Agent) UpdateAgentMasterACLToken(token string, q *WriteOptions) (*WriteMeta, error) { 961 return a.updateTokenFallback("agent_master", "acl_agent_master_token", token, q) 962} 963 964// UpdateReplicationACLToken updates the agent's "replication" token. See updateToken 965// for more details 966func (a *Agent) UpdateReplicationACLToken(token string, q *WriteOptions) (*WriteMeta, error) { 967 return a.updateTokenFallback("replication", "acl_replication_token", token, q) 968} 969 970// updateToken can be used to update one of an agent's ACL tokens after the agent has 971// started. The tokens are may not be persisted, so will need to be updated again if 972// the agent is restarted unless the agent is configured to persist them. 973func (a *Agent) updateToken(target, token string, q *WriteOptions) (*WriteMeta, error) { 974 meta, _, err := a.updateTokenOnce(target, token, q) 975 return meta, err 976} 977 978func (a *Agent) updateTokenFallback(target, fallback, token string, q *WriteOptions) (*WriteMeta, error) { 979 meta, status, err := a.updateTokenOnce(target, token, q) 980 if err != nil && status == 404 { 981 meta, _, err = a.updateTokenOnce(fallback, token, q) 982 } 983 return meta, err 984} 985 986func (a *Agent) updateTokenOnce(target, token string, q *WriteOptions) (*WriteMeta, int, error) { 987 r := a.c.newRequest("PUT", fmt.Sprintf("/v1/agent/token/%s", target)) 988 r.setWriteOptions(q) 989 r.obj = &AgentToken{Token: token} 990 991 rtt, resp, err := a.c.doRequest(r) 992 if err != nil { 993 return nil, 0, err 994 } 995 defer resp.Body.Close() 996 997 wm := &WriteMeta{RequestTime: rtt} 998 999 if resp.StatusCode != 200 { 1000 var buf bytes.Buffer 1001 io.Copy(&buf, resp.Body) 1002 return wm, resp.StatusCode, fmt.Errorf("Unexpected response code: %d (%s)", resp.StatusCode, buf.Bytes()) 1003 } 1004 1005 return wm, resp.StatusCode, nil 1006} 1007