1package api 2 3import ( 4 "encoding/json" 5 "fmt" 6 "net/url" 7) 8 9// Agent encapsulates an API client which talks to Nomad's 10// agent endpoints for a specific node. 11type Agent struct { 12 client *Client 13 14 // Cache static agent info 15 nodeName string 16 datacenter string 17 region string 18} 19 20// KeyringResponse is a unified key response and can be used for install, 21// remove, use, as well as listing key queries. 22type KeyringResponse struct { 23 Messages map[string]string 24 Keys map[string]int 25 NumNodes int 26} 27 28// KeyringRequest is request objects for serf key operations. 29type KeyringRequest struct { 30 Key string 31} 32 33// Agent returns a new agent which can be used to query 34// the agent-specific endpoints. 35func (c *Client) Agent() *Agent { 36 return &Agent{client: c} 37} 38 39// Self is used to query the /v1/agent/self endpoint and 40// returns information specific to the running agent. 41func (a *Agent) Self() (*AgentSelf, error) { 42 var out *AgentSelf 43 44 // Query the self endpoint on the agent 45 _, err := a.client.query("/v1/agent/self", &out, nil) 46 if err != nil { 47 return nil, fmt.Errorf("failed querying self endpoint: %s", err) 48 } 49 50 // Populate the cache for faster queries 51 a.populateCache(out) 52 53 return out, nil 54} 55 56// populateCache is used to insert various pieces of static 57// data into the agent handle. This is used during subsequent 58// lookups for the same data later on to save the round trip. 59func (a *Agent) populateCache(self *AgentSelf) { 60 if a.nodeName == "" { 61 a.nodeName = self.Member.Name 62 } 63 if a.datacenter == "" { 64 if val, ok := self.Config["Datacenter"]; ok { 65 a.datacenter, _ = val.(string) 66 } 67 } 68 if a.region == "" { 69 if val, ok := self.Config["Region"]; ok { 70 a.region, _ = val.(string) 71 } 72 } 73} 74 75// NodeName is used to query the Nomad agent for its node name. 76func (a *Agent) NodeName() (string, error) { 77 // Return from cache if we have it 78 if a.nodeName != "" { 79 return a.nodeName, nil 80 } 81 82 // Query the node name 83 _, err := a.Self() 84 return a.nodeName, err 85} 86 87// Datacenter is used to return the name of the datacenter which 88// the agent is a member of. 89func (a *Agent) Datacenter() (string, error) { 90 // Return from cache if we have it 91 if a.datacenter != "" { 92 return a.datacenter, nil 93 } 94 95 // Query the agent for the DC 96 _, err := a.Self() 97 return a.datacenter, err 98} 99 100// Region is used to look up the region the agent is in. 101func (a *Agent) Region() (string, error) { 102 // Return from cache if we have it 103 if a.region != "" { 104 return a.region, nil 105 } 106 107 // Query the agent for the region 108 _, err := a.Self() 109 return a.region, err 110} 111 112// Join is used to instruct a server node to join another server 113// via the gossip protocol. Multiple addresses may be specified. 114// We attempt to join all of the hosts in the list. Returns the 115// number of nodes successfully joined and any error. If one or 116// more nodes have a successful result, no error is returned. 117func (a *Agent) Join(addrs ...string) (int, error) { 118 // Accumulate the addresses 119 v := url.Values{} 120 for _, addr := range addrs { 121 v.Add("address", addr) 122 } 123 124 // Send the join request 125 var resp joinResponse 126 _, err := a.client.write("/v1/agent/join?"+v.Encode(), nil, &resp, nil) 127 if err != nil { 128 return 0, fmt.Errorf("failed joining: %s", err) 129 } 130 if resp.Error != "" { 131 return 0, fmt.Errorf("failed joining: %s", resp.Error) 132 } 133 return resp.NumJoined, nil 134} 135 136// Members is used to query all of the known server members 137func (a *Agent) Members() (*ServerMembers, error) { 138 var resp *ServerMembers 139 140 // Query the known members 141 _, err := a.client.query("/v1/agent/members", &resp, nil) 142 if err != nil { 143 return nil, err 144 } 145 return resp, nil 146} 147 148// ForceLeave is used to eject an existing node from the cluster. 149func (a *Agent) ForceLeave(node string) error { 150 _, err := a.client.write("/v1/agent/force-leave?node="+node, nil, nil, nil) 151 return err 152} 153 154// Servers is used to query the list of servers on a client node. 155func (a *Agent) Servers() ([]string, error) { 156 var resp []string 157 _, err := a.client.query("/v1/agent/servers", &resp, nil) 158 if err != nil { 159 return nil, err 160 } 161 return resp, nil 162} 163 164// SetServers is used to update the list of servers on a client node. 165func (a *Agent) SetServers(addrs []string) error { 166 // Accumulate the addresses 167 v := url.Values{} 168 for _, addr := range addrs { 169 v.Add("address", addr) 170 } 171 172 _, err := a.client.write("/v1/agent/servers?"+v.Encode(), nil, nil, nil) 173 return err 174} 175 176// ListKeys returns the list of installed keys 177func (a *Agent) ListKeys() (*KeyringResponse, error) { 178 var resp KeyringResponse 179 _, err := a.client.query("/v1/agent/keyring/list", &resp, nil) 180 if err != nil { 181 return nil, err 182 } 183 return &resp, nil 184} 185 186// InstallKey installs a key in the keyrings of all the serf members 187func (a *Agent) InstallKey(key string) (*KeyringResponse, error) { 188 args := KeyringRequest{ 189 Key: key, 190 } 191 var resp KeyringResponse 192 _, err := a.client.write("/v1/agent/keyring/install", &args, &resp, nil) 193 return &resp, err 194} 195 196// UseKey uses a key from the keyring of serf members 197func (a *Agent) UseKey(key string) (*KeyringResponse, error) { 198 args := KeyringRequest{ 199 Key: key, 200 } 201 var resp KeyringResponse 202 _, err := a.client.write("/v1/agent/keyring/use", &args, &resp, nil) 203 return &resp, err 204} 205 206// RemoveKey removes a particular key from keyrings of serf members 207func (a *Agent) RemoveKey(key string) (*KeyringResponse, error) { 208 args := KeyringRequest{ 209 Key: key, 210 } 211 var resp KeyringResponse 212 _, err := a.client.write("/v1/agent/keyring/remove", &args, &resp, nil) 213 return &resp, err 214} 215 216// Health queries the agent's health 217func (a *Agent) Health() (*AgentHealthResponse, error) { 218 req, err := a.client.newRequest("GET", "/v1/agent/health") 219 if err != nil { 220 return nil, err 221 } 222 223 var health AgentHealthResponse 224 _, resp, err := a.client.doRequest(req) 225 if err != nil { 226 return nil, err 227 } 228 defer resp.Body.Close() 229 230 // Always try to decode the response as JSON 231 err = json.NewDecoder(resp.Body).Decode(&health) 232 if err == nil { 233 return &health, nil 234 } 235 236 // Return custom error when response is not expected JSON format 237 return nil, fmt.Errorf("unable to unmarshal response with status %d: %v", resp.StatusCode, err) 238} 239 240// Monitor returns a channel which will receive streaming logs from the agent 241// Providing a non-nil stopCh can be used to close the connection and stop log streaming 242func (a *Agent) Monitor(stopCh <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) { 243 errCh := make(chan error, 1) 244 r, err := a.client.newRequest("GET", "/v1/agent/monitor") 245 if err != nil { 246 errCh <- err 247 return nil, errCh 248 } 249 250 r.setQueryOptions(q) 251 _, resp, err := requireOK(a.client.doRequest(r)) 252 if err != nil { 253 errCh <- err 254 return nil, errCh 255 } 256 257 frames := make(chan *StreamFrame, 10) 258 go func() { 259 defer resp.Body.Close() 260 261 dec := json.NewDecoder(resp.Body) 262 263 for { 264 select { 265 case <-stopCh: 266 close(frames) 267 return 268 default: 269 } 270 271 // Decode the next frame 272 var frame StreamFrame 273 if err := dec.Decode(&frame); err != nil { 274 close(frames) 275 errCh <- err 276 return 277 } 278 279 // Discard heartbeat frame 280 if frame.IsHeartbeat() { 281 continue 282 } 283 284 frames <- &frame 285 } 286 }() 287 288 return frames, errCh 289} 290 291// joinResponse is used to decode the response we get while 292// sending a member join request. 293type joinResponse struct { 294 NumJoined int `json:"num_joined"` 295 Error string `json:"error"` 296} 297 298type ServerMembers struct { 299 ServerName string 300 ServerRegion string 301 ServerDC string 302 Members []*AgentMember 303} 304 305type AgentSelf struct { 306 Config map[string]interface{} `json:"config"` 307 Member AgentMember `json:"member"` 308 Stats map[string]map[string]string `json:"stats"` 309} 310 311// AgentMember represents a cluster member known to the agent 312type AgentMember struct { 313 Name string 314 Addr string 315 Port uint16 316 Tags map[string]string 317 Status string 318 ProtocolMin uint8 319 ProtocolMax uint8 320 ProtocolCur uint8 321 DelegateMin uint8 322 DelegateMax uint8 323 DelegateCur uint8 324} 325 326// AgentMembersNameSort implements sort.Interface for []*AgentMembersNameSort 327// based on the Name, DC and Region 328type AgentMembersNameSort []*AgentMember 329 330func (a AgentMembersNameSort) Len() int { return len(a) } 331func (a AgentMembersNameSort) Swap(i, j int) { a[i], a[j] = a[j], a[i] } 332func (a AgentMembersNameSort) Less(i, j int) bool { 333 if a[i].Tags["region"] != a[j].Tags["region"] { 334 return a[i].Tags["region"] < a[j].Tags["region"] 335 } 336 337 if a[i].Tags["dc"] != a[j].Tags["dc"] { 338 return a[i].Tags["dc"] < a[j].Tags["dc"] 339 } 340 341 return a[i].Name < a[j].Name 342 343} 344 345// AgentHealthResponse is the response from the Health endpoint describing an 346// agent's health. 347type AgentHealthResponse struct { 348 Client *AgentHealth `json:"client,omitempty"` 349 Server *AgentHealth `json:"server,omitempty"` 350} 351 352// AgentHealth describes the Client or Server's health in a Health request. 353type AgentHealth struct { 354 // Ok is false if the agent is unhealthy 355 Ok bool `json:"ok"` 356 357 // Message describes why the agent is unhealthy 358 Message string `json:"message"` 359} 360