1package agent 2 3import ( 4 "flag" 5 "fmt" 6 "io" 7 "log" 8 "net" 9 "os" 10 "os/signal" 11 "runtime" 12 "strings" 13 "syscall" 14 "time" 15 16 "github.com/armon/go-metrics" 17 gsyslog "github.com/hashicorp/go-syslog" 18 "github.com/hashicorp/logutils" 19 "github.com/hashicorp/memberlist" 20 "github.com/hashicorp/serf/serf" 21 "github.com/mitchellh/cli" 22) 23 24const ( 25 // gracefulTimeout controls how long we wait before forcefully terminating 26 gracefulTimeout = 3 * time.Second 27 28 // minRetryInterval applies a lower bound to the join retry interval 29 minRetryInterval = time.Second 30 31 // minBroadcastTimeout applies a lower bound to the broadcast timeout interval 32 minBroadcastTimeout = time.Second 33) 34 35// Command is a Command implementation that runs a Serf agent. 36// The command will not end unless a shutdown message is sent on the 37// ShutdownCh. If two messages are sent on the ShutdownCh it will forcibly 38// exit. 39type Command struct { 40 Ui cli.Ui 41 ShutdownCh <-chan struct{} 42 args []string 43 scriptHandler *ScriptEventHandler 44 logFilter *logutils.LevelFilter 45 logger *log.Logger 46} 47 48var _ cli.Command = &Command{} 49 50// readConfig is responsible for setup of our configuration using 51// the command line and any file configs 52func (c *Command) readConfig() *Config { 53 var cmdConfig Config 54 var configFiles []string 55 var tags []string 56 var retryInterval string 57 var broadcastTimeout string 58 var disableCompression bool 59 60 cmdFlags := flag.NewFlagSet("agent", flag.ContinueOnError) 61 cmdFlags.Usage = func() { c.Ui.Output(c.Help()) } 62 cmdFlags.StringVar(&cmdConfig.BindAddr, "bind", "", "address to bind listeners to") 63 cmdFlags.StringVar(&cmdConfig.AdvertiseAddr, "advertise", "", "address to advertise to cluster") 64 cmdFlags.Var((*AppendSliceValue)(&configFiles), "config-file", 65 "json file to read config from") 66 cmdFlags.Var((*AppendSliceValue)(&configFiles), "config-dir", 67 "directory of json files to read") 68 cmdFlags.StringVar(&cmdConfig.EncryptKey, "encrypt", "", "encryption key") 69 cmdFlags.StringVar(&cmdConfig.KeyringFile, "keyring-file", "", "path to the keyring file") 70 cmdFlags.Var((*AppendSliceValue)(&cmdConfig.EventHandlers), "event-handler", 71 "command to execute when events occur") 72 cmdFlags.Var((*AppendSliceValue)(&cmdConfig.StartJoin), "join", 73 "address of agent to join on startup") 74 cmdFlags.BoolVar(&cmdConfig.ReplayOnJoin, "replay", false, 75 "replay events for startup join") 76 cmdFlags.StringVar(&cmdConfig.LogLevel, "log-level", "", "log level") 77 cmdFlags.StringVar(&cmdConfig.NodeName, "node", "", "node name") 78 cmdFlags.IntVar(&cmdConfig.Protocol, "protocol", -1, "protocol version") 79 cmdFlags.StringVar(&cmdConfig.Role, "role", "", "role name") 80 cmdFlags.StringVar(&cmdConfig.RPCAddr, "rpc-addr", "", 81 "address to bind RPC listener to") 82 cmdFlags.StringVar(&cmdConfig.Profile, "profile", "", "timing profile to use (lan, wan, local)") 83 cmdFlags.StringVar(&cmdConfig.SnapshotPath, "snapshot", "", "path to the snapshot file") 84 cmdFlags.Var((*AppendSliceValue)(&tags), "tag", 85 "tag pair, specified as key=value") 86 cmdFlags.StringVar(&cmdConfig.Discover, "discover", "", "mDNS discovery name") 87 cmdFlags.StringVar(&cmdConfig.Interface, "iface", "", "interface to bind to") 88 cmdFlags.StringVar(&cmdConfig.TagsFile, "tags-file", "", "tag persistence file") 89 cmdFlags.BoolVar(&cmdConfig.EnableSyslog, "syslog", false, 90 "enable logging to syslog facility") 91 cmdFlags.Var((*AppendSliceValue)(&cmdConfig.RetryJoin), "retry-join", 92 "address of agent to join on startup with retry") 93 cmdFlags.IntVar(&cmdConfig.RetryMaxAttempts, "retry-max", 0, "maximum retry join attempts") 94 cmdFlags.StringVar(&retryInterval, "retry-interval", "", "retry join interval") 95 cmdFlags.BoolVar(&cmdConfig.RejoinAfterLeave, "rejoin", false, 96 "enable re-joining after a previous leave") 97 98 cmdFlags.BoolVar( 99 &disableCompression, 100 "disable-compression", 101 false, 102 "disable message compression for broadcasting events", 103 ) 104 105 cmdFlags.StringVar(&broadcastTimeout, "broadcast-timeout", "", "timeout for broadcast messages") 106 if err := cmdFlags.Parse(c.args); err != nil { 107 return nil 108 } 109 110 cmdConfig.EnableCompression = !disableCompression 111 112 // Parse any command line tag values 113 tagValues, err := UnmarshalTags(tags) 114 if err != nil { 115 c.Ui.Error(fmt.Sprintf("Error: %s", err)) 116 return nil 117 } 118 cmdConfig.Tags = tagValues 119 120 // Decode the retry interval if given 121 if retryInterval != "" { 122 dur, err := time.ParseDuration(retryInterval) 123 if err != nil { 124 c.Ui.Error(fmt.Sprintf("Error: %s", err)) 125 return nil 126 } 127 cmdConfig.RetryInterval = dur 128 } 129 130 // Decode the broadcast timeout if given 131 if broadcastTimeout != "" { 132 dur, err := time.ParseDuration(broadcastTimeout) 133 if err != nil { 134 c.Ui.Error(fmt.Sprintf("Error: %s", err)) 135 return nil 136 } 137 cmdConfig.BroadcastTimeout = dur 138 } 139 140 config := DefaultConfig() 141 if len(configFiles) > 0 { 142 fileConfig, err := ReadConfigPaths(configFiles) 143 if err != nil { 144 c.Ui.Error(err.Error()) 145 return nil 146 } 147 148 config = MergeConfig(config, fileConfig) 149 } 150 151 config = MergeConfig(config, &cmdConfig) 152 153 if config.NodeName == "" { 154 hostname, err := os.Hostname() 155 if err != nil { 156 c.Ui.Error(fmt.Sprintf("Error determining hostname: %s", err)) 157 return nil 158 } 159 config.NodeName = hostname 160 } 161 162 eventScripts := config.EventScripts() 163 for _, script := range eventScripts { 164 if !script.Valid() { 165 c.Ui.Error(fmt.Sprintf("Invalid event script: %s", script.String())) 166 return nil 167 } 168 } 169 170 // Check for a valid interface 171 if _, err := config.NetworkInterface(); err != nil { 172 c.Ui.Error(fmt.Sprintf("Invalid network interface: %s", err)) 173 return nil 174 } 175 176 // Backward compatibility hack for 'Role' 177 if config.Role != "" { 178 c.Ui.Output("Deprecation warning: 'Role' has been replaced with 'Tags'") 179 config.Tags["role"] = config.Role 180 } 181 182 // Check for sane retry interval 183 if config.RetryInterval < minRetryInterval { 184 config.RetryInterval = minRetryInterval 185 c.Ui.Output(fmt.Sprintf("Warning: 'RetryInterval' is too low. Setting to %v", config.RetryInterval)) 186 } 187 188 // Check for sane broadcast timeout 189 if config.BroadcastTimeout < minBroadcastTimeout { 190 config.BroadcastTimeout = minBroadcastTimeout 191 c.Ui.Output(fmt.Sprintf("Warning: 'BroadcastTimeout' is too low. Setting to %v", 192 config.BroadcastTimeout)) 193 } 194 195 // Check snapshot file is provided if we have RejoinAfterLeave 196 if config.RejoinAfterLeave && config.SnapshotPath == "" { 197 c.Ui.Output("Warning: 'RejoinAfterLeave' enabled without snapshot file") 198 } 199 200 return config 201} 202 203// setupAgent is used to create the agent we use 204func (c *Command) setupAgent(config *Config, logOutput io.Writer) *Agent { 205 bindIP, bindPort, err := config.AddrParts(config.BindAddr) 206 if err != nil { 207 c.Ui.Error(fmt.Sprintf("Invalid bind address: %s", err)) 208 return nil 209 } 210 211 // Check if we have an interface 212 if iface, _ := config.NetworkInterface(); iface != nil { 213 addrs, err := iface.Addrs() 214 if err != nil { 215 c.Ui.Error(fmt.Sprintf("Failed to get interface addresses: %s", err)) 216 return nil 217 } 218 if len(addrs) == 0 { 219 c.Ui.Error(fmt.Sprintf("Interface '%s' has no addresses", config.Interface)) 220 return nil 221 } 222 223 // If there is no bind IP, pick an address 224 if bindIP == "0.0.0.0" { 225 found := false 226 for _, a := range addrs { 227 var addrIP net.IP 228 if runtime.GOOS == "windows" { 229 // Waiting for https://github.com/golang/go/issues/5395 to use IPNet only 230 addr, ok := a.(*net.IPAddr) 231 if !ok { 232 continue 233 } 234 addrIP = addr.IP 235 } else { 236 addr, ok := a.(*net.IPNet) 237 if !ok { 238 continue 239 } 240 addrIP = addr.IP 241 } 242 243 // Skip self-assigned IPs 244 if addrIP.IsLinkLocalUnicast() { 245 continue 246 } 247 248 // Found an IP 249 found = true 250 bindIP = addrIP.String() 251 c.Ui.Output(fmt.Sprintf("Using interface '%s' address '%s'", 252 config.Interface, bindIP)) 253 254 // Update the configuration 255 bindAddr := &net.TCPAddr{ 256 IP: net.ParseIP(bindIP), 257 Port: bindPort, 258 } 259 config.BindAddr = bindAddr.String() 260 break 261 } 262 if !found { 263 c.Ui.Error(fmt.Sprintf("Failed to find usable address for interface '%s'", config.Interface)) 264 return nil 265 } 266 267 } else { 268 // If there is a bind IP, ensure it is available 269 found := false 270 for _, a := range addrs { 271 addr, ok := a.(*net.IPNet) 272 if !ok { 273 continue 274 } 275 if addr.IP.String() == bindIP { 276 found = true 277 break 278 } 279 } 280 if !found { 281 c.Ui.Error(fmt.Sprintf("Interface '%s' has no '%s' address", 282 config.Interface, bindIP)) 283 return nil 284 } 285 } 286 } 287 288 var advertiseIP string 289 var advertisePort int 290 if config.AdvertiseAddr != "" { 291 advertiseIP, advertisePort, err = config.AddrParts(config.AdvertiseAddr) 292 if err != nil { 293 c.Ui.Error(fmt.Sprintf("Invalid advertise address: %s", err)) 294 return nil 295 } 296 } 297 298 encryptKey, err := config.EncryptBytes() 299 if err != nil { 300 c.Ui.Error(fmt.Sprintf("Invalid encryption key: %s", err)) 301 return nil 302 } 303 304 serfConfig := serf.DefaultConfig() 305 switch config.Profile { 306 case "lan": 307 serfConfig.MemberlistConfig = memberlist.DefaultLANConfig() 308 case "wan": 309 serfConfig.MemberlistConfig = memberlist.DefaultWANConfig() 310 case "local": 311 serfConfig.MemberlistConfig = memberlist.DefaultLocalConfig() 312 default: 313 c.Ui.Error(fmt.Sprintf("Unknown profile: %s", config.Profile)) 314 return nil 315 } 316 317 serfConfig.MemberlistConfig.BindAddr = bindIP 318 serfConfig.MemberlistConfig.BindPort = bindPort 319 serfConfig.MemberlistConfig.AdvertiseAddr = advertiseIP 320 serfConfig.MemberlistConfig.AdvertisePort = advertisePort 321 serfConfig.MemberlistConfig.SecretKey = encryptKey 322 serfConfig.NodeName = config.NodeName 323 serfConfig.Tags = config.Tags 324 serfConfig.SnapshotPath = config.SnapshotPath 325 serfConfig.ProtocolVersion = uint8(config.Protocol) 326 serfConfig.CoalescePeriod = 3 * time.Second 327 serfConfig.QuiescentPeriod = time.Second 328 serfConfig.QueryResponseSizeLimit = config.QueryResponseSizeLimit 329 serfConfig.QuerySizeLimit = config.QuerySizeLimit 330 serfConfig.UserEventSizeLimit = config.UserEventSizeLimit 331 serfConfig.UserCoalescePeriod = 3 * time.Second 332 serfConfig.UserQuiescentPeriod = time.Second 333 if config.ReconnectInterval != 0 { 334 serfConfig.ReconnectInterval = config.ReconnectInterval 335 } 336 if config.ReconnectTimeout != 0 { 337 serfConfig.ReconnectTimeout = config.ReconnectTimeout 338 } 339 if config.TombstoneTimeout != 0 { 340 serfConfig.TombstoneTimeout = config.TombstoneTimeout 341 } 342 serfConfig.EnableNameConflictResolution = !config.DisableNameResolution 343 if config.KeyringFile != "" { 344 serfConfig.KeyringFile = config.KeyringFile 345 } 346 serfConfig.RejoinAfterLeave = config.RejoinAfterLeave 347 if config.BroadcastTimeout != 0 { 348 serfConfig.BroadcastTimeout = config.BroadcastTimeout 349 } 350 351 // Start Serf 352 c.Ui.Output("Starting Serf agent...") 353 agent, err := Create(config, serfConfig, logOutput) 354 if err != nil { 355 c.Ui.Error(fmt.Sprintf("Failed to setup the Serf agent: %v", err)) 356 return nil 357 } 358 return agent 359} 360 361// setupLoggers is used to setup the logGate, logWriter, and our logOutput 362func (c *Command) setupLoggers(config *Config) (*GatedWriter, *logWriter, io.Writer) { 363 // Setup logging. First create the gated log writer, which will 364 // store logs until we're ready to show them. Then create the level 365 // filter, filtering logs of the specified level. 366 logGate := &GatedWriter{ 367 Writer: &cli.UiWriter{Ui: c.Ui}, 368 } 369 370 c.logFilter = LevelFilter() 371 c.logFilter.MinLevel = logutils.LogLevel(strings.ToUpper(config.LogLevel)) 372 c.logFilter.Writer = logGate 373 if !ValidateLevelFilter(c.logFilter.MinLevel, c.logFilter) { 374 c.Ui.Error(fmt.Sprintf( 375 "Invalid log level: %s. Valid log levels are: %v", 376 c.logFilter.MinLevel, c.logFilter.Levels)) 377 return nil, nil, nil 378 } 379 380 // Check if syslog is enabled 381 var syslog io.Writer 382 if config.EnableSyslog { 383 l, err := gsyslog.NewLogger(gsyslog.LOG_NOTICE, config.SyslogFacility, "serf") 384 if err != nil { 385 c.Ui.Error(fmt.Sprintf("Syslog setup failed: %v", err)) 386 return nil, nil, nil 387 } 388 syslog = &SyslogWrapper{l, c.logFilter} 389 } 390 391 // Create a log writer, and wrap a logOutput around it 392 logWriter := NewLogWriter(512) 393 var logOutput io.Writer 394 if syslog != nil { 395 logOutput = io.MultiWriter(c.logFilter, logWriter, syslog) 396 } else { 397 logOutput = io.MultiWriter(c.logFilter, logWriter) 398 } 399 400 // Create a logger 401 c.logger = log.New(logOutput, "", log.LstdFlags) 402 return logGate, logWriter, logOutput 403} 404 405// startAgent is used to start the agent and IPC 406func (c *Command) startAgent(config *Config, agent *Agent, 407 logWriter *logWriter, logOutput io.Writer) *AgentIPC { 408 // Add the script event handlers 409 c.scriptHandler = &ScriptEventHandler{ 410 SelfFunc: func() serf.Member { return agent.Serf().LocalMember() }, 411 Scripts: config.EventScripts(), 412 Logger: log.New(logOutput, "", log.LstdFlags), 413 } 414 agent.RegisterEventHandler(c.scriptHandler) 415 416 // Start the agent after the handler is registered 417 if err := agent.Start(); err != nil { 418 c.Ui.Error(fmt.Sprintf("Failed to start the Serf agent: %v", err)) 419 return nil 420 } 421 422 // Parse the bind address information 423 bindIP, bindPort, err := config.AddrParts(config.BindAddr) 424 bindAddr := &net.TCPAddr{IP: net.ParseIP(bindIP), Port: bindPort} 425 426 // Start the discovery layer 427 if config.Discover != "" { 428 // Use the advertise addr and port 429 local := agent.Serf().Memberlist().LocalNode() 430 431 // Get the bind interface if any 432 iface, _ := config.NetworkInterface() 433 434 _, err := NewAgentMDNS(agent, logOutput, config.ReplayOnJoin, 435 config.NodeName, config.Discover, iface, local.Addr, int(local.Port)) 436 if err != nil { 437 c.Ui.Error(fmt.Sprintf("Error starting mDNS listener: %s", err)) 438 return nil 439 440 } 441 } 442 443 // Setup the RPC listener 444 rpcListener, err := net.Listen("tcp", config.RPCAddr) 445 if err != nil { 446 c.Ui.Error(fmt.Sprintf("Error starting RPC listener: %s", err)) 447 return nil 448 } 449 450 // Start the IPC layer 451 c.Ui.Output("Starting Serf agent RPC...") 452 ipc := NewAgentIPC(agent, config.RPCAuthKey, rpcListener, logOutput, logWriter) 453 454 c.Ui.Output("Serf agent running!") 455 c.Ui.Info(fmt.Sprintf(" Node name: '%s'", config.NodeName)) 456 c.Ui.Info(fmt.Sprintf(" Bind addr: '%s'", bindAddr.String())) 457 458 if config.AdvertiseAddr != "" { 459 advertiseIP, advertisePort, _ := config.AddrParts(config.AdvertiseAddr) 460 advertiseAddr := (&net.TCPAddr{IP: net.ParseIP(advertiseIP), Port: advertisePort}).String() 461 c.Ui.Info(fmt.Sprintf("Advertise addr: '%s'", advertiseAddr)) 462 } 463 464 c.Ui.Info(fmt.Sprintf(" RPC addr: '%s'", config.RPCAddr)) 465 c.Ui.Info(fmt.Sprintf(" Encrypted: %#v", agent.serf.EncryptionEnabled())) 466 c.Ui.Info(fmt.Sprintf(" Snapshot: %v", config.SnapshotPath != "")) 467 c.Ui.Info(fmt.Sprintf(" Profile: %s", config.Profile)) 468 c.Ui.Info(fmt.Sprintf("Message Compression Enabled: %v", config.EnableCompression)) 469 470 if config.Discover != "" { 471 c.Ui.Info(fmt.Sprintf(" mDNS cluster: %s", config.Discover)) 472 } 473 return ipc 474} 475 476// startupJoin is invoked to handle any joins specified to take place at start time 477func (c *Command) startupJoin(config *Config, agent *Agent) error { 478 if len(config.StartJoin) == 0 { 479 return nil 480 } 481 482 c.Ui.Output(fmt.Sprintf("Joining cluster...(replay: %v)", config.ReplayOnJoin)) 483 n, err := agent.Join(config.StartJoin, config.ReplayOnJoin) 484 if err != nil { 485 return err 486 } 487 488 c.Ui.Info(fmt.Sprintf("Join completed. Synced with %d initial agents", n)) 489 return nil 490} 491 492// retryJoin is invoked to handle joins with retries. This runs until at least a 493// single successful join or RetryMaxAttempts is reached 494func (c *Command) retryJoin(config *Config, agent *Agent, errCh chan struct{}) { 495 // Quit fast if there is no nodes to join 496 if len(config.RetryJoin) == 0 { 497 return 498 } 499 500 // Track the number of join attempts 501 attempt := 0 502 for { 503 // Try to perform the join 504 c.logger.Printf("[INFO] agent: Joining cluster...(replay: %v)", config.ReplayOnJoin) 505 n, err := agent.Join(config.RetryJoin, config.ReplayOnJoin) 506 if err == nil { 507 c.logger.Printf("[INFO] agent: Join completed. Synced with %d initial agents", n) 508 return 509 } 510 511 // Check if the maximum attempts has been exceeded 512 attempt++ 513 if config.RetryMaxAttempts > 0 && attempt > config.RetryMaxAttempts { 514 c.logger.Printf("[ERR] agent: maximum retry join attempts made, exiting") 515 close(errCh) 516 return 517 } 518 519 // Log the failure and sleep 520 c.logger.Printf("[WARN] agent: Join failed: %v, retrying in %v", err, config.RetryInterval) 521 time.Sleep(config.RetryInterval) 522 } 523} 524 525func (c *Command) Run(args []string) int { 526 c.Ui = &cli.PrefixedUi{ 527 OutputPrefix: "==> ", 528 InfoPrefix: " ", 529 ErrorPrefix: "==> ", 530 Ui: c.Ui, 531 } 532 533 // Parse our configs 534 c.args = args 535 config := c.readConfig() 536 if config == nil { 537 return 1 538 } 539 540 // Setup the log outputs 541 logGate, logWriter, logOutput := c.setupLoggers(config) 542 if logWriter == nil { 543 return 1 544 } 545 546 /* 547 Setup telemetry 548 Aggregate on 10 second intervals for 1 minute. Expose the 549 metrics over stderr when there is a SIGUSR1 received. 550 */ 551 inm := metrics.NewInmemSink(10*time.Second, time.Minute) 552 metrics.DefaultInmemSignal(inm) 553 metricsConf := metrics.DefaultConfig("serf-agent") 554 555 // Configure the statsite sink 556 var fanout metrics.FanoutSink 557 if config.StatsiteAddr != "" { 558 sink, err := metrics.NewStatsiteSink(config.StatsiteAddr) 559 if err != nil { 560 c.Ui.Error(fmt.Sprintf("Failed to start statsite sink. Got: %s", err)) 561 return 1 562 } 563 fanout = append(fanout, sink) 564 } 565 566 // Configure the statsd sink 567 if config.StatsdAddr != "" { 568 sink, err := metrics.NewStatsdSink(config.StatsdAddr) 569 if err != nil { 570 c.Ui.Error(fmt.Sprintf("Failed to start statsd sink. Got: %s", err)) 571 return 1 572 } 573 fanout = append(fanout, sink) 574 } 575 576 // Initialize the global sink 577 if len(fanout) > 0 { 578 fanout = append(fanout, inm) 579 metrics.NewGlobal(metricsConf, fanout) 580 } else { 581 metricsConf.EnableHostname = false 582 metrics.NewGlobal(metricsConf, inm) 583 } 584 585 // Setup serf 586 agent := c.setupAgent(config, logOutput) 587 if agent == nil { 588 return 1 589 } 590 defer agent.Shutdown() 591 592 // Start the agent 593 ipc := c.startAgent(config, agent, logWriter, logOutput) 594 if ipc == nil { 595 return 1 596 } 597 defer ipc.Shutdown() 598 599 // Join startup nodes if specified 600 if err := c.startupJoin(config, agent); err != nil { 601 c.Ui.Error(err.Error()) 602 return 1 603 } 604 605 // Enable log streaming 606 c.Ui.Info("") 607 c.Ui.Output("Log data will now stream in as it occurs:\n") 608 logGate.Flush() 609 610 // Start the retry joins 611 retryJoinCh := make(chan struct{}) 612 go c.retryJoin(config, agent, retryJoinCh) 613 614 // Wait for exit 615 return c.handleSignals(config, agent, retryJoinCh) 616} 617 618// handleSignals blocks until we get an exit-causing signal 619func (c *Command) handleSignals(config *Config, agent *Agent, retryJoin chan struct{}) int { 620 signalCh := make(chan os.Signal, 4) 621 signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP) 622 623 // Wait for a signal 624WAIT: 625 var sig os.Signal 626 select { 627 case s := <-signalCh: 628 sig = s 629 case <-c.ShutdownCh: 630 sig = os.Interrupt 631 case <-retryJoin: 632 // Retry join failed! 633 return 1 634 case <-agent.ShutdownCh(): 635 // Agent is already shutdown! 636 return 0 637 } 638 c.Ui.Output(fmt.Sprintf("Caught signal: %v", sig)) 639 640 // Check if this is a SIGHUP 641 if sig == syscall.SIGHUP { 642 config = c.handleReload(config, agent) 643 goto WAIT 644 } 645 646 // Check if we should do a graceful leave 647 graceful := false 648 if sig == os.Interrupt && !config.SkipLeaveOnInt { 649 graceful = true 650 } else if sig == syscall.SIGTERM && config.LeaveOnTerm { 651 graceful = true 652 } 653 654 // Bail fast if not doing a graceful leave 655 if !graceful { 656 return 1 657 } 658 659 // Attempt a graceful leave 660 gracefulCh := make(chan struct{}) 661 c.Ui.Output("Gracefully shutting down agent...") 662 go func() { 663 if err := agent.Leave(); err != nil { 664 c.Ui.Error(fmt.Sprintf("Error: %s", err)) 665 return 666 } 667 close(gracefulCh) 668 }() 669 670 // Wait for leave or another signal 671 select { 672 case <-signalCh: 673 return 1 674 case <-time.After(gracefulTimeout): 675 return 1 676 case <-gracefulCh: 677 return 0 678 } 679} 680 681// handleReload is invoked when we should reload our configs, e.g. SIGHUP 682func (c *Command) handleReload(config *Config, agent *Agent) *Config { 683 c.Ui.Output("Reloading configuration...") 684 newConf := c.readConfig() 685 if newConf == nil { 686 c.Ui.Error(fmt.Sprintf("Failed to reload configs")) 687 return config 688 } 689 690 // Change the log level 691 minLevel := logutils.LogLevel(strings.ToUpper(newConf.LogLevel)) 692 if ValidateLevelFilter(minLevel, c.logFilter) { 693 c.logFilter.SetMinLevel(minLevel) 694 } else { 695 c.Ui.Error(fmt.Sprintf( 696 "Invalid log level: %s. Valid log levels are: %v", 697 minLevel, c.logFilter.Levels)) 698 699 // Keep the current log level 700 newConf.LogLevel = config.LogLevel 701 } 702 703 // Change the event handlers 704 c.scriptHandler.UpdateScripts(newConf.EventScripts()) 705 706 // Update the tags in serf 707 if err := agent.SetTags(newConf.Tags); err != nil { 708 c.Ui.Error(fmt.Sprintf("Failed to update tags: %v", err)) 709 return newConf 710 } 711 712 return newConf 713} 714 715func (c *Command) Synopsis() string { 716 return "Runs a Serf agent" 717} 718 719func (c *Command) Help() string { 720 helpText := ` 721Usage: serf agent [options] 722 723 Starts the Serf agent and runs until an interrupt is received. The 724 agent represents a single node in a cluster. 725 726Options: 727 728 -bind=0.0.0.0:7946 Address to bind network listeners to. To use an IPv6 729 address, specify [::1] or [::1]:7946. 730 -iface Network interface to bind to. Can be used instead of 731 -bind if the interface is known but not the address. 732 If both are provided, then Serf verifies that the 733 interface has the bind address that is provided. This 734 flag also sets the multicast device used for -discover. 735 -advertise=0.0.0.0 Address to advertise to the other cluster members 736 -config-file=foo Path to a JSON file to read configuration from. 737 This can be specified multiple times. 738 -config-dir=foo Path to a directory to read configuration files 739 from. This will read every file ending in ".json" 740 as configuration in this directory in alphabetical 741 order. 742 -discover=cluster A cluster name used to discovery peers. On 743 networks that support multicast, this can be used to have 744 peers join each other without an explicit join. 745 -encrypt=foo Key for encrypting network traffic within Serf. 746 Must be a base64-encoded 32-byte key. 747 -keyring-file The keyring file is used to store encryption keys used 748 by Serf. As encryption keys are changed, the content of 749 this file is updated so that the same keys may be used 750 during later agent starts. 751 -event-handler=foo Script to execute when events occur. This can 752 be specified multiple times. See the event scripts 753 section below for more info. 754 -join=addr An initial agent to join with. This flag can be 755 specified multiple times. 756 -log-level=info Log level of the agent. 757 -node=hostname Name of this node. Must be unique in the cluster 758 -profile=[lan|wan|local] Profile is used to control the timing profiles used in Serf. 759 The default if not provided is lan. 760 -protocol=n Serf protocol version to use. This defaults to 761 the latest version, but can be set back for upgrades. 762 -rejoin Ignores a previous leave and attempts to rejoin the cluster. 763 Only works if provided along with a snapshot file. 764 -retry-join=addr An agent to join with. This flag be specified multiple times. 765 Does not exit on failure like -join, used to retry until success. 766 -retry-interval=30s Sets the interval on which a node will attempt to retry joining 767 nodes provided by -retry-join. Defaults to 30s. 768 -retry-max=0 Limits the number of retry events. Defaults to 0 for unlimited. 769 -disable-compression Disable message compression for broadcasting events. Enabled by default. 770 -role=foo The role of this node, if any. This can be used 771 by event scripts to differentiate different types 772 of nodes that may be part of the same cluster. 773 '-role' is deprecated in favor of '-tag role=foo'. 774 -rpc-addr=127.0.0.1:7373 Address to bind the RPC listener. 775 -snapshot=path/to/file The snapshot file is used to store alive nodes and 776 event information so that Serf can rejoin a cluster 777 and avoid event replay on restart. 778 -tag key=value Tag can be specified multiple times to attach multiple 779 key/value tag pairs to the given node. 780 -tags-file=/path/to/file The tags file is used to persist tag data. As an agent's 781 tags are changed, the tags file will be updated. Tags 782 can be reloaded during later agent starts. This option 783 is incompatible with the '-tag' option and requires there 784 be no tags in the agent configuration file, if given. 785 -syslog When provided, logs will also be sent to syslog. 786 -broadcast-timeout=5s Sets the broadcast timeout, which is the max time allowed for 787 responses to events including leave and force remove messages. 788 Defaults to 5s. 789 790Event handlers: 791 792 For more information on what event handlers are, please read the 793 Serf documentation. This section will document how to configure them 794 on the command-line. There are three methods of specifying an event 795 handler: 796 797 - The value can be a plain script, such as "event.sh". In this case, 798 Serf will send all events to this script, and you'll be responsible 799 for differentiating between them based on the SERF_EVENT. 800 801 - The value can be in the format of "TYPE=SCRIPT", such as 802 "member-join=join.sh". With this format, Serf will only send events 803 of that type to that script. 804 805 - The value can be in the format of "user:EVENT=SCRIPT", such as 806 "user:deploy=deploy.sh". This means that Serf will only invoke this 807 script in the case of user events named "deploy". 808` 809 return strings.TrimSpace(helpText) 810} 811