1package agent 2 3import ( 4 "encoding/base64" 5 "encoding/json" 6 "fmt" 7 "io" 8 "io/ioutil" 9 "log" 10 "os" 11 "strings" 12 "sync" 13 14 "github.com/hashicorp/memberlist" 15 "github.com/hashicorp/serf/serf" 16) 17 18// Agent starts and manages a Serf instance, adding some niceties 19// on top of Serf such as storing logs that you can later retrieve, 20// and invoking EventHandlers when events occur. 21type Agent struct { 22 // Stores the serf configuration 23 conf *serf.Config 24 25 // Stores the agent configuration 26 agentConf *Config 27 28 // eventCh is used for Serf to deliver events on 29 eventCh chan serf.Event 30 31 // eventHandlers is the registered handlers for events 32 eventHandlers map[EventHandler]struct{} 33 eventHandlerList []EventHandler 34 eventHandlersLock sync.Mutex 35 36 // logger instance wraps the logOutput 37 logger *log.Logger 38 39 // This is the underlying Serf we are wrapping 40 serf *serf.Serf 41 42 // shutdownCh is used for shutdowns 43 shutdown bool 44 shutdownCh chan struct{} 45 shutdownLock sync.Mutex 46} 47 48// Create creates a new agent, potentially returning an error 49func Create(agentConf *Config, conf *serf.Config, logOutput io.Writer) (*Agent, error) { 50 // Ensure we have a log sink 51 if logOutput == nil { 52 logOutput = os.Stderr 53 } 54 55 // Setup the underlying loggers 56 conf.MemberlistConfig.LogOutput = logOutput 57 conf.MemberlistConfig.EnableCompression = agentConf.EnableCompression 58 conf.LogOutput = logOutput 59 60 // Create a channel to listen for events from Serf 61 eventCh := make(chan serf.Event, 64) 62 conf.EventCh = eventCh 63 64 // Setup the agent 65 agent := &Agent{ 66 conf: conf, 67 agentConf: agentConf, 68 eventCh: eventCh, 69 eventHandlers: make(map[EventHandler]struct{}), 70 logger: log.New(logOutput, "", log.LstdFlags), 71 shutdownCh: make(chan struct{}), 72 } 73 74 // Restore agent tags from a tags file 75 if agentConf.TagsFile != "" { 76 if err := agent.loadTagsFile(agentConf.TagsFile); err != nil { 77 return nil, err 78 } 79 } 80 81 // Load in a keyring file if provided 82 if agentConf.KeyringFile != "" { 83 if err := agent.loadKeyringFile(agentConf.KeyringFile); err != nil { 84 return nil, err 85 } 86 } 87 88 return agent, nil 89} 90 91// Start is used to initiate the event listeners. It is separate from 92// create so that there isn't a race condition between creating the 93// agent and registering handlers 94func (a *Agent) Start() error { 95 a.logger.Printf("[INFO] agent: Serf agent starting") 96 97 // Create serf first 98 serf, err := serf.Create(a.conf) 99 if err != nil { 100 return fmt.Errorf("Error creating Serf: %s", err) 101 } 102 a.serf = serf 103 104 // Start event loop 105 go a.eventLoop() 106 return nil 107} 108 109// Leave prepares for a graceful shutdown of the agent and its processes 110func (a *Agent) Leave() error { 111 if a.serf == nil { 112 return nil 113 } 114 115 a.logger.Println("[INFO] agent: requesting graceful leave from Serf") 116 return a.serf.Leave() 117} 118 119// Shutdown closes this agent and all of its processes. Should be preceded 120// by a Leave for a graceful shutdown. 121func (a *Agent) Shutdown() error { 122 a.shutdownLock.Lock() 123 defer a.shutdownLock.Unlock() 124 125 if a.shutdown { 126 return nil 127 } 128 129 if a.serf == nil { 130 goto EXIT 131 } 132 133 a.logger.Println("[INFO] agent: requesting serf shutdown") 134 if err := a.serf.Shutdown(); err != nil { 135 return err 136 } 137 138EXIT: 139 a.logger.Println("[INFO] agent: shutdown complete") 140 a.shutdown = true 141 close(a.shutdownCh) 142 return nil 143} 144 145// ShutdownCh returns a channel that can be selected to wait 146// for the agent to perform a shutdown. 147func (a *Agent) ShutdownCh() <-chan struct{} { 148 return a.shutdownCh 149} 150 151// Returns the Serf agent of the running Agent. 152func (a *Agent) Serf() *serf.Serf { 153 return a.serf 154} 155 156// Returns the Serf config of the running Agent. 157func (a *Agent) SerfConfig() *serf.Config { 158 return a.conf 159} 160 161// Join asks the Serf instance to join. See the Serf.Join function. 162func (a *Agent) Join(addrs []string, replay bool) (n int, err error) { 163 a.logger.Printf("[INFO] agent: joining: %v replay: %v", addrs, replay) 164 ignoreOld := !replay 165 n, err = a.serf.Join(addrs, ignoreOld) 166 if n > 0 { 167 a.logger.Printf("[INFO] agent: joined: %d nodes", n) 168 } 169 if err != nil { 170 a.logger.Printf("[WARN] agent: error joining: %v", err) 171 } 172 return 173} 174 175// ForceLeave is used to eject a failed node from the cluster 176func (a *Agent) ForceLeave(node string) error { 177 a.logger.Printf("[INFO] agent: Force leaving node: %s", node) 178 err := a.serf.RemoveFailedNode(node) 179 if err != nil { 180 a.logger.Printf("[WARN] agent: failed to remove node: %v", err) 181 } 182 return err 183} 184 185// ForceLeavePrune completely removes a failed node from the 186// member list entirely 187func (a *Agent) ForceLeavePrune(node string) error { 188 a.logger.Printf("[INFO] agent: Force leaving node (prune): %s", node) 189 err := a.serf.RemoveFailedNodePrune(node) 190 if err != nil { 191 a.logger.Printf("[WARN] agent: failed to remove node (prune): %v", err) 192 } 193 return err 194} 195 196// UserEvent sends a UserEvent on Serf, see Serf.UserEvent. 197func (a *Agent) UserEvent(name string, payload []byte, coalesce bool) error { 198 a.logger.Printf("[DEBUG] agent: Requesting user event send: %s. Coalesced: %#v. Payload: %#v", 199 name, coalesce, string(payload)) 200 err := a.serf.UserEvent(name, payload, coalesce) 201 if err != nil { 202 a.logger.Printf("[WARN] agent: failed to send user event: %v", err) 203 } 204 return err 205} 206 207// Query sends a Query on Serf, see Serf.Query. 208func (a *Agent) Query(name string, payload []byte, params *serf.QueryParam) (*serf.QueryResponse, error) { 209 // Prevent the use of the internal prefix 210 if strings.HasPrefix(name, serf.InternalQueryPrefix) { 211 // Allow the special "ping" query 212 if name != serf.InternalQueryPrefix+"ping" || payload != nil { 213 return nil, fmt.Errorf("Queries cannot contain the '%s' prefix", serf.InternalQueryPrefix) 214 } 215 } 216 a.logger.Printf("[DEBUG] agent: Requesting query send: %s. Payload: %#v", 217 name, string(payload)) 218 resp, err := a.serf.Query(name, payload, params) 219 if err != nil { 220 a.logger.Printf("[WARN] agent: failed to start user query: %v", err) 221 } 222 return resp, err 223} 224 225// RegisterEventHandler adds an event handler to receive event notifications 226func (a *Agent) RegisterEventHandler(eh EventHandler) { 227 a.eventHandlersLock.Lock() 228 defer a.eventHandlersLock.Unlock() 229 230 a.eventHandlers[eh] = struct{}{} 231 a.eventHandlerList = nil 232 for eh := range a.eventHandlers { 233 a.eventHandlerList = append(a.eventHandlerList, eh) 234 } 235} 236 237// DeregisterEventHandler removes an EventHandler and prevents more invocations 238func (a *Agent) DeregisterEventHandler(eh EventHandler) { 239 a.eventHandlersLock.Lock() 240 defer a.eventHandlersLock.Unlock() 241 242 delete(a.eventHandlers, eh) 243 a.eventHandlerList = nil 244 for eh := range a.eventHandlers { 245 a.eventHandlerList = append(a.eventHandlerList, eh) 246 } 247} 248 249// eventLoop listens to events from Serf and fans out to event handlers 250func (a *Agent) eventLoop() { 251 serfShutdownCh := a.serf.ShutdownCh() 252 for { 253 select { 254 case e := <-a.eventCh: 255 a.logger.Printf("[INFO] agent: Received event: %s", e.String()) 256 a.eventHandlersLock.Lock() 257 handlers := a.eventHandlerList 258 a.eventHandlersLock.Unlock() 259 for _, eh := range handlers { 260 eh.HandleEvent(e) 261 } 262 263 case <-serfShutdownCh: 264 a.logger.Printf("[WARN] agent: Serf shutdown detected, quitting") 265 a.Shutdown() 266 return 267 268 case <-a.shutdownCh: 269 return 270 } 271 } 272} 273 274// InstallKey initiates a query to install a new key on all members 275func (a *Agent) InstallKey(key string) (*serf.KeyResponse, error) { 276 a.logger.Print("[INFO] agent: Initiating key installation") 277 manager := a.serf.KeyManager() 278 return manager.InstallKey(key) 279} 280 281// UseKey sends a query instructing all members to switch primary keys 282func (a *Agent) UseKey(key string) (*serf.KeyResponse, error) { 283 a.logger.Print("[INFO] agent: Initiating primary key change") 284 manager := a.serf.KeyManager() 285 return manager.UseKey(key) 286} 287 288// RemoveKey sends a query to all members to remove a key from the keyring 289func (a *Agent) RemoveKey(key string) (*serf.KeyResponse, error) { 290 a.logger.Print("[INFO] agent: Initiating key removal") 291 manager := a.serf.KeyManager() 292 return manager.RemoveKey(key) 293} 294 295// ListKeys sends a query to all members to return a list of their keys 296func (a *Agent) ListKeys() (*serf.KeyResponse, error) { 297 a.logger.Print("[INFO] agent: Initiating key listing") 298 manager := a.serf.KeyManager() 299 return manager.ListKeys() 300} 301 302// SetTags is used to update the tags. The agent will make sure to 303// persist tags if necessary before gossiping to the cluster. 304func (a *Agent) SetTags(tags map[string]string) error { 305 // Update the tags file if we have one 306 if a.agentConf.TagsFile != "" { 307 if err := a.writeTagsFile(tags); err != nil { 308 a.logger.Printf("[ERR] agent: %s", err) 309 return err 310 } 311 } 312 313 // Set the tags in Serf, start gossiping out 314 return a.serf.SetTags(tags) 315} 316 317// loadTagsFile will load agent tags out of a file and set them in the 318// current serf configuration. 319func (a *Agent) loadTagsFile(tagsFile string) error { 320 // Avoid passing tags and using a tags file at the same time 321 if len(a.agentConf.Tags) > 0 { 322 return fmt.Errorf("Tags config not allowed while using tag files") 323 } 324 325 if _, err := os.Stat(tagsFile); err == nil { 326 tagData, err := ioutil.ReadFile(tagsFile) 327 if err != nil { 328 return fmt.Errorf("Failed to read tags file: %s", err) 329 } 330 if err := json.Unmarshal(tagData, &a.conf.Tags); err != nil { 331 return fmt.Errorf("Failed to decode tags file: %s", err) 332 } 333 a.logger.Printf("[INFO] agent: Restored %d tag(s) from %s", 334 len(a.conf.Tags), tagsFile) 335 } 336 337 // Success! 338 return nil 339} 340 341// writeTagsFile will write the current tags to the configured tags file. 342func (a *Agent) writeTagsFile(tags map[string]string) error { 343 encoded, err := json.MarshalIndent(tags, "", " ") 344 if err != nil { 345 return fmt.Errorf("Failed to encode tags: %s", err) 346 } 347 348 // Use 0600 for permissions, in case tag data is sensitive 349 if err = ioutil.WriteFile(a.agentConf.TagsFile, encoded, 0600); err != nil { 350 return fmt.Errorf("Failed to write tags file: %s", err) 351 } 352 353 // Success! 354 return nil 355} 356 357// MarshalTags is a utility function which takes a map of tag key/value pairs 358// and returns the same tags as strings in 'key=value' format. 359func MarshalTags(tags map[string]string) []string { 360 var result []string 361 for name, value := range tags { 362 result = append(result, fmt.Sprintf("%s=%s", name, value)) 363 } 364 return result 365} 366 367// UnmarshalTags is a utility function which takes a slice of strings in 368// key=value format and returns them as a tag mapping. 369func UnmarshalTags(tags []string) (map[string]string, error) { 370 result := make(map[string]string) 371 for _, tag := range tags { 372 parts := strings.SplitN(tag, "=", 2) 373 if len(parts) != 2 || len(parts[0]) == 0 { 374 return nil, fmt.Errorf("Invalid tag: '%s'", tag) 375 } 376 result[parts[0]] = parts[1] 377 } 378 return result, nil 379} 380 381// loadKeyringFile will load a keyring out of a file 382func (a *Agent) loadKeyringFile(keyringFile string) error { 383 // Avoid passing an encryption key and a keyring file at the same time 384 if len(a.agentConf.EncryptKey) > 0 { 385 return fmt.Errorf("Encryption key not allowed while using a keyring") 386 } 387 388 if _, err := os.Stat(keyringFile); err != nil { 389 return err 390 } 391 392 // Read in the keyring file data 393 keyringData, err := ioutil.ReadFile(keyringFile) 394 if err != nil { 395 return fmt.Errorf("Failed to read keyring file: %s", err) 396 } 397 398 // Decode keyring JSON 399 keys := make([]string, 0) 400 if err := json.Unmarshal(keyringData, &keys); err != nil { 401 return fmt.Errorf("Failed to decode keyring file: %s", err) 402 } 403 404 // Decode base64 values 405 keysDecoded := make([][]byte, len(keys)) 406 for i, key := range keys { 407 keyBytes, err := base64.StdEncoding.DecodeString(key) 408 if err != nil { 409 return fmt.Errorf("Failed to decode key from keyring: %s", err) 410 } 411 keysDecoded[i] = keyBytes 412 } 413 414 // Guard against empty keyring file 415 if len(keysDecoded) == 0 { 416 return fmt.Errorf("Keyring file contains no keys") 417 } 418 419 // Create the keyring 420 keyring, err := memberlist.NewKeyring(keysDecoded, keysDecoded[0]) 421 if err != nil { 422 return fmt.Errorf("Failed to restore keyring: %s", err) 423 } 424 a.conf.MemberlistConfig.Keyring = keyring 425 a.logger.Printf("[INFO] agent: Restored keyring with %d keys from %s", 426 len(keys), keyringFile) 427 428 // Success! 429 return nil 430} 431 432// Stats is used to get various runtime information and stats 433func (a *Agent) Stats() map[string]map[string]string { 434 local := a.serf.LocalMember() 435 event_handlers := make(map[string]string) 436 437 // Convert event handlers from a string slice to a string map 438 for _, script := range a.agentConf.EventScripts() { 439 script_filter := fmt.Sprintf("%s:%s", script.EventFilter.Event, script.EventFilter.Name) 440 event_handlers[script_filter] = script.Script 441 } 442 443 output := map[string]map[string]string{ 444 "agent": map[string]string{ 445 "name": local.Name, 446 }, 447 "runtime": runtimeStats(), 448 "serf": a.serf.Stats(), 449 "tags": local.Tags, 450 "event_handlers": event_handlers, 451 } 452 return output 453} 454