1package api 2 3import ( 4 "encoding/json" 5 "fmt" 6 "io/ioutil" 7 "net/url" 8 "strconv" 9) 10 11// Agent encapsulates an API client which talks to Nomad's 12// agent endpoints for a specific node. 13type Agent struct { 14 client *Client 15 16 // Cache static agent info 17 nodeName string 18 datacenter string 19 region string 20} 21 22// KeyringResponse is a unified key response and can be used for install, 23// remove, use, as well as listing key queries. 24type KeyringResponse struct { 25 Messages map[string]string 26 Keys map[string]int 27 NumNodes int 28} 29 30// KeyringRequest is request objects for serf key operations. 31type KeyringRequest struct { 32 Key string 33} 34 35// Agent returns a new agent which can be used to query 36// the agent-specific endpoints. 37func (c *Client) Agent() *Agent { 38 return &Agent{client: c} 39} 40 41// Self is used to query the /v1/agent/self endpoint and 42// returns information specific to the running agent. 43func (a *Agent) Self() (*AgentSelf, error) { 44 var out *AgentSelf 45 46 // Query the self endpoint on the agent 47 _, err := a.client.query("/v1/agent/self", &out, nil) 48 if err != nil { 49 return nil, fmt.Errorf("failed querying self endpoint: %s", err) 50 } 51 52 // Populate the cache for faster queries 53 a.populateCache(out) 54 55 return out, nil 56} 57 58// populateCache is used to insert various pieces of static 59// data into the agent handle. This is used during subsequent 60// lookups for the same data later on to save the round trip. 61func (a *Agent) populateCache(self *AgentSelf) { 62 if a.nodeName == "" { 63 a.nodeName = self.Member.Name 64 } 65 if a.datacenter == "" { 66 if val, ok := self.Config["Datacenter"]; ok { 67 a.datacenter, _ = val.(string) 68 } 69 } 70 if a.region == "" { 71 if val, ok := self.Config["Region"]; ok { 72 a.region, _ = val.(string) 73 } 74 } 75} 76 77// NodeName is used to query the Nomad agent for its node name. 78func (a *Agent) NodeName() (string, error) { 79 // Return from cache if we have it 80 if a.nodeName != "" { 81 return a.nodeName, nil 82 } 83 84 // Query the node name 85 _, err := a.Self() 86 return a.nodeName, err 87} 88 89// Datacenter is used to return the name of the datacenter which 90// the agent is a member of. 91func (a *Agent) Datacenter() (string, error) { 92 // Return from cache if we have it 93 if a.datacenter != "" { 94 return a.datacenter, nil 95 } 96 97 // Query the agent for the DC 98 _, err := a.Self() 99 return a.datacenter, err 100} 101 102// Region is used to look up the region the agent is in. 103func (a *Agent) Region() (string, error) { 104 // Return from cache if we have it 105 if a.region != "" { 106 return a.region, nil 107 } 108 109 // Query the agent for the region 110 _, err := a.Self() 111 return a.region, err 112} 113 114// Join is used to instruct a server node to join another server 115// via the gossip protocol. Multiple addresses may be specified. 116// We attempt to join all of the hosts in the list. Returns the 117// number of nodes successfully joined and any error. If one or 118// more nodes have a successful result, no error is returned. 119func (a *Agent) Join(addrs ...string) (int, error) { 120 // Accumulate the addresses 121 v := url.Values{} 122 for _, addr := range addrs { 123 v.Add("address", addr) 124 } 125 126 // Send the join request 127 var resp joinResponse 128 _, err := a.client.write("/v1/agent/join?"+v.Encode(), nil, &resp, nil) 129 if err != nil { 130 return 0, fmt.Errorf("failed joining: %s", err) 131 } 132 if resp.Error != "" { 133 return 0, fmt.Errorf("failed joining: %s", resp.Error) 134 } 135 return resp.NumJoined, nil 136} 137 138// Members is used to query all of the known server members 139func (a *Agent) Members() (*ServerMembers, error) { 140 var resp *ServerMembers 141 142 // Query the known members 143 _, err := a.client.query("/v1/agent/members", &resp, nil) 144 if err != nil { 145 return nil, err 146 } 147 return resp, nil 148} 149 150// ForceLeave is used to eject an existing node from the cluster. 151func (a *Agent) ForceLeave(node string) error { 152 _, err := a.client.write("/v1/agent/force-leave?node="+node, nil, nil, nil) 153 return err 154} 155 156// Servers is used to query the list of servers on a client node. 157func (a *Agent) Servers() ([]string, error) { 158 var resp []string 159 _, err := a.client.query("/v1/agent/servers", &resp, nil) 160 if err != nil { 161 return nil, err 162 } 163 return resp, nil 164} 165 166// SetServers is used to update the list of servers on a client node. 167func (a *Agent) SetServers(addrs []string) error { 168 // Accumulate the addresses 169 v := url.Values{} 170 for _, addr := range addrs { 171 v.Add("address", addr) 172 } 173 174 _, err := a.client.write("/v1/agent/servers?"+v.Encode(), nil, nil, nil) 175 return err 176} 177 178// ListKeys returns the list of installed keys 179func (a *Agent) ListKeys() (*KeyringResponse, error) { 180 var resp KeyringResponse 181 _, err := a.client.query("/v1/agent/keyring/list", &resp, nil) 182 if err != nil { 183 return nil, err 184 } 185 return &resp, nil 186} 187 188// InstallKey installs a key in the keyrings of all the serf members 189func (a *Agent) InstallKey(key string) (*KeyringResponse, error) { 190 args := KeyringRequest{ 191 Key: key, 192 } 193 var resp KeyringResponse 194 _, err := a.client.write("/v1/agent/keyring/install", &args, &resp, nil) 195 return &resp, err 196} 197 198// UseKey uses a key from the keyring of serf members 199func (a *Agent) UseKey(key string) (*KeyringResponse, error) { 200 args := KeyringRequest{ 201 Key: key, 202 } 203 var resp KeyringResponse 204 _, err := a.client.write("/v1/agent/keyring/use", &args, &resp, nil) 205 return &resp, err 206} 207 208// RemoveKey removes a particular key from keyrings of serf members 209func (a *Agent) RemoveKey(key string) (*KeyringResponse, error) { 210 args := KeyringRequest{ 211 Key: key, 212 } 213 var resp KeyringResponse 214 _, err := a.client.write("/v1/agent/keyring/remove", &args, &resp, nil) 215 return &resp, err 216} 217 218// Health queries the agent's health 219func (a *Agent) Health() (*AgentHealthResponse, error) { 220 req, err := a.client.newRequest("GET", "/v1/agent/health") 221 if err != nil { 222 return nil, err 223 } 224 225 var health AgentHealthResponse 226 _, resp, err := a.client.doRequest(req) 227 if err != nil { 228 return nil, err 229 } 230 defer resp.Body.Close() 231 232 // Always try to decode the response as JSON 233 err = json.NewDecoder(resp.Body).Decode(&health) 234 if err == nil { 235 return &health, nil 236 } 237 238 // Return custom error when response is not expected JSON format 239 return nil, fmt.Errorf("unable to unmarshal response with status %d: %v", resp.StatusCode, err) 240} 241 242// Host returns debugging context about the agent's host operating system 243func (a *Agent) Host(serverID, nodeID string, q *QueryOptions) (*HostDataResponse, error) { 244 if q == nil { 245 q = &QueryOptions{} 246 } 247 if q.Params == nil { 248 q.Params = make(map[string]string) 249 } 250 251 if serverID != "" { 252 q.Params["server_id"] = serverID 253 } 254 255 if nodeID != "" { 256 q.Params["node_id"] = nodeID 257 } 258 259 var resp HostDataResponse 260 _, err := a.client.query("/v1/agent/host", &resp, q) 261 if err != nil { 262 return nil, err 263 } 264 265 return &resp, nil 266} 267 268// Monitor returns a channel which will receive streaming logs from the agent 269// Providing a non-nil stopCh can be used to close the connection and stop log streaming 270func (a *Agent) Monitor(stopCh <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) { 271 errCh := make(chan error, 1) 272 r, err := a.client.newRequest("GET", "/v1/agent/monitor") 273 if err != nil { 274 errCh <- err 275 return nil, errCh 276 } 277 278 r.setQueryOptions(q) 279 _, resp, err := requireOK(a.client.doRequest(r)) 280 if err != nil { 281 errCh <- err 282 return nil, errCh 283 } 284 285 frames := make(chan *StreamFrame, 10) 286 go func() { 287 defer resp.Body.Close() 288 289 dec := json.NewDecoder(resp.Body) 290 291 for { 292 select { 293 case <-stopCh: 294 close(frames) 295 return 296 default: 297 } 298 299 // Decode the next frame 300 var frame StreamFrame 301 if err := dec.Decode(&frame); err != nil { 302 close(frames) 303 errCh <- err 304 return 305 } 306 307 // Discard heartbeat frame 308 if frame.IsHeartbeat() { 309 continue 310 } 311 312 frames <- &frame 313 } 314 }() 315 316 return frames, errCh 317} 318 319// PprofOptions contain a set of parameters for profiling a node or server. 320type PprofOptions struct { 321 // ServerID is the server ID, name, or special value "leader" to 322 // specify the server that a given profile should be run on. 323 ServerID string 324 325 // NodeID is the node ID that a given profile should be run on. 326 NodeID string 327 328 // Seconds specifies the amount of time a profile should be run for. 329 // Seconds only applies for certain runtime profiles like CPU and Trace. 330 Seconds int 331 332 // GC determines if a runtime.GC() should be called before a heap 333 // profile. 334 GC int 335 336 // Debug specifies if the output of a lookup profile should be returned 337 // in human readable format instead of binary. 338 Debug int 339} 340 341// CPUProfile returns a runtime/pprof cpu profile for a given server or node. 342// The profile will run for the amount of seconds passed in or default to 1. 343// If no serverID or nodeID are provided the current Agents server will be 344// used. 345// 346// The call blocks until the profile finishes, and returns the raw bytes of the 347// profile. 348func (a *Agent) CPUProfile(opts PprofOptions, q *QueryOptions) ([]byte, error) { 349 return a.pprofRequest("profile", opts, q) 350} 351 352// Trace returns a runtime/pprof trace for a given server or node. 353// The trace will run for the amount of seconds passed in or default to 1. 354// If no serverID or nodeID are provided the current Agents server will be 355// used. 356// 357// The call blocks until the profile finishes, and returns the raw bytes of the 358// profile. 359func (a *Agent) Trace(opts PprofOptions, q *QueryOptions) ([]byte, error) { 360 return a.pprofRequest("trace", opts, q) 361} 362 363// Lookup returns a runtime/pprof profile using pprof.Lookup to determine 364// which profile to run. Accepts a client or server ID but not both simultaneously. 365// 366// The call blocks until the profile finishes, and returns the raw bytes of the 367// profile unless debug is set. 368func (a *Agent) Lookup(profile string, opts PprofOptions, q *QueryOptions) ([]byte, error) { 369 return a.pprofRequest(profile, opts, q) 370} 371 372func (a *Agent) pprofRequest(req string, opts PprofOptions, q *QueryOptions) ([]byte, error) { 373 if q == nil { 374 q = &QueryOptions{} 375 } 376 if q.Params == nil { 377 q.Params = make(map[string]string) 378 } 379 380 q.Params["seconds"] = strconv.Itoa(opts.Seconds) 381 q.Params["debug"] = strconv.Itoa(opts.Debug) 382 q.Params["gc"] = strconv.Itoa(opts.GC) 383 q.Params["node_id"] = opts.NodeID 384 q.Params["server_id"] = opts.ServerID 385 386 body, err := a.client.rawQuery(fmt.Sprintf("/v1/agent/pprof/%s", req), q) 387 if err != nil { 388 return nil, err 389 } 390 391 resp, err := ioutil.ReadAll(body) 392 if err != nil { 393 return nil, err 394 } 395 return resp, nil 396} 397 398// joinResponse is used to decode the response we get while 399// sending a member join request. 400type joinResponse struct { 401 NumJoined int `json:"num_joined"` 402 Error string `json:"error"` 403} 404 405type ServerMembers struct { 406 ServerName string 407 ServerRegion string 408 ServerDC string 409 Members []*AgentMember 410} 411 412type AgentSelf struct { 413 Config map[string]interface{} `json:"config"` 414 Member AgentMember `json:"member"` 415 Stats map[string]map[string]string `json:"stats"` 416} 417 418// AgentMember represents a cluster member known to the agent 419type AgentMember struct { 420 Name string 421 Addr string 422 Port uint16 423 Tags map[string]string 424 Status string 425 ProtocolMin uint8 426 ProtocolMax uint8 427 ProtocolCur uint8 428 DelegateMin uint8 429 DelegateMax uint8 430 DelegateCur uint8 431} 432 433// AgentMembersNameSort implements sort.Interface for []*AgentMembersNameSort 434// based on the Name, DC and Region 435type AgentMembersNameSort []*AgentMember 436 437func (a AgentMembersNameSort) Len() int { return len(a) } 438func (a AgentMembersNameSort) Swap(i, j int) { a[i], a[j] = a[j], a[i] } 439func (a AgentMembersNameSort) Less(i, j int) bool { 440 if a[i].Tags["region"] != a[j].Tags["region"] { 441 return a[i].Tags["region"] < a[j].Tags["region"] 442 } 443 444 if a[i].Tags["dc"] != a[j].Tags["dc"] { 445 return a[i].Tags["dc"] < a[j].Tags["dc"] 446 } 447 448 return a[i].Name < a[j].Name 449 450} 451 452// AgentHealthResponse is the response from the Health endpoint describing an 453// agent's health. 454type AgentHealthResponse struct { 455 Client *AgentHealth `json:"client,omitempty"` 456 Server *AgentHealth `json:"server,omitempty"` 457} 458 459// AgentHealth describes the Client or Server's health in a Health request. 460type AgentHealth struct { 461 // Ok is false if the agent is unhealthy 462 Ok bool `json:"ok"` 463 464 // Message describes why the agent is unhealthy 465 Message string `json:"message"` 466} 467 468type HostData struct { 469 OS string 470 Network []map[string]string 471 ResolvConf string 472 Hosts string 473 Environment map[string]string 474 Disk map[string]DiskUsage 475} 476 477type DiskUsage struct { 478 DiskMB int64 479 UsedMB int64 480} 481 482type HostDataResponse struct { 483 AgentID string 484 HostData *HostData `json:",omitempty"` 485} 486