1package agent
2
3import (
4	"encoding/base64"
5	"encoding/json"
6	"fmt"
7	"io"
8	"net"
9	"os"
10	"path/filepath"
11	"sort"
12	"strings"
13	"time"
14
15	"github.com/hashicorp/serf/serf"
16	"github.com/mitchellh/mapstructure"
17)
18
19// This is the default port that we use for Serf communication
20const DefaultBindPort int = 7946
21
22// DefaultConfig contains the defaults for configurations.
23func DefaultConfig() *Config {
24	return &Config{
25		DisableCoordinates:     false,
26		Tags:                   make(map[string]string),
27		BindAddr:               "0.0.0.0",
28		AdvertiseAddr:          "",
29		LogLevel:               "INFO",
30		RPCAddr:                "127.0.0.1:7373",
31		Protocol:               serf.ProtocolVersionMax,
32		ReplayOnJoin:           false,
33		Profile:                "lan",
34		RetryInterval:          30 * time.Second,
35		SyslogFacility:         "LOCAL0",
36		QueryResponseSizeLimit: 1024,
37		QuerySizeLimit:         1024,
38		UserEventSizeLimit:     512,
39		BroadcastTimeout:       5 * time.Second,
40	}
41}
42
43type dirEnts []os.FileInfo
44
45// Config is the configuration that can be set for an Agent. Some of these
46// configurations are exposed as command-line flags to `serf agent`, whereas
47// many of the more advanced configurations can only be set by creating
48// a configuration file.
49type Config struct {
50	// All the configurations in this section are identical to their
51	// Serf counterparts. See the documentation for Serf.Config for
52	// more info.
53	NodeName           string `mapstructure:"node_name"`
54	Role               string `mapstructure:"role"`
55	DisableCoordinates bool   `mapstructure:"disable_coordinates"`
56
57	// Tags are used to attach key/value metadata to a node. They have
58	// replaced 'Role' as a more flexible meta data mechanism. For compatibility,
59	// the 'role' key is special, and is used for backwards compatibility.
60	Tags map[string]string `mapstructure:"tags"`
61
62	// TagsFile is the path to a file where Serf can store its tags. Tag
63	// persistence is desirable since tags may be set or deleted while the
64	// agent is running. Tags can be reloaded from this file on later starts.
65	TagsFile string `mapstructure:"tags_file"`
66
67	// BindAddr is the address that the Serf agent's communication ports
68	// will bind to. Serf will use this address to bind to for both TCP
69	// and UDP connections. If no port is present in the address, the default
70	// port will be used.
71	BindAddr string `mapstructure:"bind"`
72
73	// AdvertiseAddr is the address that the Serf agent will advertise to
74	// other members of the cluster. Can be used for basic NAT traversal
75	// where both the internal ip:port and external ip:port are known.
76	AdvertiseAddr string `mapstructure:"advertise"`
77
78	// EncryptKey is the secret key to use for encrypting communication
79	// traffic for Serf. The secret key must be exactly 32-bytes, base64
80	// encoded. The easiest way to do this on Unix machines is this command:
81	// "head -c32 /dev/urandom | base64". If this is not specified, the
82	// traffic will not be encrypted.
83	EncryptKey string `mapstructure:"encrypt_key"`
84
85	// KeyringFile is the path to a file containing a serialized keyring.
86	// The keyring is used to facilitate encryption. If left blank, the
87	// keyring will not be persisted to a file.
88	KeyringFile string `mapstructure:"keyring_file"`
89
90	// LogLevel is the level of the logs to output.
91	// This can be updated during a reload.
92	LogLevel string `mapstructure:"log_level"`
93
94	// RPCAddr is the address and port to listen on for the agent's RPC
95	// interface.
96	RPCAddr string `mapstructure:"rpc_addr"`
97
98	// RPCAuthKey is a key that can be set to optionally require that
99	// RPC's provide an authentication key. This is meant to be
100	// a very simple authentication control
101	RPCAuthKey string `mapstructure:"rpc_auth"`
102
103	// Protocol is the Serf protocol version to use.
104	Protocol int `mapstructure:"protocol"`
105
106	// ReplayOnJoin tells Serf to replay past user events
107	// when joining based on a `StartJoin`.
108	ReplayOnJoin bool `mapstructure:"replay_on_join"`
109
110	// QueryResponseSizeLimit and QuerySizeLimit limit the inbound and
111	// outbound payload sizes for queries, respectively. These must fit
112	// in a UDP packet with some additional overhead, so tuning these
113	// past the default values of 1024 will depend on your network
114	// configuration.
115	QueryResponseSizeLimit int `mapstructure:"query_response_size_limit"`
116	QuerySizeLimit         int `mapstructure:"query_size_limit"`
117
118	// UserEventSizeLimit is maximum byte size limit of user event `name` + `payload` in bytes.
119	// It's optimal to be relatively small, since it's going to be gossiped through the cluster.
120	UserEventSizeLimit int `mapstructure:"user_event_size_limit"`
121
122	// StartJoin is a list of addresses to attempt to join when the
123	// agent starts. If Serf is unable to communicate with any of these
124	// addresses, then the agent will error and exit.
125	StartJoin []string `mapstructure:"start_join"`
126
127	// EventHandlers is a list of event handlers that will be invoked.
128	// These can be updated during a reload.
129	EventHandlers []string `mapstructure:"event_handlers"`
130
131	// Profile is used to select a timing profile for Serf. The supported choices
132	// are "wan", "lan", and "local". The default is "lan"
133	Profile string `mapstructure:"profile"`
134
135	// SnapshotPath is used to allow Serf to snapshot important transactional
136	// state to make a more graceful recovery possible. This enables auto
137	// re-joining a cluster on failure and avoids old message replay.
138	SnapshotPath string `mapstructure:"snapshot_path"`
139
140	// LeaveOnTerm controls if Serf does a graceful leave when receiving
141	// the TERM signal. Defaults false. This can be changed on reload.
142	LeaveOnTerm bool `mapstructure:"leave_on_terminate"`
143
144	// SkipLeaveOnInt controls if Serf skips a graceful leave when receiving
145	// the INT signal. Defaults false. This can be changed on reload.
146	SkipLeaveOnInt bool `mapstructure:"skip_leave_on_interrupt"`
147
148	// Discover is used to setup an mDNS Discovery name. When this is set, the
149	// agent will setup an mDNS responder and periodically run an mDNS query
150	// to look for peers. For peers on a network that supports multicast, this
151	// allows Serf agents to join each other with zero configuration.
152	Discover string `mapstructure:"discover"`
153
154	// Interface is used to provide a binding interface to use. It can be
155	// used instead of providing a bind address, as Serf will discover the
156	// address of the provided interface. It is also used to set the multicast
157	// device used with `-discover`.
158	Interface string `mapstructure:"interface"`
159
160	// ReconnectIntervalRaw is the string reconnect interval time. This interval
161	// controls how often we attempt to connect to a failed node.
162	ReconnectIntervalRaw string        `mapstructure:"reconnect_interval"`
163	ReconnectInterval    time.Duration `mapstructure:"-"`
164
165	// ReconnectTimeoutRaw is the string reconnect timeout. This timeout controls
166	// for how long we attempt to connect to a failed node before removing
167	// it from the cluster.
168	ReconnectTimeoutRaw string        `mapstructure:"reconnect_timeout"`
169	ReconnectTimeout    time.Duration `mapstructure:"-"`
170
171	// TombstoneTimeoutRaw is the string tombstone timeout. This timeout controls
172	// for how long we remember a left node before removing it from the cluster.
173	TombstoneTimeoutRaw string        `mapstructure:"tombstone_timeout"`
174	TombstoneTimeout    time.Duration `mapstructure:"-"`
175
176	// By default Serf will attempt to resolve name conflicts. This is done by
177	// determining which node the majority believe to be the proper node, and
178	// by having the minority node shutdown. If you want to disable this behavior,
179	// then this flag can be set to true.
180	DisableNameResolution bool `mapstructure:"disable_name_resolution"`
181
182	// EnableSyslog is used to also tee all the logs over to syslog. Only supported
183	// on linux and OSX. Other platforms will generate an error.
184	EnableSyslog bool `mapstructure:"enable_syslog"`
185
186	// SyslogFacility is used to control which syslog facility messages are
187	// sent to. Defaults to LOCAL0.
188	SyslogFacility string `mapstructure:"syslog_facility"`
189
190	// RetryJoin is a list of addresses to attempt to join when the
191	// agent starts. Serf will continue to retry the join until it
192	// succeeds or RetryMaxAttempts is reached.
193	RetryJoin []string `mapstructure:"retry_join"`
194
195	// RetryMaxAttempts is used to limit the maximum attempts made
196	// by RetryJoin to reach other nodes. If this is 0, then no limit
197	// is imposed, and Serf will continue to try forever. Defaults to 0.
198	RetryMaxAttempts int `mapstructure:"retry_max_attempts"`
199
200	// RetryIntervalRaw is the string retry interval. This interval
201	// controls how often we retry the join for RetryJoin. This defaults
202	// to 30 seconds.
203	RetryIntervalRaw string        `mapstructure:"retry_interval"`
204	RetryInterval    time.Duration `mapstructure:"-"`
205
206	// RejoinAfterLeave controls our interaction with the snapshot file.
207	// When set to false (default), a leave causes a Serf to not rejoin
208	// the cluster until an explicit join is received. If this is set to
209	// true, we ignore the leave, and rejoin the cluster on start. This
210	// only has an affect if the snapshot file is enabled.
211	RejoinAfterLeave bool `mapstructure:"rejoin_after_leave"`
212
213	// EnableCompression specifies whether message compression is enabled
214	// by `github.com/hashicorp/memberlist` when broadcasting events.
215	EnableCompression bool `mapstructure:"enable_compression"`
216
217	// StatsiteAddr is the address of a statsite instance. If provided,
218	// metrics will be streamed to that instance.
219	StatsiteAddr string `mapstructure:"statsite_addr"`
220
221	// StatsdAddr is the address of a statsd instance. If provided,
222	// metrics will be sent to that instance.
223	StatsdAddr string `mapstructure:"statsd_addr"`
224
225	// BroadcastTimeoutRaw is the string retry interval. This interval
226	// controls the timeout for broadcast events. This defaults to
227	// 5 seconds.
228	BroadcastTimeoutRaw string        `mapstructure:"broadcast_timeout"`
229	BroadcastTimeout    time.Duration `mapstructure:"-"`
230
231	// ValidateNodeNames controls whether nodenames only
232	// contain alphanumeric, dashes and '.'characters
233	// and sets maximum length to 128 characters
234	ValidateNodeNames bool `mapstructure:"validate_node_names"`
235}
236
237// BindAddrParts returns the parts of the BindAddr that should be
238// used to configure Serf.
239func (c *Config) AddrParts(address string) (string, int, error) {
240	checkAddr := address
241
242START:
243	_, _, err := net.SplitHostPort(checkAddr)
244	if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" {
245		checkAddr = fmt.Sprintf("%s:%d", checkAddr, DefaultBindPort)
246		goto START
247	}
248	if err != nil {
249		return "", 0, err
250	}
251
252	// Get the address
253	addr, err := net.ResolveTCPAddr("tcp", checkAddr)
254	if err != nil {
255		return "", 0, err
256	}
257
258	return addr.IP.String(), addr.Port, nil
259}
260
261// EncryptBytes returns the encryption key configured.
262func (c *Config) EncryptBytes() ([]byte, error) {
263	return base64.StdEncoding.DecodeString(c.EncryptKey)
264}
265
266// EventScripts returns the list of EventScripts associated with this
267// configuration and specified by the "event_handlers" configuration.
268func (c *Config) EventScripts() []EventScript {
269	result := make([]EventScript, 0, len(c.EventHandlers))
270	for _, v := range c.EventHandlers {
271		part := ParseEventScript(v)
272		result = append(result, part...)
273	}
274	return result
275}
276
277// Networkinterface is used to get the associated network
278// interface from the configured value
279func (c *Config) NetworkInterface() (*net.Interface, error) {
280	if c.Interface == "" {
281		return nil, nil
282	}
283	return net.InterfaceByName(c.Interface)
284}
285
286// DecodeConfig reads the configuration from the given reader in JSON
287// format and decodes it into a proper Config structure.
288func DecodeConfig(r io.Reader) (*Config, error) {
289	var raw interface{}
290	dec := json.NewDecoder(r)
291	if err := dec.Decode(&raw); err != nil {
292		return nil, err
293	}
294
295	// Decode
296	var md mapstructure.Metadata
297	var result Config
298	msdec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
299		Metadata:    &md,
300		Result:      &result,
301		ErrorUnused: true,
302	})
303	if err != nil {
304		return nil, err
305	}
306
307	if err := msdec.Decode(raw); err != nil {
308		return nil, err
309	}
310
311	// Decode the time values
312	if result.ReconnectIntervalRaw != "" {
313		dur, err := time.ParseDuration(result.ReconnectIntervalRaw)
314		if err != nil {
315			return nil, err
316		}
317		result.ReconnectInterval = dur
318	}
319
320	if result.ReconnectTimeoutRaw != "" {
321		dur, err := time.ParseDuration(result.ReconnectTimeoutRaw)
322		if err != nil {
323			return nil, err
324		}
325		result.ReconnectTimeout = dur
326	}
327
328	if result.TombstoneTimeoutRaw != "" {
329		dur, err := time.ParseDuration(result.TombstoneTimeoutRaw)
330		if err != nil {
331			return nil, err
332		}
333		result.TombstoneTimeout = dur
334	}
335
336	if result.RetryIntervalRaw != "" {
337		dur, err := time.ParseDuration(result.RetryIntervalRaw)
338		if err != nil {
339			return nil, err
340		}
341		result.RetryInterval = dur
342	}
343
344	if result.BroadcastTimeoutRaw != "" {
345		dur, err := time.ParseDuration(result.BroadcastTimeoutRaw)
346		if err != nil {
347			return nil, err
348		}
349		result.BroadcastTimeout = dur
350	}
351
352	return &result, nil
353}
354
355// containsKey is used to check if a slice of string keys contains
356// another key
357func containsKey(keys []string, key string) bool {
358	for _, k := range keys {
359		if k == key {
360			return true
361		}
362	}
363	return false
364}
365
366// MergeConfig merges two configurations together to make a single new
367// configuration.
368func MergeConfig(a, b *Config) *Config {
369	var result Config = *a
370
371	if b.NodeName != "" {
372		result.NodeName = b.NodeName
373	}
374	if b.Role != "" {
375		result.Role = b.Role
376	}
377	if b.DisableCoordinates == true {
378		result.DisableCoordinates = true
379	}
380	if b.Tags != nil {
381		if result.Tags == nil {
382			result.Tags = make(map[string]string)
383		}
384		for name, value := range b.Tags {
385			result.Tags[name] = value
386		}
387	}
388	if b.BindAddr != "" {
389		result.BindAddr = b.BindAddr
390	}
391	if b.AdvertiseAddr != "" {
392		result.AdvertiseAddr = b.AdvertiseAddr
393	}
394	if b.EncryptKey != "" {
395		result.EncryptKey = b.EncryptKey
396	}
397	if b.LogLevel != "" {
398		result.LogLevel = b.LogLevel
399	}
400	if b.Protocol > 0 {
401		result.Protocol = b.Protocol
402	}
403	if b.RPCAddr != "" {
404		result.RPCAddr = b.RPCAddr
405	}
406	if b.RPCAuthKey != "" {
407		result.RPCAuthKey = b.RPCAuthKey
408	}
409	if b.ReplayOnJoin != false {
410		result.ReplayOnJoin = b.ReplayOnJoin
411	}
412	if b.Profile != "" {
413		result.Profile = b.Profile
414	}
415	if b.SnapshotPath != "" {
416		result.SnapshotPath = b.SnapshotPath
417	}
418	if b.LeaveOnTerm == true {
419		result.LeaveOnTerm = true
420	}
421	if b.SkipLeaveOnInt == true {
422		result.SkipLeaveOnInt = true
423	}
424	if b.Discover != "" {
425		result.Discover = b.Discover
426	}
427	if b.Interface != "" {
428		result.Interface = b.Interface
429	}
430	if b.ReconnectInterval != 0 {
431		result.ReconnectInterval = b.ReconnectInterval
432	}
433	if b.ReconnectTimeout != 0 {
434		result.ReconnectTimeout = b.ReconnectTimeout
435	}
436	if b.TombstoneTimeout != 0 {
437		result.TombstoneTimeout = b.TombstoneTimeout
438	}
439	if b.DisableNameResolution {
440		result.DisableNameResolution = true
441	}
442	if b.TagsFile != "" {
443		result.TagsFile = b.TagsFile
444	}
445	if b.KeyringFile != "" {
446		result.KeyringFile = b.KeyringFile
447	}
448	if b.EnableSyslog {
449		result.EnableSyslog = true
450	}
451	if b.RetryMaxAttempts != 0 {
452		result.RetryMaxAttempts = b.RetryMaxAttempts
453	}
454	if b.RetryInterval != 0 {
455		result.RetryInterval = b.RetryInterval
456	}
457	if b.RejoinAfterLeave {
458		result.RejoinAfterLeave = true
459	}
460	if b.SyslogFacility != "" {
461		result.SyslogFacility = b.SyslogFacility
462	}
463	if b.StatsiteAddr != "" {
464		result.StatsiteAddr = b.StatsiteAddr
465	}
466	if b.StatsdAddr != "" {
467		result.StatsdAddr = b.StatsdAddr
468	}
469	if b.QueryResponseSizeLimit != 0 {
470		result.QueryResponseSizeLimit = b.QueryResponseSizeLimit
471	}
472	if b.QuerySizeLimit != 0 {
473		result.QuerySizeLimit = b.QuerySizeLimit
474	}
475	if b.UserEventSizeLimit != 0 {
476		result.UserEventSizeLimit = b.UserEventSizeLimit
477	}
478	if b.BroadcastTimeout != 0 {
479		result.BroadcastTimeout = b.BroadcastTimeout
480	}
481	result.EnableCompression = b.EnableCompression
482
483	// Copy the event handlers
484	result.EventHandlers = make([]string, 0, len(a.EventHandlers)+len(b.EventHandlers))
485	result.EventHandlers = append(result.EventHandlers, a.EventHandlers...)
486	result.EventHandlers = append(result.EventHandlers, b.EventHandlers...)
487
488	// Copy the start join addresses
489	result.StartJoin = make([]string, 0, len(a.StartJoin)+len(b.StartJoin))
490	result.StartJoin = append(result.StartJoin, a.StartJoin...)
491	result.StartJoin = append(result.StartJoin, b.StartJoin...)
492
493	// Copy the retry join addresses
494	result.RetryJoin = make([]string, 0, len(a.RetryJoin)+len(b.RetryJoin))
495	result.RetryJoin = append(result.RetryJoin, a.RetryJoin...)
496	result.RetryJoin = append(result.RetryJoin, b.RetryJoin...)
497
498	return &result
499}
500
501// ReadConfigPaths reads the paths in the given order to load configurations.
502// The paths can be to files or directories. If the path is a directory,
503// we read one directory deep and read any files ending in ".json" as
504// configuration files.
505func ReadConfigPaths(paths []string) (*Config, error) {
506	result := new(Config)
507	for _, path := range paths {
508		f, err := os.Open(path)
509		if err != nil {
510			return nil, fmt.Errorf("Error reading '%s': %s", path, err)
511		}
512
513		fi, err := f.Stat()
514		if err != nil {
515			f.Close()
516			return nil, fmt.Errorf("Error reading '%s': %s", path, err)
517		}
518
519		if !fi.IsDir() {
520			config, err := DecodeConfig(f)
521			f.Close()
522
523			if err != nil {
524				return nil, fmt.Errorf("Error decoding '%s': %s", path, err)
525			}
526
527			result = MergeConfig(result, config)
528			continue
529		}
530
531		contents, err := f.Readdir(-1)
532		f.Close()
533		if err != nil {
534			return nil, fmt.Errorf("Error reading '%s': %s", path, err)
535		}
536
537		// Sort the contents, ensures lexical order
538		sort.Sort(dirEnts(contents))
539
540		for _, fi := range contents {
541			// Don't recursively read contents
542			if fi.IsDir() {
543				continue
544			}
545
546			// If it isn't a JSON file, ignore it
547			if !strings.HasSuffix(fi.Name(), ".json") {
548				continue
549			}
550
551			subpath := filepath.Join(path, fi.Name())
552			f, err := os.Open(subpath)
553			if err != nil {
554				return nil, fmt.Errorf("Error reading '%s': %s", subpath, err)
555			}
556
557			config, err := DecodeConfig(f)
558			f.Close()
559
560			if err != nil {
561				return nil, fmt.Errorf("Error decoding '%s': %s", subpath, err)
562			}
563
564			result = MergeConfig(result, config)
565		}
566	}
567
568	return result, nil
569}
570
571// Implement the sort interface for dirEnts
572func (d dirEnts) Len() int {
573	return len(d)
574}
575
576func (d dirEnts) Less(i, j int) bool {
577	return d[i].Name() < d[j].Name()
578}
579
580func (d dirEnts) Swap(i, j int) {
581	d[i], d[j] = d[j], d[i]
582}
583