1package api
2
3import (
4	"encoding/json"
5	"fmt"
6	"io/ioutil"
7	"net/url"
8	"strconv"
9)
10
11// Agent encapsulates an API client which talks to Nomad's
12// agent endpoints for a specific node.
13type Agent struct {
14	client *Client
15
16	// Cache static agent info
17	nodeName   string
18	datacenter string
19	region     string
20}
21
22// KeyringResponse is a unified key response and can be used for install,
23// remove, use, as well as listing key queries.
24type KeyringResponse struct {
25	Messages map[string]string
26	Keys     map[string]int
27	NumNodes int
28}
29
30// KeyringRequest is request objects for serf key operations.
31type KeyringRequest struct {
32	Key string
33}
34
35// Agent returns a new agent which can be used to query
36// the agent-specific endpoints.
37func (c *Client) Agent() *Agent {
38	return &Agent{client: c}
39}
40
41// Self is used to query the /v1/agent/self endpoint and
42// returns information specific to the running agent.
43func (a *Agent) Self() (*AgentSelf, error) {
44	var out *AgentSelf
45
46	// Query the self endpoint on the agent
47	_, err := a.client.query("/v1/agent/self", &out, nil)
48	if err != nil {
49		return nil, fmt.Errorf("failed querying self endpoint: %s", err)
50	}
51
52	// Populate the cache for faster queries
53	a.populateCache(out)
54
55	return out, nil
56}
57
58// populateCache is used to insert various pieces of static
59// data into the agent handle. This is used during subsequent
60// lookups for the same data later on to save the round trip.
61func (a *Agent) populateCache(self *AgentSelf) {
62	if a.nodeName == "" {
63		a.nodeName = self.Member.Name
64	}
65	if a.datacenter == "" {
66		if val, ok := self.Config["Datacenter"]; ok {
67			a.datacenter, _ = val.(string)
68		}
69	}
70	if a.region == "" {
71		if val, ok := self.Config["Region"]; ok {
72			a.region, _ = val.(string)
73		}
74	}
75}
76
77// NodeName is used to query the Nomad agent for its node name.
78func (a *Agent) NodeName() (string, error) {
79	// Return from cache if we have it
80	if a.nodeName != "" {
81		return a.nodeName, nil
82	}
83
84	// Query the node name
85	_, err := a.Self()
86	return a.nodeName, err
87}
88
89// Datacenter is used to return the name of the datacenter which
90// the agent is a member of.
91func (a *Agent) Datacenter() (string, error) {
92	// Return from cache if we have it
93	if a.datacenter != "" {
94		return a.datacenter, nil
95	}
96
97	// Query the agent for the DC
98	_, err := a.Self()
99	return a.datacenter, err
100}
101
102// Region is used to look up the region the agent is in.
103func (a *Agent) Region() (string, error) {
104	// Return from cache if we have it
105	if a.region != "" {
106		return a.region, nil
107	}
108
109	// Query the agent for the region
110	_, err := a.Self()
111	return a.region, err
112}
113
114// Join is used to instruct a server node to join another server
115// via the gossip protocol. Multiple addresses may be specified.
116// We attempt to join all of the hosts in the list. Returns the
117// number of nodes successfully joined and any error. If one or
118// more nodes have a successful result, no error is returned.
119func (a *Agent) Join(addrs ...string) (int, error) {
120	// Accumulate the addresses
121	v := url.Values{}
122	for _, addr := range addrs {
123		v.Add("address", addr)
124	}
125
126	// Send the join request
127	var resp joinResponse
128	_, err := a.client.write("/v1/agent/join?"+v.Encode(), nil, &resp, nil)
129	if err != nil {
130		return 0, fmt.Errorf("failed joining: %s", err)
131	}
132	if resp.Error != "" {
133		return 0, fmt.Errorf("failed joining: %s", resp.Error)
134	}
135	return resp.NumJoined, nil
136}
137
138// Members is used to query all of the known server members
139func (a *Agent) Members() (*ServerMembers, error) {
140	var resp *ServerMembers
141
142	// Query the known members
143	_, err := a.client.query("/v1/agent/members", &resp, nil)
144	if err != nil {
145		return nil, err
146	}
147	return resp, nil
148}
149
150// ForceLeave is used to eject an existing node from the cluster.
151func (a *Agent) ForceLeave(node string) error {
152	_, err := a.client.write("/v1/agent/force-leave?node="+node, nil, nil, nil)
153	return err
154}
155
156// Servers is used to query the list of servers on a client node.
157func (a *Agent) Servers() ([]string, error) {
158	var resp []string
159	_, err := a.client.query("/v1/agent/servers", &resp, nil)
160	if err != nil {
161		return nil, err
162	}
163	return resp, nil
164}
165
166// SetServers is used to update the list of servers on a client node.
167func (a *Agent) SetServers(addrs []string) error {
168	// Accumulate the addresses
169	v := url.Values{}
170	for _, addr := range addrs {
171		v.Add("address", addr)
172	}
173
174	_, err := a.client.write("/v1/agent/servers?"+v.Encode(), nil, nil, nil)
175	return err
176}
177
178// ListKeys returns the list of installed keys
179func (a *Agent) ListKeys() (*KeyringResponse, error) {
180	var resp KeyringResponse
181	_, err := a.client.query("/v1/agent/keyring/list", &resp, nil)
182	if err != nil {
183		return nil, err
184	}
185	return &resp, nil
186}
187
188// InstallKey installs a key in the keyrings of all the serf members
189func (a *Agent) InstallKey(key string) (*KeyringResponse, error) {
190	args := KeyringRequest{
191		Key: key,
192	}
193	var resp KeyringResponse
194	_, err := a.client.write("/v1/agent/keyring/install", &args, &resp, nil)
195	return &resp, err
196}
197
198// UseKey uses a key from the keyring of serf members
199func (a *Agent) UseKey(key string) (*KeyringResponse, error) {
200	args := KeyringRequest{
201		Key: key,
202	}
203	var resp KeyringResponse
204	_, err := a.client.write("/v1/agent/keyring/use", &args, &resp, nil)
205	return &resp, err
206}
207
208// RemoveKey removes a particular key from keyrings of serf members
209func (a *Agent) RemoveKey(key string) (*KeyringResponse, error) {
210	args := KeyringRequest{
211		Key: key,
212	}
213	var resp KeyringResponse
214	_, err := a.client.write("/v1/agent/keyring/remove", &args, &resp, nil)
215	return &resp, err
216}
217
218// Health queries the agent's health
219func (a *Agent) Health() (*AgentHealthResponse, error) {
220	req, err := a.client.newRequest("GET", "/v1/agent/health")
221	if err != nil {
222		return nil, err
223	}
224
225	var health AgentHealthResponse
226	_, resp, err := a.client.doRequest(req)
227	if err != nil {
228		return nil, err
229	}
230	defer resp.Body.Close()
231
232	// Always try to decode the response as JSON
233	err = json.NewDecoder(resp.Body).Decode(&health)
234	if err == nil {
235		return &health, nil
236	}
237
238	// Return custom error when response is not expected JSON format
239	return nil, fmt.Errorf("unable to unmarshal response with status %d: %v", resp.StatusCode, err)
240}
241
242// Host returns debugging context about the agent's host operating system
243func (a *Agent) Host(serverID, nodeID string, q *QueryOptions) (*HostDataResponse, error) {
244	if q == nil {
245		q = &QueryOptions{}
246	}
247	if q.Params == nil {
248		q.Params = make(map[string]string)
249	}
250
251	if serverID != "" {
252		q.Params["server_id"] = serverID
253	}
254
255	if nodeID != "" {
256		q.Params["node_id"] = nodeID
257	}
258
259	var resp HostDataResponse
260	_, err := a.client.query("/v1/agent/host", &resp, q)
261	if err != nil {
262		return nil, err
263	}
264
265	return &resp, nil
266}
267
268// Monitor returns a channel which will receive streaming logs from the agent
269// Providing a non-nil stopCh can be used to close the connection and stop log streaming
270func (a *Agent) Monitor(stopCh <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) {
271	errCh := make(chan error, 1)
272	r, err := a.client.newRequest("GET", "/v1/agent/monitor")
273	if err != nil {
274		errCh <- err
275		return nil, errCh
276	}
277
278	r.setQueryOptions(q)
279	_, resp, err := requireOK(a.client.doRequest(r))
280	if err != nil {
281		errCh <- err
282		return nil, errCh
283	}
284
285	frames := make(chan *StreamFrame, 10)
286	go func() {
287		defer resp.Body.Close()
288
289		dec := json.NewDecoder(resp.Body)
290
291		for {
292			select {
293			case <-stopCh:
294				close(frames)
295				return
296			default:
297			}
298
299			// Decode the next frame
300			var frame StreamFrame
301			if err := dec.Decode(&frame); err != nil {
302				close(frames)
303				errCh <- err
304				return
305			}
306
307			// Discard heartbeat frame
308			if frame.IsHeartbeat() {
309				continue
310			}
311
312			frames <- &frame
313		}
314	}()
315
316	return frames, errCh
317}
318
319// PprofOptions contain a set of parameters for profiling a node or server.
320type PprofOptions struct {
321	// ServerID is the server ID, name, or special value "leader" to
322	// specify the server that a given profile should be run on.
323	ServerID string
324
325	// NodeID is the node ID that a given profile should be run on.
326	NodeID string
327
328	// Seconds specifies the amount of time a profile should be run for.
329	// Seconds only applies for certain runtime profiles like CPU and Trace.
330	Seconds int
331
332	// GC determines if a runtime.GC() should be called before a heap
333	// profile.
334	GC int
335
336	// Debug specifies if the output of a lookup profile should be returned
337	// in human readable format instead of binary.
338	Debug int
339}
340
341// CPUProfile returns a runtime/pprof cpu profile for a given server or node.
342// The profile will run for the amount of seconds passed in or default to 1.
343// If no serverID or nodeID are provided the current Agents server will be
344// used.
345//
346// The call blocks until the profile finishes, and returns the raw bytes of the
347// profile.
348func (a *Agent) CPUProfile(opts PprofOptions, q *QueryOptions) ([]byte, error) {
349	return a.pprofRequest("profile", opts, q)
350}
351
352// Trace returns a runtime/pprof trace for a given server or node.
353// The trace will run for the amount of seconds passed in or default to 1.
354// If no serverID or nodeID are provided the current Agents server will be
355// used.
356//
357// The call blocks until the profile finishes, and returns the raw bytes of the
358// profile.
359func (a *Agent) Trace(opts PprofOptions, q *QueryOptions) ([]byte, error) {
360	return a.pprofRequest("trace", opts, q)
361}
362
363// Lookup returns a runtime/pprof profile using pprof.Lookup to determine
364// which profile to run. Accepts a client or server ID but not both simultaneously.
365//
366// The call blocks until the profile finishes, and returns the raw bytes of the
367// profile unless debug is set.
368func (a *Agent) Lookup(profile string, opts PprofOptions, q *QueryOptions) ([]byte, error) {
369	return a.pprofRequest(profile, opts, q)
370}
371
372func (a *Agent) pprofRequest(req string, opts PprofOptions, q *QueryOptions) ([]byte, error) {
373	if q == nil {
374		q = &QueryOptions{}
375	}
376	if q.Params == nil {
377		q.Params = make(map[string]string)
378	}
379
380	q.Params["seconds"] = strconv.Itoa(opts.Seconds)
381	q.Params["debug"] = strconv.Itoa(opts.Debug)
382	q.Params["gc"] = strconv.Itoa(opts.GC)
383	q.Params["node_id"] = opts.NodeID
384	q.Params["server_id"] = opts.ServerID
385
386	body, err := a.client.rawQuery(fmt.Sprintf("/v1/agent/pprof/%s", req), q)
387	if err != nil {
388		return nil, err
389	}
390
391	resp, err := ioutil.ReadAll(body)
392	if err != nil {
393		return nil, err
394	}
395	return resp, nil
396}
397
398// joinResponse is used to decode the response we get while
399// sending a member join request.
400type joinResponse struct {
401	NumJoined int    `json:"num_joined"`
402	Error     string `json:"error"`
403}
404
405type ServerMembers struct {
406	ServerName   string
407	ServerRegion string
408	ServerDC     string
409	Members      []*AgentMember
410}
411
412type AgentSelf struct {
413	Config map[string]interface{}       `json:"config"`
414	Member AgentMember                  `json:"member"`
415	Stats  map[string]map[string]string `json:"stats"`
416}
417
418// AgentMember represents a cluster member known to the agent
419type AgentMember struct {
420	Name        string
421	Addr        string
422	Port        uint16
423	Tags        map[string]string
424	Status      string
425	ProtocolMin uint8
426	ProtocolMax uint8
427	ProtocolCur uint8
428	DelegateMin uint8
429	DelegateMax uint8
430	DelegateCur uint8
431}
432
433// AgentMembersNameSort implements sort.Interface for []*AgentMembersNameSort
434// based on the Name, DC and Region
435type AgentMembersNameSort []*AgentMember
436
437func (a AgentMembersNameSort) Len() int      { return len(a) }
438func (a AgentMembersNameSort) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
439func (a AgentMembersNameSort) Less(i, j int) bool {
440	if a[i].Tags["region"] != a[j].Tags["region"] {
441		return a[i].Tags["region"] < a[j].Tags["region"]
442	}
443
444	if a[i].Tags["dc"] != a[j].Tags["dc"] {
445		return a[i].Tags["dc"] < a[j].Tags["dc"]
446	}
447
448	return a[i].Name < a[j].Name
449
450}
451
452// AgentHealthResponse is the response from the Health endpoint describing an
453// agent's health.
454type AgentHealthResponse struct {
455	Client *AgentHealth `json:"client,omitempty"`
456	Server *AgentHealth `json:"server,omitempty"`
457}
458
459// AgentHealth describes the Client or Server's health in a Health request.
460type AgentHealth struct {
461	// Ok is false if the agent is unhealthy
462	Ok bool `json:"ok"`
463
464	// Message describes why the agent is unhealthy
465	Message string `json:"message"`
466}
467
468type HostData struct {
469	OS          string
470	Network     []map[string]string
471	ResolvConf  string
472	Hosts       string
473	Environment map[string]string
474	Disk        map[string]DiskUsage
475}
476
477type DiskUsage struct {
478	DiskMB int64
479	UsedMB int64
480}
481
482type HostDataResponse struct {
483	AgentID  string
484	HostData *HostData `json:",omitempty"`
485}
486