1package memberlist
2
3import (
4	"bytes"
5	"fmt"
6	"math"
7	"math/rand"
8	"net"
9	"sync/atomic"
10	"time"
11
12	"github.com/armon/go-metrics"
13)
14
15type nodeStateType int
16
17const (
18	stateAlive nodeStateType = iota
19	stateSuspect
20	stateDead
21)
22
23// Node represents a node in the cluster.
24type Node struct {
25	Name string
26	Addr net.IP
27	Port uint16
28	Meta []byte // Metadata from the delegate for this node.
29	PMin uint8  // Minimum protocol version this understands
30	PMax uint8  // Maximum protocol version this understands
31	PCur uint8  // Current version node is speaking
32	DMin uint8  // Min protocol version for the delegate to understand
33	DMax uint8  // Max protocol version for the delegate to understand
34	DCur uint8  // Current version delegate is speaking
35}
36
37// NodeState is used to manage our state view of another node
38type nodeState struct {
39	Node
40	Incarnation uint32        // Last known incarnation number
41	State       nodeStateType // Current state
42	StateChange time.Time     // Time last state change happened
43}
44
45// ackHandler is used to register handlers for incoming acks
46type ackHandler struct {
47	handler func([]byte, time.Time)
48	timer   *time.Timer
49}
50
51// NoPingResponseError is used to indicate a 'ping' packet was
52// successfully issued but no response was received
53type NoPingResponseError struct {
54	node string
55}
56
57func (f NoPingResponseError) Error() string {
58	return fmt.Sprintf("No response from node %s", f.node)
59}
60
61// Schedule is used to ensure the Tick is performed periodically. This
62// function is safe to call multiple times. If the memberlist is already
63// scheduled, then it won't do anything.
64func (m *Memberlist) schedule() {
65	m.tickerLock.Lock()
66	defer m.tickerLock.Unlock()
67
68	// If we already have tickers, then don't do anything, since we're
69	// scheduled
70	if len(m.tickers) > 0 {
71		return
72	}
73
74	// Create the stop tick channel, a blocking channel. We close this
75	// when we should stop the tickers.
76	stopCh := make(chan struct{})
77
78	// Create a new probeTicker
79	if m.config.ProbeInterval > 0 {
80		t := time.NewTicker(m.config.ProbeInterval)
81		go m.triggerFunc(m.config.ProbeInterval, t.C, stopCh, m.probe)
82		m.tickers = append(m.tickers, t)
83	}
84
85	// Create a push pull ticker if needed
86	if m.config.PushPullInterval > 0 {
87		go m.pushPullTrigger(stopCh)
88	}
89
90	// Create a gossip ticker if needed
91	if m.config.GossipInterval > 0 && m.config.GossipNodes > 0 {
92		t := time.NewTicker(m.config.GossipInterval)
93		go m.triggerFunc(m.config.GossipInterval, t.C, stopCh, m.gossip)
94		m.tickers = append(m.tickers, t)
95	}
96
97	// If we made any tickers, then record the stopTick channel for
98	// later.
99	if len(m.tickers) > 0 {
100		m.stopTick = stopCh
101	}
102}
103
104// triggerFunc is used to trigger a function call each time a
105// message is received until a stop tick arrives.
106func (m *Memberlist) triggerFunc(stagger time.Duration, C <-chan time.Time, stop <-chan struct{}, f func()) {
107	// Use a random stagger to avoid syncronizing
108	randStagger := time.Duration(uint64(rand.Int63()) % uint64(stagger))
109	select {
110	case <-time.After(randStagger):
111	case <-stop:
112		return
113	}
114	for {
115		select {
116		case <-C:
117			f()
118		case <-stop:
119			return
120		}
121	}
122}
123
124// pushPullTrigger is used to periodically trigger a push/pull until
125// a stop tick arrives. We don't use triggerFunc since the push/pull
126// timer is dynamically scaled based on cluster size to avoid network
127// saturation
128func (m *Memberlist) pushPullTrigger(stop <-chan struct{}) {
129	interval := m.config.PushPullInterval
130
131	// Use a random stagger to avoid syncronizing
132	randStagger := time.Duration(uint64(rand.Int63()) % uint64(interval))
133	select {
134	case <-time.After(randStagger):
135	case <-stop:
136		return
137	}
138
139	// Tick using a dynamic timer
140	for {
141		tickTime := pushPullScale(interval, m.estNumNodes())
142		select {
143		case <-time.After(tickTime):
144			m.pushPull()
145		case <-stop:
146			return
147		}
148	}
149}
150
151// Deschedule is used to stop the background maintenence. This is safe
152// to call multiple times.
153func (m *Memberlist) deschedule() {
154	m.tickerLock.Lock()
155	defer m.tickerLock.Unlock()
156
157	// If we have no tickers, then we aren't scheduled.
158	if len(m.tickers) == 0 {
159		return
160	}
161
162	// Close the stop channel so all the ticker listeners stop.
163	close(m.stopTick)
164
165	// Explicitly stop all the tickers themselves so they don't take
166	// up any more resources, and get rid of the list.
167	for _, t := range m.tickers {
168		t.Stop()
169	}
170	m.tickers = nil
171}
172
173// Tick is used to perform a single round of failure detection and gossip
174func (m *Memberlist) probe() {
175	// Track the number of indexes we've considered probing
176	numCheck := 0
177START:
178	m.nodeLock.RLock()
179
180	// Make sure we don't wrap around infinitely
181	if numCheck >= len(m.nodes) {
182		m.nodeLock.RUnlock()
183		return
184	}
185
186	// Handle the wrap around case
187	if m.probeIndex >= len(m.nodes) {
188		m.nodeLock.RUnlock()
189		m.resetNodes()
190		m.probeIndex = 0
191		numCheck++
192		goto START
193	}
194
195	// Determine if we should probe this node
196	skip := false
197	var node nodeState
198
199	node = *m.nodes[m.probeIndex]
200	if node.Name == m.config.Name {
201		skip = true
202	} else if node.State == stateDead {
203		skip = true
204	}
205
206	// Potentially skip
207	m.nodeLock.RUnlock()
208	m.probeIndex++
209	if skip {
210		numCheck++
211		goto START
212	}
213
214	// Probe the specific node
215	m.probeNode(&node)
216}
217
218// probeNode handles a single round of failure checking on a node.
219func (m *Memberlist) probeNode(node *nodeState) {
220	defer metrics.MeasureSince([]string{"memberlist", "probeNode"}, time.Now())
221
222	// Prepare a ping message and setup an ack handler.
223	ping := ping{SeqNo: m.nextSeqNo(), Node: node.Name}
224	ackCh := make(chan ackMessage, m.config.IndirectChecks+1)
225	m.setAckChannel(ping.SeqNo, ackCh, m.config.ProbeInterval)
226
227	// Send a ping to the node.
228	deadline := time.Now().Add(m.config.ProbeInterval)
229	destAddr := &net.UDPAddr{IP: node.Addr, Port: int(node.Port)}
230	if err := m.encodeAndSendMsg(destAddr, pingMsg, &ping); err != nil {
231		m.logger.Printf("[ERR] memberlist: Failed to send ping: %s", err)
232		return
233	}
234
235	// Mark the sent time here, which should be after any pre-processing and
236	// system calls to do the actual send. This probably under-reports a bit,
237	// but it's the best we can do.
238	sent := time.Now()
239
240	// Wait for response or round-trip-time.
241	select {
242	case v := <-ackCh:
243		if v.Complete == true {
244			if m.config.Ping != nil {
245				rtt := v.Timestamp.Sub(sent)
246				m.config.Ping.NotifyPingComplete(&node.Node, rtt, v.Payload)
247			}
248			return
249		}
250
251		// As an edge case, if we get a timeout, we need to re-enqueue it
252		// here to break out of the select below.
253		if v.Complete == false {
254			ackCh <- v
255		}
256	case <-time.After(m.config.ProbeTimeout):
257		m.logger.Printf("[DEBUG] memberlist: Failed UDP ping: %v (timeout reached)", node.Name)
258	}
259
260	// Get some random live nodes.
261	m.nodeLock.RLock()
262	excludes := []string{m.config.Name, node.Name}
263	kNodes := kRandomNodes(m.config.IndirectChecks, excludes, m.nodes)
264	m.nodeLock.RUnlock()
265
266	// Attempt an indirect ping.
267	ind := indirectPingReq{SeqNo: ping.SeqNo, Target: node.Addr, Port: node.Port, Node: node.Name}
268	for _, peer := range kNodes {
269		destAddr := &net.UDPAddr{IP: peer.Addr, Port: int(peer.Port)}
270		if err := m.encodeAndSendMsg(destAddr, indirectPingMsg, &ind); err != nil {
271			m.logger.Printf("[ERR] memberlist: Failed to send indirect ping: %s", err)
272		}
273	}
274
275	// Also make an attempt to contact the node directly over TCP. This
276	// helps prevent confused clients who get isolated from UDP traffic
277	// but can still speak TCP (which also means they can possibly report
278	// misinformation to other nodes via anti-entropy), avoiding flapping in
279	// the cluster.
280	//
281	// This is a little unusual because we will attempt a TCP ping to any
282	// member who understands version 3 of the protocol, regardless of
283	// which protocol version we are speaking. That's why we've included a
284	// config option to turn this off if desired.
285	fallbackCh := make(chan bool, 1)
286	if (!m.config.DisableTcpPings) && (node.PMax >= 3) {
287		destAddr := &net.TCPAddr{IP: node.Addr, Port: int(node.Port)}
288		go func() {
289			defer close(fallbackCh)
290			didContact, err := m.sendPingAndWaitForAck(destAddr, ping, deadline)
291			if err != nil {
292				m.logger.Printf("[ERR] memberlist: Failed TCP fallback ping: %s", err)
293			} else {
294				fallbackCh <- didContact
295			}
296		}()
297	} else {
298		close(fallbackCh)
299	}
300
301	// Wait for the acks or timeout. Note that we don't check the fallback
302	// channel here because we want to issue a warning below if that's the
303	// *only* way we hear back from the peer, so we have to let this time
304	// out first to allow the normal UDP-based acks to come in.
305	select {
306	case v := <-ackCh:
307		if v.Complete == true {
308			return
309		}
310	}
311
312	// Finally, poll the fallback channel. The timeouts are set such that
313	// the channel will have something or be closed without having to wait
314	// any additional time here.
315	for didContact := range fallbackCh {
316		if didContact {
317			m.logger.Printf("[WARN] memberlist: Was able to reach %s via TCP but not UDP, network may be misconfigured and not allowing bidirectional UDP", node.Name)
318			return
319		}
320	}
321
322	// No acks received from target, suspect
323	m.logger.Printf("[INFO] memberlist: Suspect %s has failed, no acks received", node.Name)
324	s := suspect{Incarnation: node.Incarnation, Node: node.Name, From: m.config.Name}
325	m.suspectNode(&s)
326}
327
328// Ping initiates a ping to the node with the specified name.
329func (m *Memberlist) Ping(node string, addr net.Addr) (time.Duration, error) {
330	// Prepare a ping message and setup an ack handler.
331	ping := ping{SeqNo: m.nextSeqNo(), Node: node}
332	ackCh := make(chan ackMessage, m.config.IndirectChecks+1)
333	m.setAckChannel(ping.SeqNo, ackCh, m.config.ProbeInterval)
334
335	// Send a ping to the node.
336	if err := m.encodeAndSendMsg(addr, pingMsg, &ping); err != nil {
337		return 0, err
338	}
339
340	// Mark the sent time here, which should be after any pre-processing and
341	// system calls to do the actual send. This probably under-reports a bit,
342	// but it's the best we can do.
343	sent := time.Now()
344
345	// Wait for response or timeout.
346	select {
347	case v := <-ackCh:
348		if v.Complete == true {
349			return v.Timestamp.Sub(sent), nil
350		}
351	case <-time.After(m.config.ProbeTimeout):
352		// Timeout, return an error below.
353	}
354
355	m.logger.Printf("[DEBUG] memberlist: Failed UDP ping: %v (timeout reached)", node)
356	return 0, NoPingResponseError{ping.Node}
357}
358
359// resetNodes is used when the tick wraps around. It will reap the
360// dead nodes and shuffle the node list.
361func (m *Memberlist) resetNodes() {
362	m.nodeLock.Lock()
363	defer m.nodeLock.Unlock()
364
365	// Move the dead nodes
366	deadIdx := moveDeadNodes(m.nodes)
367
368	// Deregister the dead nodes
369	for i := deadIdx; i < len(m.nodes); i++ {
370		delete(m.nodeMap, m.nodes[i].Name)
371		m.nodes[i] = nil
372	}
373
374	// Trim the nodes to exclude the dead nodes
375	m.nodes = m.nodes[0:deadIdx]
376
377	// Update numNodes after we've trimmed the dead nodes
378	atomic.StoreUint32(&m.numNodes, uint32(deadIdx))
379
380	// Shuffle live nodes
381	shuffleNodes(m.nodes)
382}
383
384// gossip is invoked every GossipInterval period to broadcast our gossip
385// messages to a few random nodes.
386func (m *Memberlist) gossip() {
387	defer metrics.MeasureSince([]string{"memberlist", "gossip"}, time.Now())
388
389	// Get some random live nodes
390	m.nodeLock.RLock()
391	excludes := []string{m.config.Name}
392	kNodes := kRandomNodes(m.config.GossipNodes, excludes, m.nodes)
393	m.nodeLock.RUnlock()
394
395	// Compute the bytes available
396	bytesAvail := udpSendBuf - compoundHeaderOverhead
397	if m.config.EncryptionEnabled() {
398		bytesAvail -= encryptOverhead(m.encryptionVersion())
399	}
400
401	for _, node := range kNodes {
402		// Get any pending broadcasts
403		msgs := m.getBroadcasts(compoundOverhead, bytesAvail)
404		if len(msgs) == 0 {
405			return
406		}
407
408		// Create a compound message
409		compound := makeCompoundMessage(msgs)
410
411		// Send the compound message
412		destAddr := &net.UDPAddr{IP: node.Addr, Port: int(node.Port)}
413		if err := m.rawSendMsgUDP(destAddr, compound.Bytes()); err != nil {
414			m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", destAddr, err)
415		}
416	}
417}
418
419// pushPull is invoked periodically to randomly perform a complete state
420// exchange. Used to ensure a high level of convergence, but is also
421// reasonably expensive as the entire state of this node is exchanged
422// with the other node.
423func (m *Memberlist) pushPull() {
424	// Get a random live node
425	m.nodeLock.RLock()
426	excludes := []string{m.config.Name}
427	nodes := kRandomNodes(1, excludes, m.nodes)
428	m.nodeLock.RUnlock()
429
430	// If no nodes, bail
431	if len(nodes) == 0 {
432		return
433	}
434	node := nodes[0]
435
436	// Attempt a push pull
437	if err := m.pushPullNode(node.Addr, node.Port, false); err != nil {
438		m.logger.Printf("[ERR] memberlist: Push/Pull with %s failed: %s", node.Name, err)
439	}
440}
441
442// pushPullNode does a complete state exchange with a specific node.
443func (m *Memberlist) pushPullNode(addr []byte, port uint16, join bool) error {
444	defer metrics.MeasureSince([]string{"memberlist", "pushPullNode"}, time.Now())
445
446	// Attempt to send and receive with the node
447	remote, userState, err := m.sendAndReceiveState(addr, port, join)
448	if err != nil {
449		return err
450	}
451
452	if err := m.mergeRemoteState(join, remote, userState); err != nil {
453		return err
454	}
455	return nil
456}
457
458// verifyProtocol verifies that all the remote nodes can speak with our
459// nodes and vice versa on both the core protocol as well as the
460// delegate protocol level.
461//
462// The verification works by finding the maximum minimum and
463// minimum maximum understood protocol and delegate versions. In other words,
464// it finds the common denominator of protocol and delegate version ranges
465// for the entire cluster.
466//
467// After this, it goes through the entire cluster (local and remote) and
468// verifies that everyone's speaking protocol versions satisfy this range.
469// If this passes, it means that every node can understand each other.
470func (m *Memberlist) verifyProtocol(remote []pushNodeState) error {
471	m.nodeLock.RLock()
472	defer m.nodeLock.RUnlock()
473
474	// Maximum minimum understood and minimum maximum understood for both
475	// the protocol and delegate versions. We use this to verify everyone
476	// can be understood.
477	var maxpmin, minpmax uint8
478	var maxdmin, mindmax uint8
479	minpmax = math.MaxUint8
480	mindmax = math.MaxUint8
481
482	for _, rn := range remote {
483		// If the node isn't alive, then skip it
484		if rn.State != stateAlive {
485			continue
486		}
487
488		// Skip nodes that don't have versions set, it just means
489		// their version is zero.
490		if len(rn.Vsn) == 0 {
491			continue
492		}
493
494		if rn.Vsn[0] > maxpmin {
495			maxpmin = rn.Vsn[0]
496		}
497
498		if rn.Vsn[1] < minpmax {
499			minpmax = rn.Vsn[1]
500		}
501
502		if rn.Vsn[3] > maxdmin {
503			maxdmin = rn.Vsn[3]
504		}
505
506		if rn.Vsn[4] < mindmax {
507			mindmax = rn.Vsn[4]
508		}
509	}
510
511	for _, n := range m.nodes {
512		// Ignore non-alive nodes
513		if n.State != stateAlive {
514			continue
515		}
516
517		if n.PMin > maxpmin {
518			maxpmin = n.PMin
519		}
520
521		if n.PMax < minpmax {
522			minpmax = n.PMax
523		}
524
525		if n.DMin > maxdmin {
526			maxdmin = n.DMin
527		}
528
529		if n.DMax < mindmax {
530			mindmax = n.DMax
531		}
532	}
533
534	// Now that we definitively know the minimum and maximum understood
535	// version that satisfies the whole cluster, we verify that every
536	// node in the cluster satisifies this.
537	for _, n := range remote {
538		var nPCur, nDCur uint8
539		if len(n.Vsn) > 0 {
540			nPCur = n.Vsn[2]
541			nDCur = n.Vsn[5]
542		}
543
544		if nPCur < maxpmin || nPCur > minpmax {
545			return fmt.Errorf(
546				"Node '%s' protocol version (%d) is incompatible: [%d, %d]",
547				n.Name, nPCur, maxpmin, minpmax)
548		}
549
550		if nDCur < maxdmin || nDCur > mindmax {
551			return fmt.Errorf(
552				"Node '%s' delegate protocol version (%d) is incompatible: [%d, %d]",
553				n.Name, nDCur, maxdmin, mindmax)
554		}
555	}
556
557	for _, n := range m.nodes {
558		nPCur := n.PCur
559		nDCur := n.DCur
560
561		if nPCur < maxpmin || nPCur > minpmax {
562			return fmt.Errorf(
563				"Node '%s' protocol version (%d) is incompatible: [%d, %d]",
564				n.Name, nPCur, maxpmin, minpmax)
565		}
566
567		if nDCur < maxdmin || nDCur > mindmax {
568			return fmt.Errorf(
569				"Node '%s' delegate protocol version (%d) is incompatible: [%d, %d]",
570				n.Name, nDCur, maxdmin, mindmax)
571		}
572	}
573
574	return nil
575}
576
577// nextSeqNo returns a usable sequence number in a thread safe way
578func (m *Memberlist) nextSeqNo() uint32 {
579	return atomic.AddUint32(&m.sequenceNum, 1)
580}
581
582// nextIncarnation returns the next incarnation number in a thread safe way
583func (m *Memberlist) nextIncarnation() uint32 {
584	return atomic.AddUint32(&m.incarnation, 1)
585}
586
587// estNumNodes is used to get the current estimate of the number of nodes
588func (m *Memberlist) estNumNodes() int {
589	return int(atomic.LoadUint32(&m.numNodes))
590}
591
592type ackMessage struct {
593	Complete  bool
594	Payload   []byte
595	Timestamp time.Time
596}
597
598// setAckChannel is used to attach a channel to receive a message when an ack with a given
599// sequence number is received. The `complete` field of the message will be false on timeout
600func (m *Memberlist) setAckChannel(seqNo uint32, ch chan ackMessage, timeout time.Duration) {
601	// Create a handler function
602	handler := func(payload []byte, timestamp time.Time) {
603		select {
604		case ch <- ackMessage{true, payload, timestamp}:
605		default:
606		}
607	}
608
609	// Add the handler
610	ah := &ackHandler{handler, nil}
611	m.ackLock.Lock()
612	m.ackHandlers[seqNo] = ah
613	m.ackLock.Unlock()
614
615	// Setup a reaping routing
616	ah.timer = time.AfterFunc(timeout, func() {
617		m.ackLock.Lock()
618		delete(m.ackHandlers, seqNo)
619		m.ackLock.Unlock()
620		select {
621		case ch <- ackMessage{false, nil, time.Now()}:
622		default:
623		}
624	})
625}
626
627// setAckHandler is used to attach a handler to be invoked when an
628// ack with a given sequence number is received. If a timeout is reached,
629// the handler is deleted
630func (m *Memberlist) setAckHandler(seqNo uint32, handler func([]byte, time.Time), timeout time.Duration) {
631	// Add the handler
632	ah := &ackHandler{handler, nil}
633	m.ackLock.Lock()
634	m.ackHandlers[seqNo] = ah
635	m.ackLock.Unlock()
636
637	// Setup a reaping routing
638	ah.timer = time.AfterFunc(timeout, func() {
639		m.ackLock.Lock()
640		delete(m.ackHandlers, seqNo)
641		m.ackLock.Unlock()
642	})
643}
644
645// Invokes an Ack handler if any is associated, and reaps the handler immediately
646func (m *Memberlist) invokeAckHandler(ack ackResp, timestamp time.Time) {
647	m.ackLock.Lock()
648	ah, ok := m.ackHandlers[ack.SeqNo]
649	delete(m.ackHandlers, ack.SeqNo)
650	m.ackLock.Unlock()
651	if !ok {
652		return
653	}
654	ah.timer.Stop()
655	ah.handler(ack.Payload, timestamp)
656}
657
658// aliveNode is invoked by the network layer when we get a message about a
659// live node.
660func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) {
661	m.nodeLock.Lock()
662	defer m.nodeLock.Unlock()
663	state, ok := m.nodeMap[a.Node]
664
665	// It is possible that during a Leave(), there is already an aliveMsg
666	// in-queue to be processed but blocked by the locks above. If we let
667	// that aliveMsg process, it'll cause us to re-join the cluster. This
668	// ensures that we don't.
669	if m.leave && a.Node == m.config.Name {
670		return
671	}
672
673	// Invoke the Alive delegate if any. This can be used to filter out
674	// alive messages based on custom logic. For example, using a cluster name.
675	// Using a merge delegate is not enough, as it is possible for passive
676	// cluster merging to still occur.
677	if m.config.Alive != nil {
678		node := &Node{
679			Name: a.Node,
680			Addr: a.Addr,
681			Port: a.Port,
682			Meta: a.Meta,
683			PMin: a.Vsn[0],
684			PMax: a.Vsn[1],
685			PCur: a.Vsn[2],
686			DMin: a.Vsn[3],
687			DMax: a.Vsn[4],
688			DCur: a.Vsn[5],
689		}
690		if err := m.config.Alive.NotifyAlive(node); err != nil {
691			m.logger.Printf("[WARN] memberlist: ignoring alive message for '%s': %s",
692				a.Node, err)
693			return
694		}
695	}
696
697	// Check if we've never seen this node before, and if not, then
698	// store this node in our node map.
699	if !ok {
700		state = &nodeState{
701			Node: Node{
702				Name: a.Node,
703				Addr: a.Addr,
704				Port: a.Port,
705				Meta: a.Meta,
706			},
707			State: stateDead,
708		}
709
710		// Add to map
711		m.nodeMap[a.Node] = state
712
713		// Get a random offset. This is important to ensure
714		// the failure detection bound is low on average. If all
715		// nodes did an append, failure detection bound would be
716		// very high.
717		n := len(m.nodes)
718		offset := randomOffset(n)
719
720		// Add at the end and swap with the node at the offset
721		m.nodes = append(m.nodes, state)
722		m.nodes[offset], m.nodes[n] = m.nodes[n], m.nodes[offset]
723
724		// Update numNodes after we've added a new node
725		atomic.AddUint32(&m.numNodes, 1)
726	}
727
728	// Check if this address is different than the existing node
729	if !bytes.Equal([]byte(state.Addr), a.Addr) || state.Port != a.Port {
730		m.logger.Printf("[ERR] memberlist: Conflicting address for %s. Mine: %v:%d Theirs: %v:%d",
731			state.Name, state.Addr, state.Port, net.IP(a.Addr), a.Port)
732
733		// Inform the conflict delegate if provided
734		if m.config.Conflict != nil {
735			other := Node{
736				Name: a.Node,
737				Addr: a.Addr,
738				Port: a.Port,
739				Meta: a.Meta,
740			}
741			m.config.Conflict.NotifyConflict(&state.Node, &other)
742		}
743		return
744	}
745
746	// Bail if the incarnation number is older, and this is not about us
747	isLocalNode := state.Name == m.config.Name
748	if a.Incarnation <= state.Incarnation && !isLocalNode {
749		return
750	}
751
752	// Bail if strictly less and this is about us
753	if a.Incarnation < state.Incarnation && isLocalNode {
754		return
755	}
756
757	// Store the old state and meta data
758	oldState := state.State
759	oldMeta := state.Meta
760
761	// If this is us we need to refute, otherwise re-broadcast
762	if !bootstrap && isLocalNode {
763		// Compute the version vector
764		versions := []uint8{
765			state.PMin, state.PMax, state.PCur,
766			state.DMin, state.DMax, state.DCur,
767		}
768
769		// If the Incarnation is the same, we need special handling, since it
770		// possible for the following situation to happen:
771		// 1) Start with configuration C, join cluster
772		// 2) Hard fail / Kill / Shutdown
773		// 3) Restart with configuration C', join cluster
774		//
775		// In this case, other nodes and the local node see the same incarnation,
776		// but the values may not be the same. For this reason, we always
777		// need to do an equality check for this Incarnation. In most cases,
778		// we just ignore, but we may need to refute.
779		//
780		if a.Incarnation == state.Incarnation &&
781			bytes.Equal(a.Meta, state.Meta) &&
782			bytes.Equal(a.Vsn, versions) {
783			return
784		}
785
786		inc := m.nextIncarnation()
787		for a.Incarnation >= inc {
788			inc = m.nextIncarnation()
789		}
790		state.Incarnation = inc
791
792		a := alive{
793			Incarnation: inc,
794			Node:        state.Name,
795			Addr:        state.Addr,
796			Port:        state.Port,
797			Meta:        state.Meta,
798			Vsn:         versions,
799		}
800		m.encodeBroadcastNotify(a.Node, aliveMsg, a, notify)
801		m.logger.Printf("[WARN] memberlist: Refuting an alive message")
802	} else {
803		m.encodeBroadcastNotify(a.Node, aliveMsg, a, notify)
804
805		// Update protocol versions if it arrived
806		if len(a.Vsn) > 0 {
807			state.PMin = a.Vsn[0]
808			state.PMax = a.Vsn[1]
809			state.PCur = a.Vsn[2]
810			state.DMin = a.Vsn[3]
811			state.DMax = a.Vsn[4]
812			state.DCur = a.Vsn[5]
813		}
814
815		// Update the state and incarnation number
816		state.Incarnation = a.Incarnation
817		state.Meta = a.Meta
818		if state.State != stateAlive {
819			state.State = stateAlive
820			state.StateChange = time.Now()
821		}
822	}
823
824	// Update metrics
825	metrics.IncrCounter([]string{"memberlist", "msg", "alive"}, 1)
826
827	// Notify the delegate of any relevant updates
828	if m.config.Events != nil {
829		if oldState == stateDead {
830			// if Dead -> Alive, notify of join
831			m.config.Events.NotifyJoin(&state.Node)
832
833		} else if !bytes.Equal(oldMeta, state.Meta) {
834			// if Meta changed, trigger an update notification
835			m.config.Events.NotifyUpdate(&state.Node)
836		}
837	}
838}
839
840// suspectNode is invoked by the network layer when we get a message
841// about a suspect node
842func (m *Memberlist) suspectNode(s *suspect) {
843	m.nodeLock.Lock()
844	defer m.nodeLock.Unlock()
845	state, ok := m.nodeMap[s.Node]
846
847	// If we've never heard about this node before, ignore it
848	if !ok {
849		return
850	}
851
852	// Ignore old incarnation numbers
853	if s.Incarnation < state.Incarnation {
854		return
855	}
856
857	// Ignore non-alive nodes
858	if state.State != stateAlive {
859		return
860	}
861
862	// If this is us we need to refute, otherwise re-broadcast
863	if state.Name == m.config.Name {
864		inc := m.nextIncarnation()
865		for s.Incarnation >= inc {
866			inc = m.nextIncarnation()
867		}
868		state.Incarnation = inc
869
870		a := alive{
871			Incarnation: inc,
872			Node:        state.Name,
873			Addr:        state.Addr,
874			Port:        state.Port,
875			Meta:        state.Meta,
876			Vsn: []uint8{
877				state.PMin, state.PMax, state.PCur,
878				state.DMin, state.DMax, state.DCur,
879			},
880		}
881		m.encodeAndBroadcast(s.Node, aliveMsg, a)
882		m.logger.Printf("[WARN] memberlist: Refuting a suspect message (from: %s)", s.From)
883		return // Do not mark ourself suspect
884	} else {
885		m.encodeAndBroadcast(s.Node, suspectMsg, s)
886	}
887
888	// Update metrics
889	metrics.IncrCounter([]string{"memberlist", "msg", "suspect"}, 1)
890
891	// Update the state
892	state.Incarnation = s.Incarnation
893	state.State = stateSuspect
894	changeTime := time.Now()
895	state.StateChange = changeTime
896
897	// Setup a timeout for this
898	timeout := suspicionTimeout(m.config.SuspicionMult, m.estNumNodes(), m.config.ProbeInterval)
899	time.AfterFunc(timeout, func() {
900		m.nodeLock.Lock()
901		state, ok := m.nodeMap[s.Node]
902		timeout := ok && state.State == stateSuspect && state.StateChange == changeTime
903		m.nodeLock.Unlock()
904
905		if timeout {
906			m.suspectTimeout(state)
907		}
908	})
909}
910
911// suspectTimeout is invoked when a suspect timeout has occurred
912func (m *Memberlist) suspectTimeout(n *nodeState) {
913	// Construct a dead message
914	m.logger.Printf("[INFO] memberlist: Marking %s as failed, suspect timeout reached", n.Name)
915	d := dead{Incarnation: n.Incarnation, Node: n.Name, From: m.config.Name}
916	m.deadNode(&d)
917}
918
919// deadNode is invoked by the network layer when we get a message
920// about a dead node
921func (m *Memberlist) deadNode(d *dead) {
922	m.nodeLock.Lock()
923	defer m.nodeLock.Unlock()
924	state, ok := m.nodeMap[d.Node]
925
926	// If we've never heard about this node before, ignore it
927	if !ok {
928		return
929	}
930
931	// Ignore old incarnation numbers
932	if d.Incarnation < state.Incarnation {
933		return
934	}
935
936	// Ignore if node is already dead
937	if state.State == stateDead {
938		return
939	}
940
941	// Check if this is us
942	if state.Name == m.config.Name {
943		// If we are not leaving we need to refute
944		if !m.leave {
945			inc := m.nextIncarnation()
946			for d.Incarnation >= inc {
947				inc = m.nextIncarnation()
948			}
949			state.Incarnation = inc
950
951			a := alive{
952				Incarnation: inc,
953				Node:        state.Name,
954				Addr:        state.Addr,
955				Port:        state.Port,
956				Meta:        state.Meta,
957				Vsn: []uint8{
958					state.PMin, state.PMax, state.PCur,
959					state.DMin, state.DMax, state.DCur,
960				},
961			}
962			m.encodeAndBroadcast(d.Node, aliveMsg, a)
963			m.logger.Printf("[WARN] memberlist: Refuting a dead message (from: %s)", d.From)
964			return // Do not mark ourself dead
965		}
966
967		// If we are leaving, we broadcast and wait
968		m.encodeBroadcastNotify(d.Node, deadMsg, d, m.leaveBroadcast)
969	} else {
970		m.encodeAndBroadcast(d.Node, deadMsg, d)
971	}
972
973	// Update metrics
974	metrics.IncrCounter([]string{"memberlist", "msg", "dead"}, 1)
975
976	// Update the state
977	state.Incarnation = d.Incarnation
978	state.State = stateDead
979	state.StateChange = time.Now()
980
981	// Notify of death
982	if m.config.Events != nil {
983		m.config.Events.NotifyLeave(&state.Node)
984	}
985}
986
987// mergeState is invoked by the network layer when we get a Push/Pull
988// state transfer
989func (m *Memberlist) mergeState(remote []pushNodeState) {
990	for _, r := range remote {
991		switch r.State {
992		case stateAlive:
993			a := alive{
994				Incarnation: r.Incarnation,
995				Node:        r.Name,
996				Addr:        r.Addr,
997				Port:        r.Port,
998				Meta:        r.Meta,
999				Vsn:         r.Vsn,
1000			}
1001			m.aliveNode(&a, nil, false)
1002
1003		case stateDead:
1004			// If the remote node belives a node is dead, we prefer to
1005			// suspect that node instead of declaring it dead instantly
1006			fallthrough
1007		case stateSuspect:
1008			s := suspect{Incarnation: r.Incarnation, Node: r.Name, From: m.config.Name}
1009			m.suspectNode(&s)
1010		}
1011	}
1012}
1013