1package agent
2
3import (
4	"encoding/base64"
5	"encoding/json"
6	"fmt"
7	"io"
8	"io/ioutil"
9	"log"
10	"os"
11	"strings"
12	"sync"
13
14	"github.com/hashicorp/memberlist"
15	"github.com/hashicorp/serf/serf"
16)
17
18// Agent starts and manages a Serf instance, adding some niceties
19// on top of Serf such as storing logs that you can later retrieve,
20// and invoking EventHandlers when events occur.
21type Agent struct {
22	// Stores the serf configuration
23	conf *serf.Config
24
25	// Stores the agent configuration
26	agentConf *Config
27
28	// eventCh is used for Serf to deliver events on
29	eventCh chan serf.Event
30
31	// eventHandlers is the registered handlers for events
32	eventHandlers     map[EventHandler]struct{}
33	eventHandlerList  []EventHandler
34	eventHandlersLock sync.Mutex
35
36	// logger instance wraps the logOutput
37	logger *log.Logger
38
39	// This is the underlying Serf we are wrapping
40	serf *serf.Serf
41
42	// shutdownCh is used for shutdowns
43	shutdown     bool
44	shutdownCh   chan struct{}
45	shutdownLock sync.Mutex
46}
47
48// Create creates a new agent, potentially returning an error
49func Create(agentConf *Config, conf *serf.Config, logOutput io.Writer) (*Agent, error) {
50	// Ensure we have a log sink
51	if logOutput == nil {
52		logOutput = os.Stderr
53	}
54
55	// Setup the underlying loggers
56	conf.MemberlistConfig.LogOutput = logOutput
57	conf.MemberlistConfig.EnableCompression = agentConf.EnableCompression
58	conf.LogOutput = logOutput
59
60	// Create a channel to listen for events from Serf
61	eventCh := make(chan serf.Event, 64)
62	conf.EventCh = eventCh
63
64	// Setup the agent
65	agent := &Agent{
66		conf:          conf,
67		agentConf:     agentConf,
68		eventCh:       eventCh,
69		eventHandlers: make(map[EventHandler]struct{}),
70		logger:        log.New(logOutput, "", log.LstdFlags),
71		shutdownCh:    make(chan struct{}),
72	}
73
74	// Restore agent tags from a tags file
75	if agentConf.TagsFile != "" {
76		if err := agent.loadTagsFile(agentConf.TagsFile); err != nil {
77			return nil, err
78		}
79	}
80
81	// Load in a keyring file if provided
82	if agentConf.KeyringFile != "" {
83		if err := agent.loadKeyringFile(agentConf.KeyringFile); err != nil {
84			return nil, err
85		}
86	}
87
88	return agent, nil
89}
90
91// Start is used to initiate the event listeners. It is separate from
92// create so that there isn't a race condition between creating the
93// agent and registering handlers
94func (a *Agent) Start() error {
95	a.logger.Printf("[INFO] agent: Serf agent starting")
96
97	// Create serf first
98	serf, err := serf.Create(a.conf)
99	if err != nil {
100		return fmt.Errorf("Error creating Serf: %s", err)
101	}
102	a.serf = serf
103
104	// Start event loop
105	go a.eventLoop()
106	return nil
107}
108
109// Leave prepares for a graceful shutdown of the agent and its processes
110func (a *Agent) Leave() error {
111	if a.serf == nil {
112		return nil
113	}
114
115	a.logger.Println("[INFO] agent: requesting graceful leave from Serf")
116	return a.serf.Leave()
117}
118
119// Shutdown closes this agent and all of its processes. Should be preceded
120// by a Leave for a graceful shutdown.
121func (a *Agent) Shutdown() error {
122	a.shutdownLock.Lock()
123	defer a.shutdownLock.Unlock()
124
125	if a.shutdown {
126		return nil
127	}
128
129	if a.serf == nil {
130		goto EXIT
131	}
132
133	a.logger.Println("[INFO] agent: requesting serf shutdown")
134	if err := a.serf.Shutdown(); err != nil {
135		return err
136	}
137
138EXIT:
139	a.logger.Println("[INFO] agent: shutdown complete")
140	a.shutdown = true
141	close(a.shutdownCh)
142	return nil
143}
144
145// ShutdownCh returns a channel that can be selected to wait
146// for the agent to perform a shutdown.
147func (a *Agent) ShutdownCh() <-chan struct{} {
148	return a.shutdownCh
149}
150
151// Returns the Serf agent of the running Agent.
152func (a *Agent) Serf() *serf.Serf {
153	return a.serf
154}
155
156// Returns the Serf config of the running Agent.
157func (a *Agent) SerfConfig() *serf.Config {
158	return a.conf
159}
160
161// Join asks the Serf instance to join. See the Serf.Join function.
162func (a *Agent) Join(addrs []string, replay bool) (n int, err error) {
163	a.logger.Printf("[INFO] agent: joining: %v replay: %v", addrs, replay)
164	ignoreOld := !replay
165	n, err = a.serf.Join(addrs, ignoreOld)
166	if n > 0 {
167		a.logger.Printf("[INFO] agent: joined: %d nodes", n)
168	}
169	if err != nil {
170		a.logger.Printf("[WARN] agent: error joining: %v", err)
171	}
172	return
173}
174
175// ForceLeave is used to eject a failed node from the cluster
176func (a *Agent) ForceLeave(node string) error {
177	a.logger.Printf("[INFO] agent: Force leaving node: %s", node)
178	err := a.serf.RemoveFailedNode(node)
179	if err != nil {
180		a.logger.Printf("[WARN] agent: failed to remove node: %v", err)
181	}
182	return err
183}
184
185// ForceLeavePrune completely removes a failed node from the
186// member list entirely
187func (a *Agent) ForceLeavePrune(node string) error {
188	a.logger.Printf("[INFO] agent: Force leaving node (prune): %s", node)
189	err := a.serf.RemoveFailedNodePrune(node)
190	if err != nil {
191		a.logger.Printf("[WARN] agent: failed to remove node (prune): %v", err)
192	}
193	return err
194}
195
196// UserEvent sends a UserEvent on Serf, see Serf.UserEvent.
197func (a *Agent) UserEvent(name string, payload []byte, coalesce bool) error {
198	a.logger.Printf("[DEBUG] agent: Requesting user event send: %s. Coalesced: %#v. Payload: %#v",
199		name, coalesce, string(payload))
200	err := a.serf.UserEvent(name, payload, coalesce)
201	if err != nil {
202		a.logger.Printf("[WARN] agent: failed to send user event: %v", err)
203	}
204	return err
205}
206
207// Query sends a Query on Serf, see Serf.Query.
208func (a *Agent) Query(name string, payload []byte, params *serf.QueryParam) (*serf.QueryResponse, error) {
209	// Prevent the use of the internal prefix
210	if strings.HasPrefix(name, serf.InternalQueryPrefix) {
211		// Allow the special "ping" query
212		if name != serf.InternalQueryPrefix+"ping" || payload != nil {
213			return nil, fmt.Errorf("Queries cannot contain the '%s' prefix", serf.InternalQueryPrefix)
214		}
215	}
216	a.logger.Printf("[DEBUG] agent: Requesting query send: %s. Payload: %#v",
217		name, string(payload))
218	resp, err := a.serf.Query(name, payload, params)
219	if err != nil {
220		a.logger.Printf("[WARN] agent: failed to start user query: %v", err)
221	}
222	return resp, err
223}
224
225// RegisterEventHandler adds an event handler to receive event notifications
226func (a *Agent) RegisterEventHandler(eh EventHandler) {
227	a.eventHandlersLock.Lock()
228	defer a.eventHandlersLock.Unlock()
229
230	a.eventHandlers[eh] = struct{}{}
231	a.eventHandlerList = nil
232	for eh := range a.eventHandlers {
233		a.eventHandlerList = append(a.eventHandlerList, eh)
234	}
235}
236
237// DeregisterEventHandler removes an EventHandler and prevents more invocations
238func (a *Agent) DeregisterEventHandler(eh EventHandler) {
239	a.eventHandlersLock.Lock()
240	defer a.eventHandlersLock.Unlock()
241
242	delete(a.eventHandlers, eh)
243	a.eventHandlerList = nil
244	for eh := range a.eventHandlers {
245		a.eventHandlerList = append(a.eventHandlerList, eh)
246	}
247}
248
249// eventLoop listens to events from Serf and fans out to event handlers
250func (a *Agent) eventLoop() {
251	serfShutdownCh := a.serf.ShutdownCh()
252	for {
253		select {
254		case e := <-a.eventCh:
255			a.logger.Printf("[INFO] agent: Received event: %s", e.String())
256			a.eventHandlersLock.Lock()
257			handlers := a.eventHandlerList
258			a.eventHandlersLock.Unlock()
259			for _, eh := range handlers {
260				eh.HandleEvent(e)
261			}
262
263		case <-serfShutdownCh:
264			a.logger.Printf("[WARN] agent: Serf shutdown detected, quitting")
265			a.Shutdown()
266			return
267
268		case <-a.shutdownCh:
269			return
270		}
271	}
272}
273
274// InstallKey initiates a query to install a new key on all members
275func (a *Agent) InstallKey(key string) (*serf.KeyResponse, error) {
276	a.logger.Print("[INFO] agent: Initiating key installation")
277	manager := a.serf.KeyManager()
278	return manager.InstallKey(key)
279}
280
281// UseKey sends a query instructing all members to switch primary keys
282func (a *Agent) UseKey(key string) (*serf.KeyResponse, error) {
283	a.logger.Print("[INFO] agent: Initiating primary key change")
284	manager := a.serf.KeyManager()
285	return manager.UseKey(key)
286}
287
288// RemoveKey sends a query to all members to remove a key from the keyring
289func (a *Agent) RemoveKey(key string) (*serf.KeyResponse, error) {
290	a.logger.Print("[INFO] agent: Initiating key removal")
291	manager := a.serf.KeyManager()
292	return manager.RemoveKey(key)
293}
294
295// ListKeys sends a query to all members to return a list of their keys
296func (a *Agent) ListKeys() (*serf.KeyResponse, error) {
297	a.logger.Print("[INFO] agent: Initiating key listing")
298	manager := a.serf.KeyManager()
299	return manager.ListKeys()
300}
301
302// SetTags is used to update the tags. The agent will make sure to
303// persist tags if necessary before gossiping to the cluster.
304func (a *Agent) SetTags(tags map[string]string) error {
305	// Update the tags file if we have one
306	if a.agentConf.TagsFile != "" {
307		if err := a.writeTagsFile(tags); err != nil {
308			a.logger.Printf("[ERR] agent: %s", err)
309			return err
310		}
311	}
312
313	// Set the tags in Serf, start gossiping out
314	return a.serf.SetTags(tags)
315}
316
317// loadTagsFile will load agent tags out of a file and set them in the
318// current serf configuration.
319func (a *Agent) loadTagsFile(tagsFile string) error {
320	// Avoid passing tags and using a tags file at the same time
321	if len(a.agentConf.Tags) > 0 {
322		return fmt.Errorf("Tags config not allowed while using tag files")
323	}
324
325	if _, err := os.Stat(tagsFile); err == nil {
326		tagData, err := ioutil.ReadFile(tagsFile)
327		if err != nil {
328			return fmt.Errorf("Failed to read tags file: %s", err)
329		}
330		if err := json.Unmarshal(tagData, &a.conf.Tags); err != nil {
331			return fmt.Errorf("Failed to decode tags file: %s", err)
332		}
333		a.logger.Printf("[INFO] agent: Restored %d tag(s) from %s",
334			len(a.conf.Tags), tagsFile)
335	}
336
337	// Success!
338	return nil
339}
340
341// writeTagsFile will write the current tags to the configured tags file.
342func (a *Agent) writeTagsFile(tags map[string]string) error {
343	encoded, err := json.MarshalIndent(tags, "", "  ")
344	if err != nil {
345		return fmt.Errorf("Failed to encode tags: %s", err)
346	}
347
348	// Use 0600 for permissions, in case tag data is sensitive
349	if err = ioutil.WriteFile(a.agentConf.TagsFile, encoded, 0600); err != nil {
350		return fmt.Errorf("Failed to write tags file: %s", err)
351	}
352
353	// Success!
354	return nil
355}
356
357// MarshalTags is a utility function which takes a map of tag key/value pairs
358// and returns the same tags as strings in 'key=value' format.
359func MarshalTags(tags map[string]string) []string {
360	var result []string
361	for name, value := range tags {
362		result = append(result, fmt.Sprintf("%s=%s", name, value))
363	}
364	return result
365}
366
367// UnmarshalTags is a utility function which takes a slice of strings in
368// key=value format and returns them as a tag mapping.
369func UnmarshalTags(tags []string) (map[string]string, error) {
370	result := make(map[string]string)
371	for _, tag := range tags {
372		parts := strings.SplitN(tag, "=", 2)
373		if len(parts) != 2 || len(parts[0]) == 0 {
374			return nil, fmt.Errorf("Invalid tag: '%s'", tag)
375		}
376		result[parts[0]] = parts[1]
377	}
378	return result, nil
379}
380
381// loadKeyringFile will load a keyring out of a file
382func (a *Agent) loadKeyringFile(keyringFile string) error {
383	// Avoid passing an encryption key and a keyring file at the same time
384	if len(a.agentConf.EncryptKey) > 0 {
385		return fmt.Errorf("Encryption key not allowed while using a keyring")
386	}
387
388	if _, err := os.Stat(keyringFile); err != nil {
389		return err
390	}
391
392	// Read in the keyring file data
393	keyringData, err := ioutil.ReadFile(keyringFile)
394	if err != nil {
395		return fmt.Errorf("Failed to read keyring file: %s", err)
396	}
397
398	// Decode keyring JSON
399	keys := make([]string, 0)
400	if err := json.Unmarshal(keyringData, &keys); err != nil {
401		return fmt.Errorf("Failed to decode keyring file: %s", err)
402	}
403
404	// Decode base64 values
405	keysDecoded := make([][]byte, len(keys))
406	for i, key := range keys {
407		keyBytes, err := base64.StdEncoding.DecodeString(key)
408		if err != nil {
409			return fmt.Errorf("Failed to decode key from keyring: %s", err)
410		}
411		keysDecoded[i] = keyBytes
412	}
413
414	// Guard against empty keyring file
415	if len(keysDecoded) == 0 {
416		return fmt.Errorf("Keyring file contains no keys")
417	}
418
419	// Create the keyring
420	keyring, err := memberlist.NewKeyring(keysDecoded, keysDecoded[0])
421	if err != nil {
422		return fmt.Errorf("Failed to restore keyring: %s", err)
423	}
424	a.conf.MemberlistConfig.Keyring = keyring
425	a.logger.Printf("[INFO] agent: Restored keyring with %d keys from %s",
426		len(keys), keyringFile)
427
428	// Success!
429	return nil
430}
431
432// Stats is used to get various runtime information and stats
433func (a *Agent) Stats() map[string]map[string]string {
434	local := a.serf.LocalMember()
435	event_handlers := make(map[string]string)
436
437	// Convert event handlers from a string slice to a string map
438	for _, script := range a.agentConf.EventScripts() {
439		script_filter := fmt.Sprintf("%s:%s", script.EventFilter.Event, script.EventFilter.Name)
440		event_handlers[script_filter] = script.Script
441	}
442
443	output := map[string]map[string]string{
444		"agent": map[string]string{
445			"name": local.Name,
446		},
447		"runtime":        runtimeStats(),
448		"serf":           a.serf.Stats(),
449		"tags":           local.Tags,
450		"event_handlers": event_handlers,
451	}
452	return output
453}
454