1package memberlist
2
3import (
4	"bytes"
5	"fmt"
6	"math"
7	"math/rand"
8	"net"
9	"strings"
10	"sync/atomic"
11	"time"
12
13	metrics "github.com/armon/go-metrics"
14)
15
16type NodeStateType int
17
18const (
19	StateAlive NodeStateType = iota
20	StateSuspect
21	StateDead
22	StateLeft
23)
24
25// Node represents a node in the cluster.
26type Node struct {
27	Name  string
28	Addr  net.IP
29	Port  uint16
30	Meta  []byte        // Metadata from the delegate for this node.
31	State NodeStateType // State of the node.
32	PMin  uint8         // Minimum protocol version this understands
33	PMax  uint8         // Maximum protocol version this understands
34	PCur  uint8         // Current version node is speaking
35	DMin  uint8         // Min protocol version for the delegate to understand
36	DMax  uint8         // Max protocol version for the delegate to understand
37	DCur  uint8         // Current version delegate is speaking
38}
39
40// Address returns the host:port form of a node's address, suitable for use
41// with a transport.
42func (n *Node) Address() string {
43	return joinHostPort(n.Addr.String(), n.Port)
44}
45
46// FullAddress returns the node name and host:port form of a node's address,
47// suitable for use with a transport.
48func (n *Node) FullAddress() Address {
49	return Address{
50		Addr: joinHostPort(n.Addr.String(), n.Port),
51		Name: n.Name,
52	}
53}
54
55// String returns the node name
56func (n *Node) String() string {
57	return n.Name
58}
59
60// NodeState is used to manage our state view of another node
61type nodeState struct {
62	Node
63	Incarnation uint32        // Last known incarnation number
64	State       NodeStateType // Current state
65	StateChange time.Time     // Time last state change happened
66}
67
68// Address returns the host:port form of a node's address, suitable for use
69// with a transport.
70func (n *nodeState) Address() string {
71	return n.Node.Address()
72}
73
74// FullAddress returns the node name and host:port form of a node's address,
75// suitable for use with a transport.
76func (n *nodeState) FullAddress() Address {
77	return n.Node.FullAddress()
78}
79
80func (n *nodeState) DeadOrLeft() bool {
81	return n.State == StateDead || n.State == StateLeft
82}
83
84// ackHandler is used to register handlers for incoming acks and nacks.
85type ackHandler struct {
86	ackFn  func([]byte, time.Time)
87	nackFn func()
88	timer  *time.Timer
89}
90
91// NoPingResponseError is used to indicate a 'ping' packet was
92// successfully issued but no response was received
93type NoPingResponseError struct {
94	node string
95}
96
97func (f NoPingResponseError) Error() string {
98	return fmt.Sprintf("No response from node %s", f.node)
99}
100
101// Schedule is used to ensure the Tick is performed periodically. This
102// function is safe to call multiple times. If the memberlist is already
103// scheduled, then it won't do anything.
104func (m *Memberlist) schedule() {
105	m.tickerLock.Lock()
106	defer m.tickerLock.Unlock()
107
108	// If we already have tickers, then don't do anything, since we're
109	// scheduled
110	if len(m.tickers) > 0 {
111		return
112	}
113
114	// Create the stop tick channel, a blocking channel. We close this
115	// when we should stop the tickers.
116	stopCh := make(chan struct{})
117
118	// Create a new probeTicker
119	if m.config.ProbeInterval > 0 {
120		t := time.NewTicker(m.config.ProbeInterval)
121		go m.triggerFunc(m.config.ProbeInterval, t.C, stopCh, m.probe)
122		m.tickers = append(m.tickers, t)
123	}
124
125	// Create a push pull ticker if needed
126	if m.config.PushPullInterval > 0 {
127		go m.pushPullTrigger(stopCh)
128	}
129
130	// Create a gossip ticker if needed
131	if m.config.GossipInterval > 0 && m.config.GossipNodes > 0 {
132		t := time.NewTicker(m.config.GossipInterval)
133		go m.triggerFunc(m.config.GossipInterval, t.C, stopCh, m.gossip)
134		m.tickers = append(m.tickers, t)
135	}
136
137	// If we made any tickers, then record the stopTick channel for
138	// later.
139	if len(m.tickers) > 0 {
140		m.stopTick = stopCh
141	}
142}
143
144// triggerFunc is used to trigger a function call each time a
145// message is received until a stop tick arrives.
146func (m *Memberlist) triggerFunc(stagger time.Duration, C <-chan time.Time, stop <-chan struct{}, f func()) {
147	// Use a random stagger to avoid syncronizing
148	randStagger := time.Duration(uint64(rand.Int63()) % uint64(stagger))
149	select {
150	case <-time.After(randStagger):
151	case <-stop:
152		return
153	}
154	for {
155		select {
156		case <-C:
157			f()
158		case <-stop:
159			return
160		}
161	}
162}
163
164// pushPullTrigger is used to periodically trigger a push/pull until
165// a stop tick arrives. We don't use triggerFunc since the push/pull
166// timer is dynamically scaled based on cluster size to avoid network
167// saturation
168func (m *Memberlist) pushPullTrigger(stop <-chan struct{}) {
169	interval := m.config.PushPullInterval
170
171	// Use a random stagger to avoid syncronizing
172	randStagger := time.Duration(uint64(rand.Int63()) % uint64(interval))
173	select {
174	case <-time.After(randStagger):
175	case <-stop:
176		return
177	}
178
179	// Tick using a dynamic timer
180	for {
181		tickTime := pushPullScale(interval, m.estNumNodes())
182		select {
183		case <-time.After(tickTime):
184			m.pushPull()
185		case <-stop:
186			return
187		}
188	}
189}
190
191// Deschedule is used to stop the background maintenance. This is safe
192// to call multiple times.
193func (m *Memberlist) deschedule() {
194	m.tickerLock.Lock()
195	defer m.tickerLock.Unlock()
196
197	// If we have no tickers, then we aren't scheduled.
198	if len(m.tickers) == 0 {
199		return
200	}
201
202	// Close the stop channel so all the ticker listeners stop.
203	close(m.stopTick)
204
205	// Explicitly stop all the tickers themselves so they don't take
206	// up any more resources, and get rid of the list.
207	for _, t := range m.tickers {
208		t.Stop()
209	}
210	m.tickers = nil
211}
212
213// Tick is used to perform a single round of failure detection and gossip
214func (m *Memberlist) probe() {
215	// Track the number of indexes we've considered probing
216	numCheck := 0
217START:
218	m.nodeLock.RLock()
219
220	// Make sure we don't wrap around infinitely
221	if numCheck >= len(m.nodes) {
222		m.nodeLock.RUnlock()
223		return
224	}
225
226	// Handle the wrap around case
227	if m.probeIndex >= len(m.nodes) {
228		m.nodeLock.RUnlock()
229		m.resetNodes()
230		m.probeIndex = 0
231		numCheck++
232		goto START
233	}
234
235	// Determine if we should probe this node
236	skip := false
237	var node nodeState
238
239	node = *m.nodes[m.probeIndex]
240	if node.Name == m.config.Name {
241		skip = true
242	} else if node.DeadOrLeft() {
243		skip = true
244	}
245
246	// Potentially skip
247	m.nodeLock.RUnlock()
248	m.probeIndex++
249	if skip {
250		numCheck++
251		goto START
252	}
253
254	// Probe the specific node
255	m.probeNode(&node)
256}
257
258// probeNodeByAddr just safely calls probeNode given only the address of the node (for tests)
259func (m *Memberlist) probeNodeByAddr(addr string) {
260	m.nodeLock.RLock()
261	n := m.nodeMap[addr]
262	m.nodeLock.RUnlock()
263
264	m.probeNode(n)
265}
266
267// failedRemote checks the error and decides if it indicates a failure on the
268// other end.
269func failedRemote(err error) bool {
270	switch t := err.(type) {
271	case *net.OpError:
272		if strings.HasPrefix(t.Net, "tcp") {
273			switch t.Op {
274			case "dial", "read", "write":
275				return true
276			}
277		}
278	}
279	return false
280}
281
282// probeNode handles a single round of failure checking on a node.
283func (m *Memberlist) probeNode(node *nodeState) {
284	defer metrics.MeasureSince([]string{"memberlist", "probeNode"}, time.Now())
285
286	// We use our health awareness to scale the overall probe interval, so we
287	// slow down if we detect problems. The ticker that calls us can handle
288	// us running over the base interval, and will skip missed ticks.
289	probeInterval := m.awareness.ScaleTimeout(m.config.ProbeInterval)
290	if probeInterval > m.config.ProbeInterval {
291		metrics.IncrCounter([]string{"memberlist", "degraded", "probe"}, 1)
292	}
293
294	// Prepare a ping message and setup an ack handler.
295	selfAddr, selfPort := m.getAdvertise()
296	ping := ping{
297		SeqNo:      m.nextSeqNo(),
298		Node:       node.Name,
299		SourceAddr: selfAddr,
300		SourcePort: selfPort,
301		SourceNode: m.config.Name,
302	}
303	ackCh := make(chan ackMessage, m.config.IndirectChecks+1)
304	nackCh := make(chan struct{}, m.config.IndirectChecks+1)
305	m.setProbeChannels(ping.SeqNo, ackCh, nackCh, probeInterval)
306
307	// Mark the sent time here, which should be after any pre-processing but
308	// before system calls to do the actual send. This probably over-reports
309	// a bit, but it's the best we can do. We had originally put this right
310	// after the I/O, but that would sometimes give negative RTT measurements
311	// which was not desirable.
312	sent := time.Now()
313
314	// Send a ping to the node. If this node looks like it's suspect or dead,
315	// also tack on a suspect message so that it has a chance to refute as
316	// soon as possible.
317	deadline := sent.Add(probeInterval)
318	addr := node.Address()
319
320	// Arrange for our self-awareness to get updated.
321	var awarenessDelta int
322	defer func() {
323		m.awareness.ApplyDelta(awarenessDelta)
324	}()
325	if node.State == StateAlive {
326		if err := m.encodeAndSendMsg(node.FullAddress(), pingMsg, &ping); err != nil {
327			m.logger.Printf("[ERR] memberlist: Failed to send ping: %s", err)
328			if failedRemote(err) {
329				goto HANDLE_REMOTE_FAILURE
330			} else {
331				return
332			}
333		}
334	} else {
335		var msgs [][]byte
336		if buf, err := encode(pingMsg, &ping); err != nil {
337			m.logger.Printf("[ERR] memberlist: Failed to encode ping message: %s", err)
338			return
339		} else {
340			msgs = append(msgs, buf.Bytes())
341		}
342		s := suspect{Incarnation: node.Incarnation, Node: node.Name, From: m.config.Name}
343		if buf, err := encode(suspectMsg, &s); err != nil {
344			m.logger.Printf("[ERR] memberlist: Failed to encode suspect message: %s", err)
345			return
346		} else {
347			msgs = append(msgs, buf.Bytes())
348		}
349
350		compound := makeCompoundMessage(msgs)
351		if err := m.rawSendMsgPacket(node.FullAddress(), &node.Node, compound.Bytes()); err != nil {
352			m.logger.Printf("[ERR] memberlist: Failed to send compound ping and suspect message to %s: %s", addr, err)
353			if failedRemote(err) {
354				goto HANDLE_REMOTE_FAILURE
355			} else {
356				return
357			}
358		}
359	}
360
361	// Arrange for our self-awareness to get updated. At this point we've
362	// sent the ping, so any return statement means the probe succeeded
363	// which will improve our health until we get to the failure scenarios
364	// at the end of this function, which will alter this delta variable
365	// accordingly.
366	awarenessDelta = -1
367
368	// Wait for response or round-trip-time.
369	select {
370	case v := <-ackCh:
371		if v.Complete == true {
372			if m.config.Ping != nil {
373				rtt := v.Timestamp.Sub(sent)
374				m.config.Ping.NotifyPingComplete(&node.Node, rtt, v.Payload)
375			}
376			return
377		}
378
379		// As an edge case, if we get a timeout, we need to re-enqueue it
380		// here to break out of the select below.
381		if v.Complete == false {
382			ackCh <- v
383		}
384	case <-time.After(m.config.ProbeTimeout):
385		// Note that we don't scale this timeout based on awareness and
386		// the health score. That's because we don't really expect waiting
387		// longer to help get UDP through. Since health does extend the
388		// probe interval it will give the TCP fallback more time, which
389		// is more active in dealing with lost packets, and it gives more
390		// time to wait for indirect acks/nacks.
391		m.logger.Printf("[DEBUG] memberlist: Failed ping: %s (timeout reached)", node.Name)
392	}
393
394HANDLE_REMOTE_FAILURE:
395	// Get some random live nodes.
396	m.nodeLock.RLock()
397	kNodes := kRandomNodes(m.config.IndirectChecks, m.nodes, func(n *nodeState) bool {
398		return n.Name == m.config.Name ||
399			n.Name == node.Name ||
400			n.State != StateAlive
401	})
402	m.nodeLock.RUnlock()
403
404	// Attempt an indirect ping.
405	expectedNacks := 0
406	selfAddr, selfPort = m.getAdvertise()
407	ind := indirectPingReq{
408		SeqNo:      ping.SeqNo,
409		Target:     node.Addr,
410		Port:       node.Port,
411		Node:       node.Name,
412		SourceAddr: selfAddr,
413		SourcePort: selfPort,
414		SourceNode: m.config.Name,
415	}
416	for _, peer := range kNodes {
417		// We only expect nack to be sent from peers who understand
418		// version 4 of the protocol.
419		if ind.Nack = peer.PMax >= 4; ind.Nack {
420			expectedNacks++
421		}
422
423		if err := m.encodeAndSendMsg(peer.FullAddress(), indirectPingMsg, &ind); err != nil {
424			m.logger.Printf("[ERR] memberlist: Failed to send indirect ping: %s", err)
425		}
426	}
427
428	// Also make an attempt to contact the node directly over TCP. This
429	// helps prevent confused clients who get isolated from UDP traffic
430	// but can still speak TCP (which also means they can possibly report
431	// misinformation to other nodes via anti-entropy), avoiding flapping in
432	// the cluster.
433	//
434	// This is a little unusual because we will attempt a TCP ping to any
435	// member who understands version 3 of the protocol, regardless of
436	// which protocol version we are speaking. That's why we've included a
437	// config option to turn this off if desired.
438	fallbackCh := make(chan bool, 1)
439
440	disableTcpPings := m.config.DisableTcpPings ||
441		(m.config.DisableTcpPingsForNode != nil && m.config.DisableTcpPingsForNode(node.Name))
442	if (!disableTcpPings) && (node.PMax >= 3) {
443		go func() {
444			defer close(fallbackCh)
445			didContact, err := m.sendPingAndWaitForAck(node.FullAddress(), ping, deadline)
446			if err != nil {
447				m.logger.Printf("[ERR] memberlist: Failed fallback ping: %s", err)
448			} else {
449				fallbackCh <- didContact
450			}
451		}()
452	} else {
453		close(fallbackCh)
454	}
455
456	// Wait for the acks or timeout. Note that we don't check the fallback
457	// channel here because we want to issue a warning below if that's the
458	// *only* way we hear back from the peer, so we have to let this time
459	// out first to allow the normal UDP-based acks to come in.
460	select {
461	case v := <-ackCh:
462		if v.Complete == true {
463			return
464		}
465	}
466
467	// Finally, poll the fallback channel. The timeouts are set such that
468	// the channel will have something or be closed without having to wait
469	// any additional time here.
470	for didContact := range fallbackCh {
471		if didContact {
472			m.logger.Printf("[WARN] memberlist: Was able to connect to %s but other probes failed, network may be misconfigured", node.Name)
473			return
474		}
475	}
476
477	// Update our self-awareness based on the results of this failed probe.
478	// If we don't have peers who will send nacks then we penalize for any
479	// failed probe as a simple health metric. If we do have peers to nack
480	// verify, then we can use that as a more sophisticated measure of self-
481	// health because we assume them to be working, and they can help us
482	// decide if the probed node was really dead or if it was something wrong
483	// with ourselves.
484	awarenessDelta = 0
485	if expectedNacks > 0 {
486		if nackCount := len(nackCh); nackCount < expectedNacks {
487			awarenessDelta += (expectedNacks - nackCount)
488		}
489	} else {
490		awarenessDelta += 1
491	}
492
493	// No acks received from target, suspect it as failed.
494	m.logger.Printf("[INFO] memberlist: Suspect %s has failed, no acks received", node.Name)
495	s := suspect{Incarnation: node.Incarnation, Node: node.Name, From: m.config.Name}
496	m.suspectNode(&s)
497}
498
499// Ping initiates a ping to the node with the specified name.
500func (m *Memberlist) Ping(node string, addr net.Addr) (time.Duration, error) {
501	// Prepare a ping message and setup an ack handler.
502	selfAddr, selfPort := m.getAdvertise()
503	ping := ping{
504		SeqNo:      m.nextSeqNo(),
505		Node:       node,
506		SourceAddr: selfAddr,
507		SourcePort: selfPort,
508		SourceNode: m.config.Name,
509	}
510	ackCh := make(chan ackMessage, m.config.IndirectChecks+1)
511	m.setProbeChannels(ping.SeqNo, ackCh, nil, m.config.ProbeInterval)
512
513	a := Address{Addr: addr.String(), Name: node}
514
515	// Send a ping to the node.
516	if err := m.encodeAndSendMsg(a, pingMsg, &ping); err != nil {
517		return 0, err
518	}
519
520	// Mark the sent time here, which should be after any pre-processing and
521	// system calls to do the actual send. This probably under-reports a bit,
522	// but it's the best we can do.
523	sent := time.Now()
524
525	// Wait for response or timeout.
526	select {
527	case v := <-ackCh:
528		if v.Complete == true {
529			return v.Timestamp.Sub(sent), nil
530		}
531	case <-time.After(m.config.ProbeTimeout):
532		// Timeout, return an error below.
533	}
534
535	m.logger.Printf("[DEBUG] memberlist: Failed UDP ping: %v (timeout reached)", node)
536	return 0, NoPingResponseError{ping.Node}
537}
538
539// resetNodes is used when the tick wraps around. It will reap the
540// dead nodes and shuffle the node list.
541func (m *Memberlist) resetNodes() {
542	m.nodeLock.Lock()
543	defer m.nodeLock.Unlock()
544
545	// Move dead nodes, but respect gossip to the dead interval
546	deadIdx := moveDeadNodes(m.nodes, m.config.GossipToTheDeadTime)
547
548	// Deregister the dead nodes
549	for i := deadIdx; i < len(m.nodes); i++ {
550		delete(m.nodeMap, m.nodes[i].Name)
551		m.nodes[i] = nil
552	}
553
554	// Trim the nodes to exclude the dead nodes
555	m.nodes = m.nodes[0:deadIdx]
556
557	// Update numNodes after we've trimmed the dead nodes
558	atomic.StoreUint32(&m.numNodes, uint32(deadIdx))
559
560	// Shuffle live nodes
561	shuffleNodes(m.nodes)
562}
563
564// gossip is invoked every GossipInterval period to broadcast our gossip
565// messages to a few random nodes.
566func (m *Memberlist) gossip() {
567	defer metrics.MeasureSince([]string{"memberlist", "gossip"}, time.Now())
568
569	// Get some random live, suspect, or recently dead nodes
570	m.nodeLock.RLock()
571	kNodes := kRandomNodes(m.config.GossipNodes, m.nodes, func(n *nodeState) bool {
572		if n.Name == m.config.Name {
573			return true
574		}
575
576		switch n.State {
577		case StateAlive, StateSuspect:
578			return false
579
580		case StateDead:
581			return time.Since(n.StateChange) > m.config.GossipToTheDeadTime
582
583		default:
584			return true
585		}
586	})
587	m.nodeLock.RUnlock()
588
589	// Compute the bytes available
590	bytesAvail := m.config.UDPBufferSize - compoundHeaderOverhead
591	if m.config.EncryptionEnabled() {
592		bytesAvail -= encryptOverhead(m.encryptionVersion())
593	}
594
595	for _, node := range kNodes {
596		// Get any pending broadcasts
597		msgs := m.getBroadcasts(compoundOverhead, bytesAvail)
598		if len(msgs) == 0 {
599			return
600		}
601
602		addr := node.Address()
603		if len(msgs) == 1 {
604			// Send single message as is
605			if err := m.rawSendMsgPacket(node.FullAddress(), &node, msgs[0]); err != nil {
606				m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", addr, err)
607			}
608		} else {
609			// Otherwise create and send a compound message
610			compound := makeCompoundMessage(msgs)
611			if err := m.rawSendMsgPacket(node.FullAddress(), &node, compound.Bytes()); err != nil {
612				m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", addr, err)
613			}
614		}
615	}
616}
617
618// pushPull is invoked periodically to randomly perform a complete state
619// exchange. Used to ensure a high level of convergence, but is also
620// reasonably expensive as the entire state of this node is exchanged
621// with the other node.
622func (m *Memberlist) pushPull() {
623	// Get a random live node
624	m.nodeLock.RLock()
625	nodes := kRandomNodes(1, m.nodes, func(n *nodeState) bool {
626		return n.Name == m.config.Name ||
627			n.State != StateAlive
628	})
629	m.nodeLock.RUnlock()
630
631	// If no nodes, bail
632	if len(nodes) == 0 {
633		return
634	}
635	node := nodes[0]
636
637	// Attempt a push pull
638	if err := m.pushPullNode(node.FullAddress(), false); err != nil {
639		m.logger.Printf("[ERR] memberlist: Push/Pull with %s failed: %s", node.Name, err)
640	}
641}
642
643// pushPullNode does a complete state exchange with a specific node.
644func (m *Memberlist) pushPullNode(a Address, join bool) error {
645	defer metrics.MeasureSince([]string{"memberlist", "pushPullNode"}, time.Now())
646
647	// Attempt to send and receive with the node
648	remote, userState, err := m.sendAndReceiveState(a, join)
649	if err != nil {
650		return err
651	}
652
653	if err := m.mergeRemoteState(join, remote, userState); err != nil {
654		return err
655	}
656	return nil
657}
658
659// verifyProtocol verifies that all the remote nodes can speak with our
660// nodes and vice versa on both the core protocol as well as the
661// delegate protocol level.
662//
663// The verification works by finding the maximum minimum and
664// minimum maximum understood protocol and delegate versions. In other words,
665// it finds the common denominator of protocol and delegate version ranges
666// for the entire cluster.
667//
668// After this, it goes through the entire cluster (local and remote) and
669// verifies that everyone's speaking protocol versions satisfy this range.
670// If this passes, it means that every node can understand each other.
671func (m *Memberlist) verifyProtocol(remote []pushNodeState) error {
672	m.nodeLock.RLock()
673	defer m.nodeLock.RUnlock()
674
675	// Maximum minimum understood and minimum maximum understood for both
676	// the protocol and delegate versions. We use this to verify everyone
677	// can be understood.
678	var maxpmin, minpmax uint8
679	var maxdmin, mindmax uint8
680	minpmax = math.MaxUint8
681	mindmax = math.MaxUint8
682
683	for _, rn := range remote {
684		// If the node isn't alive, then skip it
685		if rn.State != StateAlive {
686			continue
687		}
688
689		// Skip nodes that don't have versions set, it just means
690		// their version is zero.
691		if len(rn.Vsn) == 0 {
692			continue
693		}
694
695		if rn.Vsn[0] > maxpmin {
696			maxpmin = rn.Vsn[0]
697		}
698
699		if rn.Vsn[1] < minpmax {
700			minpmax = rn.Vsn[1]
701		}
702
703		if rn.Vsn[3] > maxdmin {
704			maxdmin = rn.Vsn[3]
705		}
706
707		if rn.Vsn[4] < mindmax {
708			mindmax = rn.Vsn[4]
709		}
710	}
711
712	for _, n := range m.nodes {
713		// Ignore non-alive nodes
714		if n.State != StateAlive {
715			continue
716		}
717
718		if n.PMin > maxpmin {
719			maxpmin = n.PMin
720		}
721
722		if n.PMax < minpmax {
723			minpmax = n.PMax
724		}
725
726		if n.DMin > maxdmin {
727			maxdmin = n.DMin
728		}
729
730		if n.DMax < mindmax {
731			mindmax = n.DMax
732		}
733	}
734
735	// Now that we definitively know the minimum and maximum understood
736	// version that satisfies the whole cluster, we verify that every
737	// node in the cluster satisifies this.
738	for _, n := range remote {
739		var nPCur, nDCur uint8
740		if len(n.Vsn) > 0 {
741			nPCur = n.Vsn[2]
742			nDCur = n.Vsn[5]
743		}
744
745		if nPCur < maxpmin || nPCur > minpmax {
746			return fmt.Errorf(
747				"Node '%s' protocol version (%d) is incompatible: [%d, %d]",
748				n.Name, nPCur, maxpmin, minpmax)
749		}
750
751		if nDCur < maxdmin || nDCur > mindmax {
752			return fmt.Errorf(
753				"Node '%s' delegate protocol version (%d) is incompatible: [%d, %d]",
754				n.Name, nDCur, maxdmin, mindmax)
755		}
756	}
757
758	for _, n := range m.nodes {
759		nPCur := n.PCur
760		nDCur := n.DCur
761
762		if nPCur < maxpmin || nPCur > minpmax {
763			return fmt.Errorf(
764				"Node '%s' protocol version (%d) is incompatible: [%d, %d]",
765				n.Name, nPCur, maxpmin, minpmax)
766		}
767
768		if nDCur < maxdmin || nDCur > mindmax {
769			return fmt.Errorf(
770				"Node '%s' delegate protocol version (%d) is incompatible: [%d, %d]",
771				n.Name, nDCur, maxdmin, mindmax)
772		}
773	}
774
775	return nil
776}
777
778// nextSeqNo returns a usable sequence number in a thread safe way
779func (m *Memberlist) nextSeqNo() uint32 {
780	return atomic.AddUint32(&m.sequenceNum, 1)
781}
782
783// nextIncarnation returns the next incarnation number in a thread safe way
784func (m *Memberlist) nextIncarnation() uint32 {
785	return atomic.AddUint32(&m.incarnation, 1)
786}
787
788// skipIncarnation adds the positive offset to the incarnation number.
789func (m *Memberlist) skipIncarnation(offset uint32) uint32 {
790	return atomic.AddUint32(&m.incarnation, offset)
791}
792
793// estNumNodes is used to get the current estimate of the number of nodes
794func (m *Memberlist) estNumNodes() int {
795	return int(atomic.LoadUint32(&m.numNodes))
796}
797
798type ackMessage struct {
799	Complete  bool
800	Payload   []byte
801	Timestamp time.Time
802}
803
804// setProbeChannels is used to attach the ackCh to receive a message when an ack
805// with a given sequence number is received. The `complete` field of the message
806// will be false on timeout. Any nack messages will cause an empty struct to be
807// passed to the nackCh, which can be nil if not needed.
808func (m *Memberlist) setProbeChannels(seqNo uint32, ackCh chan ackMessage, nackCh chan struct{}, timeout time.Duration) {
809	// Create handler functions for acks and nacks
810	ackFn := func(payload []byte, timestamp time.Time) {
811		select {
812		case ackCh <- ackMessage{true, payload, timestamp}:
813		default:
814		}
815	}
816	nackFn := func() {
817		select {
818		case nackCh <- struct{}{}:
819		default:
820		}
821	}
822
823	// Add the handlers
824	ah := &ackHandler{ackFn, nackFn, nil}
825	m.ackLock.Lock()
826	m.ackHandlers[seqNo] = ah
827	m.ackLock.Unlock()
828
829	// Setup a reaping routing
830	ah.timer = time.AfterFunc(timeout, func() {
831		m.ackLock.Lock()
832		delete(m.ackHandlers, seqNo)
833		m.ackLock.Unlock()
834		select {
835		case ackCh <- ackMessage{false, nil, time.Now()}:
836		default:
837		}
838	})
839}
840
841// setAckHandler is used to attach a handler to be invoked when an ack with a
842// given sequence number is received. If a timeout is reached, the handler is
843// deleted. This is used for indirect pings so does not configure a function
844// for nacks.
845func (m *Memberlist) setAckHandler(seqNo uint32, ackFn func([]byte, time.Time), timeout time.Duration) {
846	// Add the handler
847	ah := &ackHandler{ackFn, nil, nil}
848	m.ackLock.Lock()
849	m.ackHandlers[seqNo] = ah
850	m.ackLock.Unlock()
851
852	// Setup a reaping routing
853	ah.timer = time.AfterFunc(timeout, func() {
854		m.ackLock.Lock()
855		delete(m.ackHandlers, seqNo)
856		m.ackLock.Unlock()
857	})
858}
859
860// Invokes an ack handler if any is associated, and reaps the handler immediately
861func (m *Memberlist) invokeAckHandler(ack ackResp, timestamp time.Time) {
862	m.ackLock.Lock()
863	ah, ok := m.ackHandlers[ack.SeqNo]
864	delete(m.ackHandlers, ack.SeqNo)
865	m.ackLock.Unlock()
866	if !ok {
867		return
868	}
869	ah.timer.Stop()
870	ah.ackFn(ack.Payload, timestamp)
871}
872
873// Invokes nack handler if any is associated.
874func (m *Memberlist) invokeNackHandler(nack nackResp) {
875	m.ackLock.Lock()
876	ah, ok := m.ackHandlers[nack.SeqNo]
877	m.ackLock.Unlock()
878	if !ok || ah.nackFn == nil {
879		return
880	}
881	ah.nackFn()
882}
883
884// refute gossips an alive message in response to incoming information that we
885// are suspect or dead. It will make sure the incarnation number beats the given
886// accusedInc value, or you can supply 0 to just get the next incarnation number.
887// This alters the node state that's passed in so this MUST be called while the
888// nodeLock is held.
889func (m *Memberlist) refute(me *nodeState, accusedInc uint32) {
890	// Make sure the incarnation number beats the accusation.
891	inc := m.nextIncarnation()
892	if accusedInc >= inc {
893		inc = m.skipIncarnation(accusedInc - inc + 1)
894	}
895	me.Incarnation = inc
896
897	// Decrease our health because we are being asked to refute a problem.
898	m.awareness.ApplyDelta(1)
899
900	// Format and broadcast an alive message.
901	a := alive{
902		Incarnation: inc,
903		Node:        me.Name,
904		Addr:        me.Addr,
905		Port:        me.Port,
906		Meta:        me.Meta,
907		Vsn: []uint8{
908			me.PMin, me.PMax, me.PCur,
909			me.DMin, me.DMax, me.DCur,
910		},
911	}
912	m.encodeAndBroadcast(me.Addr.String(), aliveMsg, a)
913}
914
915// aliveNode is invoked by the network layer when we get a message about a
916// live node.
917func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) {
918	m.nodeLock.Lock()
919	defer m.nodeLock.Unlock()
920	state, ok := m.nodeMap[a.Node]
921
922	// It is possible that during a Leave(), there is already an aliveMsg
923	// in-queue to be processed but blocked by the locks above. If we let
924	// that aliveMsg process, it'll cause us to re-join the cluster. This
925	// ensures that we don't.
926	if m.hasLeft() && a.Node == m.config.Name {
927		return
928	}
929
930	if len(a.Vsn) >= 3 {
931		pMin := a.Vsn[0]
932		pMax := a.Vsn[1]
933		pCur := a.Vsn[2]
934		if pMin == 0 || pMax == 0 || pMin > pMax {
935			m.logger.Printf("[WARN] memberlist: Ignoring an alive message for '%s' (%v:%d) because protocol version(s) are wrong: %d <= %d <= %d should be >0", a.Node, net.IP(a.Addr), a.Port, pMin, pCur, pMax)
936			return
937		}
938	}
939
940	// Invoke the Alive delegate if any. This can be used to filter out
941	// alive messages based on custom logic. For example, using a cluster name.
942	// Using a merge delegate is not enough, as it is possible for passive
943	// cluster merging to still occur.
944	if m.config.Alive != nil {
945		if len(a.Vsn) < 6 {
946			m.logger.Printf("[WARN] memberlist: ignoring alive message for '%s' (%v:%d) because Vsn is not present",
947				a.Node, net.IP(a.Addr), a.Port)
948			return
949		}
950		node := &Node{
951			Name: a.Node,
952			Addr: a.Addr,
953			Port: a.Port,
954			Meta: a.Meta,
955			PMin: a.Vsn[0],
956			PMax: a.Vsn[1],
957			PCur: a.Vsn[2],
958			DMin: a.Vsn[3],
959			DMax: a.Vsn[4],
960			DCur: a.Vsn[5],
961		}
962		if err := m.config.Alive.NotifyAlive(node); err != nil {
963			m.logger.Printf("[WARN] memberlist: ignoring alive message for '%s': %s",
964				a.Node, err)
965			return
966		}
967	}
968
969	// Check if we've never seen this node before, and if not, then
970	// store this node in our node map.
971	var updatesNode bool
972	if !ok {
973		errCon := m.config.IPAllowed(a.Addr)
974		if errCon != nil {
975			m.logger.Printf("[WARN] memberlist: Rejected node %s (%v): %s", a.Node, net.IP(a.Addr), errCon)
976			return
977		}
978		state = &nodeState{
979			Node: Node{
980				Name: a.Node,
981				Addr: a.Addr,
982				Port: a.Port,
983				Meta: a.Meta,
984			},
985			State: StateDead,
986		}
987		if len(a.Vsn) > 5 {
988			state.PMin = a.Vsn[0]
989			state.PMax = a.Vsn[1]
990			state.PCur = a.Vsn[2]
991			state.DMin = a.Vsn[3]
992			state.DMax = a.Vsn[4]
993			state.DCur = a.Vsn[5]
994		}
995
996		// Add to map
997		m.nodeMap[a.Node] = state
998
999		// Get a random offset. This is important to ensure
1000		// the failure detection bound is low on average. If all
1001		// nodes did an append, failure detection bound would be
1002		// very high.
1003		n := len(m.nodes)
1004		offset := randomOffset(n)
1005
1006		// Add at the end and swap with the node at the offset
1007		m.nodes = append(m.nodes, state)
1008		m.nodes[offset], m.nodes[n] = m.nodes[n], m.nodes[offset]
1009
1010		// Update numNodes after we've added a new node
1011		atomic.AddUint32(&m.numNodes, 1)
1012	} else {
1013		// Check if this address is different than the existing node unless the old node is dead.
1014		if !bytes.Equal([]byte(state.Addr), a.Addr) || state.Port != a.Port {
1015			errCon := m.config.IPAllowed(a.Addr)
1016			if errCon != nil {
1017				m.logger.Printf("[WARN] memberlist: Rejected IP update from %v to %v for node %s: %s", a.Node, state.Addr, net.IP(a.Addr), errCon)
1018				return
1019			}
1020			// If DeadNodeReclaimTime is configured, check if enough time has elapsed since the node died.
1021			canReclaim := (m.config.DeadNodeReclaimTime > 0 &&
1022				time.Since(state.StateChange) > m.config.DeadNodeReclaimTime)
1023
1024			// Allow the address to be updated if a dead node is being replaced.
1025			if state.State == StateLeft || (state.State == StateDead && canReclaim) {
1026				m.logger.Printf("[INFO] memberlist: Updating address for left or failed node %s from %v:%d to %v:%d",
1027					state.Name, state.Addr, state.Port, net.IP(a.Addr), a.Port)
1028				updatesNode = true
1029			} else {
1030				m.logger.Printf("[ERR] memberlist: Conflicting address for %s. Mine: %v:%d Theirs: %v:%d Old state: %v",
1031					state.Name, state.Addr, state.Port, net.IP(a.Addr), a.Port, state.State)
1032
1033				// Inform the conflict delegate if provided
1034				if m.config.Conflict != nil {
1035					other := Node{
1036						Name: a.Node,
1037						Addr: a.Addr,
1038						Port: a.Port,
1039						Meta: a.Meta,
1040					}
1041					m.config.Conflict.NotifyConflict(&state.Node, &other)
1042				}
1043				return
1044			}
1045		}
1046	}
1047
1048	// Bail if the incarnation number is older, and this is not about us
1049	isLocalNode := state.Name == m.config.Name
1050	if a.Incarnation <= state.Incarnation && !isLocalNode && !updatesNode {
1051		return
1052	}
1053
1054	// Bail if strictly less and this is about us
1055	if a.Incarnation < state.Incarnation && isLocalNode {
1056		return
1057	}
1058
1059	// Clear out any suspicion timer that may be in effect.
1060	delete(m.nodeTimers, a.Node)
1061
1062	// Store the old state and meta data
1063	oldState := state.State
1064	oldMeta := state.Meta
1065
1066	// If this is us we need to refute, otherwise re-broadcast
1067	if !bootstrap && isLocalNode {
1068		// Compute the version vector
1069		versions := []uint8{
1070			state.PMin, state.PMax, state.PCur,
1071			state.DMin, state.DMax, state.DCur,
1072		}
1073
1074		// If the Incarnation is the same, we need special handling, since it
1075		// possible for the following situation to happen:
1076		// 1) Start with configuration C, join cluster
1077		// 2) Hard fail / Kill / Shutdown
1078		// 3) Restart with configuration C', join cluster
1079		//
1080		// In this case, other nodes and the local node see the same incarnation,
1081		// but the values may not be the same. For this reason, we always
1082		// need to do an equality check for this Incarnation. In most cases,
1083		// we just ignore, but we may need to refute.
1084		//
1085		if a.Incarnation == state.Incarnation &&
1086			bytes.Equal(a.Meta, state.Meta) &&
1087			bytes.Equal(a.Vsn, versions) {
1088			return
1089		}
1090		m.refute(state, a.Incarnation)
1091		m.logger.Printf("[WARN] memberlist: Refuting an alive message for '%s' (%v:%d) meta:(%v VS %v), vsn:(%v VS %v)", a.Node, net.IP(a.Addr), a.Port, a.Meta, state.Meta, a.Vsn, versions)
1092	} else {
1093		m.encodeBroadcastNotify(a.Node, aliveMsg, a, notify)
1094
1095		// Update protocol versions if it arrived
1096		if len(a.Vsn) > 0 {
1097			state.PMin = a.Vsn[0]
1098			state.PMax = a.Vsn[1]
1099			state.PCur = a.Vsn[2]
1100			state.DMin = a.Vsn[3]
1101			state.DMax = a.Vsn[4]
1102			state.DCur = a.Vsn[5]
1103		}
1104
1105		// Update the state and incarnation number
1106		state.Incarnation = a.Incarnation
1107		state.Meta = a.Meta
1108		state.Addr = a.Addr
1109		state.Port = a.Port
1110		if state.State != StateAlive {
1111			state.State = StateAlive
1112			state.StateChange = time.Now()
1113		}
1114	}
1115
1116	// Update metrics
1117	metrics.IncrCounter([]string{"memberlist", "msg", "alive"}, 1)
1118
1119	// Notify the delegate of any relevant updates
1120	if m.config.Events != nil {
1121		if oldState == StateDead || oldState == StateLeft {
1122			// if Dead/Left -> Alive, notify of join
1123			m.config.Events.NotifyJoin(&state.Node)
1124
1125		} else if !bytes.Equal(oldMeta, state.Meta) {
1126			// if Meta changed, trigger an update notification
1127			m.config.Events.NotifyUpdate(&state.Node)
1128		}
1129	}
1130}
1131
1132// suspectNode is invoked by the network layer when we get a message
1133// about a suspect node
1134func (m *Memberlist) suspectNode(s *suspect) {
1135	m.nodeLock.Lock()
1136	defer m.nodeLock.Unlock()
1137	state, ok := m.nodeMap[s.Node]
1138
1139	// If we've never heard about this node before, ignore it
1140	if !ok {
1141		return
1142	}
1143
1144	// Ignore old incarnation numbers
1145	if s.Incarnation < state.Incarnation {
1146		return
1147	}
1148
1149	// See if there's a suspicion timer we can confirm. If the info is new
1150	// to us we will go ahead and re-gossip it. This allows for multiple
1151	// independent confirmations to flow even when a node probes a node
1152	// that's already suspect.
1153	if timer, ok := m.nodeTimers[s.Node]; ok {
1154		if timer.Confirm(s.From) {
1155			m.encodeAndBroadcast(s.Node, suspectMsg, s)
1156		}
1157		return
1158	}
1159
1160	// Ignore non-alive nodes
1161	if state.State != StateAlive {
1162		return
1163	}
1164
1165	// If this is us we need to refute, otherwise re-broadcast
1166	if state.Name == m.config.Name {
1167		m.refute(state, s.Incarnation)
1168		m.logger.Printf("[WARN] memberlist: Refuting a suspect message (from: %s)", s.From)
1169		return // Do not mark ourself suspect
1170	} else {
1171		m.encodeAndBroadcast(s.Node, suspectMsg, s)
1172	}
1173
1174	// Update metrics
1175	metrics.IncrCounter([]string{"memberlist", "msg", "suspect"}, 1)
1176
1177	// Update the state
1178	state.Incarnation = s.Incarnation
1179	state.State = StateSuspect
1180	changeTime := time.Now()
1181	state.StateChange = changeTime
1182
1183	// Setup a suspicion timer. Given that we don't have any known phase
1184	// relationship with our peers, we set up k such that we hit the nominal
1185	// timeout two probe intervals short of what we expect given the suspicion
1186	// multiplier.
1187	k := m.config.SuspicionMult - 2
1188
1189	// If there aren't enough nodes to give the expected confirmations, just
1190	// set k to 0 to say that we don't expect any. Note we subtract 2 from n
1191	// here to take out ourselves and the node being probed.
1192	n := m.estNumNodes()
1193	if n-2 < k {
1194		k = 0
1195	}
1196
1197	// Compute the timeouts based on the size of the cluster.
1198	min := suspicionTimeout(m.config.SuspicionMult, n, m.config.ProbeInterval)
1199	max := time.Duration(m.config.SuspicionMaxTimeoutMult) * min
1200	fn := func(numConfirmations int) {
1201		var d *dead
1202
1203		m.nodeLock.Lock()
1204		state, ok := m.nodeMap[s.Node]
1205		timeout := ok && state.State == StateSuspect && state.StateChange == changeTime
1206		if timeout {
1207			d = &dead{Incarnation: state.Incarnation, Node: state.Name, From: m.config.Name}
1208		}
1209		m.nodeLock.Unlock()
1210
1211		if timeout {
1212			if k > 0 && numConfirmations < k {
1213				metrics.IncrCounter([]string{"memberlist", "degraded", "timeout"}, 1)
1214			}
1215
1216			m.logger.Printf("[INFO] memberlist: Marking %s as failed, suspect timeout reached (%d peer confirmations)",
1217				state.Name, numConfirmations)
1218
1219			m.deadNode(d)
1220		}
1221	}
1222	m.nodeTimers[s.Node] = newSuspicion(s.From, k, min, max, fn)
1223}
1224
1225// deadNode is invoked by the network layer when we get a message
1226// about a dead node
1227func (m *Memberlist) deadNode(d *dead) {
1228	m.nodeLock.Lock()
1229	defer m.nodeLock.Unlock()
1230	state, ok := m.nodeMap[d.Node]
1231
1232	// If we've never heard about this node before, ignore it
1233	if !ok {
1234		return
1235	}
1236
1237	// Ignore old incarnation numbers
1238	if d.Incarnation < state.Incarnation {
1239		return
1240	}
1241
1242	// Clear out any suspicion timer that may be in effect.
1243	delete(m.nodeTimers, d.Node)
1244
1245	// Ignore if node is already dead
1246	if state.DeadOrLeft() {
1247		return
1248	}
1249
1250	// Check if this is us
1251	if state.Name == m.config.Name {
1252		// If we are not leaving we need to refute
1253		if !m.hasLeft() {
1254			m.refute(state, d.Incarnation)
1255			m.logger.Printf("[WARN] memberlist: Refuting a dead message (from: %s)", d.From)
1256			return // Do not mark ourself dead
1257		}
1258
1259		// If we are leaving, we broadcast and wait
1260		m.encodeBroadcastNotify(d.Node, deadMsg, d, m.leaveBroadcast)
1261	} else {
1262		m.encodeAndBroadcast(d.Node, deadMsg, d)
1263	}
1264
1265	// Update metrics
1266	metrics.IncrCounter([]string{"memberlist", "msg", "dead"}, 1)
1267
1268	// Update the state
1269	state.Incarnation = d.Incarnation
1270
1271	// If the dead message was send by the node itself, mark it is left
1272	// instead of dead.
1273	if d.Node == d.From {
1274		state.State = StateLeft
1275	} else {
1276		state.State = StateDead
1277	}
1278	state.StateChange = time.Now()
1279
1280	// Notify of death
1281	if m.config.Events != nil {
1282		m.config.Events.NotifyLeave(&state.Node)
1283	}
1284}
1285
1286// mergeState is invoked by the network layer when we get a Push/Pull
1287// state transfer
1288func (m *Memberlist) mergeState(remote []pushNodeState) {
1289	for _, r := range remote {
1290		switch r.State {
1291		case StateAlive:
1292			a := alive{
1293				Incarnation: r.Incarnation,
1294				Node:        r.Name,
1295				Addr:        r.Addr,
1296				Port:        r.Port,
1297				Meta:        r.Meta,
1298				Vsn:         r.Vsn,
1299			}
1300			m.aliveNode(&a, nil, false)
1301
1302		case StateLeft:
1303			d := dead{Incarnation: r.Incarnation, Node: r.Name, From: r.Name}
1304			m.deadNode(&d)
1305		case StateDead:
1306			// If the remote node believes a node is dead, we prefer to
1307			// suspect that node instead of declaring it dead instantly
1308			fallthrough
1309		case StateSuspect:
1310			s := suspect{Incarnation: r.Incarnation, Node: r.Name, From: m.config.Name}
1311			m.suspectNode(&s)
1312		}
1313	}
1314}
1315