1package agent 2 3import ( 4 "encoding/base64" 5 "encoding/json" 6 "fmt" 7 "io" 8 "net" 9 "os" 10 "path/filepath" 11 "sort" 12 "strings" 13 "time" 14 15 "github.com/hashicorp/serf/serf" 16 "github.com/mitchellh/mapstructure" 17) 18 19// This is the default port that we use for Serf communication 20const DefaultBindPort int = 7946 21 22// DefaultConfig contains the defaults for configurations. 23func DefaultConfig() *Config { 24 return &Config{ 25 DisableCoordinates: false, 26 Tags: make(map[string]string), 27 BindAddr: "0.0.0.0", 28 AdvertiseAddr: "", 29 LogLevel: "INFO", 30 RPCAddr: "127.0.0.1:7373", 31 Protocol: serf.ProtocolVersionMax, 32 ReplayOnJoin: false, 33 Profile: "lan", 34 RetryInterval: 30 * time.Second, 35 SyslogFacility: "LOCAL0", 36 QueryResponseSizeLimit: 1024, 37 QuerySizeLimit: 1024, 38 UserEventSizeLimit: 512, 39 BroadcastTimeout: 5 * time.Second, 40 } 41} 42 43type dirEnts []os.FileInfo 44 45// Config is the configuration that can be set for an Agent. Some of these 46// configurations are exposed as command-line flags to `serf agent`, whereas 47// many of the more advanced configurations can only be set by creating 48// a configuration file. 49type Config struct { 50 // All the configurations in this section are identical to their 51 // Serf counterparts. See the documentation for Serf.Config for 52 // more info. 53 NodeName string `mapstructure:"node_name"` 54 Role string `mapstructure:"role"` 55 DisableCoordinates bool `mapstructure:"disable_coordinates"` 56 57 // Tags are used to attach key/value metadata to a node. They have 58 // replaced 'Role' as a more flexible meta data mechanism. For compatibility, 59 // the 'role' key is special, and is used for backwards compatibility. 60 Tags map[string]string `mapstructure:"tags"` 61 62 // TagsFile is the path to a file where Serf can store its tags. Tag 63 // persistence is desirable since tags may be set or deleted while the 64 // agent is running. Tags can be reloaded from this file on later starts. 65 TagsFile string `mapstructure:"tags_file"` 66 67 // BindAddr is the address that the Serf agent's communication ports 68 // will bind to. Serf will use this address to bind to for both TCP 69 // and UDP connections. If no port is present in the address, the default 70 // port will be used. 71 BindAddr string `mapstructure:"bind"` 72 73 // AdvertiseAddr is the address that the Serf agent will advertise to 74 // other members of the cluster. Can be used for basic NAT traversal 75 // where both the internal ip:port and external ip:port are known. 76 AdvertiseAddr string `mapstructure:"advertise"` 77 78 // EncryptKey is the secret key to use for encrypting communication 79 // traffic for Serf. The secret key must be exactly 32-bytes, base64 80 // encoded. The easiest way to do this on Unix machines is this command: 81 // "head -c32 /dev/urandom | base64". If this is not specified, the 82 // traffic will not be encrypted. 83 EncryptKey string `mapstructure:"encrypt_key"` 84 85 // KeyringFile is the path to a file containing a serialized keyring. 86 // The keyring is used to facilitate encryption. If left blank, the 87 // keyring will not be persisted to a file. 88 KeyringFile string `mapstructure:"keyring_file"` 89 90 // LogLevel is the level of the logs to output. 91 // This can be updated during a reload. 92 LogLevel string `mapstructure:"log_level"` 93 94 // RPCAddr is the address and port to listen on for the agent's RPC 95 // interface. 96 RPCAddr string `mapstructure:"rpc_addr"` 97 98 // RPCAuthKey is a key that can be set to optionally require that 99 // RPC's provide an authentication key. This is meant to be 100 // a very simple authentication control 101 RPCAuthKey string `mapstructure:"rpc_auth"` 102 103 // Protocol is the Serf protocol version to use. 104 Protocol int `mapstructure:"protocol"` 105 106 // ReplayOnJoin tells Serf to replay past user events 107 // when joining based on a `StartJoin`. 108 ReplayOnJoin bool `mapstructure:"replay_on_join"` 109 110 // QueryResponseSizeLimit and QuerySizeLimit limit the inbound and 111 // outbound payload sizes for queries, respectively. These must fit 112 // in a UDP packet with some additional overhead, so tuning these 113 // past the default values of 1024 will depend on your network 114 // configuration. 115 QueryResponseSizeLimit int `mapstructure:"query_response_size_limit"` 116 QuerySizeLimit int `mapstructure:"query_size_limit"` 117 118 // UserEventSizeLimit is maximum byte size limit of user event `name` + `payload` in bytes. 119 // It's optimal to be relatively small, since it's going to be gossiped through the cluster. 120 UserEventSizeLimit int `mapstructure:"user_event_size_limit"` 121 122 // StartJoin is a list of addresses to attempt to join when the 123 // agent starts. If Serf is unable to communicate with any of these 124 // addresses, then the agent will error and exit. 125 StartJoin []string `mapstructure:"start_join"` 126 127 // EventHandlers is a list of event handlers that will be invoked. 128 // These can be updated during a reload. 129 EventHandlers []string `mapstructure:"event_handlers"` 130 131 // Profile is used to select a timing profile for Serf. The supported choices 132 // are "wan", "lan", and "local". The default is "lan" 133 Profile string `mapstructure:"profile"` 134 135 // SnapshotPath is used to allow Serf to snapshot important transactional 136 // state to make a more graceful recovery possible. This enables auto 137 // re-joining a cluster on failure and avoids old message replay. 138 SnapshotPath string `mapstructure:"snapshot_path"` 139 140 // LeaveOnTerm controls if Serf does a graceful leave when receiving 141 // the TERM signal. Defaults false. This can be changed on reload. 142 LeaveOnTerm bool `mapstructure:"leave_on_terminate"` 143 144 // SkipLeaveOnInt controls if Serf skips a graceful leave when receiving 145 // the INT signal. Defaults false. This can be changed on reload. 146 SkipLeaveOnInt bool `mapstructure:"skip_leave_on_interrupt"` 147 148 // Discover is used to setup an mDNS Discovery name. When this is set, the 149 // agent will setup an mDNS responder and periodically run an mDNS query 150 // to look for peers. For peers on a network that supports multicast, this 151 // allows Serf agents to join each other with zero configuration. 152 Discover string `mapstructure:"discover"` 153 154 // Interface is used to provide a binding interface to use. It can be 155 // used instead of providing a bind address, as Serf will discover the 156 // address of the provided interface. It is also used to set the multicast 157 // device used with `-discover`. 158 Interface string `mapstructure:"interface"` 159 160 // ReconnectIntervalRaw is the string reconnect interval time. This interval 161 // controls how often we attempt to connect to a failed node. 162 ReconnectIntervalRaw string `mapstructure:"reconnect_interval"` 163 ReconnectInterval time.Duration `mapstructure:"-"` 164 165 // ReconnectTimeoutRaw is the string reconnect timeout. This timeout controls 166 // for how long we attempt to connect to a failed node before removing 167 // it from the cluster. 168 ReconnectTimeoutRaw string `mapstructure:"reconnect_timeout"` 169 ReconnectTimeout time.Duration `mapstructure:"-"` 170 171 // TombstoneTimeoutRaw is the string tombstone timeout. This timeout controls 172 // for how long we remember a left node before removing it from the cluster. 173 TombstoneTimeoutRaw string `mapstructure:"tombstone_timeout"` 174 TombstoneTimeout time.Duration `mapstructure:"-"` 175 176 // By default Serf will attempt to resolve name conflicts. This is done by 177 // determining which node the majority believe to be the proper node, and 178 // by having the minority node shutdown. If you want to disable this behavior, 179 // then this flag can be set to true. 180 DisableNameResolution bool `mapstructure:"disable_name_resolution"` 181 182 // EnableSyslog is used to also tee all the logs over to syslog. Only supported 183 // on linux and OSX. Other platforms will generate an error. 184 EnableSyslog bool `mapstructure:"enable_syslog"` 185 186 // SyslogFacility is used to control which syslog facility messages are 187 // sent to. Defaults to LOCAL0. 188 SyslogFacility string `mapstructure:"syslog_facility"` 189 190 // RetryJoin is a list of addresses to attempt to join when the 191 // agent starts. Serf will continue to retry the join until it 192 // succeeds or RetryMaxAttempts is reached. 193 RetryJoin []string `mapstructure:"retry_join"` 194 195 // RetryMaxAttempts is used to limit the maximum attempts made 196 // by RetryJoin to reach other nodes. If this is 0, then no limit 197 // is imposed, and Serf will continue to try forever. Defaults to 0. 198 RetryMaxAttempts int `mapstructure:"retry_max_attempts"` 199 200 // RetryIntervalRaw is the string retry interval. This interval 201 // controls how often we retry the join for RetryJoin. This defaults 202 // to 30 seconds. 203 RetryIntervalRaw string `mapstructure:"retry_interval"` 204 RetryInterval time.Duration `mapstructure:"-"` 205 206 // RejoinAfterLeave controls our interaction with the snapshot file. 207 // When set to false (default), a leave causes a Serf to not rejoin 208 // the cluster until an explicit join is received. If this is set to 209 // true, we ignore the leave, and rejoin the cluster on start. This 210 // only has an affect if the snapshot file is enabled. 211 RejoinAfterLeave bool `mapstructure:"rejoin_after_leave"` 212 213 // EnableCompression specifies whether message compression is enabled 214 // by `github.com/hashicorp/memberlist` when broadcasting events. 215 EnableCompression bool `mapstructure:"enable_compression"` 216 217 // StatsiteAddr is the address of a statsite instance. If provided, 218 // metrics will be streamed to that instance. 219 StatsiteAddr string `mapstructure:"statsite_addr"` 220 221 // StatsdAddr is the address of a statsd instance. If provided, 222 // metrics will be sent to that instance. 223 StatsdAddr string `mapstructure:"statsd_addr"` 224 225 // BroadcastTimeoutRaw is the string retry interval. This interval 226 // controls the timeout for broadcast events. This defaults to 227 // 5 seconds. 228 BroadcastTimeoutRaw string `mapstructure:"broadcast_timeout"` 229 BroadcastTimeout time.Duration `mapstructure:"-"` 230 231 // ValidateNodeNames controls whether nodenames only 232 // contain alphanumeric, dashes and '.'characters 233 // and sets maximum length to 128 characters 234 ValidateNodeNames bool `mapstructure:"validate_node_names"` 235} 236 237// BindAddrParts returns the parts of the BindAddr that should be 238// used to configure Serf. 239func (c *Config) AddrParts(address string) (string, int, error) { 240 checkAddr := address 241 242START: 243 _, _, err := net.SplitHostPort(checkAddr) 244 if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" { 245 checkAddr = fmt.Sprintf("%s:%d", checkAddr, DefaultBindPort) 246 goto START 247 } 248 if err != nil { 249 return "", 0, err 250 } 251 252 // Get the address 253 addr, err := net.ResolveTCPAddr("tcp", checkAddr) 254 if err != nil { 255 return "", 0, err 256 } 257 258 return addr.IP.String(), addr.Port, nil 259} 260 261// EncryptBytes returns the encryption key configured. 262func (c *Config) EncryptBytes() ([]byte, error) { 263 return base64.StdEncoding.DecodeString(c.EncryptKey) 264} 265 266// EventScripts returns the list of EventScripts associated with this 267// configuration and specified by the "event_handlers" configuration. 268func (c *Config) EventScripts() []EventScript { 269 result := make([]EventScript, 0, len(c.EventHandlers)) 270 for _, v := range c.EventHandlers { 271 part := ParseEventScript(v) 272 result = append(result, part...) 273 } 274 return result 275} 276 277// Networkinterface is used to get the associated network 278// interface from the configured value 279func (c *Config) NetworkInterface() (*net.Interface, error) { 280 if c.Interface == "" { 281 return nil, nil 282 } 283 return net.InterfaceByName(c.Interface) 284} 285 286// DecodeConfig reads the configuration from the given reader in JSON 287// format and decodes it into a proper Config structure. 288func DecodeConfig(r io.Reader) (*Config, error) { 289 var raw interface{} 290 dec := json.NewDecoder(r) 291 if err := dec.Decode(&raw); err != nil { 292 return nil, err 293 } 294 295 // Decode 296 var md mapstructure.Metadata 297 var result Config 298 msdec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ 299 Metadata: &md, 300 Result: &result, 301 ErrorUnused: true, 302 }) 303 if err != nil { 304 return nil, err 305 } 306 307 if err := msdec.Decode(raw); err != nil { 308 return nil, err 309 } 310 311 // Decode the time values 312 if result.ReconnectIntervalRaw != "" { 313 dur, err := time.ParseDuration(result.ReconnectIntervalRaw) 314 if err != nil { 315 return nil, err 316 } 317 result.ReconnectInterval = dur 318 } 319 320 if result.ReconnectTimeoutRaw != "" { 321 dur, err := time.ParseDuration(result.ReconnectTimeoutRaw) 322 if err != nil { 323 return nil, err 324 } 325 result.ReconnectTimeout = dur 326 } 327 328 if result.TombstoneTimeoutRaw != "" { 329 dur, err := time.ParseDuration(result.TombstoneTimeoutRaw) 330 if err != nil { 331 return nil, err 332 } 333 result.TombstoneTimeout = dur 334 } 335 336 if result.RetryIntervalRaw != "" { 337 dur, err := time.ParseDuration(result.RetryIntervalRaw) 338 if err != nil { 339 return nil, err 340 } 341 result.RetryInterval = dur 342 } 343 344 if result.BroadcastTimeoutRaw != "" { 345 dur, err := time.ParseDuration(result.BroadcastTimeoutRaw) 346 if err != nil { 347 return nil, err 348 } 349 result.BroadcastTimeout = dur 350 } 351 352 return &result, nil 353} 354 355// containsKey is used to check if a slice of string keys contains 356// another key 357func containsKey(keys []string, key string) bool { 358 for _, k := range keys { 359 if k == key { 360 return true 361 } 362 } 363 return false 364} 365 366// MergeConfig merges two configurations together to make a single new 367// configuration. 368func MergeConfig(a, b *Config) *Config { 369 var result Config = *a 370 371 if b.NodeName != "" { 372 result.NodeName = b.NodeName 373 } 374 if b.Role != "" { 375 result.Role = b.Role 376 } 377 if b.DisableCoordinates == true { 378 result.DisableCoordinates = true 379 } 380 if b.Tags != nil { 381 if result.Tags == nil { 382 result.Tags = make(map[string]string) 383 } 384 for name, value := range b.Tags { 385 result.Tags[name] = value 386 } 387 } 388 if b.BindAddr != "" { 389 result.BindAddr = b.BindAddr 390 } 391 if b.AdvertiseAddr != "" { 392 result.AdvertiseAddr = b.AdvertiseAddr 393 } 394 if b.EncryptKey != "" { 395 result.EncryptKey = b.EncryptKey 396 } 397 if b.LogLevel != "" { 398 result.LogLevel = b.LogLevel 399 } 400 if b.Protocol > 0 { 401 result.Protocol = b.Protocol 402 } 403 if b.RPCAddr != "" { 404 result.RPCAddr = b.RPCAddr 405 } 406 if b.RPCAuthKey != "" { 407 result.RPCAuthKey = b.RPCAuthKey 408 } 409 if b.ReplayOnJoin != false { 410 result.ReplayOnJoin = b.ReplayOnJoin 411 } 412 if b.Profile != "" { 413 result.Profile = b.Profile 414 } 415 if b.SnapshotPath != "" { 416 result.SnapshotPath = b.SnapshotPath 417 } 418 if b.LeaveOnTerm == true { 419 result.LeaveOnTerm = true 420 } 421 if b.SkipLeaveOnInt == true { 422 result.SkipLeaveOnInt = true 423 } 424 if b.Discover != "" { 425 result.Discover = b.Discover 426 } 427 if b.Interface != "" { 428 result.Interface = b.Interface 429 } 430 if b.ReconnectInterval != 0 { 431 result.ReconnectInterval = b.ReconnectInterval 432 } 433 if b.ReconnectTimeout != 0 { 434 result.ReconnectTimeout = b.ReconnectTimeout 435 } 436 if b.TombstoneTimeout != 0 { 437 result.TombstoneTimeout = b.TombstoneTimeout 438 } 439 if b.DisableNameResolution { 440 result.DisableNameResolution = true 441 } 442 if b.TagsFile != "" { 443 result.TagsFile = b.TagsFile 444 } 445 if b.KeyringFile != "" { 446 result.KeyringFile = b.KeyringFile 447 } 448 if b.EnableSyslog { 449 result.EnableSyslog = true 450 } 451 if b.RetryMaxAttempts != 0 { 452 result.RetryMaxAttempts = b.RetryMaxAttempts 453 } 454 if b.RetryInterval != 0 { 455 result.RetryInterval = b.RetryInterval 456 } 457 if b.RejoinAfterLeave { 458 result.RejoinAfterLeave = true 459 } 460 if b.SyslogFacility != "" { 461 result.SyslogFacility = b.SyslogFacility 462 } 463 if b.StatsiteAddr != "" { 464 result.StatsiteAddr = b.StatsiteAddr 465 } 466 if b.StatsdAddr != "" { 467 result.StatsdAddr = b.StatsdAddr 468 } 469 if b.QueryResponseSizeLimit != 0 { 470 result.QueryResponseSizeLimit = b.QueryResponseSizeLimit 471 } 472 if b.QuerySizeLimit != 0 { 473 result.QuerySizeLimit = b.QuerySizeLimit 474 } 475 if b.UserEventSizeLimit != 0 { 476 result.UserEventSizeLimit = b.UserEventSizeLimit 477 } 478 if b.BroadcastTimeout != 0 { 479 result.BroadcastTimeout = b.BroadcastTimeout 480 } 481 result.EnableCompression = b.EnableCompression 482 483 // Copy the event handlers 484 result.EventHandlers = make([]string, 0, len(a.EventHandlers)+len(b.EventHandlers)) 485 result.EventHandlers = append(result.EventHandlers, a.EventHandlers...) 486 result.EventHandlers = append(result.EventHandlers, b.EventHandlers...) 487 488 // Copy the start join addresses 489 result.StartJoin = make([]string, 0, len(a.StartJoin)+len(b.StartJoin)) 490 result.StartJoin = append(result.StartJoin, a.StartJoin...) 491 result.StartJoin = append(result.StartJoin, b.StartJoin...) 492 493 // Copy the retry join addresses 494 result.RetryJoin = make([]string, 0, len(a.RetryJoin)+len(b.RetryJoin)) 495 result.RetryJoin = append(result.RetryJoin, a.RetryJoin...) 496 result.RetryJoin = append(result.RetryJoin, b.RetryJoin...) 497 498 return &result 499} 500 501// ReadConfigPaths reads the paths in the given order to load configurations. 502// The paths can be to files or directories. If the path is a directory, 503// we read one directory deep and read any files ending in ".json" as 504// configuration files. 505func ReadConfigPaths(paths []string) (*Config, error) { 506 result := new(Config) 507 for _, path := range paths { 508 f, err := os.Open(path) 509 if err != nil { 510 return nil, fmt.Errorf("Error reading '%s': %s", path, err) 511 } 512 513 fi, err := f.Stat() 514 if err != nil { 515 f.Close() 516 return nil, fmt.Errorf("Error reading '%s': %s", path, err) 517 } 518 519 if !fi.IsDir() { 520 config, err := DecodeConfig(f) 521 f.Close() 522 523 if err != nil { 524 return nil, fmt.Errorf("Error decoding '%s': %s", path, err) 525 } 526 527 result = MergeConfig(result, config) 528 continue 529 } 530 531 contents, err := f.Readdir(-1) 532 f.Close() 533 if err != nil { 534 return nil, fmt.Errorf("Error reading '%s': %s", path, err) 535 } 536 537 // Sort the contents, ensures lexical order 538 sort.Sort(dirEnts(contents)) 539 540 for _, fi := range contents { 541 // Don't recursively read contents 542 if fi.IsDir() { 543 continue 544 } 545 546 // If it isn't a JSON file, ignore it 547 if !strings.HasSuffix(fi.Name(), ".json") { 548 continue 549 } 550 551 subpath := filepath.Join(path, fi.Name()) 552 f, err := os.Open(subpath) 553 if err != nil { 554 return nil, fmt.Errorf("Error reading '%s': %s", subpath, err) 555 } 556 557 config, err := DecodeConfig(f) 558 f.Close() 559 560 if err != nil { 561 return nil, fmt.Errorf("Error decoding '%s': %s", subpath, err) 562 } 563 564 result = MergeConfig(result, config) 565 } 566 } 567 568 return result, nil 569} 570 571// Implement the sort interface for dirEnts 572func (d dirEnts) Len() int { 573 return len(d) 574} 575 576func (d dirEnts) Less(i, j int) bool { 577 return d[i].Name() < d[j].Name() 578} 579 580func (d dirEnts) Swap(i, j int) { 581 d[i], d[j] = d[j], d[i] 582} 583