1package serf 2 3import ( 4 "bytes" 5 "encoding/base64" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "io/ioutil" 10 "log" 11 "math/rand" 12 "net" 13 "strconv" 14 "sync" 15 "time" 16 17 "github.com/armon/go-metrics" 18 "github.com/hashicorp/go-msgpack/codec" 19 "github.com/hashicorp/memberlist" 20 "github.com/hashicorp/serf/coordinate" 21) 22 23// These are the protocol versions that Serf can _understand_. These are 24// Serf-level protocol versions that are passed down as the delegate 25// version to memberlist below. 26const ( 27 ProtocolVersionMin uint8 = 2 28 ProtocolVersionMax = 4 29) 30 31const ( 32 // Used to detect if the meta data is tags 33 // or if it is a raw role 34 tagMagicByte uint8 = 255 35) 36 37var ( 38 // FeatureNotSupported is returned if a feature cannot be used 39 // due to an older protocol version being used. 40 FeatureNotSupported = fmt.Errorf("Feature not supported") 41) 42 43func init() { 44 // Seed the random number generator 45 rand.Seed(time.Now().UnixNano()) 46} 47 48// Serf is a single node that is part of a single cluster that gets 49// events about joins/leaves/failures/etc. It is created with the Create 50// method. 51// 52// All functions on the Serf structure are safe to call concurrently. 53type Serf struct { 54 // The clocks for different purposes. These MUST be the first things 55 // in this struct due to Golang issue #599. 56 clock LamportClock 57 eventClock LamportClock 58 queryClock LamportClock 59 60 broadcasts *memberlist.TransmitLimitedQueue 61 config *Config 62 failedMembers []*memberState 63 leftMembers []*memberState 64 memberlist *memberlist.Memberlist 65 memberLock sync.RWMutex 66 members map[string]*memberState 67 68 // Circular buffers for recent intents, used 69 // in case we get the intent before the relevant event 70 recentLeave []nodeIntent 71 recentLeaveIndex int 72 recentJoin []nodeIntent 73 recentJoinIndex int 74 75 eventBroadcasts *memberlist.TransmitLimitedQueue 76 eventBuffer []*userEvents 77 eventJoinIgnore bool 78 eventMinTime LamportTime 79 eventLock sync.RWMutex 80 81 queryBroadcasts *memberlist.TransmitLimitedQueue 82 queryBuffer []*queries 83 queryMinTime LamportTime 84 queryResponse map[LamportTime]*QueryResponse 85 queryLock sync.RWMutex 86 87 logger *log.Logger 88 joinLock sync.Mutex 89 stateLock sync.Mutex 90 state SerfState 91 shutdownCh chan struct{} 92 93 snapshotter *Snapshotter 94 keyManager *KeyManager 95 96 coordClient *coordinate.Client 97 coordCache map[string]*coordinate.Coordinate 98 coordCacheLock sync.RWMutex 99} 100 101// SerfState is the state of the Serf instance. 102type SerfState int 103 104const ( 105 SerfAlive SerfState = iota 106 SerfLeaving 107 SerfLeft 108 SerfShutdown 109) 110 111func (s SerfState) String() string { 112 switch s { 113 case SerfAlive: 114 return "alive" 115 case SerfLeaving: 116 return "leaving" 117 case SerfLeft: 118 return "left" 119 case SerfShutdown: 120 return "shutdown" 121 default: 122 return "unknown" 123 } 124} 125 126// Member is a single member of the Serf cluster. 127type Member struct { 128 Name string 129 Addr net.IP 130 Port uint16 131 Tags map[string]string 132 Status MemberStatus 133 134 // The minimum, maximum, and current values of the protocol versions 135 // and delegate (Serf) protocol versions that each member can understand 136 // or is speaking. 137 ProtocolMin uint8 138 ProtocolMax uint8 139 ProtocolCur uint8 140 DelegateMin uint8 141 DelegateMax uint8 142 DelegateCur uint8 143} 144 145// MemberStatus is the state that a member is in. 146type MemberStatus int 147 148const ( 149 StatusNone MemberStatus = iota 150 StatusAlive 151 StatusLeaving 152 StatusLeft 153 StatusFailed 154) 155 156func (s MemberStatus) String() string { 157 switch s { 158 case StatusNone: 159 return "none" 160 case StatusAlive: 161 return "alive" 162 case StatusLeaving: 163 return "leaving" 164 case StatusLeft: 165 return "left" 166 case StatusFailed: 167 return "failed" 168 default: 169 panic(fmt.Sprintf("unknown MemberStatus: %d", s)) 170 } 171} 172 173// memberState is used to track members that are no longer active due to 174// leaving, failing, partitioning, etc. It tracks the member along with 175// when that member was marked as leaving. 176type memberState struct { 177 Member 178 statusLTime LamportTime // lamport clock time of last received message 179 leaveTime time.Time // wall clock time of leave 180} 181 182// nodeIntent is used to buffer intents for out-of-order deliveries 183type nodeIntent struct { 184 LTime LamportTime 185 Node string 186} 187 188// userEvent is used to buffer events to prevent re-delivery 189type userEvent struct { 190 Name string 191 Payload []byte 192} 193 194func (ue *userEvent) Equals(other *userEvent) bool { 195 if ue.Name != other.Name { 196 return false 197 } 198 if bytes.Compare(ue.Payload, other.Payload) != 0 { 199 return false 200 } 201 return true 202} 203 204// userEvents stores all the user events at a specific time 205type userEvents struct { 206 LTime LamportTime 207 Events []userEvent 208} 209 210// queries stores all the query ids at a specific time 211type queries struct { 212 LTime LamportTime 213 QueryIDs []uint32 214} 215 216const ( 217 UserEventSizeLimit = 512 // Maximum byte size for event name and payload 218 snapshotSizeLimit = 128 * 1024 // Maximum 128 KB snapshot 219) 220 221// Create creates a new Serf instance, starting all the background tasks 222// to maintain cluster membership information. 223// 224// After calling this function, the configuration should no longer be used 225// or modified by the caller. 226func Create(conf *Config) (*Serf, error) { 227 conf.Init() 228 if conf.ProtocolVersion < ProtocolVersionMin { 229 return nil, fmt.Errorf("Protocol version '%d' too low. Must be in range: [%d, %d]", 230 conf.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax) 231 } else if conf.ProtocolVersion > ProtocolVersionMax { 232 return nil, fmt.Errorf("Protocol version '%d' too high. Must be in range: [%d, %d]", 233 conf.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax) 234 } 235 236 serf := &Serf{ 237 config: conf, 238 logger: log.New(conf.LogOutput, "", log.LstdFlags), 239 members: make(map[string]*memberState), 240 queryResponse: make(map[LamportTime]*QueryResponse), 241 shutdownCh: make(chan struct{}), 242 state: SerfAlive, 243 } 244 245 // Check that the meta data length is okay 246 if len(serf.encodeTags(conf.Tags)) > memberlist.MetaMaxSize { 247 return nil, fmt.Errorf("Encoded length of tags exceeds limit of %d bytes", memberlist.MetaMaxSize) 248 } 249 250 // Check if serf member event coalescing is enabled 251 if conf.CoalescePeriod > 0 && conf.QuiescentPeriod > 0 && conf.EventCh != nil { 252 c := &memberEventCoalescer{ 253 lastEvents: make(map[string]EventType), 254 latestEvents: make(map[string]coalesceEvent), 255 } 256 257 conf.EventCh = coalescedEventCh(conf.EventCh, serf.shutdownCh, 258 conf.CoalescePeriod, conf.QuiescentPeriod, c) 259 } 260 261 // Check if user event coalescing is enabled 262 if conf.UserCoalescePeriod > 0 && conf.UserQuiescentPeriod > 0 && conf.EventCh != nil { 263 c := &userEventCoalescer{ 264 events: make(map[string]*latestUserEvents), 265 } 266 267 conf.EventCh = coalescedEventCh(conf.EventCh, serf.shutdownCh, 268 conf.UserCoalescePeriod, conf.UserQuiescentPeriod, c) 269 } 270 271 // Listen for internal Serf queries. This is setup before the snapshotter, since 272 // we want to capture the query-time, but the internal listener does not passthrough 273 // the queries 274 outCh, err := newSerfQueries(serf, serf.logger, conf.EventCh, serf.shutdownCh) 275 if err != nil { 276 return nil, fmt.Errorf("Failed to setup serf query handler: %v", err) 277 } 278 conf.EventCh = outCh 279 280 // Set up network coordinate client. 281 if !conf.DisableCoordinates { 282 serf.coordClient, err = coordinate.NewClient(coordinate.DefaultConfig()) 283 if err != nil { 284 return nil, fmt.Errorf("Failed to create coordinate client: %v", err) 285 } 286 } 287 288 // Try access the snapshot 289 var oldClock, oldEventClock, oldQueryClock LamportTime 290 var prev []*PreviousNode 291 if conf.SnapshotPath != "" { 292 eventCh, snap, err := NewSnapshotter( 293 conf.SnapshotPath, 294 snapshotSizeLimit, 295 conf.RejoinAfterLeave, 296 serf.logger, 297 &serf.clock, 298 serf.coordClient, 299 conf.EventCh, 300 serf.shutdownCh) 301 if err != nil { 302 return nil, fmt.Errorf("Failed to setup snapshot: %v", err) 303 } 304 serf.snapshotter = snap 305 conf.EventCh = eventCh 306 prev = snap.AliveNodes() 307 oldClock = snap.LastClock() 308 oldEventClock = snap.LastEventClock() 309 oldQueryClock = snap.LastQueryClock() 310 serf.eventMinTime = oldEventClock + 1 311 serf.queryMinTime = oldQueryClock + 1 312 } 313 314 // Set up the coordinate cache. We do this after we read the snapshot to 315 // make sure we get a good initial value from there, if we got one. 316 if !conf.DisableCoordinates { 317 serf.coordCache = make(map[string]*coordinate.Coordinate) 318 serf.coordCache[conf.NodeName] = serf.coordClient.GetCoordinate() 319 } 320 321 // Setup the various broadcast queues, which we use to send our own 322 // custom broadcasts along the gossip channel. 323 serf.broadcasts = &memberlist.TransmitLimitedQueue{ 324 NumNodes: func() int { 325 return len(serf.members) 326 }, 327 RetransmitMult: conf.MemberlistConfig.RetransmitMult, 328 } 329 serf.eventBroadcasts = &memberlist.TransmitLimitedQueue{ 330 NumNodes: func() int { 331 return len(serf.members) 332 }, 333 RetransmitMult: conf.MemberlistConfig.RetransmitMult, 334 } 335 serf.queryBroadcasts = &memberlist.TransmitLimitedQueue{ 336 NumNodes: func() int { 337 return len(serf.members) 338 }, 339 RetransmitMult: conf.MemberlistConfig.RetransmitMult, 340 } 341 342 // Create the buffer for recent intents 343 serf.recentJoin = make([]nodeIntent, conf.RecentIntentBuffer) 344 serf.recentLeave = make([]nodeIntent, conf.RecentIntentBuffer) 345 346 // Create a buffer for events and queries 347 serf.eventBuffer = make([]*userEvents, conf.EventBuffer) 348 serf.queryBuffer = make([]*queries, conf.QueryBuffer) 349 350 // Ensure our lamport clock is at least 1, so that the default 351 // join LTime of 0 does not cause issues 352 serf.clock.Increment() 353 serf.eventClock.Increment() 354 serf.queryClock.Increment() 355 356 // Restore the clock from snap if we have one 357 serf.clock.Witness(oldClock) 358 serf.eventClock.Witness(oldEventClock) 359 serf.queryClock.Witness(oldQueryClock) 360 361 // Modify the memberlist configuration with keys that we set 362 conf.MemberlistConfig.Events = &eventDelegate{serf: serf} 363 conf.MemberlistConfig.Conflict = &conflictDelegate{serf: serf} 364 conf.MemberlistConfig.Delegate = &delegate{serf: serf} 365 conf.MemberlistConfig.DelegateProtocolVersion = conf.ProtocolVersion 366 conf.MemberlistConfig.DelegateProtocolMin = ProtocolVersionMin 367 conf.MemberlistConfig.DelegateProtocolMax = ProtocolVersionMax 368 conf.MemberlistConfig.Name = conf.NodeName 369 conf.MemberlistConfig.ProtocolVersion = ProtocolVersionMap[conf.ProtocolVersion] 370 if !conf.DisableCoordinates { 371 conf.MemberlistConfig.Ping = &pingDelegate{serf: serf} 372 } 373 374 // Setup a merge delegate if necessary 375 if conf.Merge != nil { 376 md := &mergeDelegate{serf: serf} 377 conf.MemberlistConfig.Merge = md 378 conf.MemberlistConfig.Alive = md 379 } 380 381 // Create the underlying memberlist that will manage membership 382 // and failure detection for the Serf instance. 383 memberlist, err := memberlist.Create(conf.MemberlistConfig) 384 if err != nil { 385 return nil, fmt.Errorf("Failed to create memberlist: %v", err) 386 } 387 388 serf.memberlist = memberlist 389 390 // Create a key manager for handling all encryption key changes 391 serf.keyManager = &KeyManager{serf: serf} 392 393 // Start the background tasks. See the documentation above each method 394 // for more information on their role. 395 go serf.handleReap() 396 go serf.handleReconnect() 397 go serf.checkQueueDepth("Intent", serf.broadcasts) 398 go serf.checkQueueDepth("Event", serf.eventBroadcasts) 399 go serf.checkQueueDepth("Query", serf.queryBroadcasts) 400 401 // Attempt to re-join the cluster if we have known nodes 402 if len(prev) != 0 { 403 go serf.handleRejoin(prev) 404 } 405 406 return serf, nil 407} 408 409// ProtocolVersion returns the current protocol version in use by Serf. 410// This is the Serf protocol version, not the memberlist protocol version. 411func (s *Serf) ProtocolVersion() uint8 { 412 return s.config.ProtocolVersion 413} 414 415// EncryptionEnabled is a predicate that determines whether or not encryption 416// is enabled, which can be possible in one of 2 cases: 417// - Single encryption key passed at agent start (no persistence) 418// - Keyring file provided at agent start 419func (s *Serf) EncryptionEnabled() bool { 420 return s.config.MemberlistConfig.Keyring != nil 421} 422 423// KeyManager returns the key manager for the current Serf instance. 424func (s *Serf) KeyManager() *KeyManager { 425 return s.keyManager 426} 427 428// UserEvent is used to broadcast a custom user event with a given 429// name and payload. The events must be fairly small, and if the 430// size limit is exceeded and error will be returned. If coalesce is enabled, 431// nodes are allowed to coalesce this event. Coalescing is only available 432// starting in v0.2 433func (s *Serf) UserEvent(name string, payload []byte, coalesce bool) error { 434 // Check the size limit 435 if len(name)+len(payload) > UserEventSizeLimit { 436 return fmt.Errorf("user event exceeds limit of %d bytes", UserEventSizeLimit) 437 } 438 439 // Create a message 440 msg := messageUserEvent{ 441 LTime: s.eventClock.Time(), 442 Name: name, 443 Payload: payload, 444 CC: coalesce, 445 } 446 s.eventClock.Increment() 447 448 // Process update locally 449 s.handleUserEvent(&msg) 450 451 // Start broadcasting the event 452 raw, err := encodeMessage(messageUserEventType, &msg) 453 if err != nil { 454 return err 455 } 456 s.eventBroadcasts.QueueBroadcast(&broadcast{ 457 msg: raw, 458 }) 459 return nil 460} 461 462// Query is used to broadcast a new query. The query must be fairly small, 463// and an error will be returned if the size limit is exceeded. This is only 464// available with protocol version 4 and newer. Query parameters are optional, 465// and if not provided, a sane set of defaults will be used. 466func (s *Serf) Query(name string, payload []byte, params *QueryParam) (*QueryResponse, error) { 467 // Check that the latest protocol is in use 468 if s.ProtocolVersion() < 4 { 469 return nil, FeatureNotSupported 470 } 471 472 // Provide default parameters if none given 473 if params == nil { 474 params = s.DefaultQueryParams() 475 } else if params.Timeout == 0 { 476 params.Timeout = s.DefaultQueryTimeout() 477 } 478 479 // Get the local node 480 local := s.memberlist.LocalNode() 481 482 // Encode the filters 483 filters, err := params.encodeFilters() 484 if err != nil { 485 return nil, fmt.Errorf("Failed to format filters: %v", err) 486 } 487 488 // Setup the flags 489 var flags uint32 490 if params.RequestAck { 491 flags |= queryFlagAck 492 } 493 494 // Create a message 495 q := messageQuery{ 496 LTime: s.queryClock.Time(), 497 ID: uint32(rand.Int31()), 498 Addr: local.Addr, 499 Port: local.Port, 500 Filters: filters, 501 Flags: flags, 502 Timeout: params.Timeout, 503 Name: name, 504 Payload: payload, 505 } 506 507 // Encode the query 508 raw, err := encodeMessage(messageQueryType, &q) 509 if err != nil { 510 return nil, err 511 } 512 513 // Check the size 514 if len(raw) > s.config.QuerySizeLimit { 515 return nil, fmt.Errorf("query exceeds limit of %d bytes", s.config.QuerySizeLimit) 516 } 517 518 // Register QueryResponse to track acks and responses 519 resp := newQueryResponse(s.memberlist.NumMembers(), &q) 520 s.registerQueryResponse(params.Timeout, resp) 521 522 // Process query locally 523 s.handleQuery(&q) 524 525 // Start broadcasting the event 526 s.queryBroadcasts.QueueBroadcast(&broadcast{ 527 msg: raw, 528 }) 529 return resp, nil 530} 531 532// registerQueryResponse is used to setup the listeners for the query, 533// and to schedule closing the query after the timeout. 534func (s *Serf) registerQueryResponse(timeout time.Duration, resp *QueryResponse) { 535 s.queryLock.Lock() 536 defer s.queryLock.Unlock() 537 538 // Map the LTime to the QueryResponse. This is necessarily 1-to-1, 539 // since we increment the time for each new query. 540 s.queryResponse[resp.lTime] = resp 541 542 // Setup a timer to close the response and deregister after the timeout 543 time.AfterFunc(timeout, func() { 544 s.queryLock.Lock() 545 delete(s.queryResponse, resp.lTime) 546 resp.Close() 547 s.queryLock.Unlock() 548 }) 549} 550 551// SetTags is used to dynamically update the tags associated with 552// the local node. This will propagate the change to the rest of 553// the cluster. Blocks until a the message is broadcast out. 554func (s *Serf) SetTags(tags map[string]string) error { 555 // Check that the meta data length is okay 556 if len(s.encodeTags(tags)) > memberlist.MetaMaxSize { 557 return fmt.Errorf("Encoded length of tags exceeds limit of %d bytes", 558 memberlist.MetaMaxSize) 559 } 560 561 // Update the config 562 s.config.Tags = tags 563 564 // Trigger a memberlist update 565 return s.memberlist.UpdateNode(s.config.BroadcastTimeout) 566} 567 568// Join joins an existing Serf cluster. Returns the number of nodes 569// successfully contacted. The returned error will be non-nil only in the 570// case that no nodes could be contacted. If ignoreOld is true, then any 571// user messages sent prior to the join will be ignored. 572func (s *Serf) Join(existing []string, ignoreOld bool) (int, error) { 573 // Do a quick state check 574 if s.State() != SerfAlive { 575 return 0, fmt.Errorf("Serf can't Join after Leave or Shutdown") 576 } 577 578 // Hold the joinLock, this is to make eventJoinIgnore safe 579 s.joinLock.Lock() 580 defer s.joinLock.Unlock() 581 582 // Ignore any events from a potential join. This is safe since we hold 583 // the joinLock and nobody else can be doing a Join 584 if ignoreOld { 585 s.eventJoinIgnore = true 586 defer func() { 587 s.eventJoinIgnore = false 588 }() 589 } 590 591 // Have memberlist attempt to join 592 num, err := s.memberlist.Join(existing) 593 594 // If we joined any nodes, broadcast the join message 595 if num > 0 { 596 // Start broadcasting the update 597 if err := s.broadcastJoin(s.clock.Time()); err != nil { 598 return num, err 599 } 600 } 601 602 return num, err 603} 604 605// broadcastJoin broadcasts a new join intent with a 606// given clock value. It is used on either join, or if 607// we need to refute an older leave intent. Cannot be called 608// with the memberLock held. 609func (s *Serf) broadcastJoin(ltime LamportTime) error { 610 // Construct message to update our lamport clock 611 msg := messageJoin{ 612 LTime: ltime, 613 Node: s.config.NodeName, 614 } 615 s.clock.Witness(ltime) 616 617 // Process update locally 618 s.handleNodeJoinIntent(&msg) 619 620 // Start broadcasting the update 621 if err := s.broadcast(messageJoinType, &msg, nil); err != nil { 622 s.logger.Printf("[WARN] serf: Failed to broadcast join intent: %v", err) 623 return err 624 } 625 return nil 626} 627 628// Leave gracefully exits the cluster. It is safe to call this multiple 629// times. 630func (s *Serf) Leave() error { 631 // Check the current state 632 s.stateLock.Lock() 633 if s.state == SerfLeft { 634 s.stateLock.Unlock() 635 return nil 636 } else if s.state == SerfLeaving { 637 s.stateLock.Unlock() 638 return fmt.Errorf("Leave already in progress") 639 } else if s.state == SerfShutdown { 640 s.stateLock.Unlock() 641 return fmt.Errorf("Leave called after Shutdown") 642 } 643 s.state = SerfLeaving 644 s.stateLock.Unlock() 645 646 // If we have a snapshot, mark we are leaving 647 if s.snapshotter != nil { 648 s.snapshotter.Leave() 649 } 650 651 // Construct the message for the graceful leave 652 msg := messageLeave{ 653 LTime: s.clock.Time(), 654 Node: s.config.NodeName, 655 } 656 s.clock.Increment() 657 658 // Process the leave locally 659 s.handleNodeLeaveIntent(&msg) 660 661 // Only broadcast the leave message if there is at least one 662 // other node alive. 663 if s.hasAliveMembers() { 664 notifyCh := make(chan struct{}) 665 if err := s.broadcast(messageLeaveType, &msg, notifyCh); err != nil { 666 return err 667 } 668 669 select { 670 case <-notifyCh: 671 case <-time.After(s.config.BroadcastTimeout): 672 return errors.New("timeout while waiting for graceful leave") 673 } 674 } 675 676 // Attempt the memberlist leave 677 err := s.memberlist.Leave(s.config.BroadcastTimeout) 678 if err != nil { 679 return err 680 } 681 682 // Transition to Left only if we not already shutdown 683 s.stateLock.Lock() 684 if s.state != SerfShutdown { 685 s.state = SerfLeft 686 } 687 s.stateLock.Unlock() 688 return nil 689} 690 691// hasAliveMembers is called to check for any alive members other than 692// ourself. 693func (s *Serf) hasAliveMembers() bool { 694 s.memberLock.RLock() 695 defer s.memberLock.RUnlock() 696 697 hasAlive := false 698 for _, m := range s.members { 699 // Skip ourself, we want to know if OTHER members are alive 700 if m.Name == s.config.NodeName { 701 continue 702 } 703 704 if m.Status == StatusAlive { 705 hasAlive = true 706 break 707 } 708 } 709 return hasAlive 710} 711 712// LocalMember returns the Member information for the local node 713func (s *Serf) LocalMember() Member { 714 s.memberLock.RLock() 715 defer s.memberLock.RUnlock() 716 return s.members[s.config.NodeName].Member 717} 718 719// Members returns a point-in-time snapshot of the members of this cluster. 720func (s *Serf) Members() []Member { 721 s.memberLock.RLock() 722 defer s.memberLock.RUnlock() 723 724 members := make([]Member, 0, len(s.members)) 725 for _, m := range s.members { 726 members = append(members, m.Member) 727 } 728 729 return members 730} 731 732// RemoveFailedNode forcibly removes a failed node from the cluster 733// immediately, instead of waiting for the reaper to eventually reclaim it. 734// This also has the effect that Serf will no longer attempt to reconnect 735// to this node. 736func (s *Serf) RemoveFailedNode(node string) error { 737 // Construct the message to broadcast 738 msg := messageLeave{ 739 LTime: s.clock.Time(), 740 Node: node, 741 } 742 s.clock.Increment() 743 744 // Process our own event 745 s.handleNodeLeaveIntent(&msg) 746 747 // If we have no members, then we don't need to broadcast 748 if !s.hasAliveMembers() { 749 return nil 750 } 751 752 // Broadcast the remove 753 notifyCh := make(chan struct{}) 754 if err := s.broadcast(messageLeaveType, &msg, notifyCh); err != nil { 755 return err 756 } 757 758 // Wait for the broadcast 759 select { 760 case <-notifyCh: 761 case <-time.After(s.config.BroadcastTimeout): 762 return fmt.Errorf("timed out broadcasting node removal") 763 } 764 765 return nil 766} 767 768// Shutdown forcefully shuts down the Serf instance, stopping all network 769// activity and background maintenance associated with the instance. 770// 771// This is not a graceful shutdown, and should be preceded by a call 772// to Leave. Otherwise, other nodes in the cluster will detect this node's 773// exit as a node failure. 774// 775// It is safe to call this method multiple times. 776func (s *Serf) Shutdown() error { 777 s.stateLock.Lock() 778 defer s.stateLock.Unlock() 779 780 if s.state == SerfShutdown { 781 return nil 782 } 783 784 if s.state != SerfLeft { 785 s.logger.Printf("[WARN] serf: Shutdown without a Leave") 786 } 787 788 s.state = SerfShutdown 789 close(s.shutdownCh) 790 791 err := s.memberlist.Shutdown() 792 if err != nil { 793 return err 794 } 795 796 // Wait for the snapshoter to finish if we have one 797 if s.snapshotter != nil { 798 s.snapshotter.Wait() 799 } 800 801 return nil 802} 803 804// ShutdownCh returns a channel that can be used to wait for 805// Serf to shutdown. 806func (s *Serf) ShutdownCh() <-chan struct{} { 807 return s.shutdownCh 808} 809 810// Memberlist is used to get access to the underlying Memberlist instance 811func (s *Serf) Memberlist() *memberlist.Memberlist { 812 return s.memberlist 813} 814 815// State is the current state of this Serf instance. 816func (s *Serf) State() SerfState { 817 s.stateLock.Lock() 818 defer s.stateLock.Unlock() 819 return s.state 820} 821 822// broadcast takes a Serf message type, encodes it for the wire, and queues 823// the broadcast. If a notify channel is given, this channel will be closed 824// when the broadcast is sent. 825func (s *Serf) broadcast(t messageType, msg interface{}, notify chan<- struct{}) error { 826 raw, err := encodeMessage(t, msg) 827 if err != nil { 828 return err 829 } 830 831 s.broadcasts.QueueBroadcast(&broadcast{ 832 msg: raw, 833 notify: notify, 834 }) 835 return nil 836} 837 838// handleNodeJoin is called when a node join event is received 839// from memberlist. 840func (s *Serf) handleNodeJoin(n *memberlist.Node) { 841 s.memberLock.Lock() 842 defer s.memberLock.Unlock() 843 844 var oldStatus MemberStatus 845 member, ok := s.members[n.Name] 846 if !ok { 847 oldStatus = StatusNone 848 member = &memberState{ 849 Member: Member{ 850 Name: n.Name, 851 Addr: net.IP(n.Addr), 852 Port: n.Port, 853 Tags: s.decodeTags(n.Meta), 854 Status: StatusAlive, 855 }, 856 } 857 858 // Check if we have a join intent and use the LTime 859 if join := recentIntent(s.recentJoin, n.Name); join != nil { 860 member.statusLTime = join.LTime 861 } 862 863 // Check if we have a leave intent 864 if leave := recentIntent(s.recentLeave, n.Name); leave != nil { 865 if leave.LTime > member.statusLTime { 866 member.Status = StatusLeaving 867 member.statusLTime = leave.LTime 868 } 869 } 870 871 s.members[n.Name] = member 872 } else { 873 oldStatus = member.Status 874 member.Status = StatusAlive 875 member.leaveTime = time.Time{} 876 member.Addr = net.IP(n.Addr) 877 member.Port = n.Port 878 member.Tags = s.decodeTags(n.Meta) 879 } 880 881 // Update the protocol versions every time we get an event 882 member.ProtocolMin = n.PMin 883 member.ProtocolMax = n.PMax 884 member.ProtocolCur = n.PCur 885 member.DelegateMin = n.DMin 886 member.DelegateMax = n.DMax 887 member.DelegateCur = n.DCur 888 889 // If node was previously in a failed state, then clean up some 890 // internal accounting. 891 // TODO(mitchellh): needs tests to verify not reaped 892 if oldStatus == StatusFailed || oldStatus == StatusLeft { 893 s.failedMembers = removeOldMember(s.failedMembers, member.Name) 894 s.leftMembers = removeOldMember(s.leftMembers, member.Name) 895 } 896 897 // Update some metrics 898 metrics.IncrCounter([]string{"serf", "member", "join"}, 1) 899 900 // Send an event along 901 s.logger.Printf("[INFO] serf: EventMemberJoin: %s %s", 902 member.Member.Name, member.Member.Addr) 903 if s.config.EventCh != nil { 904 s.config.EventCh <- MemberEvent{ 905 Type: EventMemberJoin, 906 Members: []Member{member.Member}, 907 } 908 } 909} 910 911// handleNodeLeave is called when a node leave event is received 912// from memberlist. 913func (s *Serf) handleNodeLeave(n *memberlist.Node) { 914 s.memberLock.Lock() 915 defer s.memberLock.Unlock() 916 917 member, ok := s.members[n.Name] 918 if !ok { 919 // We've never even heard of this node that is supposedly 920 // leaving. Just ignore it completely. 921 return 922 } 923 924 switch member.Status { 925 case StatusLeaving: 926 member.Status = StatusLeft 927 member.leaveTime = time.Now() 928 s.leftMembers = append(s.leftMembers, member) 929 case StatusAlive: 930 member.Status = StatusFailed 931 member.leaveTime = time.Now() 932 s.failedMembers = append(s.failedMembers, member) 933 default: 934 // Unknown state that it was in? Just don't do anything 935 s.logger.Printf("[WARN] serf: Bad state when leave: %d", member.Status) 936 return 937 } 938 939 // Send an event along 940 event := EventMemberLeave 941 eventStr := "EventMemberLeave" 942 if member.Status != StatusLeft { 943 event = EventMemberFailed 944 eventStr = "EventMemberFailed" 945 } 946 947 // Update some metrics 948 metrics.IncrCounter([]string{"serf", "member", member.Status.String()}, 1) 949 950 s.logger.Printf("[INFO] serf: %s: %s %s", 951 eventStr, member.Member.Name, member.Member.Addr) 952 if s.config.EventCh != nil { 953 s.config.EventCh <- MemberEvent{ 954 Type: event, 955 Members: []Member{member.Member}, 956 } 957 } 958} 959 960// handleNodeUpdate is called when a node meta data update 961// has taken place 962func (s *Serf) handleNodeUpdate(n *memberlist.Node) { 963 s.memberLock.Lock() 964 defer s.memberLock.Unlock() 965 966 member, ok := s.members[n.Name] 967 if !ok { 968 // We've never even heard of this node that is updating. 969 // Just ignore it completely. 970 return 971 } 972 973 // Update the member attributes 974 member.Addr = net.IP(n.Addr) 975 member.Port = n.Port 976 member.Tags = s.decodeTags(n.Meta) 977 978 // Snag the latest versions. NOTE - the current memberlist code will NOT 979 // fire an update event if the metadata (for Serf, tags) stays the same 980 // and only the protocol versions change. If we wake any Serf-level 981 // protocol changes where we want to get this event under those 982 // circumstances, we will need to update memberlist to do a check of 983 // versions as well as the metadata. 984 member.ProtocolMin = n.PMin 985 member.ProtocolMax = n.PMax 986 member.ProtocolCur = n.PCur 987 member.DelegateMin = n.DMin 988 member.DelegateMax = n.DMax 989 member.DelegateCur = n.DCur 990 991 // Update some metrics 992 metrics.IncrCounter([]string{"serf", "member", "update"}, 1) 993 994 // Send an event along 995 s.logger.Printf("[INFO] serf: EventMemberUpdate: %s", member.Member.Name) 996 if s.config.EventCh != nil { 997 s.config.EventCh <- MemberEvent{ 998 Type: EventMemberUpdate, 999 Members: []Member{member.Member}, 1000 } 1001 } 1002} 1003 1004// handleNodeLeaveIntent is called when an intent to leave is received. 1005func (s *Serf) handleNodeLeaveIntent(leaveMsg *messageLeave) bool { 1006 // Witness a potentially newer time 1007 s.clock.Witness(leaveMsg.LTime) 1008 1009 s.memberLock.Lock() 1010 defer s.memberLock.Unlock() 1011 1012 member, ok := s.members[leaveMsg.Node] 1013 if !ok { 1014 // If we've already seen this message don't rebroadcast 1015 if recentIntent(s.recentLeave, leaveMsg.Node) != nil { 1016 return false 1017 } 1018 1019 // We don't know this member so store it in a buffer for now 1020 s.recentLeave[s.recentLeaveIndex] = nodeIntent{ 1021 LTime: leaveMsg.LTime, 1022 Node: leaveMsg.Node, 1023 } 1024 s.recentLeaveIndex = (s.recentLeaveIndex + 1) % len(s.recentLeave) 1025 return true 1026 } 1027 1028 // If the message is old, then it is irrelevant and we can skip it 1029 if leaveMsg.LTime <= member.statusLTime { 1030 return false 1031 } 1032 1033 // Refute us leaving if we are in the alive state 1034 // Must be done in another goroutine since we have the memberLock 1035 if leaveMsg.Node == s.config.NodeName && s.state == SerfAlive { 1036 s.logger.Printf("[DEBUG] serf: Refuting an older leave intent") 1037 go s.broadcastJoin(s.clock.Time()) 1038 return false 1039 } 1040 1041 // State transition depends on current state 1042 switch member.Status { 1043 case StatusAlive: 1044 member.Status = StatusLeaving 1045 member.statusLTime = leaveMsg.LTime 1046 return true 1047 case StatusFailed: 1048 member.Status = StatusLeft 1049 member.statusLTime = leaveMsg.LTime 1050 1051 // Remove from the failed list and add to the left list. We add 1052 // to the left list so that when we do a sync, other nodes will 1053 // remove it from their failed list. 1054 s.failedMembers = removeOldMember(s.failedMembers, member.Name) 1055 s.leftMembers = append(s.leftMembers, member) 1056 1057 // We must push a message indicating the node has now 1058 // left to allow higher-level applications to handle the 1059 // graceful leave. 1060 s.logger.Printf("[INFO] serf: EventMemberLeave (forced): %s %s", 1061 member.Member.Name, member.Member.Addr) 1062 if s.config.EventCh != nil { 1063 s.config.EventCh <- MemberEvent{ 1064 Type: EventMemberLeave, 1065 Members: []Member{member.Member}, 1066 } 1067 } 1068 return true 1069 default: 1070 return false 1071 } 1072} 1073 1074// handleNodeJoinIntent is called when a node broadcasts a 1075// join message to set the lamport time of its join 1076func (s *Serf) handleNodeJoinIntent(joinMsg *messageJoin) bool { 1077 // Witness a potentially newer time 1078 s.clock.Witness(joinMsg.LTime) 1079 1080 s.memberLock.Lock() 1081 defer s.memberLock.Unlock() 1082 1083 member, ok := s.members[joinMsg.Node] 1084 if !ok { 1085 // If we've already seen this message don't rebroadcast 1086 if recentIntent(s.recentJoin, joinMsg.Node) != nil { 1087 return false 1088 } 1089 1090 // We don't know this member so store it in a buffer for now 1091 s.recentJoin[s.recentJoinIndex] = nodeIntent{LTime: joinMsg.LTime, Node: joinMsg.Node} 1092 s.recentJoinIndex = (s.recentJoinIndex + 1) % len(s.recentJoin) 1093 return true 1094 } 1095 1096 // Check if this time is newer than what we have 1097 if joinMsg.LTime <= member.statusLTime { 1098 return false 1099 } 1100 1101 // Update the LTime 1102 member.statusLTime = joinMsg.LTime 1103 1104 // If we are in the leaving state, we should go back to alive, 1105 // since the leaving message must have been for an older time 1106 if member.Status == StatusLeaving { 1107 member.Status = StatusAlive 1108 } 1109 return true 1110} 1111 1112// handleUserEvent is called when a user event broadcast is 1113// received. Returns if the message should be rebroadcast. 1114func (s *Serf) handleUserEvent(eventMsg *messageUserEvent) bool { 1115 // Witness a potentially newer time 1116 s.eventClock.Witness(eventMsg.LTime) 1117 1118 s.eventLock.Lock() 1119 defer s.eventLock.Unlock() 1120 1121 // Ignore if it is before our minimum event time 1122 if eventMsg.LTime < s.eventMinTime { 1123 return false 1124 } 1125 1126 // Check if this message is too old 1127 curTime := s.eventClock.Time() 1128 if curTime > LamportTime(len(s.eventBuffer)) && 1129 eventMsg.LTime < curTime-LamportTime(len(s.eventBuffer)) { 1130 s.logger.Printf( 1131 "[WARN] serf: received old event %s from time %d (current: %d)", 1132 eventMsg.Name, 1133 eventMsg.LTime, 1134 s.eventClock.Time()) 1135 return false 1136 } 1137 1138 // Check if we've already seen this 1139 idx := eventMsg.LTime % LamportTime(len(s.eventBuffer)) 1140 seen := s.eventBuffer[idx] 1141 userEvent := userEvent{Name: eventMsg.Name, Payload: eventMsg.Payload} 1142 if seen != nil && seen.LTime == eventMsg.LTime { 1143 for _, previous := range seen.Events { 1144 if previous.Equals(&userEvent) { 1145 return false 1146 } 1147 } 1148 } else { 1149 seen = &userEvents{LTime: eventMsg.LTime} 1150 s.eventBuffer[idx] = seen 1151 } 1152 1153 // Add to recent events 1154 seen.Events = append(seen.Events, userEvent) 1155 1156 // Update some metrics 1157 metrics.IncrCounter([]string{"serf", "events"}, 1) 1158 metrics.IncrCounter([]string{"serf", "events", eventMsg.Name}, 1) 1159 1160 if s.config.EventCh != nil { 1161 s.config.EventCh <- UserEvent{ 1162 LTime: eventMsg.LTime, 1163 Name: eventMsg.Name, 1164 Payload: eventMsg.Payload, 1165 Coalesce: eventMsg.CC, 1166 } 1167 } 1168 return true 1169} 1170 1171// handleQuery is called when a query broadcast is 1172// received. Returns if the message should be rebroadcast. 1173func (s *Serf) handleQuery(query *messageQuery) bool { 1174 // Witness a potentially newer time 1175 s.queryClock.Witness(query.LTime) 1176 1177 s.queryLock.Lock() 1178 defer s.queryLock.Unlock() 1179 1180 // Ignore if it is before our minimum query time 1181 if query.LTime < s.queryMinTime { 1182 return false 1183 } 1184 1185 // Check if this message is too old 1186 curTime := s.queryClock.Time() 1187 if curTime > LamportTime(len(s.queryBuffer)) && 1188 query.LTime < curTime-LamportTime(len(s.queryBuffer)) { 1189 s.logger.Printf( 1190 "[WARN] serf: received old query %s from time %d (current: %d)", 1191 query.Name, 1192 query.LTime, 1193 s.queryClock.Time()) 1194 return false 1195 } 1196 1197 // Check if we've already seen this 1198 idx := query.LTime % LamportTime(len(s.queryBuffer)) 1199 seen := s.queryBuffer[idx] 1200 if seen != nil && seen.LTime == query.LTime { 1201 for _, previous := range seen.QueryIDs { 1202 if previous == query.ID { 1203 // Seen this ID already 1204 return false 1205 } 1206 } 1207 } else { 1208 seen = &queries{LTime: query.LTime} 1209 s.queryBuffer[idx] = seen 1210 } 1211 1212 // Add to recent queries 1213 seen.QueryIDs = append(seen.QueryIDs, query.ID) 1214 1215 // Update some metrics 1216 metrics.IncrCounter([]string{"serf", "queries"}, 1) 1217 metrics.IncrCounter([]string{"serf", "queries", query.Name}, 1) 1218 1219 // Check if we should rebroadcast, this may be disabled by a flag 1220 rebroadcast := true 1221 if query.NoBroadcast() { 1222 rebroadcast = false 1223 } 1224 1225 // Filter the query 1226 if !s.shouldProcessQuery(query.Filters) { 1227 // Even if we don't process it further, we should rebroadcast, 1228 // since it is the first time we've seen this. 1229 return rebroadcast 1230 } 1231 1232 // Send ack if requested, without waiting for client to Respond() 1233 if query.Ack() { 1234 ack := messageQueryResponse{ 1235 LTime: query.LTime, 1236 ID: query.ID, 1237 From: s.config.NodeName, 1238 Flags: queryFlagAck, 1239 } 1240 raw, err := encodeMessage(messageQueryResponseType, &ack) 1241 if err != nil { 1242 s.logger.Printf("[ERR] serf: failed to format ack: %v", err) 1243 } else { 1244 addr := net.UDPAddr{IP: query.Addr, Port: int(query.Port)} 1245 if err := s.memberlist.SendTo(&addr, raw); err != nil { 1246 s.logger.Printf("[ERR] serf: failed to send ack: %v", err) 1247 } 1248 } 1249 } 1250 1251 if s.config.EventCh != nil { 1252 s.config.EventCh <- &Query{ 1253 LTime: query.LTime, 1254 Name: query.Name, 1255 Payload: query.Payload, 1256 serf: s, 1257 id: query.ID, 1258 addr: query.Addr, 1259 port: query.Port, 1260 deadline: time.Now().Add(query.Timeout), 1261 } 1262 } 1263 return rebroadcast 1264} 1265 1266// handleResponse is called when a query response is 1267// received. 1268func (s *Serf) handleQueryResponse(resp *messageQueryResponse) { 1269 // Look for a corresponding QueryResponse 1270 s.queryLock.RLock() 1271 query, ok := s.queryResponse[resp.LTime] 1272 s.queryLock.RUnlock() 1273 if !ok { 1274 s.logger.Printf("[WARN] serf: reply for non-running query (LTime: %d, ID: %d) From: %s", 1275 resp.LTime, resp.ID, resp.From) 1276 return 1277 } 1278 1279 // Verify the ID matches 1280 if query.id != resp.ID { 1281 s.logger.Printf("[WARN] serf: query reply ID mismatch (Local: %d, Response: %d)", 1282 query.id, resp.ID) 1283 return 1284 } 1285 1286 // Check if the query is closed 1287 if query.Finished() { 1288 return 1289 } 1290 1291 // Process each type of response 1292 if resp.Ack() { 1293 metrics.IncrCounter([]string{"serf", "query_acks"}, 1) 1294 select { 1295 case query.ackCh <- resp.From: 1296 default: 1297 s.logger.Printf("[WARN] serf: Failed to delivery query ack, dropping") 1298 } 1299 } else { 1300 metrics.IncrCounter([]string{"serf", "query_responses"}, 1) 1301 select { 1302 case query.respCh <- NodeResponse{From: resp.From, Payload: resp.Payload}: 1303 default: 1304 s.logger.Printf("[WARN] serf: Failed to delivery query response, dropping") 1305 } 1306 } 1307} 1308 1309// handleNodeConflict is invoked when a join detects a conflict over a name. 1310// This means two different nodes (IP/Port) are claiming the same name. Memberlist 1311// will reject the "new" node mapping, but we can still be notified 1312func (s *Serf) handleNodeConflict(existing, other *memberlist.Node) { 1313 // Log a basic warning if the node is not us... 1314 if existing.Name != s.config.NodeName { 1315 s.logger.Printf("[WARN] serf: Name conflict for '%s' both %s:%d and %s:%d are claiming", 1316 existing.Name, existing.Addr, existing.Port, other.Addr, other.Port) 1317 return 1318 } 1319 1320 // The current node is conflicting! This is an error 1321 s.logger.Printf("[ERR] serf: Node name conflicts with another node at %s:%d. Names must be unique! (Resolution enabled: %v)", 1322 other.Addr, other.Port, s.config.EnableNameConflictResolution) 1323 1324 // If automatic resolution is enabled, kick off the resolution 1325 if s.config.EnableNameConflictResolution { 1326 go s.resolveNodeConflict() 1327 } 1328} 1329 1330// resolveNodeConflict is used to determine which node should remain during 1331// a name conflict. This is done by running an internal query. 1332func (s *Serf) resolveNodeConflict() { 1333 // Get the local node 1334 local := s.memberlist.LocalNode() 1335 1336 // Start a name resolution query 1337 qName := internalQueryName(conflictQuery) 1338 payload := []byte(s.config.NodeName) 1339 resp, err := s.Query(qName, payload, nil) 1340 if err != nil { 1341 s.logger.Printf("[ERR] serf: Failed to start name resolution query: %v", err) 1342 return 1343 } 1344 1345 // Counter to determine winner 1346 var responses, matching int 1347 1348 // Gather responses 1349 respCh := resp.ResponseCh() 1350 for r := range respCh { 1351 // Decode the response 1352 if len(r.Payload) < 1 || messageType(r.Payload[0]) != messageConflictResponseType { 1353 s.logger.Printf("[ERR] serf: Invalid conflict query response type: %v", r.Payload) 1354 continue 1355 } 1356 var member Member 1357 if err := decodeMessage(r.Payload[1:], &member); err != nil { 1358 s.logger.Printf("[ERR] serf: Failed to decode conflict query response: %v", err) 1359 continue 1360 } 1361 1362 // Update the counters 1363 responses++ 1364 if bytes.Equal(member.Addr, local.Addr) && member.Port == local.Port { 1365 matching++ 1366 } 1367 } 1368 1369 // Query over, determine if we should live 1370 majority := (responses / 2) + 1 1371 if matching >= majority { 1372 s.logger.Printf("[INFO] serf: majority in name conflict resolution [%d / %d]", 1373 matching, responses) 1374 return 1375 } 1376 1377 // Since we lost the vote, we need to exit 1378 s.logger.Printf("[WARN] serf: minority in name conflict resolution, quiting [%d / %d]", 1379 matching, responses) 1380 if err := s.Shutdown(); err != nil { 1381 s.logger.Printf("[ERR] serf: Failed to shutdown: %v", err) 1382 } 1383} 1384 1385// handleReap periodically reaps the list of failed and left members. 1386func (s *Serf) handleReap() { 1387 for { 1388 select { 1389 case <-time.After(s.config.ReapInterval): 1390 s.memberLock.Lock() 1391 s.failedMembers = s.reap(s.failedMembers, s.config.ReconnectTimeout) 1392 s.leftMembers = s.reap(s.leftMembers, s.config.TombstoneTimeout) 1393 s.memberLock.Unlock() 1394 case <-s.shutdownCh: 1395 return 1396 } 1397 } 1398} 1399 1400// handleReconnect attempts to reconnect to recently failed nodes 1401// on configured intervals. 1402func (s *Serf) handleReconnect() { 1403 for { 1404 select { 1405 case <-time.After(s.config.ReconnectInterval): 1406 s.reconnect() 1407 case <-s.shutdownCh: 1408 return 1409 } 1410 } 1411} 1412 1413// reap is called with a list of old members and a timeout, and removes 1414// members that have exceeded the timeout. The members are removed from 1415// both the old list and the members itself. Locking is left to the caller. 1416func (s *Serf) reap(old []*memberState, timeout time.Duration) []*memberState { 1417 now := time.Now() 1418 n := len(old) 1419 for i := 0; i < n; i++ { 1420 m := old[i] 1421 1422 // Skip if the timeout is not yet reached 1423 if now.Sub(m.leaveTime) <= timeout { 1424 continue 1425 } 1426 1427 // Delete from the list 1428 old[i], old[n-1] = old[n-1], nil 1429 old = old[:n-1] 1430 n-- 1431 i-- 1432 1433 // Delete from members 1434 delete(s.members, m.Name) 1435 1436 // Tell the coordinate client the node has gone away and delete 1437 // its cached coordinates. 1438 if !s.config.DisableCoordinates { 1439 s.coordClient.ForgetNode(m.Name) 1440 1441 s.coordCacheLock.Lock() 1442 delete(s.coordCache, m.Name) 1443 s.coordCacheLock.Unlock() 1444 } 1445 1446 // Send an event along 1447 s.logger.Printf("[INFO] serf: EventMemberReap: %s", m.Name) 1448 if s.config.EventCh != nil { 1449 s.config.EventCh <- MemberEvent{ 1450 Type: EventMemberReap, 1451 Members: []Member{m.Member}, 1452 } 1453 } 1454 } 1455 1456 return old 1457} 1458 1459// reconnect attempts to reconnect to recently fail nodes. 1460func (s *Serf) reconnect() { 1461 s.memberLock.RLock() 1462 1463 // Nothing to do if there are no failed members 1464 n := len(s.failedMembers) 1465 if n == 0 { 1466 s.memberLock.RUnlock() 1467 return 1468 } 1469 1470 // Probability we should attempt to reconect is given 1471 // by num failed / (num members - num failed - num left) 1472 // This means that we probabilistically expect the cluster 1473 // to attempt to connect to each failed member once per 1474 // reconnect interval 1475 numFailed := float32(len(s.failedMembers)) 1476 numAlive := float32(len(s.members) - len(s.failedMembers) - len(s.leftMembers)) 1477 if numAlive == 0 { 1478 numAlive = 1 // guard against zero divide 1479 } 1480 prob := numFailed / numAlive 1481 if rand.Float32() > prob { 1482 s.memberLock.RUnlock() 1483 s.logger.Printf("[DEBUG] serf: forgoing reconnect for random throttling") 1484 return 1485 } 1486 1487 // Select a random member to try and join 1488 idx := int(rand.Uint32() % uint32(n)) 1489 mem := s.failedMembers[idx] 1490 s.memberLock.RUnlock() 1491 1492 // Format the addr 1493 addr := net.UDPAddr{IP: mem.Addr, Port: int(mem.Port)} 1494 s.logger.Printf("[INFO] serf: attempting reconnect to %v %s", mem.Name, addr.String()) 1495 1496 // Attempt to join at the memberlist level 1497 s.memberlist.Join([]string{addr.String()}) 1498} 1499 1500// checkQueueDepth periodically checks the size of a queue to see if 1501// it is too large 1502func (s *Serf) checkQueueDepth(name string, queue *memberlist.TransmitLimitedQueue) { 1503 for { 1504 select { 1505 case <-time.After(time.Second): 1506 numq := queue.NumQueued() 1507 metrics.AddSample([]string{"serf", "queue", name}, float32(numq)) 1508 if numq >= s.config.QueueDepthWarning { 1509 s.logger.Printf("[WARN] serf: %s queue depth: %d", name, numq) 1510 } 1511 if numq > s.config.MaxQueueDepth { 1512 s.logger.Printf("[WARN] serf: %s queue depth (%d) exceeds limit (%d), dropping messages!", 1513 name, numq, s.config.MaxQueueDepth) 1514 queue.Prune(s.config.MaxQueueDepth) 1515 } 1516 case <-s.shutdownCh: 1517 return 1518 } 1519 } 1520} 1521 1522// removeOldMember is used to remove an old member from a list of old 1523// members. 1524func removeOldMember(old []*memberState, name string) []*memberState { 1525 for i, m := range old { 1526 if m.Name == name { 1527 n := len(old) 1528 old[i], old[n-1] = old[n-1], nil 1529 return old[:n-1] 1530 } 1531 } 1532 1533 return old 1534} 1535 1536// recentIntent checks the recent intent buffer for a matching 1537// entry for a given node, and either returns the message or nil 1538func recentIntent(recent []nodeIntent, node string) (intent *nodeIntent) { 1539 for i := 0; i < len(recent); i++ { 1540 // Break fast if we hit a zero entry 1541 if recent[i].LTime == 0 { 1542 break 1543 } 1544 1545 // Check for a node match 1546 if recent[i].Node == node { 1547 // Take the most recent entry 1548 if intent == nil || recent[i].LTime > intent.LTime { 1549 intent = &recent[i] 1550 } 1551 } 1552 } 1553 return 1554} 1555 1556// handleRejoin attempts to reconnect to previously known alive nodes 1557func (s *Serf) handleRejoin(previous []*PreviousNode) { 1558 for _, prev := range previous { 1559 // Do not attempt to join ourself 1560 if prev.Name == s.config.NodeName { 1561 continue 1562 } 1563 1564 s.logger.Printf("[INFO] serf: Attempting re-join to previously known node: %s", prev) 1565 _, err := s.memberlist.Join([]string{prev.Addr}) 1566 if err == nil { 1567 s.logger.Printf("[INFO] serf: Re-joined to previously known node: %s", prev) 1568 return 1569 } 1570 } 1571 s.logger.Printf("[WARN] serf: Failed to re-join any previously known node") 1572} 1573 1574// encodeTags is used to encode a tag map 1575func (s *Serf) encodeTags(tags map[string]string) []byte { 1576 // Support role-only backwards compatibility 1577 if s.ProtocolVersion() < 3 { 1578 role := tags["role"] 1579 return []byte(role) 1580 } 1581 1582 // Use a magic byte prefix and msgpack encode the tags 1583 var buf bytes.Buffer 1584 buf.WriteByte(tagMagicByte) 1585 enc := codec.NewEncoder(&buf, &codec.MsgpackHandle{}) 1586 if err := enc.Encode(tags); err != nil { 1587 panic(fmt.Sprintf("Failed to encode tags: %v", err)) 1588 } 1589 return buf.Bytes() 1590} 1591 1592// decodeTags is used to decode a tag map 1593func (s *Serf) decodeTags(buf []byte) map[string]string { 1594 tags := make(map[string]string) 1595 1596 // Backwards compatibility mode 1597 if len(buf) == 0 || buf[0] != tagMagicByte { 1598 tags["role"] = string(buf) 1599 return tags 1600 } 1601 1602 // Decode the tags 1603 r := bytes.NewReader(buf[1:]) 1604 dec := codec.NewDecoder(r, &codec.MsgpackHandle{}) 1605 if err := dec.Decode(&tags); err != nil { 1606 s.logger.Printf("[ERR] serf: Failed to decode tags: %v", err) 1607 } 1608 return tags 1609} 1610 1611// Stats is used to provide operator debugging information 1612func (s *Serf) Stats() map[string]string { 1613 toString := func(v uint64) string { 1614 return strconv.FormatUint(v, 10) 1615 } 1616 stats := map[string]string{ 1617 "members": toString(uint64(len(s.members))), 1618 "failed": toString(uint64(len(s.failedMembers))), 1619 "left": toString(uint64(len(s.leftMembers))), 1620 "member_time": toString(uint64(s.clock.Time())), 1621 "event_time": toString(uint64(s.eventClock.Time())), 1622 "query_time": toString(uint64(s.queryClock.Time())), 1623 "intent_queue": toString(uint64(s.broadcasts.NumQueued())), 1624 "event_queue": toString(uint64(s.eventBroadcasts.NumQueued())), 1625 "query_queue": toString(uint64(s.queryBroadcasts.NumQueued())), 1626 "encrypted": fmt.Sprintf("%v", s.EncryptionEnabled()), 1627 } 1628 return stats 1629} 1630 1631// WriteKeyringFile will serialize the current keyring and save it to a file. 1632func (s *Serf) writeKeyringFile() error { 1633 if len(s.config.KeyringFile) == 0 { 1634 return nil 1635 } 1636 1637 keyring := s.config.MemberlistConfig.Keyring 1638 keysRaw := keyring.GetKeys() 1639 keysEncoded := make([]string, len(keysRaw)) 1640 1641 for i, key := range keysRaw { 1642 keysEncoded[i] = base64.StdEncoding.EncodeToString(key) 1643 } 1644 1645 encodedKeys, err := json.MarshalIndent(keysEncoded, "", " ") 1646 if err != nil { 1647 return fmt.Errorf("Failed to encode keys: %s", err) 1648 } 1649 1650 // Use 0600 for permissions because key data is sensitive 1651 if err = ioutil.WriteFile(s.config.KeyringFile, encodedKeys, 0600); err != nil { 1652 return fmt.Errorf("Failed to write keyring file: %s", err) 1653 } 1654 1655 // Success! 1656 return nil 1657} 1658 1659// GetCoordinate returns the network coordinate of the local node. 1660func (s *Serf) GetCoordinate() (*coordinate.Coordinate, error) { 1661 if !s.config.DisableCoordinates { 1662 return s.coordClient.GetCoordinate(), nil 1663 } 1664 1665 return nil, fmt.Errorf("Coordinates are disabled") 1666} 1667 1668// GetCachedCoordinate returns the network coordinate for the node with the given 1669// name. This will only be valid if DisableCoordinates is set to false. 1670func (s *Serf) GetCachedCoordinate(name string) (coord *coordinate.Coordinate, ok bool) { 1671 if !s.config.DisableCoordinates { 1672 s.coordCacheLock.RLock() 1673 defer s.coordCacheLock.RUnlock() 1674 if coord, ok = s.coordCache[name]; ok { 1675 return coord, true 1676 } 1677 1678 return nil, false 1679 } 1680 1681 return nil, false 1682} 1683