1package api
2
3import (
4	"encoding/json"
5	"fmt"
6	"net/url"
7)
8
9// Agent encapsulates an API client which talks to Nomad's
10// agent endpoints for a specific node.
11type Agent struct {
12	client *Client
13
14	// Cache static agent info
15	nodeName   string
16	datacenter string
17	region     string
18}
19
20// KeyringResponse is a unified key response and can be used for install,
21// remove, use, as well as listing key queries.
22type KeyringResponse struct {
23	Messages map[string]string
24	Keys     map[string]int
25	NumNodes int
26}
27
28// KeyringRequest is request objects for serf key operations.
29type KeyringRequest struct {
30	Key string
31}
32
33// Agent returns a new agent which can be used to query
34// the agent-specific endpoints.
35func (c *Client) Agent() *Agent {
36	return &Agent{client: c}
37}
38
39// Self is used to query the /v1/agent/self endpoint and
40// returns information specific to the running agent.
41func (a *Agent) Self() (*AgentSelf, error) {
42	var out *AgentSelf
43
44	// Query the self endpoint on the agent
45	_, err := a.client.query("/v1/agent/self", &out, nil)
46	if err != nil {
47		return nil, fmt.Errorf("failed querying self endpoint: %s", err)
48	}
49
50	// Populate the cache for faster queries
51	a.populateCache(out)
52
53	return out, nil
54}
55
56// populateCache is used to insert various pieces of static
57// data into the agent handle. This is used during subsequent
58// lookups for the same data later on to save the round trip.
59func (a *Agent) populateCache(self *AgentSelf) {
60	if a.nodeName == "" {
61		a.nodeName = self.Member.Name
62	}
63	if a.datacenter == "" {
64		if val, ok := self.Config["Datacenter"]; ok {
65			a.datacenter, _ = val.(string)
66		}
67	}
68	if a.region == "" {
69		if val, ok := self.Config["Region"]; ok {
70			a.region, _ = val.(string)
71		}
72	}
73}
74
75// NodeName is used to query the Nomad agent for its node name.
76func (a *Agent) NodeName() (string, error) {
77	// Return from cache if we have it
78	if a.nodeName != "" {
79		return a.nodeName, nil
80	}
81
82	// Query the node name
83	_, err := a.Self()
84	return a.nodeName, err
85}
86
87// Datacenter is used to return the name of the datacenter which
88// the agent is a member of.
89func (a *Agent) Datacenter() (string, error) {
90	// Return from cache if we have it
91	if a.datacenter != "" {
92		return a.datacenter, nil
93	}
94
95	// Query the agent for the DC
96	_, err := a.Self()
97	return a.datacenter, err
98}
99
100// Region is used to look up the region the agent is in.
101func (a *Agent) Region() (string, error) {
102	// Return from cache if we have it
103	if a.region != "" {
104		return a.region, nil
105	}
106
107	// Query the agent for the region
108	_, err := a.Self()
109	return a.region, err
110}
111
112// Join is used to instruct a server node to join another server
113// via the gossip protocol. Multiple addresses may be specified.
114// We attempt to join all of the hosts in the list. Returns the
115// number of nodes successfully joined and any error. If one or
116// more nodes have a successful result, no error is returned.
117func (a *Agent) Join(addrs ...string) (int, error) {
118	// Accumulate the addresses
119	v := url.Values{}
120	for _, addr := range addrs {
121		v.Add("address", addr)
122	}
123
124	// Send the join request
125	var resp joinResponse
126	_, err := a.client.write("/v1/agent/join?"+v.Encode(), nil, &resp, nil)
127	if err != nil {
128		return 0, fmt.Errorf("failed joining: %s", err)
129	}
130	if resp.Error != "" {
131		return 0, fmt.Errorf("failed joining: %s", resp.Error)
132	}
133	return resp.NumJoined, nil
134}
135
136// Members is used to query all of the known server members
137func (a *Agent) Members() (*ServerMembers, error) {
138	var resp *ServerMembers
139
140	// Query the known members
141	_, err := a.client.query("/v1/agent/members", &resp, nil)
142	if err != nil {
143		return nil, err
144	}
145	return resp, nil
146}
147
148// ForceLeave is used to eject an existing node from the cluster.
149func (a *Agent) ForceLeave(node string) error {
150	_, err := a.client.write("/v1/agent/force-leave?node="+node, nil, nil, nil)
151	return err
152}
153
154// Servers is used to query the list of servers on a client node.
155func (a *Agent) Servers() ([]string, error) {
156	var resp []string
157	_, err := a.client.query("/v1/agent/servers", &resp, nil)
158	if err != nil {
159		return nil, err
160	}
161	return resp, nil
162}
163
164// SetServers is used to update the list of servers on a client node.
165func (a *Agent) SetServers(addrs []string) error {
166	// Accumulate the addresses
167	v := url.Values{}
168	for _, addr := range addrs {
169		v.Add("address", addr)
170	}
171
172	_, err := a.client.write("/v1/agent/servers?"+v.Encode(), nil, nil, nil)
173	return err
174}
175
176// ListKeys returns the list of installed keys
177func (a *Agent) ListKeys() (*KeyringResponse, error) {
178	var resp KeyringResponse
179	_, err := a.client.query("/v1/agent/keyring/list", &resp, nil)
180	if err != nil {
181		return nil, err
182	}
183	return &resp, nil
184}
185
186// InstallKey installs a key in the keyrings of all the serf members
187func (a *Agent) InstallKey(key string) (*KeyringResponse, error) {
188	args := KeyringRequest{
189		Key: key,
190	}
191	var resp KeyringResponse
192	_, err := a.client.write("/v1/agent/keyring/install", &args, &resp, nil)
193	return &resp, err
194}
195
196// UseKey uses a key from the keyring of serf members
197func (a *Agent) UseKey(key string) (*KeyringResponse, error) {
198	args := KeyringRequest{
199		Key: key,
200	}
201	var resp KeyringResponse
202	_, err := a.client.write("/v1/agent/keyring/use", &args, &resp, nil)
203	return &resp, err
204}
205
206// RemoveKey removes a particular key from keyrings of serf members
207func (a *Agent) RemoveKey(key string) (*KeyringResponse, error) {
208	args := KeyringRequest{
209		Key: key,
210	}
211	var resp KeyringResponse
212	_, err := a.client.write("/v1/agent/keyring/remove", &args, &resp, nil)
213	return &resp, err
214}
215
216// Health queries the agent's health
217func (a *Agent) Health() (*AgentHealthResponse, error) {
218	req, err := a.client.newRequest("GET", "/v1/agent/health")
219	if err != nil {
220		return nil, err
221	}
222
223	var health AgentHealthResponse
224	_, resp, err := a.client.doRequest(req)
225	if err != nil {
226		return nil, err
227	}
228	defer resp.Body.Close()
229
230	// Always try to decode the response as JSON
231	err = json.NewDecoder(resp.Body).Decode(&health)
232	if err == nil {
233		return &health, nil
234	}
235
236	// Return custom error when response is not expected JSON format
237	return nil, fmt.Errorf("unable to unmarshal response with status %d: %v", resp.StatusCode, err)
238}
239
240// Monitor returns a channel which will receive streaming logs from the agent
241// Providing a non-nil stopCh can be used to close the connection and stop log streaming
242func (a *Agent) Monitor(stopCh <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) {
243	errCh := make(chan error, 1)
244	r, err := a.client.newRequest("GET", "/v1/agent/monitor")
245	if err != nil {
246		errCh <- err
247		return nil, errCh
248	}
249
250	r.setQueryOptions(q)
251	_, resp, err := requireOK(a.client.doRequest(r))
252	if err != nil {
253		errCh <- err
254		return nil, errCh
255	}
256
257	frames := make(chan *StreamFrame, 10)
258	go func() {
259		defer resp.Body.Close()
260
261		dec := json.NewDecoder(resp.Body)
262
263		for {
264			select {
265			case <-stopCh:
266				close(frames)
267				return
268			default:
269			}
270
271			// Decode the next frame
272			var frame StreamFrame
273			if err := dec.Decode(&frame); err != nil {
274				close(frames)
275				errCh <- err
276				return
277			}
278
279			// Discard heartbeat frame
280			if frame.IsHeartbeat() {
281				continue
282			}
283
284			frames <- &frame
285		}
286	}()
287
288	return frames, errCh
289}
290
291// joinResponse is used to decode the response we get while
292// sending a member join request.
293type joinResponse struct {
294	NumJoined int    `json:"num_joined"`
295	Error     string `json:"error"`
296}
297
298type ServerMembers struct {
299	ServerName   string
300	ServerRegion string
301	ServerDC     string
302	Members      []*AgentMember
303}
304
305type AgentSelf struct {
306	Config map[string]interface{}       `json:"config"`
307	Member AgentMember                  `json:"member"`
308	Stats  map[string]map[string]string `json:"stats"`
309}
310
311// AgentMember represents a cluster member known to the agent
312type AgentMember struct {
313	Name        string
314	Addr        string
315	Port        uint16
316	Tags        map[string]string
317	Status      string
318	ProtocolMin uint8
319	ProtocolMax uint8
320	ProtocolCur uint8
321	DelegateMin uint8
322	DelegateMax uint8
323	DelegateCur uint8
324}
325
326// AgentMembersNameSort implements sort.Interface for []*AgentMembersNameSort
327// based on the Name, DC and Region
328type AgentMembersNameSort []*AgentMember
329
330func (a AgentMembersNameSort) Len() int      { return len(a) }
331func (a AgentMembersNameSort) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
332func (a AgentMembersNameSort) Less(i, j int) bool {
333	if a[i].Tags["region"] != a[j].Tags["region"] {
334		return a[i].Tags["region"] < a[j].Tags["region"]
335	}
336
337	if a[i].Tags["dc"] != a[j].Tags["dc"] {
338		return a[i].Tags["dc"] < a[j].Tags["dc"]
339	}
340
341	return a[i].Name < a[j].Name
342
343}
344
345// AgentHealthResponse is the response from the Health endpoint describing an
346// agent's health.
347type AgentHealthResponse struct {
348	Client *AgentHealth `json:"client,omitempty"`
349	Server *AgentHealth `json:"server,omitempty"`
350}
351
352// AgentHealth describes the Client or Server's health in a Health request.
353type AgentHealth struct {
354	// Ok is false if the agent is unhealthy
355	Ok bool `json:"ok"`
356
357	// Message describes why the agent is unhealthy
358	Message string `json:"message"`
359}
360