1package serf
2
3import (
4	"bytes"
5	"encoding/base64"
6	"encoding/json"
7	"errors"
8	"fmt"
9	"io/ioutil"
10	"log"
11	"math/rand"
12	"net"
13	"strconv"
14	"sync"
15	"time"
16
17	"github.com/armon/go-metrics"
18	"github.com/hashicorp/go-msgpack/codec"
19	"github.com/hashicorp/memberlist"
20	"github.com/hashicorp/serf/coordinate"
21)
22
23// These are the protocol versions that Serf can _understand_. These are
24// Serf-level protocol versions that are passed down as the delegate
25// version to memberlist below.
26const (
27	ProtocolVersionMin uint8 = 2
28	ProtocolVersionMax       = 4
29)
30
31const (
32	// Used to detect if the meta data is tags
33	// or if it is a raw role
34	tagMagicByte uint8 = 255
35)
36
37var (
38	// FeatureNotSupported is returned if a feature cannot be used
39	// due to an older protocol version being used.
40	FeatureNotSupported = fmt.Errorf("Feature not supported")
41)
42
43func init() {
44	// Seed the random number generator
45	rand.Seed(time.Now().UnixNano())
46}
47
48// Serf is a single node that is part of a single cluster that gets
49// events about joins/leaves/failures/etc. It is created with the Create
50// method.
51//
52// All functions on the Serf structure are safe to call concurrently.
53type Serf struct {
54	// The clocks for different purposes. These MUST be the first things
55	// in this struct due to Golang issue #599.
56	clock      LamportClock
57	eventClock LamportClock
58	queryClock LamportClock
59
60	broadcasts    *memberlist.TransmitLimitedQueue
61	config        *Config
62	failedMembers []*memberState
63	leftMembers   []*memberState
64	memberlist    *memberlist.Memberlist
65	memberLock    sync.RWMutex
66	members       map[string]*memberState
67
68	// Circular buffers for recent intents, used
69	// in case we get the intent before the relevant event
70	recentLeave      []nodeIntent
71	recentLeaveIndex int
72	recentJoin       []nodeIntent
73	recentJoinIndex  int
74
75	eventBroadcasts *memberlist.TransmitLimitedQueue
76	eventBuffer     []*userEvents
77	eventJoinIgnore bool
78	eventMinTime    LamportTime
79	eventLock       sync.RWMutex
80
81	queryBroadcasts *memberlist.TransmitLimitedQueue
82	queryBuffer     []*queries
83	queryMinTime    LamportTime
84	queryResponse   map[LamportTime]*QueryResponse
85	queryLock       sync.RWMutex
86
87	logger     *log.Logger
88	joinLock   sync.Mutex
89	stateLock  sync.Mutex
90	state      SerfState
91	shutdownCh chan struct{}
92
93	snapshotter *Snapshotter
94	keyManager  *KeyManager
95
96	coordClient    *coordinate.Client
97	coordCache     map[string]*coordinate.Coordinate
98	coordCacheLock sync.RWMutex
99}
100
101// SerfState is the state of the Serf instance.
102type SerfState int
103
104const (
105	SerfAlive SerfState = iota
106	SerfLeaving
107	SerfLeft
108	SerfShutdown
109)
110
111func (s SerfState) String() string {
112	switch s {
113	case SerfAlive:
114		return "alive"
115	case SerfLeaving:
116		return "leaving"
117	case SerfLeft:
118		return "left"
119	case SerfShutdown:
120		return "shutdown"
121	default:
122		return "unknown"
123	}
124}
125
126// Member is a single member of the Serf cluster.
127type Member struct {
128	Name   string
129	Addr   net.IP
130	Port   uint16
131	Tags   map[string]string
132	Status MemberStatus
133
134	// The minimum, maximum, and current values of the protocol versions
135	// and delegate (Serf) protocol versions that each member can understand
136	// or is speaking.
137	ProtocolMin uint8
138	ProtocolMax uint8
139	ProtocolCur uint8
140	DelegateMin uint8
141	DelegateMax uint8
142	DelegateCur uint8
143}
144
145// MemberStatus is the state that a member is in.
146type MemberStatus int
147
148const (
149	StatusNone MemberStatus = iota
150	StatusAlive
151	StatusLeaving
152	StatusLeft
153	StatusFailed
154)
155
156func (s MemberStatus) String() string {
157	switch s {
158	case StatusNone:
159		return "none"
160	case StatusAlive:
161		return "alive"
162	case StatusLeaving:
163		return "leaving"
164	case StatusLeft:
165		return "left"
166	case StatusFailed:
167		return "failed"
168	default:
169		panic(fmt.Sprintf("unknown MemberStatus: %d", s))
170	}
171}
172
173// memberState is used to track members that are no longer active due to
174// leaving, failing, partitioning, etc. It tracks the member along with
175// when that member was marked as leaving.
176type memberState struct {
177	Member
178	statusLTime LamportTime // lamport clock time of last received message
179	leaveTime   time.Time   // wall clock time of leave
180}
181
182// nodeIntent is used to buffer intents for out-of-order deliveries
183type nodeIntent struct {
184	LTime LamportTime
185	Node  string
186}
187
188// userEvent is used to buffer events to prevent re-delivery
189type userEvent struct {
190	Name    string
191	Payload []byte
192}
193
194func (ue *userEvent) Equals(other *userEvent) bool {
195	if ue.Name != other.Name {
196		return false
197	}
198	if bytes.Compare(ue.Payload, other.Payload) != 0 {
199		return false
200	}
201	return true
202}
203
204// userEvents stores all the user events at a specific time
205type userEvents struct {
206	LTime  LamportTime
207	Events []userEvent
208}
209
210// queries stores all the query ids at a specific time
211type queries struct {
212	LTime    LamportTime
213	QueryIDs []uint32
214}
215
216const (
217	UserEventSizeLimit     = 512        // Maximum byte size for event name and payload
218	snapshotSizeLimit      = 128 * 1024 // Maximum 128 KB snapshot
219)
220
221// Create creates a new Serf instance, starting all the background tasks
222// to maintain cluster membership information.
223//
224// After calling this function, the configuration should no longer be used
225// or modified by the caller.
226func Create(conf *Config) (*Serf, error) {
227	conf.Init()
228	if conf.ProtocolVersion < ProtocolVersionMin {
229		return nil, fmt.Errorf("Protocol version '%d' too low. Must be in range: [%d, %d]",
230			conf.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax)
231	} else if conf.ProtocolVersion > ProtocolVersionMax {
232		return nil, fmt.Errorf("Protocol version '%d' too high. Must be in range: [%d, %d]",
233			conf.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax)
234	}
235
236	serf := &Serf{
237		config:        conf,
238		logger:        log.New(conf.LogOutput, "", log.LstdFlags),
239		members:       make(map[string]*memberState),
240		queryResponse: make(map[LamportTime]*QueryResponse),
241		shutdownCh:    make(chan struct{}),
242		state:         SerfAlive,
243	}
244
245	// Check that the meta data length is okay
246	if len(serf.encodeTags(conf.Tags)) > memberlist.MetaMaxSize {
247		return nil, fmt.Errorf("Encoded length of tags exceeds limit of %d bytes", memberlist.MetaMaxSize)
248	}
249
250	// Check if serf member event coalescing is enabled
251	if conf.CoalescePeriod > 0 && conf.QuiescentPeriod > 0 && conf.EventCh != nil {
252		c := &memberEventCoalescer{
253			lastEvents:   make(map[string]EventType),
254			latestEvents: make(map[string]coalesceEvent),
255		}
256
257		conf.EventCh = coalescedEventCh(conf.EventCh, serf.shutdownCh,
258			conf.CoalescePeriod, conf.QuiescentPeriod, c)
259	}
260
261	// Check if user event coalescing is enabled
262	if conf.UserCoalescePeriod > 0 && conf.UserQuiescentPeriod > 0 && conf.EventCh != nil {
263		c := &userEventCoalescer{
264			events: make(map[string]*latestUserEvents),
265		}
266
267		conf.EventCh = coalescedEventCh(conf.EventCh, serf.shutdownCh,
268			conf.UserCoalescePeriod, conf.UserQuiescentPeriod, c)
269	}
270
271	// Listen for internal Serf queries. This is setup before the snapshotter, since
272	// we want to capture the query-time, but the internal listener does not passthrough
273	// the queries
274	outCh, err := newSerfQueries(serf, serf.logger, conf.EventCh, serf.shutdownCh)
275	if err != nil {
276		return nil, fmt.Errorf("Failed to setup serf query handler: %v", err)
277	}
278	conf.EventCh = outCh
279
280	// Set up network coordinate client.
281	if !conf.DisableCoordinates {
282		serf.coordClient, err = coordinate.NewClient(coordinate.DefaultConfig())
283		if err != nil {
284			return nil, fmt.Errorf("Failed to create coordinate client: %v", err)
285		}
286	}
287
288	// Try access the snapshot
289	var oldClock, oldEventClock, oldQueryClock LamportTime
290	var prev []*PreviousNode
291	if conf.SnapshotPath != "" {
292		eventCh, snap, err := NewSnapshotter(
293			conf.SnapshotPath,
294			snapshotSizeLimit,
295			conf.RejoinAfterLeave,
296			serf.logger,
297			&serf.clock,
298			serf.coordClient,
299			conf.EventCh,
300			serf.shutdownCh)
301		if err != nil {
302			return nil, fmt.Errorf("Failed to setup snapshot: %v", err)
303		}
304		serf.snapshotter = snap
305		conf.EventCh = eventCh
306		prev = snap.AliveNodes()
307		oldClock = snap.LastClock()
308		oldEventClock = snap.LastEventClock()
309		oldQueryClock = snap.LastQueryClock()
310		serf.eventMinTime = oldEventClock + 1
311		serf.queryMinTime = oldQueryClock + 1
312	}
313
314	// Set up the coordinate cache. We do this after we read the snapshot to
315	// make sure we get a good initial value from there, if we got one.
316	if !conf.DisableCoordinates {
317		serf.coordCache = make(map[string]*coordinate.Coordinate)
318		serf.coordCache[conf.NodeName] = serf.coordClient.GetCoordinate()
319	}
320
321	// Setup the various broadcast queues, which we use to send our own
322	// custom broadcasts along the gossip channel.
323	serf.broadcasts = &memberlist.TransmitLimitedQueue{
324		NumNodes: func() int {
325			return len(serf.members)
326		},
327		RetransmitMult: conf.MemberlistConfig.RetransmitMult,
328	}
329	serf.eventBroadcasts = &memberlist.TransmitLimitedQueue{
330		NumNodes: func() int {
331			return len(serf.members)
332		},
333		RetransmitMult: conf.MemberlistConfig.RetransmitMult,
334	}
335	serf.queryBroadcasts = &memberlist.TransmitLimitedQueue{
336		NumNodes: func() int {
337			return len(serf.members)
338		},
339		RetransmitMult: conf.MemberlistConfig.RetransmitMult,
340	}
341
342	// Create the buffer for recent intents
343	serf.recentJoin = make([]nodeIntent, conf.RecentIntentBuffer)
344	serf.recentLeave = make([]nodeIntent, conf.RecentIntentBuffer)
345
346	// Create a buffer for events and queries
347	serf.eventBuffer = make([]*userEvents, conf.EventBuffer)
348	serf.queryBuffer = make([]*queries, conf.QueryBuffer)
349
350	// Ensure our lamport clock is at least 1, so that the default
351	// join LTime of 0 does not cause issues
352	serf.clock.Increment()
353	serf.eventClock.Increment()
354	serf.queryClock.Increment()
355
356	// Restore the clock from snap if we have one
357	serf.clock.Witness(oldClock)
358	serf.eventClock.Witness(oldEventClock)
359	serf.queryClock.Witness(oldQueryClock)
360
361	// Modify the memberlist configuration with keys that we set
362	conf.MemberlistConfig.Events = &eventDelegate{serf: serf}
363	conf.MemberlistConfig.Conflict = &conflictDelegate{serf: serf}
364	conf.MemberlistConfig.Delegate = &delegate{serf: serf}
365	conf.MemberlistConfig.DelegateProtocolVersion = conf.ProtocolVersion
366	conf.MemberlistConfig.DelegateProtocolMin = ProtocolVersionMin
367	conf.MemberlistConfig.DelegateProtocolMax = ProtocolVersionMax
368	conf.MemberlistConfig.Name = conf.NodeName
369	conf.MemberlistConfig.ProtocolVersion = ProtocolVersionMap[conf.ProtocolVersion]
370	if !conf.DisableCoordinates {
371		conf.MemberlistConfig.Ping = &pingDelegate{serf: serf}
372	}
373
374	// Setup a merge delegate if necessary
375	if conf.Merge != nil {
376		md := &mergeDelegate{serf: serf}
377		conf.MemberlistConfig.Merge = md
378		conf.MemberlistConfig.Alive = md
379	}
380
381	// Create the underlying memberlist that will manage membership
382	// and failure detection for the Serf instance.
383	memberlist, err := memberlist.Create(conf.MemberlistConfig)
384	if err != nil {
385		return nil, fmt.Errorf("Failed to create memberlist: %v", err)
386	}
387
388	serf.memberlist = memberlist
389
390	// Create a key manager for handling all encryption key changes
391	serf.keyManager = &KeyManager{serf: serf}
392
393	// Start the background tasks. See the documentation above each method
394	// for more information on their role.
395	go serf.handleReap()
396	go serf.handleReconnect()
397	go serf.checkQueueDepth("Intent", serf.broadcasts)
398	go serf.checkQueueDepth("Event", serf.eventBroadcasts)
399	go serf.checkQueueDepth("Query", serf.queryBroadcasts)
400
401	// Attempt to re-join the cluster if we have known nodes
402	if len(prev) != 0 {
403		go serf.handleRejoin(prev)
404	}
405
406	return serf, nil
407}
408
409// ProtocolVersion returns the current protocol version in use by Serf.
410// This is the Serf protocol version, not the memberlist protocol version.
411func (s *Serf) ProtocolVersion() uint8 {
412	return s.config.ProtocolVersion
413}
414
415// EncryptionEnabled is a predicate that determines whether or not encryption
416// is enabled, which can be possible in one of 2 cases:
417//   - Single encryption key passed at agent start (no persistence)
418//   - Keyring file provided at agent start
419func (s *Serf) EncryptionEnabled() bool {
420	return s.config.MemberlistConfig.Keyring != nil
421}
422
423// KeyManager returns the key manager for the current Serf instance.
424func (s *Serf) KeyManager() *KeyManager {
425	return s.keyManager
426}
427
428// UserEvent is used to broadcast a custom user event with a given
429// name and payload. The events must be fairly small, and if the
430// size limit is exceeded and error will be returned. If coalesce is enabled,
431// nodes are allowed to coalesce this event. Coalescing is only available
432// starting in v0.2
433func (s *Serf) UserEvent(name string, payload []byte, coalesce bool) error {
434	// Check the size limit
435	if len(name)+len(payload) > UserEventSizeLimit {
436		return fmt.Errorf("user event exceeds limit of %d bytes", UserEventSizeLimit)
437	}
438
439	// Create a message
440	msg := messageUserEvent{
441		LTime:   s.eventClock.Time(),
442		Name:    name,
443		Payload: payload,
444		CC:      coalesce,
445	}
446	s.eventClock.Increment()
447
448	// Process update locally
449	s.handleUserEvent(&msg)
450
451	// Start broadcasting the event
452	raw, err := encodeMessage(messageUserEventType, &msg)
453	if err != nil {
454		return err
455	}
456	s.eventBroadcasts.QueueBroadcast(&broadcast{
457		msg: raw,
458	})
459	return nil
460}
461
462// Query is used to broadcast a new query. The query must be fairly small,
463// and an error will be returned if the size limit is exceeded. This is only
464// available with protocol version 4 and newer. Query parameters are optional,
465// and if not provided, a sane set of defaults will be used.
466func (s *Serf) Query(name string, payload []byte, params *QueryParam) (*QueryResponse, error) {
467	// Check that the latest protocol is in use
468	if s.ProtocolVersion() < 4 {
469		return nil, FeatureNotSupported
470	}
471
472	// Provide default parameters if none given
473	if params == nil {
474		params = s.DefaultQueryParams()
475	} else if params.Timeout == 0 {
476		params.Timeout = s.DefaultQueryTimeout()
477	}
478
479	// Get the local node
480	local := s.memberlist.LocalNode()
481
482	// Encode the filters
483	filters, err := params.encodeFilters()
484	if err != nil {
485		return nil, fmt.Errorf("Failed to format filters: %v", err)
486	}
487
488	// Setup the flags
489	var flags uint32
490	if params.RequestAck {
491		flags |= queryFlagAck
492	}
493
494	// Create a message
495	q := messageQuery{
496		LTime:   s.queryClock.Time(),
497		ID:      uint32(rand.Int31()),
498		Addr:    local.Addr,
499		Port:    local.Port,
500		Filters: filters,
501		Flags:   flags,
502		Timeout: params.Timeout,
503		Name:    name,
504		Payload: payload,
505	}
506
507	// Encode the query
508	raw, err := encodeMessage(messageQueryType, &q)
509	if err != nil {
510		return nil, err
511	}
512
513	// Check the size
514	if len(raw) > s.config.QuerySizeLimit {
515		return nil, fmt.Errorf("query exceeds limit of %d bytes", s.config.QuerySizeLimit)
516	}
517
518	// Register QueryResponse to track acks and responses
519	resp := newQueryResponse(s.memberlist.NumMembers(), &q)
520	s.registerQueryResponse(params.Timeout, resp)
521
522	// Process query locally
523	s.handleQuery(&q)
524
525	// Start broadcasting the event
526	s.queryBroadcasts.QueueBroadcast(&broadcast{
527		msg: raw,
528	})
529	return resp, nil
530}
531
532// registerQueryResponse is used to setup the listeners for the query,
533// and to schedule closing the query after the timeout.
534func (s *Serf) registerQueryResponse(timeout time.Duration, resp *QueryResponse) {
535	s.queryLock.Lock()
536	defer s.queryLock.Unlock()
537
538	// Map the LTime to the QueryResponse. This is necessarily 1-to-1,
539	// since we increment the time for each new query.
540	s.queryResponse[resp.lTime] = resp
541
542	// Setup a timer to close the response and deregister after the timeout
543	time.AfterFunc(timeout, func() {
544		s.queryLock.Lock()
545		delete(s.queryResponse, resp.lTime)
546		resp.Close()
547		s.queryLock.Unlock()
548	})
549}
550
551// SetTags is used to dynamically update the tags associated with
552// the local node. This will propagate the change to the rest of
553// the cluster. Blocks until a the message is broadcast out.
554func (s *Serf) SetTags(tags map[string]string) error {
555	// Check that the meta data length is okay
556	if len(s.encodeTags(tags)) > memberlist.MetaMaxSize {
557		return fmt.Errorf("Encoded length of tags exceeds limit of %d bytes",
558			memberlist.MetaMaxSize)
559	}
560
561	// Update the config
562	s.config.Tags = tags
563
564	// Trigger a memberlist update
565	return s.memberlist.UpdateNode(s.config.BroadcastTimeout)
566}
567
568// Join joins an existing Serf cluster. Returns the number of nodes
569// successfully contacted. The returned error will be non-nil only in the
570// case that no nodes could be contacted. If ignoreOld is true, then any
571// user messages sent prior to the join will be ignored.
572func (s *Serf) Join(existing []string, ignoreOld bool) (int, error) {
573	// Do a quick state check
574	if s.State() != SerfAlive {
575		return 0, fmt.Errorf("Serf can't Join after Leave or Shutdown")
576	}
577
578	// Hold the joinLock, this is to make eventJoinIgnore safe
579	s.joinLock.Lock()
580	defer s.joinLock.Unlock()
581
582	// Ignore any events from a potential join. This is safe since we hold
583	// the joinLock and nobody else can be doing a Join
584	if ignoreOld {
585		s.eventJoinIgnore = true
586		defer func() {
587			s.eventJoinIgnore = false
588		}()
589	}
590
591	// Have memberlist attempt to join
592	num, err := s.memberlist.Join(existing)
593
594	// If we joined any nodes, broadcast the join message
595	if num > 0 {
596		// Start broadcasting the update
597		if err := s.broadcastJoin(s.clock.Time()); err != nil {
598			return num, err
599		}
600	}
601
602	return num, err
603}
604
605// broadcastJoin broadcasts a new join intent with a
606// given clock value. It is used on either join, or if
607// we need to refute an older leave intent. Cannot be called
608// with the memberLock held.
609func (s *Serf) broadcastJoin(ltime LamportTime) error {
610	// Construct message to update our lamport clock
611	msg := messageJoin{
612		LTime: ltime,
613		Node:  s.config.NodeName,
614	}
615	s.clock.Witness(ltime)
616
617	// Process update locally
618	s.handleNodeJoinIntent(&msg)
619
620	// Start broadcasting the update
621	if err := s.broadcast(messageJoinType, &msg, nil); err != nil {
622		s.logger.Printf("[WARN] serf: Failed to broadcast join intent: %v", err)
623		return err
624	}
625	return nil
626}
627
628// Leave gracefully exits the cluster. It is safe to call this multiple
629// times.
630func (s *Serf) Leave() error {
631	// Check the current state
632	s.stateLock.Lock()
633	if s.state == SerfLeft {
634		s.stateLock.Unlock()
635		return nil
636	} else if s.state == SerfLeaving {
637		s.stateLock.Unlock()
638		return fmt.Errorf("Leave already in progress")
639	} else if s.state == SerfShutdown {
640		s.stateLock.Unlock()
641		return fmt.Errorf("Leave called after Shutdown")
642	}
643	s.state = SerfLeaving
644	s.stateLock.Unlock()
645
646	// If we have a snapshot, mark we are leaving
647	if s.snapshotter != nil {
648		s.snapshotter.Leave()
649	}
650
651	// Construct the message for the graceful leave
652	msg := messageLeave{
653		LTime: s.clock.Time(),
654		Node:  s.config.NodeName,
655	}
656	s.clock.Increment()
657
658	// Process the leave locally
659	s.handleNodeLeaveIntent(&msg)
660
661	// Only broadcast the leave message if there is at least one
662	// other node alive.
663	if s.hasAliveMembers() {
664		notifyCh := make(chan struct{})
665		if err := s.broadcast(messageLeaveType, &msg, notifyCh); err != nil {
666			return err
667		}
668
669		select {
670		case <-notifyCh:
671		case <-time.After(s.config.BroadcastTimeout):
672			return errors.New("timeout while waiting for graceful leave")
673		}
674	}
675
676	// Attempt the memberlist leave
677	err := s.memberlist.Leave(s.config.BroadcastTimeout)
678	if err != nil {
679		return err
680	}
681
682	// Transition to Left only if we not already shutdown
683	s.stateLock.Lock()
684	if s.state != SerfShutdown {
685		s.state = SerfLeft
686	}
687	s.stateLock.Unlock()
688	return nil
689}
690
691// hasAliveMembers is called to check for any alive members other than
692// ourself.
693func (s *Serf) hasAliveMembers() bool {
694	s.memberLock.RLock()
695	defer s.memberLock.RUnlock()
696
697	hasAlive := false
698	for _, m := range s.members {
699		// Skip ourself, we want to know if OTHER members are alive
700		if m.Name == s.config.NodeName {
701			continue
702		}
703
704		if m.Status == StatusAlive {
705			hasAlive = true
706			break
707		}
708	}
709	return hasAlive
710}
711
712// LocalMember returns the Member information for the local node
713func (s *Serf) LocalMember() Member {
714	s.memberLock.RLock()
715	defer s.memberLock.RUnlock()
716	return s.members[s.config.NodeName].Member
717}
718
719// Members returns a point-in-time snapshot of the members of this cluster.
720func (s *Serf) Members() []Member {
721	s.memberLock.RLock()
722	defer s.memberLock.RUnlock()
723
724	members := make([]Member, 0, len(s.members))
725	for _, m := range s.members {
726		members = append(members, m.Member)
727	}
728
729	return members
730}
731
732// RemoveFailedNode forcibly removes a failed node from the cluster
733// immediately, instead of waiting for the reaper to eventually reclaim it.
734// This also has the effect that Serf will no longer attempt to reconnect
735// to this node.
736func (s *Serf) RemoveFailedNode(node string) error {
737	// Construct the message to broadcast
738	msg := messageLeave{
739		LTime: s.clock.Time(),
740		Node:  node,
741	}
742	s.clock.Increment()
743
744	// Process our own event
745	s.handleNodeLeaveIntent(&msg)
746
747	// If we have no members, then we don't need to broadcast
748	if !s.hasAliveMembers() {
749		return nil
750	}
751
752	// Broadcast the remove
753	notifyCh := make(chan struct{})
754	if err := s.broadcast(messageLeaveType, &msg, notifyCh); err != nil {
755		return err
756	}
757
758	// Wait for the broadcast
759	select {
760	case <-notifyCh:
761	case <-time.After(s.config.BroadcastTimeout):
762		return fmt.Errorf("timed out broadcasting node removal")
763	}
764
765	return nil
766}
767
768// Shutdown forcefully shuts down the Serf instance, stopping all network
769// activity and background maintenance associated with the instance.
770//
771// This is not a graceful shutdown, and should be preceded by a call
772// to Leave. Otherwise, other nodes in the cluster will detect this node's
773// exit as a node failure.
774//
775// It is safe to call this method multiple times.
776func (s *Serf) Shutdown() error {
777	s.stateLock.Lock()
778	defer s.stateLock.Unlock()
779
780	if s.state == SerfShutdown {
781		return nil
782	}
783
784	if s.state != SerfLeft {
785		s.logger.Printf("[WARN] serf: Shutdown without a Leave")
786	}
787
788	s.state = SerfShutdown
789	close(s.shutdownCh)
790
791	err := s.memberlist.Shutdown()
792	if err != nil {
793		return err
794	}
795
796	// Wait for the snapshoter to finish if we have one
797	if s.snapshotter != nil {
798		s.snapshotter.Wait()
799	}
800
801	return nil
802}
803
804// ShutdownCh returns a channel that can be used to wait for
805// Serf to shutdown.
806func (s *Serf) ShutdownCh() <-chan struct{} {
807	return s.shutdownCh
808}
809
810// Memberlist is used to get access to the underlying Memberlist instance
811func (s *Serf) Memberlist() *memberlist.Memberlist {
812	return s.memberlist
813}
814
815// State is the current state of this Serf instance.
816func (s *Serf) State() SerfState {
817	s.stateLock.Lock()
818	defer s.stateLock.Unlock()
819	return s.state
820}
821
822// broadcast takes a Serf message type, encodes it for the wire, and queues
823// the broadcast. If a notify channel is given, this channel will be closed
824// when the broadcast is sent.
825func (s *Serf) broadcast(t messageType, msg interface{}, notify chan<- struct{}) error {
826	raw, err := encodeMessage(t, msg)
827	if err != nil {
828		return err
829	}
830
831	s.broadcasts.QueueBroadcast(&broadcast{
832		msg:    raw,
833		notify: notify,
834	})
835	return nil
836}
837
838// handleNodeJoin is called when a node join event is received
839// from memberlist.
840func (s *Serf) handleNodeJoin(n *memberlist.Node) {
841	s.memberLock.Lock()
842	defer s.memberLock.Unlock()
843
844	var oldStatus MemberStatus
845	member, ok := s.members[n.Name]
846	if !ok {
847		oldStatus = StatusNone
848		member = &memberState{
849			Member: Member{
850				Name:   n.Name,
851				Addr:   net.IP(n.Addr),
852				Port:   n.Port,
853				Tags:   s.decodeTags(n.Meta),
854				Status: StatusAlive,
855			},
856		}
857
858		// Check if we have a join intent and use the LTime
859		if join := recentIntent(s.recentJoin, n.Name); join != nil {
860			member.statusLTime = join.LTime
861		}
862
863		// Check if we have a leave intent
864		if leave := recentIntent(s.recentLeave, n.Name); leave != nil {
865			if leave.LTime > member.statusLTime {
866				member.Status = StatusLeaving
867				member.statusLTime = leave.LTime
868			}
869		}
870
871		s.members[n.Name] = member
872	} else {
873		oldStatus = member.Status
874		member.Status = StatusAlive
875		member.leaveTime = time.Time{}
876		member.Addr = net.IP(n.Addr)
877		member.Port = n.Port
878		member.Tags = s.decodeTags(n.Meta)
879	}
880
881	// Update the protocol versions every time we get an event
882	member.ProtocolMin = n.PMin
883	member.ProtocolMax = n.PMax
884	member.ProtocolCur = n.PCur
885	member.DelegateMin = n.DMin
886	member.DelegateMax = n.DMax
887	member.DelegateCur = n.DCur
888
889	// If node was previously in a failed state, then clean up some
890	// internal accounting.
891	// TODO(mitchellh): needs tests to verify not reaped
892	if oldStatus == StatusFailed || oldStatus == StatusLeft {
893		s.failedMembers = removeOldMember(s.failedMembers, member.Name)
894		s.leftMembers = removeOldMember(s.leftMembers, member.Name)
895	}
896
897	// Update some metrics
898	metrics.IncrCounter([]string{"serf", "member", "join"}, 1)
899
900	// Send an event along
901	s.logger.Printf("[INFO] serf: EventMemberJoin: %s %s",
902		member.Member.Name, member.Member.Addr)
903	if s.config.EventCh != nil {
904		s.config.EventCh <- MemberEvent{
905			Type:    EventMemberJoin,
906			Members: []Member{member.Member},
907		}
908	}
909}
910
911// handleNodeLeave is called when a node leave event is received
912// from memberlist.
913func (s *Serf) handleNodeLeave(n *memberlist.Node) {
914	s.memberLock.Lock()
915	defer s.memberLock.Unlock()
916
917	member, ok := s.members[n.Name]
918	if !ok {
919		// We've never even heard of this node that is supposedly
920		// leaving. Just ignore it completely.
921		return
922	}
923
924	switch member.Status {
925	case StatusLeaving:
926		member.Status = StatusLeft
927		member.leaveTime = time.Now()
928		s.leftMembers = append(s.leftMembers, member)
929	case StatusAlive:
930		member.Status = StatusFailed
931		member.leaveTime = time.Now()
932		s.failedMembers = append(s.failedMembers, member)
933	default:
934		// Unknown state that it was in? Just don't do anything
935		s.logger.Printf("[WARN] serf: Bad state when leave: %d", member.Status)
936		return
937	}
938
939	// Send an event along
940	event := EventMemberLeave
941	eventStr := "EventMemberLeave"
942	if member.Status != StatusLeft {
943		event = EventMemberFailed
944		eventStr = "EventMemberFailed"
945	}
946
947	// Update some metrics
948	metrics.IncrCounter([]string{"serf", "member", member.Status.String()}, 1)
949
950	s.logger.Printf("[INFO] serf: %s: %s %s",
951		eventStr, member.Member.Name, member.Member.Addr)
952	if s.config.EventCh != nil {
953		s.config.EventCh <- MemberEvent{
954			Type:    event,
955			Members: []Member{member.Member},
956		}
957	}
958}
959
960// handleNodeUpdate is called when a node meta data update
961// has taken place
962func (s *Serf) handleNodeUpdate(n *memberlist.Node) {
963	s.memberLock.Lock()
964	defer s.memberLock.Unlock()
965
966	member, ok := s.members[n.Name]
967	if !ok {
968		// We've never even heard of this node that is updating.
969		// Just ignore it completely.
970		return
971	}
972
973	// Update the member attributes
974	member.Addr = net.IP(n.Addr)
975	member.Port = n.Port
976	member.Tags = s.decodeTags(n.Meta)
977
978	// Snag the latest versions. NOTE - the current memberlist code will NOT
979	// fire an update event if the metadata (for Serf, tags) stays the same
980	// and only the protocol versions change. If we wake any Serf-level
981	// protocol changes where we want to get this event under those
982	// circumstances, we will need to update memberlist to do a check of
983	// versions as well as the metadata.
984	member.ProtocolMin = n.PMin
985	member.ProtocolMax = n.PMax
986	member.ProtocolCur = n.PCur
987	member.DelegateMin = n.DMin
988	member.DelegateMax = n.DMax
989	member.DelegateCur = n.DCur
990
991	// Update some metrics
992	metrics.IncrCounter([]string{"serf", "member", "update"}, 1)
993
994	// Send an event along
995	s.logger.Printf("[INFO] serf: EventMemberUpdate: %s", member.Member.Name)
996	if s.config.EventCh != nil {
997		s.config.EventCh <- MemberEvent{
998			Type:    EventMemberUpdate,
999			Members: []Member{member.Member},
1000		}
1001	}
1002}
1003
1004// handleNodeLeaveIntent is called when an intent to leave is received.
1005func (s *Serf) handleNodeLeaveIntent(leaveMsg *messageLeave) bool {
1006	// Witness a potentially newer time
1007	s.clock.Witness(leaveMsg.LTime)
1008
1009	s.memberLock.Lock()
1010	defer s.memberLock.Unlock()
1011
1012	member, ok := s.members[leaveMsg.Node]
1013	if !ok {
1014		// If we've already seen this message don't rebroadcast
1015		if recentIntent(s.recentLeave, leaveMsg.Node) != nil {
1016			return false
1017		}
1018
1019		// We don't know this member so store it in a buffer for now
1020		s.recentLeave[s.recentLeaveIndex] = nodeIntent{
1021			LTime: leaveMsg.LTime,
1022			Node:  leaveMsg.Node,
1023		}
1024		s.recentLeaveIndex = (s.recentLeaveIndex + 1) % len(s.recentLeave)
1025		return true
1026	}
1027
1028	// If the message is old, then it is irrelevant and we can skip it
1029	if leaveMsg.LTime <= member.statusLTime {
1030		return false
1031	}
1032
1033	// Refute us leaving if we are in the alive state
1034	// Must be done in another goroutine since we have the memberLock
1035	if leaveMsg.Node == s.config.NodeName && s.state == SerfAlive {
1036		s.logger.Printf("[DEBUG] serf: Refuting an older leave intent")
1037		go s.broadcastJoin(s.clock.Time())
1038		return false
1039	}
1040
1041	// State transition depends on current state
1042	switch member.Status {
1043	case StatusAlive:
1044		member.Status = StatusLeaving
1045		member.statusLTime = leaveMsg.LTime
1046		return true
1047	case StatusFailed:
1048		member.Status = StatusLeft
1049		member.statusLTime = leaveMsg.LTime
1050
1051		// Remove from the failed list and add to the left list. We add
1052		// to the left list so that when we do a sync, other nodes will
1053		// remove it from their failed list.
1054		s.failedMembers = removeOldMember(s.failedMembers, member.Name)
1055		s.leftMembers = append(s.leftMembers, member)
1056
1057		// We must push a message indicating the node has now
1058		// left to allow higher-level applications to handle the
1059		// graceful leave.
1060		s.logger.Printf("[INFO] serf: EventMemberLeave (forced): %s %s",
1061			member.Member.Name, member.Member.Addr)
1062		if s.config.EventCh != nil {
1063			s.config.EventCh <- MemberEvent{
1064				Type:    EventMemberLeave,
1065				Members: []Member{member.Member},
1066			}
1067		}
1068		return true
1069	default:
1070		return false
1071	}
1072}
1073
1074// handleNodeJoinIntent is called when a node broadcasts a
1075// join message to set the lamport time of its join
1076func (s *Serf) handleNodeJoinIntent(joinMsg *messageJoin) bool {
1077	// Witness a potentially newer time
1078	s.clock.Witness(joinMsg.LTime)
1079
1080	s.memberLock.Lock()
1081	defer s.memberLock.Unlock()
1082
1083	member, ok := s.members[joinMsg.Node]
1084	if !ok {
1085		// If we've already seen this message don't rebroadcast
1086		if recentIntent(s.recentJoin, joinMsg.Node) != nil {
1087			return false
1088		}
1089
1090		// We don't know this member so store it in a buffer for now
1091		s.recentJoin[s.recentJoinIndex] = nodeIntent{LTime: joinMsg.LTime, Node: joinMsg.Node}
1092		s.recentJoinIndex = (s.recentJoinIndex + 1) % len(s.recentJoin)
1093		return true
1094	}
1095
1096	// Check if this time is newer than what we have
1097	if joinMsg.LTime <= member.statusLTime {
1098		return false
1099	}
1100
1101	// Update the LTime
1102	member.statusLTime = joinMsg.LTime
1103
1104	// If we are in the leaving state, we should go back to alive,
1105	// since the leaving message must have been for an older time
1106	if member.Status == StatusLeaving {
1107		member.Status = StatusAlive
1108	}
1109	return true
1110}
1111
1112// handleUserEvent is called when a user event broadcast is
1113// received. Returns if the message should be rebroadcast.
1114func (s *Serf) handleUserEvent(eventMsg *messageUserEvent) bool {
1115	// Witness a potentially newer time
1116	s.eventClock.Witness(eventMsg.LTime)
1117
1118	s.eventLock.Lock()
1119	defer s.eventLock.Unlock()
1120
1121	// Ignore if it is before our minimum event time
1122	if eventMsg.LTime < s.eventMinTime {
1123		return false
1124	}
1125
1126	// Check if this message is too old
1127	curTime := s.eventClock.Time()
1128	if curTime > LamportTime(len(s.eventBuffer)) &&
1129		eventMsg.LTime < curTime-LamportTime(len(s.eventBuffer)) {
1130		s.logger.Printf(
1131			"[WARN] serf: received old event %s from time %d (current: %d)",
1132			eventMsg.Name,
1133			eventMsg.LTime,
1134			s.eventClock.Time())
1135		return false
1136	}
1137
1138	// Check if we've already seen this
1139	idx := eventMsg.LTime % LamportTime(len(s.eventBuffer))
1140	seen := s.eventBuffer[idx]
1141	userEvent := userEvent{Name: eventMsg.Name, Payload: eventMsg.Payload}
1142	if seen != nil && seen.LTime == eventMsg.LTime {
1143		for _, previous := range seen.Events {
1144			if previous.Equals(&userEvent) {
1145				return false
1146			}
1147		}
1148	} else {
1149		seen = &userEvents{LTime: eventMsg.LTime}
1150		s.eventBuffer[idx] = seen
1151	}
1152
1153	// Add to recent events
1154	seen.Events = append(seen.Events, userEvent)
1155
1156	// Update some metrics
1157	metrics.IncrCounter([]string{"serf", "events"}, 1)
1158	metrics.IncrCounter([]string{"serf", "events", eventMsg.Name}, 1)
1159
1160	if s.config.EventCh != nil {
1161		s.config.EventCh <- UserEvent{
1162			LTime:    eventMsg.LTime,
1163			Name:     eventMsg.Name,
1164			Payload:  eventMsg.Payload,
1165			Coalesce: eventMsg.CC,
1166		}
1167	}
1168	return true
1169}
1170
1171// handleQuery is called when a query broadcast is
1172// received. Returns if the message should be rebroadcast.
1173func (s *Serf) handleQuery(query *messageQuery) bool {
1174	// Witness a potentially newer time
1175	s.queryClock.Witness(query.LTime)
1176
1177	s.queryLock.Lock()
1178	defer s.queryLock.Unlock()
1179
1180	// Ignore if it is before our minimum query time
1181	if query.LTime < s.queryMinTime {
1182		return false
1183	}
1184
1185	// Check if this message is too old
1186	curTime := s.queryClock.Time()
1187	if curTime > LamportTime(len(s.queryBuffer)) &&
1188		query.LTime < curTime-LamportTime(len(s.queryBuffer)) {
1189		s.logger.Printf(
1190			"[WARN] serf: received old query %s from time %d (current: %d)",
1191			query.Name,
1192			query.LTime,
1193			s.queryClock.Time())
1194		return false
1195	}
1196
1197	// Check if we've already seen this
1198	idx := query.LTime % LamportTime(len(s.queryBuffer))
1199	seen := s.queryBuffer[idx]
1200	if seen != nil && seen.LTime == query.LTime {
1201		for _, previous := range seen.QueryIDs {
1202			if previous == query.ID {
1203				// Seen this ID already
1204				return false
1205			}
1206		}
1207	} else {
1208		seen = &queries{LTime: query.LTime}
1209		s.queryBuffer[idx] = seen
1210	}
1211
1212	// Add to recent queries
1213	seen.QueryIDs = append(seen.QueryIDs, query.ID)
1214
1215	// Update some metrics
1216	metrics.IncrCounter([]string{"serf", "queries"}, 1)
1217	metrics.IncrCounter([]string{"serf", "queries", query.Name}, 1)
1218
1219	// Check if we should rebroadcast, this may be disabled by a flag
1220	rebroadcast := true
1221	if query.NoBroadcast() {
1222		rebroadcast = false
1223	}
1224
1225	// Filter the query
1226	if !s.shouldProcessQuery(query.Filters) {
1227		// Even if we don't process it further, we should rebroadcast,
1228		// since it is the first time we've seen this.
1229		return rebroadcast
1230	}
1231
1232	// Send ack if requested, without waiting for client to Respond()
1233	if query.Ack() {
1234		ack := messageQueryResponse{
1235			LTime: query.LTime,
1236			ID:    query.ID,
1237			From:  s.config.NodeName,
1238			Flags: queryFlagAck,
1239		}
1240		raw, err := encodeMessage(messageQueryResponseType, &ack)
1241		if err != nil {
1242			s.logger.Printf("[ERR] serf: failed to format ack: %v", err)
1243		} else {
1244			addr := net.UDPAddr{IP: query.Addr, Port: int(query.Port)}
1245			if err := s.memberlist.SendTo(&addr, raw); err != nil {
1246				s.logger.Printf("[ERR] serf: failed to send ack: %v", err)
1247			}
1248		}
1249	}
1250
1251	if s.config.EventCh != nil {
1252		s.config.EventCh <- &Query{
1253			LTime:    query.LTime,
1254			Name:     query.Name,
1255			Payload:  query.Payload,
1256			serf:     s,
1257			id:       query.ID,
1258			addr:     query.Addr,
1259			port:     query.Port,
1260			deadline: time.Now().Add(query.Timeout),
1261		}
1262	}
1263	return rebroadcast
1264}
1265
1266// handleResponse is called when a query response is
1267// received.
1268func (s *Serf) handleQueryResponse(resp *messageQueryResponse) {
1269	// Look for a corresponding QueryResponse
1270	s.queryLock.RLock()
1271	query, ok := s.queryResponse[resp.LTime]
1272	s.queryLock.RUnlock()
1273	if !ok {
1274		s.logger.Printf("[WARN] serf: reply for non-running query (LTime: %d, ID: %d) From: %s",
1275			resp.LTime, resp.ID, resp.From)
1276		return
1277	}
1278
1279	// Verify the ID matches
1280	if query.id != resp.ID {
1281		s.logger.Printf("[WARN] serf: query reply ID mismatch (Local: %d, Response: %d)",
1282			query.id, resp.ID)
1283		return
1284	}
1285
1286	// Check if the query is closed
1287	if query.Finished() {
1288		return
1289	}
1290
1291	// Process each type of response
1292	if resp.Ack() {
1293		metrics.IncrCounter([]string{"serf", "query_acks"}, 1)
1294		select {
1295		case query.ackCh <- resp.From:
1296		default:
1297			s.logger.Printf("[WARN] serf: Failed to delivery query ack, dropping")
1298		}
1299	} else {
1300		metrics.IncrCounter([]string{"serf", "query_responses"}, 1)
1301		select {
1302		case query.respCh <- NodeResponse{From: resp.From, Payload: resp.Payload}:
1303		default:
1304			s.logger.Printf("[WARN] serf: Failed to delivery query response, dropping")
1305		}
1306	}
1307}
1308
1309// handleNodeConflict is invoked when a join detects a conflict over a name.
1310// This means two different nodes (IP/Port) are claiming the same name. Memberlist
1311// will reject the "new" node mapping, but we can still be notified
1312func (s *Serf) handleNodeConflict(existing, other *memberlist.Node) {
1313	// Log a basic warning if the node is not us...
1314	if existing.Name != s.config.NodeName {
1315		s.logger.Printf("[WARN] serf: Name conflict for '%s' both %s:%d and %s:%d are claiming",
1316			existing.Name, existing.Addr, existing.Port, other.Addr, other.Port)
1317		return
1318	}
1319
1320	// The current node is conflicting! This is an error
1321	s.logger.Printf("[ERR] serf: Node name conflicts with another node at %s:%d. Names must be unique! (Resolution enabled: %v)",
1322		other.Addr, other.Port, s.config.EnableNameConflictResolution)
1323
1324	// If automatic resolution is enabled, kick off the resolution
1325	if s.config.EnableNameConflictResolution {
1326		go s.resolveNodeConflict()
1327	}
1328}
1329
1330// resolveNodeConflict is used to determine which node should remain during
1331// a name conflict. This is done by running an internal query.
1332func (s *Serf) resolveNodeConflict() {
1333	// Get the local node
1334	local := s.memberlist.LocalNode()
1335
1336	// Start a name resolution query
1337	qName := internalQueryName(conflictQuery)
1338	payload := []byte(s.config.NodeName)
1339	resp, err := s.Query(qName, payload, nil)
1340	if err != nil {
1341		s.logger.Printf("[ERR] serf: Failed to start name resolution query: %v", err)
1342		return
1343	}
1344
1345	// Counter to determine winner
1346	var responses, matching int
1347
1348	// Gather responses
1349	respCh := resp.ResponseCh()
1350	for r := range respCh {
1351		// Decode the response
1352		if len(r.Payload) < 1 || messageType(r.Payload[0]) != messageConflictResponseType {
1353			s.logger.Printf("[ERR] serf: Invalid conflict query response type: %v", r.Payload)
1354			continue
1355		}
1356		var member Member
1357		if err := decodeMessage(r.Payload[1:], &member); err != nil {
1358			s.logger.Printf("[ERR] serf: Failed to decode conflict query response: %v", err)
1359			continue
1360		}
1361
1362		// Update the counters
1363		responses++
1364		if bytes.Equal(member.Addr, local.Addr) && member.Port == local.Port {
1365			matching++
1366		}
1367	}
1368
1369	// Query over, determine if we should live
1370	majority := (responses / 2) + 1
1371	if matching >= majority {
1372		s.logger.Printf("[INFO] serf: majority in name conflict resolution [%d / %d]",
1373			matching, responses)
1374		return
1375	}
1376
1377	// Since we lost the vote, we need to exit
1378	s.logger.Printf("[WARN] serf: minority in name conflict resolution, quiting [%d / %d]",
1379		matching, responses)
1380	if err := s.Shutdown(); err != nil {
1381		s.logger.Printf("[ERR] serf: Failed to shutdown: %v", err)
1382	}
1383}
1384
1385// handleReap periodically reaps the list of failed and left members.
1386func (s *Serf) handleReap() {
1387	for {
1388		select {
1389		case <-time.After(s.config.ReapInterval):
1390			s.memberLock.Lock()
1391			s.failedMembers = s.reap(s.failedMembers, s.config.ReconnectTimeout)
1392			s.leftMembers = s.reap(s.leftMembers, s.config.TombstoneTimeout)
1393			s.memberLock.Unlock()
1394		case <-s.shutdownCh:
1395			return
1396		}
1397	}
1398}
1399
1400// handleReconnect attempts to reconnect to recently failed nodes
1401// on configured intervals.
1402func (s *Serf) handleReconnect() {
1403	for {
1404		select {
1405		case <-time.After(s.config.ReconnectInterval):
1406			s.reconnect()
1407		case <-s.shutdownCh:
1408			return
1409		}
1410	}
1411}
1412
1413// reap is called with a list of old members and a timeout, and removes
1414// members that have exceeded the timeout. The members are removed from
1415// both the old list and the members itself. Locking is left to the caller.
1416func (s *Serf) reap(old []*memberState, timeout time.Duration) []*memberState {
1417	now := time.Now()
1418	n := len(old)
1419	for i := 0; i < n; i++ {
1420		m := old[i]
1421
1422		// Skip if the timeout is not yet reached
1423		if now.Sub(m.leaveTime) <= timeout {
1424			continue
1425		}
1426
1427		// Delete from the list
1428		old[i], old[n-1] = old[n-1], nil
1429		old = old[:n-1]
1430		n--
1431		i--
1432
1433		// Delete from members
1434		delete(s.members, m.Name)
1435
1436		// Tell the coordinate client the node has gone away and delete
1437		// its cached coordinates.
1438		if !s.config.DisableCoordinates {
1439			s.coordClient.ForgetNode(m.Name)
1440
1441			s.coordCacheLock.Lock()
1442			delete(s.coordCache, m.Name)
1443			s.coordCacheLock.Unlock()
1444		}
1445
1446		// Send an event along
1447		s.logger.Printf("[INFO] serf: EventMemberReap: %s", m.Name)
1448		if s.config.EventCh != nil {
1449			s.config.EventCh <- MemberEvent{
1450				Type:    EventMemberReap,
1451				Members: []Member{m.Member},
1452			}
1453		}
1454	}
1455
1456	return old
1457}
1458
1459// reconnect attempts to reconnect to recently fail nodes.
1460func (s *Serf) reconnect() {
1461	s.memberLock.RLock()
1462
1463	// Nothing to do if there are no failed members
1464	n := len(s.failedMembers)
1465	if n == 0 {
1466		s.memberLock.RUnlock()
1467		return
1468	}
1469
1470	// Probability we should attempt to reconect is given
1471	// by num failed / (num members - num failed - num left)
1472	// This means that we probabilistically expect the cluster
1473	// to attempt to connect to each failed member once per
1474	// reconnect interval
1475	numFailed := float32(len(s.failedMembers))
1476	numAlive := float32(len(s.members) - len(s.failedMembers) - len(s.leftMembers))
1477	if numAlive == 0 {
1478		numAlive = 1 // guard against zero divide
1479	}
1480	prob := numFailed / numAlive
1481	if rand.Float32() > prob {
1482		s.memberLock.RUnlock()
1483		s.logger.Printf("[DEBUG] serf: forgoing reconnect for random throttling")
1484		return
1485	}
1486
1487	// Select a random member to try and join
1488	idx := int(rand.Uint32() % uint32(n))
1489	mem := s.failedMembers[idx]
1490	s.memberLock.RUnlock()
1491
1492	// Format the addr
1493	addr := net.UDPAddr{IP: mem.Addr, Port: int(mem.Port)}
1494	s.logger.Printf("[INFO] serf: attempting reconnect to %v %s", mem.Name, addr.String())
1495
1496	// Attempt to join at the memberlist level
1497	s.memberlist.Join([]string{addr.String()})
1498}
1499
1500// checkQueueDepth periodically checks the size of a queue to see if
1501// it is too large
1502func (s *Serf) checkQueueDepth(name string, queue *memberlist.TransmitLimitedQueue) {
1503	for {
1504		select {
1505		case <-time.After(time.Second):
1506			numq := queue.NumQueued()
1507			metrics.AddSample([]string{"serf", "queue", name}, float32(numq))
1508			if numq >= s.config.QueueDepthWarning {
1509				s.logger.Printf("[WARN] serf: %s queue depth: %d", name, numq)
1510			}
1511			if numq > s.config.MaxQueueDepth {
1512				s.logger.Printf("[WARN] serf: %s queue depth (%d) exceeds limit (%d), dropping messages!",
1513					name, numq, s.config.MaxQueueDepth)
1514				queue.Prune(s.config.MaxQueueDepth)
1515			}
1516		case <-s.shutdownCh:
1517			return
1518		}
1519	}
1520}
1521
1522// removeOldMember is used to remove an old member from a list of old
1523// members.
1524func removeOldMember(old []*memberState, name string) []*memberState {
1525	for i, m := range old {
1526		if m.Name == name {
1527			n := len(old)
1528			old[i], old[n-1] = old[n-1], nil
1529			return old[:n-1]
1530		}
1531	}
1532
1533	return old
1534}
1535
1536// recentIntent checks the recent intent buffer for a matching
1537// entry for a given node, and either returns the message or nil
1538func recentIntent(recent []nodeIntent, node string) (intent *nodeIntent) {
1539	for i := 0; i < len(recent); i++ {
1540		// Break fast if we hit a zero entry
1541		if recent[i].LTime == 0 {
1542			break
1543		}
1544
1545		// Check for a node match
1546		if recent[i].Node == node {
1547			// Take the most recent entry
1548			if intent == nil || recent[i].LTime > intent.LTime {
1549				intent = &recent[i]
1550			}
1551		}
1552	}
1553	return
1554}
1555
1556// handleRejoin attempts to reconnect to previously known alive nodes
1557func (s *Serf) handleRejoin(previous []*PreviousNode) {
1558	for _, prev := range previous {
1559		// Do not attempt to join ourself
1560		if prev.Name == s.config.NodeName {
1561			continue
1562		}
1563
1564		s.logger.Printf("[INFO] serf: Attempting re-join to previously known node: %s", prev)
1565		_, err := s.memberlist.Join([]string{prev.Addr})
1566		if err == nil {
1567			s.logger.Printf("[INFO] serf: Re-joined to previously known node: %s", prev)
1568			return
1569		}
1570	}
1571	s.logger.Printf("[WARN] serf: Failed to re-join any previously known node")
1572}
1573
1574// encodeTags is used to encode a tag map
1575func (s *Serf) encodeTags(tags map[string]string) []byte {
1576	// Support role-only backwards compatibility
1577	if s.ProtocolVersion() < 3 {
1578		role := tags["role"]
1579		return []byte(role)
1580	}
1581
1582	// Use a magic byte prefix and msgpack encode the tags
1583	var buf bytes.Buffer
1584	buf.WriteByte(tagMagicByte)
1585	enc := codec.NewEncoder(&buf, &codec.MsgpackHandle{})
1586	if err := enc.Encode(tags); err != nil {
1587		panic(fmt.Sprintf("Failed to encode tags: %v", err))
1588	}
1589	return buf.Bytes()
1590}
1591
1592// decodeTags is used to decode a tag map
1593func (s *Serf) decodeTags(buf []byte) map[string]string {
1594	tags := make(map[string]string)
1595
1596	// Backwards compatibility mode
1597	if len(buf) == 0 || buf[0] != tagMagicByte {
1598		tags["role"] = string(buf)
1599		return tags
1600	}
1601
1602	// Decode the tags
1603	r := bytes.NewReader(buf[1:])
1604	dec := codec.NewDecoder(r, &codec.MsgpackHandle{})
1605	if err := dec.Decode(&tags); err != nil {
1606		s.logger.Printf("[ERR] serf: Failed to decode tags: %v", err)
1607	}
1608	return tags
1609}
1610
1611// Stats is used to provide operator debugging information
1612func (s *Serf) Stats() map[string]string {
1613	toString := func(v uint64) string {
1614		return strconv.FormatUint(v, 10)
1615	}
1616	stats := map[string]string{
1617		"members":      toString(uint64(len(s.members))),
1618		"failed":       toString(uint64(len(s.failedMembers))),
1619		"left":         toString(uint64(len(s.leftMembers))),
1620		"member_time":  toString(uint64(s.clock.Time())),
1621		"event_time":   toString(uint64(s.eventClock.Time())),
1622		"query_time":   toString(uint64(s.queryClock.Time())),
1623		"intent_queue": toString(uint64(s.broadcasts.NumQueued())),
1624		"event_queue":  toString(uint64(s.eventBroadcasts.NumQueued())),
1625		"query_queue":  toString(uint64(s.queryBroadcasts.NumQueued())),
1626		"encrypted":    fmt.Sprintf("%v", s.EncryptionEnabled()),
1627	}
1628	return stats
1629}
1630
1631// WriteKeyringFile will serialize the current keyring and save it to a file.
1632func (s *Serf) writeKeyringFile() error {
1633	if len(s.config.KeyringFile) == 0 {
1634		return nil
1635	}
1636
1637	keyring := s.config.MemberlistConfig.Keyring
1638	keysRaw := keyring.GetKeys()
1639	keysEncoded := make([]string, len(keysRaw))
1640
1641	for i, key := range keysRaw {
1642		keysEncoded[i] = base64.StdEncoding.EncodeToString(key)
1643	}
1644
1645	encodedKeys, err := json.MarshalIndent(keysEncoded, "", "  ")
1646	if err != nil {
1647		return fmt.Errorf("Failed to encode keys: %s", err)
1648	}
1649
1650	// Use 0600 for permissions because key data is sensitive
1651	if err = ioutil.WriteFile(s.config.KeyringFile, encodedKeys, 0600); err != nil {
1652		return fmt.Errorf("Failed to write keyring file: %s", err)
1653	}
1654
1655	// Success!
1656	return nil
1657}
1658
1659// GetCoordinate returns the network coordinate of the local node.
1660func (s *Serf) GetCoordinate() (*coordinate.Coordinate, error) {
1661	if !s.config.DisableCoordinates {
1662		return s.coordClient.GetCoordinate(), nil
1663	}
1664
1665	return nil, fmt.Errorf("Coordinates are disabled")
1666}
1667
1668// GetCachedCoordinate returns the network coordinate for the node with the given
1669// name. This will only be valid if DisableCoordinates is set to false.
1670func (s *Serf) GetCachedCoordinate(name string) (coord *coordinate.Coordinate, ok bool) {
1671	if !s.config.DisableCoordinates {
1672		s.coordCacheLock.RLock()
1673		defer s.coordCacheLock.RUnlock()
1674		if coord, ok = s.coordCache[name]; ok {
1675			return coord, true
1676		}
1677
1678		return nil, false
1679	}
1680
1681	return nil, false
1682}
1683