1package memberlist
2
3import (
4	"bytes"
5	"fmt"
6	"math"
7	"math/rand"
8	"net"
9	"sync/atomic"
10	"time"
11
12	"github.com/armon/go-metrics"
13)
14
15type nodeStateType int
16
17const (
18	stateAlive nodeStateType = iota
19	stateSuspect
20	stateDead
21)
22
23// Node represents a node in the cluster.
24type Node struct {
25	Name string
26	Addr net.IP
27	Port uint16
28	Meta []byte // Metadata from the delegate for this node.
29	PMin uint8  // Minimum protocol version this understands
30	PMax uint8  // Maximum protocol version this understands
31	PCur uint8  // Current version node is speaking
32	DMin uint8  // Min protocol version for the delegate to understand
33	DMax uint8  // Max protocol version for the delegate to understand
34	DCur uint8  // Current version delegate is speaking
35}
36
37// Address returns the host:port form of a node's address, suitable for use
38// with a transport.
39func (n *Node) Address() string {
40	return joinHostPort(n.Addr.String(), n.Port)
41}
42
43// String returns the node name
44func (n *Node) String() string {
45	return n.Name
46}
47
48// NodeState is used to manage our state view of another node
49type nodeState struct {
50	Node
51	Incarnation uint32        // Last known incarnation number
52	State       nodeStateType // Current state
53	StateChange time.Time     // Time last state change happened
54}
55
56// Address returns the host:port form of a node's address, suitable for use
57// with a transport.
58func (n *nodeState) Address() string {
59	return n.Node.Address()
60}
61
62// ackHandler is used to register handlers for incoming acks and nacks.
63type ackHandler struct {
64	ackFn  func([]byte, time.Time)
65	nackFn func()
66	timer  *time.Timer
67}
68
69// NoPingResponseError is used to indicate a 'ping' packet was
70// successfully issued but no response was received
71type NoPingResponseError struct {
72	node string
73}
74
75func (f NoPingResponseError) Error() string {
76	return fmt.Sprintf("No response from node %s", f.node)
77}
78
79// Schedule is used to ensure the Tick is performed periodically. This
80// function is safe to call multiple times. If the memberlist is already
81// scheduled, then it won't do anything.
82func (m *Memberlist) schedule() {
83	m.tickerLock.Lock()
84	defer m.tickerLock.Unlock()
85
86	// If we already have tickers, then don't do anything, since we're
87	// scheduled
88	if len(m.tickers) > 0 {
89		return
90	}
91
92	// Create the stop tick channel, a blocking channel. We close this
93	// when we should stop the tickers.
94	stopCh := make(chan struct{})
95
96	// Create a new probeTicker
97	if m.config.ProbeInterval > 0 {
98		t := time.NewTicker(m.config.ProbeInterval)
99		go m.triggerFunc(m.config.ProbeInterval, t.C, stopCh, m.probe)
100		m.tickers = append(m.tickers, t)
101	}
102
103	// Create a push pull ticker if needed
104	if m.config.PushPullInterval > 0 {
105		go m.pushPullTrigger(stopCh)
106	}
107
108	// Create a gossip ticker if needed
109	if m.config.GossipInterval > 0 && m.config.GossipNodes > 0 {
110		t := time.NewTicker(m.config.GossipInterval)
111		go m.triggerFunc(m.config.GossipInterval, t.C, stopCh, m.gossip)
112		m.tickers = append(m.tickers, t)
113	}
114
115	// If we made any tickers, then record the stopTick channel for
116	// later.
117	if len(m.tickers) > 0 {
118		m.stopTick = stopCh
119	}
120}
121
122// triggerFunc is used to trigger a function call each time a
123// message is received until a stop tick arrives.
124func (m *Memberlist) triggerFunc(stagger time.Duration, C <-chan time.Time, stop <-chan struct{}, f func()) {
125	// Use a random stagger to avoid syncronizing
126	randStagger := time.Duration(uint64(rand.Int63()) % uint64(stagger))
127	select {
128	case <-time.After(randStagger):
129	case <-stop:
130		return
131	}
132	for {
133		select {
134		case <-C:
135			f()
136		case <-stop:
137			return
138		}
139	}
140}
141
142// pushPullTrigger is used to periodically trigger a push/pull until
143// a stop tick arrives. We don't use triggerFunc since the push/pull
144// timer is dynamically scaled based on cluster size to avoid network
145// saturation
146func (m *Memberlist) pushPullTrigger(stop <-chan struct{}) {
147	interval := m.config.PushPullInterval
148
149	// Use a random stagger to avoid syncronizing
150	randStagger := time.Duration(uint64(rand.Int63()) % uint64(interval))
151	select {
152	case <-time.After(randStagger):
153	case <-stop:
154		return
155	}
156
157	// Tick using a dynamic timer
158	for {
159		tickTime := pushPullScale(interval, m.estNumNodes())
160		select {
161		case <-time.After(tickTime):
162			m.pushPull()
163		case <-stop:
164			return
165		}
166	}
167}
168
169// Deschedule is used to stop the background maintenance. This is safe
170// to call multiple times.
171func (m *Memberlist) deschedule() {
172	m.tickerLock.Lock()
173	defer m.tickerLock.Unlock()
174
175	// If we have no tickers, then we aren't scheduled.
176	if len(m.tickers) == 0 {
177		return
178	}
179
180	// Close the stop channel so all the ticker listeners stop.
181	close(m.stopTick)
182
183	// Explicitly stop all the tickers themselves so they don't take
184	// up any more resources, and get rid of the list.
185	for _, t := range m.tickers {
186		t.Stop()
187	}
188	m.tickers = nil
189}
190
191// Tick is used to perform a single round of failure detection and gossip
192func (m *Memberlist) probe() {
193	// Track the number of indexes we've considered probing
194	numCheck := 0
195START:
196	m.nodeLock.RLock()
197
198	// Make sure we don't wrap around infinitely
199	if numCheck >= len(m.nodes) {
200		m.nodeLock.RUnlock()
201		return
202	}
203
204	// Handle the wrap around case
205	if m.probeIndex >= len(m.nodes) {
206		m.nodeLock.RUnlock()
207		m.resetNodes()
208		m.probeIndex = 0
209		numCheck++
210		goto START
211	}
212
213	// Determine if we should probe this node
214	skip := false
215	var node nodeState
216
217	node = *m.nodes[m.probeIndex]
218	if node.Name == m.config.Name {
219		skip = true
220	} else if node.State == stateDead {
221		skip = true
222	}
223
224	// Potentially skip
225	m.nodeLock.RUnlock()
226	m.probeIndex++
227	if skip {
228		numCheck++
229		goto START
230	}
231
232	// Probe the specific node
233	m.probeNode(&node)
234}
235
236// probeNode handles a single round of failure checking on a node.
237func (m *Memberlist) probeNode(node *nodeState) {
238	defer metrics.MeasureSince([]string{"memberlist", "probeNode"}, time.Now())
239
240	// We use our health awareness to scale the overall probe interval, so we
241	// slow down if we detect problems. The ticker that calls us can handle
242	// us running over the base interval, and will skip missed ticks.
243	probeInterval := m.awareness.ScaleTimeout(m.config.ProbeInterval)
244	if probeInterval > m.config.ProbeInterval {
245		metrics.IncrCounter([]string{"memberlist", "degraded", "probe"}, 1)
246	}
247
248	// Prepare a ping message and setup an ack handler.
249	ping := ping{SeqNo: m.nextSeqNo(), Node: node.Name}
250	ackCh := make(chan ackMessage, m.config.IndirectChecks+1)
251	nackCh := make(chan struct{}, m.config.IndirectChecks+1)
252	m.setProbeChannels(ping.SeqNo, ackCh, nackCh, probeInterval)
253
254	// Mark the sent time here, which should be after any pre-processing but
255	// before system calls to do the actual send. This probably over-reports
256	// a bit, but it's the best we can do. We had originally put this right
257	// after the I/O, but that would sometimes give negative RTT measurements
258	// which was not desirable.
259	sent := time.Now()
260
261	// Send a ping to the node. If this node looks like it's suspect or dead,
262	// also tack on a suspect message so that it has a chance to refute as
263	// soon as possible.
264	deadline := sent.Add(probeInterval)
265	addr := node.Address()
266	if node.State == stateAlive {
267		if err := m.encodeAndSendMsg(addr, pingMsg, &ping); err != nil {
268			m.logger.Printf("[ERR] memberlist: Failed to send ping: %s", err)
269			return
270		}
271	} else {
272		var msgs [][]byte
273		if buf, err := encode(pingMsg, &ping); err != nil {
274			m.logger.Printf("[ERR] memberlist: Failed to encode ping message: %s", err)
275			return
276		} else {
277			msgs = append(msgs, buf.Bytes())
278		}
279		s := suspect{Incarnation: node.Incarnation, Node: node.Name, From: m.config.Name}
280		if buf, err := encode(suspectMsg, &s); err != nil {
281			m.logger.Printf("[ERR] memberlist: Failed to encode suspect message: %s", err)
282			return
283		} else {
284			msgs = append(msgs, buf.Bytes())
285		}
286
287		compound := makeCompoundMessage(msgs)
288		if err := m.rawSendMsgPacket(addr, &node.Node, compound.Bytes()); err != nil {
289			m.logger.Printf("[ERR] memberlist: Failed to send compound ping and suspect message to %s: %s", addr, err)
290			return
291		}
292	}
293
294	// Arrange for our self-awareness to get updated. At this point we've
295	// sent the ping, so any return statement means the probe succeeded
296	// which will improve our health until we get to the failure scenarios
297	// at the end of this function, which will alter this delta variable
298	// accordingly.
299	awarenessDelta := -1
300	defer func() {
301		m.awareness.ApplyDelta(awarenessDelta)
302	}()
303
304	// Wait for response or round-trip-time.
305	select {
306	case v := <-ackCh:
307		if v.Complete == true {
308			if m.config.Ping != nil {
309				rtt := v.Timestamp.Sub(sent)
310				m.config.Ping.NotifyPingComplete(&node.Node, rtt, v.Payload)
311			}
312			return
313		}
314
315		// As an edge case, if we get a timeout, we need to re-enqueue it
316		// here to break out of the select below.
317		if v.Complete == false {
318			ackCh <- v
319		}
320	case <-time.After(m.config.ProbeTimeout):
321		// Note that we don't scale this timeout based on awareness and
322		// the health score. That's because we don't really expect waiting
323		// longer to help get UDP through. Since health does extend the
324		// probe interval it will give the TCP fallback more time, which
325		// is more active in dealing with lost packets, and it gives more
326		// time to wait for indirect acks/nacks.
327		m.logger.Printf("[DEBUG] memberlist: Failed ping: %v (timeout reached)", node.Name)
328	}
329
330	// Get some random live nodes.
331	m.nodeLock.RLock()
332	kNodes := kRandomNodes(m.config.IndirectChecks, m.nodes, func(n *nodeState) bool {
333		return n.Name == m.config.Name ||
334			n.Name == node.Name ||
335			n.State != stateAlive
336	})
337	m.nodeLock.RUnlock()
338
339	// Attempt an indirect ping.
340	expectedNacks := 0
341	ind := indirectPingReq{SeqNo: ping.SeqNo, Target: node.Addr, Port: node.Port, Node: node.Name}
342	for _, peer := range kNodes {
343		// We only expect nack to be sent from peers who understand
344		// version 4 of the protocol.
345		if ind.Nack = peer.PMax >= 4; ind.Nack {
346			expectedNacks++
347		}
348
349		if err := m.encodeAndSendMsg(peer.Address(), indirectPingMsg, &ind); err != nil {
350			m.logger.Printf("[ERR] memberlist: Failed to send indirect ping: %s", err)
351		}
352	}
353
354	// Also make an attempt to contact the node directly over TCP. This
355	// helps prevent confused clients who get isolated from UDP traffic
356	// but can still speak TCP (which also means they can possibly report
357	// misinformation to other nodes via anti-entropy), avoiding flapping in
358	// the cluster.
359	//
360	// This is a little unusual because we will attempt a TCP ping to any
361	// member who understands version 3 of the protocol, regardless of
362	// which protocol version we are speaking. That's why we've included a
363	// config option to turn this off if desired.
364	fallbackCh := make(chan bool, 1)
365	if (!m.config.DisableTcpPings) && (node.PMax >= 3) {
366		go func() {
367			defer close(fallbackCh)
368			didContact, err := m.sendPingAndWaitForAck(node.Address(), ping, deadline)
369			if err != nil {
370				m.logger.Printf("[ERR] memberlist: Failed fallback ping: %s", err)
371			} else {
372				fallbackCh <- didContact
373			}
374		}()
375	} else {
376		close(fallbackCh)
377	}
378
379	// Wait for the acks or timeout. Note that we don't check the fallback
380	// channel here because we want to issue a warning below if that's the
381	// *only* way we hear back from the peer, so we have to let this time
382	// out first to allow the normal UDP-based acks to come in.
383	select {
384	case v := <-ackCh:
385		if v.Complete == true {
386			return
387		}
388	}
389
390	// Finally, poll the fallback channel. The timeouts are set such that
391	// the channel will have something or be closed without having to wait
392	// any additional time here.
393	for didContact := range fallbackCh {
394		if didContact {
395			m.logger.Printf("[WARN] memberlist: Was able to connect to %s but other probes failed, network may be misconfigured", node.Name)
396			return
397		}
398	}
399
400	// Update our self-awareness based on the results of this failed probe.
401	// If we don't have peers who will send nacks then we penalize for any
402	// failed probe as a simple health metric. If we do have peers to nack
403	// verify, then we can use that as a more sophisticated measure of self-
404	// health because we assume them to be working, and they can help us
405	// decide if the probed node was really dead or if it was something wrong
406	// with ourselves.
407	awarenessDelta = 0
408	if expectedNacks > 0 {
409		if nackCount := len(nackCh); nackCount < expectedNacks {
410			awarenessDelta += (expectedNacks - nackCount)
411		}
412	} else {
413		awarenessDelta += 1
414	}
415
416	// No acks received from target, suspect it as failed.
417	m.logger.Printf("[INFO] memberlist: Suspect %s has failed, no acks received", node.Name)
418	s := suspect{Incarnation: node.Incarnation, Node: node.Name, From: m.config.Name}
419	m.suspectNode(&s)
420}
421
422// Ping initiates a ping to the node with the specified name.
423func (m *Memberlist) Ping(node string, addr net.Addr) (time.Duration, error) {
424	// Prepare a ping message and setup an ack handler.
425	ping := ping{SeqNo: m.nextSeqNo(), Node: node}
426	ackCh := make(chan ackMessage, m.config.IndirectChecks+1)
427	m.setProbeChannels(ping.SeqNo, ackCh, nil, m.config.ProbeInterval)
428
429	// Send a ping to the node.
430	if err := m.encodeAndSendMsg(addr.String(), pingMsg, &ping); err != nil {
431		return 0, err
432	}
433
434	// Mark the sent time here, which should be after any pre-processing and
435	// system calls to do the actual send. This probably under-reports a bit,
436	// but it's the best we can do.
437	sent := time.Now()
438
439	// Wait for response or timeout.
440	select {
441	case v := <-ackCh:
442		if v.Complete == true {
443			return v.Timestamp.Sub(sent), nil
444		}
445	case <-time.After(m.config.ProbeTimeout):
446		// Timeout, return an error below.
447	}
448
449	m.logger.Printf("[DEBUG] memberlist: Failed UDP ping: %v (timeout reached)", node)
450	return 0, NoPingResponseError{ping.Node}
451}
452
453// resetNodes is used when the tick wraps around. It will reap the
454// dead nodes and shuffle the node list.
455func (m *Memberlist) resetNodes() {
456	m.nodeLock.Lock()
457	defer m.nodeLock.Unlock()
458
459	// Move dead nodes, but respect gossip to the dead interval
460	deadIdx := moveDeadNodes(m.nodes, m.config.GossipToTheDeadTime)
461
462	// Deregister the dead nodes
463	for i := deadIdx; i < len(m.nodes); i++ {
464		delete(m.nodeMap, m.nodes[i].Name)
465		m.nodes[i] = nil
466	}
467
468	// Trim the nodes to exclude the dead nodes
469	m.nodes = m.nodes[0:deadIdx]
470
471	// Update numNodes after we've trimmed the dead nodes
472	atomic.StoreUint32(&m.numNodes, uint32(deadIdx))
473
474	// Shuffle live nodes
475	shuffleNodes(m.nodes)
476}
477
478// gossip is invoked every GossipInterval period to broadcast our gossip
479// messages to a few random nodes.
480func (m *Memberlist) gossip() {
481	defer metrics.MeasureSince([]string{"memberlist", "gossip"}, time.Now())
482
483	// Get some random live, suspect, or recently dead nodes
484	m.nodeLock.RLock()
485	kNodes := kRandomNodes(m.config.GossipNodes, m.nodes, func(n *nodeState) bool {
486		if n.Name == m.config.Name {
487			return true
488		}
489
490		switch n.State {
491		case stateAlive, stateSuspect:
492			return false
493
494		case stateDead:
495			return time.Since(n.StateChange) > m.config.GossipToTheDeadTime
496
497		default:
498			return true
499		}
500	})
501	m.nodeLock.RUnlock()
502
503	// Compute the bytes available
504	bytesAvail := m.config.UDPBufferSize - compoundHeaderOverhead
505	if m.config.EncryptionEnabled() {
506		bytesAvail -= encryptOverhead(m.encryptionVersion())
507	}
508
509	for _, node := range kNodes {
510		// Get any pending broadcasts
511		msgs := m.getBroadcasts(compoundOverhead, bytesAvail)
512		if len(msgs) == 0 {
513			return
514		}
515
516		addr := node.Address()
517		if len(msgs) == 1 {
518			// Send single message as is
519			if err := m.rawSendMsgPacket(addr, &node.Node, msgs[0]); err != nil {
520				m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", addr, err)
521			}
522		} else {
523			// Otherwise create and send a compound message
524			compound := makeCompoundMessage(msgs)
525			if err := m.rawSendMsgPacket(addr, &node.Node, compound.Bytes()); err != nil {
526				m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", addr, err)
527			}
528		}
529	}
530}
531
532// pushPull is invoked periodically to randomly perform a complete state
533// exchange. Used to ensure a high level of convergence, but is also
534// reasonably expensive as the entire state of this node is exchanged
535// with the other node.
536func (m *Memberlist) pushPull() {
537	// Get a random live node
538	m.nodeLock.RLock()
539	nodes := kRandomNodes(1, m.nodes, func(n *nodeState) bool {
540		return n.Name == m.config.Name ||
541			n.State != stateAlive
542	})
543	m.nodeLock.RUnlock()
544
545	// If no nodes, bail
546	if len(nodes) == 0 {
547		return
548	}
549	node := nodes[0]
550
551	// Attempt a push pull
552	if err := m.pushPullNode(node.Address(), false); err != nil {
553		m.logger.Printf("[ERR] memberlist: Push/Pull with %s failed: %s", node.Name, err)
554	}
555}
556
557// pushPullNode does a complete state exchange with a specific node.
558func (m *Memberlist) pushPullNode(addr string, join bool) error {
559	defer metrics.MeasureSince([]string{"memberlist", "pushPullNode"}, time.Now())
560
561	// Attempt to send and receive with the node
562	remote, userState, err := m.sendAndReceiveState(addr, join)
563	if err != nil {
564		return err
565	}
566
567	if err := m.mergeRemoteState(join, remote, userState); err != nil {
568		return err
569	}
570	return nil
571}
572
573// verifyProtocol verifies that all the remote nodes can speak with our
574// nodes and vice versa on both the core protocol as well as the
575// delegate protocol level.
576//
577// The verification works by finding the maximum minimum and
578// minimum maximum understood protocol and delegate versions. In other words,
579// it finds the common denominator of protocol and delegate version ranges
580// for the entire cluster.
581//
582// After this, it goes through the entire cluster (local and remote) and
583// verifies that everyone's speaking protocol versions satisfy this range.
584// If this passes, it means that every node can understand each other.
585func (m *Memberlist) verifyProtocol(remote []pushNodeState) error {
586	m.nodeLock.RLock()
587	defer m.nodeLock.RUnlock()
588
589	// Maximum minimum understood and minimum maximum understood for both
590	// the protocol and delegate versions. We use this to verify everyone
591	// can be understood.
592	var maxpmin, minpmax uint8
593	var maxdmin, mindmax uint8
594	minpmax = math.MaxUint8
595	mindmax = math.MaxUint8
596
597	for _, rn := range remote {
598		// If the node isn't alive, then skip it
599		if rn.State != stateAlive {
600			continue
601		}
602
603		// Skip nodes that don't have versions set, it just means
604		// their version is zero.
605		if len(rn.Vsn) == 0 {
606			continue
607		}
608
609		if rn.Vsn[0] > maxpmin {
610			maxpmin = rn.Vsn[0]
611		}
612
613		if rn.Vsn[1] < minpmax {
614			minpmax = rn.Vsn[1]
615		}
616
617		if rn.Vsn[3] > maxdmin {
618			maxdmin = rn.Vsn[3]
619		}
620
621		if rn.Vsn[4] < mindmax {
622			mindmax = rn.Vsn[4]
623		}
624	}
625
626	for _, n := range m.nodes {
627		// Ignore non-alive nodes
628		if n.State != stateAlive {
629			continue
630		}
631
632		if n.PMin > maxpmin {
633			maxpmin = n.PMin
634		}
635
636		if n.PMax < minpmax {
637			minpmax = n.PMax
638		}
639
640		if n.DMin > maxdmin {
641			maxdmin = n.DMin
642		}
643
644		if n.DMax < mindmax {
645			mindmax = n.DMax
646		}
647	}
648
649	// Now that we definitively know the minimum and maximum understood
650	// version that satisfies the whole cluster, we verify that every
651	// node in the cluster satisifies this.
652	for _, n := range remote {
653		var nPCur, nDCur uint8
654		if len(n.Vsn) > 0 {
655			nPCur = n.Vsn[2]
656			nDCur = n.Vsn[5]
657		}
658
659		if nPCur < maxpmin || nPCur > minpmax {
660			return fmt.Errorf(
661				"Node '%s' protocol version (%d) is incompatible: [%d, %d]",
662				n.Name, nPCur, maxpmin, minpmax)
663		}
664
665		if nDCur < maxdmin || nDCur > mindmax {
666			return fmt.Errorf(
667				"Node '%s' delegate protocol version (%d) is incompatible: [%d, %d]",
668				n.Name, nDCur, maxdmin, mindmax)
669		}
670	}
671
672	for _, n := range m.nodes {
673		nPCur := n.PCur
674		nDCur := n.DCur
675
676		if nPCur < maxpmin || nPCur > minpmax {
677			return fmt.Errorf(
678				"Node '%s' protocol version (%d) is incompatible: [%d, %d]",
679				n.Name, nPCur, maxpmin, minpmax)
680		}
681
682		if nDCur < maxdmin || nDCur > mindmax {
683			return fmt.Errorf(
684				"Node '%s' delegate protocol version (%d) is incompatible: [%d, %d]",
685				n.Name, nDCur, maxdmin, mindmax)
686		}
687	}
688
689	return nil
690}
691
692// nextSeqNo returns a usable sequence number in a thread safe way
693func (m *Memberlist) nextSeqNo() uint32 {
694	return atomic.AddUint32(&m.sequenceNum, 1)
695}
696
697// nextIncarnation returns the next incarnation number in a thread safe way
698func (m *Memberlist) nextIncarnation() uint32 {
699	return atomic.AddUint32(&m.incarnation, 1)
700}
701
702// skipIncarnation adds the positive offset to the incarnation number.
703func (m *Memberlist) skipIncarnation(offset uint32) uint32 {
704	return atomic.AddUint32(&m.incarnation, offset)
705}
706
707// estNumNodes is used to get the current estimate of the number of nodes
708func (m *Memberlist) estNumNodes() int {
709	return int(atomic.LoadUint32(&m.numNodes))
710}
711
712type ackMessage struct {
713	Complete  bool
714	Payload   []byte
715	Timestamp time.Time
716}
717
718// setProbeChannels is used to attach the ackCh to receive a message when an ack
719// with a given sequence number is received. The `complete` field of the message
720// will be false on timeout. Any nack messages will cause an empty struct to be
721// passed to the nackCh, which can be nil if not needed.
722func (m *Memberlist) setProbeChannels(seqNo uint32, ackCh chan ackMessage, nackCh chan struct{}, timeout time.Duration) {
723	// Create handler functions for acks and nacks
724	ackFn := func(payload []byte, timestamp time.Time) {
725		select {
726		case ackCh <- ackMessage{true, payload, timestamp}:
727		default:
728		}
729	}
730	nackFn := func() {
731		select {
732		case nackCh <- struct{}{}:
733		default:
734		}
735	}
736
737	// Add the handlers
738	ah := &ackHandler{ackFn, nackFn, nil}
739	m.ackLock.Lock()
740	m.ackHandlers[seqNo] = ah
741	m.ackLock.Unlock()
742
743	// Setup a reaping routing
744	ah.timer = time.AfterFunc(timeout, func() {
745		m.ackLock.Lock()
746		delete(m.ackHandlers, seqNo)
747		m.ackLock.Unlock()
748		select {
749		case ackCh <- ackMessage{false, nil, time.Now()}:
750		default:
751		}
752	})
753}
754
755// setAckHandler is used to attach a handler to be invoked when an ack with a
756// given sequence number is received. If a timeout is reached, the handler is
757// deleted. This is used for indirect pings so does not configure a function
758// for nacks.
759func (m *Memberlist) setAckHandler(seqNo uint32, ackFn func([]byte, time.Time), timeout time.Duration) {
760	// Add the handler
761	ah := &ackHandler{ackFn, nil, nil}
762	m.ackLock.Lock()
763	m.ackHandlers[seqNo] = ah
764	m.ackLock.Unlock()
765
766	// Setup a reaping routing
767	ah.timer = time.AfterFunc(timeout, func() {
768		m.ackLock.Lock()
769		delete(m.ackHandlers, seqNo)
770		m.ackLock.Unlock()
771	})
772}
773
774// Invokes an ack handler if any is associated, and reaps the handler immediately
775func (m *Memberlist) invokeAckHandler(ack ackResp, timestamp time.Time) {
776	m.ackLock.Lock()
777	ah, ok := m.ackHandlers[ack.SeqNo]
778	delete(m.ackHandlers, ack.SeqNo)
779	m.ackLock.Unlock()
780	if !ok {
781		return
782	}
783	ah.timer.Stop()
784	ah.ackFn(ack.Payload, timestamp)
785}
786
787// Invokes nack handler if any is associated.
788func (m *Memberlist) invokeNackHandler(nack nackResp) {
789	m.ackLock.Lock()
790	ah, ok := m.ackHandlers[nack.SeqNo]
791	m.ackLock.Unlock()
792	if !ok || ah.nackFn == nil {
793		return
794	}
795	ah.nackFn()
796}
797
798// refute gossips an alive message in response to incoming information that we
799// are suspect or dead. It will make sure the incarnation number beats the given
800// accusedInc value, or you can supply 0 to just get the next incarnation number.
801// This alters the node state that's passed in so this MUST be called while the
802// nodeLock is held.
803func (m *Memberlist) refute(me *nodeState, accusedInc uint32) {
804	// Make sure the incarnation number beats the accusation.
805	inc := m.nextIncarnation()
806	if accusedInc >= inc {
807		inc = m.skipIncarnation(accusedInc - inc + 1)
808	}
809	me.Incarnation = inc
810
811	// Decrease our health because we are being asked to refute a problem.
812	m.awareness.ApplyDelta(1)
813
814	// Format and broadcast an alive message.
815	a := alive{
816		Incarnation: inc,
817		Node:        me.Name,
818		Addr:        me.Addr,
819		Port:        me.Port,
820		Meta:        me.Meta,
821		Vsn: []uint8{
822			me.PMin, me.PMax, me.PCur,
823			me.DMin, me.DMax, me.DCur,
824		},
825	}
826	m.encodeAndBroadcast(me.Addr.String(), aliveMsg, a)
827}
828
829// aliveNode is invoked by the network layer when we get a message about a
830// live node.
831func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) {
832	m.nodeLock.Lock()
833	defer m.nodeLock.Unlock()
834	state, ok := m.nodeMap[a.Node]
835
836	// It is possible that during a Leave(), there is already an aliveMsg
837	// in-queue to be processed but blocked by the locks above. If we let
838	// that aliveMsg process, it'll cause us to re-join the cluster. This
839	// ensures that we don't.
840	if m.hasLeft() && a.Node == m.config.Name {
841		return
842	}
843
844	// Invoke the Alive delegate if any. This can be used to filter out
845	// alive messages based on custom logic. For example, using a cluster name.
846	// Using a merge delegate is not enough, as it is possible for passive
847	// cluster merging to still occur.
848	if m.config.Alive != nil {
849		node := &Node{
850			Name: a.Node,
851			Addr: a.Addr,
852			Port: a.Port,
853			Meta: a.Meta,
854			PMin: a.Vsn[0],
855			PMax: a.Vsn[1],
856			PCur: a.Vsn[2],
857			DMin: a.Vsn[3],
858			DMax: a.Vsn[4],
859			DCur: a.Vsn[5],
860		}
861		if err := m.config.Alive.NotifyAlive(node); err != nil {
862			m.logger.Printf("[WARN] memberlist: ignoring alive message for '%s': %s",
863				a.Node, err)
864			return
865		}
866	}
867
868	// Check if we've never seen this node before, and if not, then
869	// store this node in our node map.
870	if !ok {
871		state = &nodeState{
872			Node: Node{
873				Name: a.Node,
874				Addr: a.Addr,
875				Port: a.Port,
876				Meta: a.Meta,
877			},
878			State: stateDead,
879		}
880
881		// Add to map
882		m.nodeMap[a.Node] = state
883
884		// Get a random offset. This is important to ensure
885		// the failure detection bound is low on average. If all
886		// nodes did an append, failure detection bound would be
887		// very high.
888		n := len(m.nodes)
889		offset := randomOffset(n)
890
891		// Add at the end and swap with the node at the offset
892		m.nodes = append(m.nodes, state)
893		m.nodes[offset], m.nodes[n] = m.nodes[n], m.nodes[offset]
894
895		// Update numNodes after we've added a new node
896		atomic.AddUint32(&m.numNodes, 1)
897	}
898
899	// Check if this address is different than the existing node
900	if !bytes.Equal([]byte(state.Addr), a.Addr) || state.Port != a.Port {
901		m.logger.Printf("[ERR] memberlist: Conflicting address for %s. Mine: %v:%d Theirs: %v:%d",
902			state.Name, state.Addr, state.Port, net.IP(a.Addr), a.Port)
903
904		// Inform the conflict delegate if provided
905		if m.config.Conflict != nil {
906			other := Node{
907				Name: a.Node,
908				Addr: a.Addr,
909				Port: a.Port,
910				Meta: a.Meta,
911			}
912			m.config.Conflict.NotifyConflict(&state.Node, &other)
913		}
914		return
915	}
916
917	// Bail if the incarnation number is older, and this is not about us
918	isLocalNode := state.Name == m.config.Name
919	if a.Incarnation <= state.Incarnation && !isLocalNode {
920		return
921	}
922
923	// Bail if strictly less and this is about us
924	if a.Incarnation < state.Incarnation && isLocalNode {
925		return
926	}
927
928	// Clear out any suspicion timer that may be in effect.
929	delete(m.nodeTimers, a.Node)
930
931	// Store the old state and meta data
932	oldState := state.State
933	oldMeta := state.Meta
934
935	// If this is us we need to refute, otherwise re-broadcast
936	if !bootstrap && isLocalNode {
937		// Compute the version vector
938		versions := []uint8{
939			state.PMin, state.PMax, state.PCur,
940			state.DMin, state.DMax, state.DCur,
941		}
942
943		// If the Incarnation is the same, we need special handling, since it
944		// possible for the following situation to happen:
945		// 1) Start with configuration C, join cluster
946		// 2) Hard fail / Kill / Shutdown
947		// 3) Restart with configuration C', join cluster
948		//
949		// In this case, other nodes and the local node see the same incarnation,
950		// but the values may not be the same. For this reason, we always
951		// need to do an equality check for this Incarnation. In most cases,
952		// we just ignore, but we may need to refute.
953		//
954		if a.Incarnation == state.Incarnation &&
955			bytes.Equal(a.Meta, state.Meta) &&
956			bytes.Equal(a.Vsn, versions) {
957			return
958		}
959
960		m.refute(state, a.Incarnation)
961		m.logger.Printf("[WARN] memberlist: Refuting an alive message")
962	} else {
963		m.encodeBroadcastNotify(a.Node, aliveMsg, a, notify)
964
965		// Update protocol versions if it arrived
966		if len(a.Vsn) > 0 {
967			state.PMin = a.Vsn[0]
968			state.PMax = a.Vsn[1]
969			state.PCur = a.Vsn[2]
970			state.DMin = a.Vsn[3]
971			state.DMax = a.Vsn[4]
972			state.DCur = a.Vsn[5]
973		}
974
975		// Update the state and incarnation number
976		state.Incarnation = a.Incarnation
977		state.Meta = a.Meta
978		if state.State != stateAlive {
979			state.State = stateAlive
980			state.StateChange = time.Now()
981		}
982	}
983
984	// Update metrics
985	metrics.IncrCounter([]string{"memberlist", "msg", "alive"}, 1)
986
987	// Notify the delegate of any relevant updates
988	if m.config.Events != nil {
989		if oldState == stateDead {
990			// if Dead -> Alive, notify of join
991			m.config.Events.NotifyJoin(&state.Node)
992
993		} else if !bytes.Equal(oldMeta, state.Meta) {
994			// if Meta changed, trigger an update notification
995			m.config.Events.NotifyUpdate(&state.Node)
996		}
997	}
998}
999
1000// suspectNode is invoked by the network layer when we get a message
1001// about a suspect node
1002func (m *Memberlist) suspectNode(s *suspect) {
1003	m.nodeLock.Lock()
1004	defer m.nodeLock.Unlock()
1005	state, ok := m.nodeMap[s.Node]
1006
1007	// If we've never heard about this node before, ignore it
1008	if !ok {
1009		return
1010	}
1011
1012	// Ignore old incarnation numbers
1013	if s.Incarnation < state.Incarnation {
1014		return
1015	}
1016
1017	// See if there's a suspicion timer we can confirm. If the info is new
1018	// to us we will go ahead and re-gossip it. This allows for multiple
1019	// independent confirmations to flow even when a node probes a node
1020	// that's already suspect.
1021	if timer, ok := m.nodeTimers[s.Node]; ok {
1022		if timer.Confirm(s.From) {
1023			m.encodeAndBroadcast(s.Node, suspectMsg, s)
1024		}
1025		return
1026	}
1027
1028	// Ignore non-alive nodes
1029	if state.State != stateAlive {
1030		return
1031	}
1032
1033	// If this is us we need to refute, otherwise re-broadcast
1034	if state.Name == m.config.Name {
1035		m.refute(state, s.Incarnation)
1036		m.logger.Printf("[WARN] memberlist: Refuting a suspect message (from: %s)", s.From)
1037		return // Do not mark ourself suspect
1038	} else {
1039		m.encodeAndBroadcast(s.Node, suspectMsg, s)
1040	}
1041
1042	// Update metrics
1043	metrics.IncrCounter([]string{"memberlist", "msg", "suspect"}, 1)
1044
1045	// Update the state
1046	state.Incarnation = s.Incarnation
1047	state.State = stateSuspect
1048	changeTime := time.Now()
1049	state.StateChange = changeTime
1050
1051	// Setup a suspicion timer. Given that we don't have any known phase
1052	// relationship with our peers, we set up k such that we hit the nominal
1053	// timeout two probe intervals short of what we expect given the suspicion
1054	// multiplier.
1055	k := m.config.SuspicionMult - 2
1056
1057	// If there aren't enough nodes to give the expected confirmations, just
1058	// set k to 0 to say that we don't expect any. Note we subtract 2 from n
1059	// here to take out ourselves and the node being probed.
1060	n := m.estNumNodes()
1061	if n-2 < k {
1062		k = 0
1063	}
1064
1065	// Compute the timeouts based on the size of the cluster.
1066	min := suspicionTimeout(m.config.SuspicionMult, n, m.config.ProbeInterval)
1067	max := time.Duration(m.config.SuspicionMaxTimeoutMult) * min
1068	fn := func(numConfirmations int) {
1069		m.nodeLock.Lock()
1070		state, ok := m.nodeMap[s.Node]
1071		timeout := ok && state.State == stateSuspect && state.StateChange == changeTime
1072		m.nodeLock.Unlock()
1073
1074		if timeout {
1075			if k > 0 && numConfirmations < k {
1076				metrics.IncrCounter([]string{"memberlist", "degraded", "timeout"}, 1)
1077			}
1078
1079			m.logger.Printf("[INFO] memberlist: Marking %s as failed, suspect timeout reached (%d peer confirmations)",
1080				state.Name, numConfirmations)
1081			d := dead{Incarnation: state.Incarnation, Node: state.Name, From: m.config.Name}
1082			m.deadNode(&d)
1083		}
1084	}
1085	m.nodeTimers[s.Node] = newSuspicion(s.From, k, min, max, fn)
1086}
1087
1088// deadNode is invoked by the network layer when we get a message
1089// about a dead node
1090func (m *Memberlist) deadNode(d *dead) {
1091	m.nodeLock.Lock()
1092	defer m.nodeLock.Unlock()
1093	state, ok := m.nodeMap[d.Node]
1094
1095	// If we've never heard about this node before, ignore it
1096	if !ok {
1097		return
1098	}
1099
1100	// Ignore old incarnation numbers
1101	if d.Incarnation < state.Incarnation {
1102		return
1103	}
1104
1105	// Clear out any suspicion timer that may be in effect.
1106	delete(m.nodeTimers, d.Node)
1107
1108	// Ignore if node is already dead
1109	if state.State == stateDead {
1110		return
1111	}
1112
1113	// Check if this is us
1114	if state.Name == m.config.Name {
1115		// If we are not leaving we need to refute
1116		if !m.hasLeft() {
1117			m.refute(state, d.Incarnation)
1118			m.logger.Printf("[WARN] memberlist: Refuting a dead message (from: %s)", d.From)
1119			return // Do not mark ourself dead
1120		}
1121
1122		// If we are leaving, we broadcast and wait
1123		m.encodeBroadcastNotify(d.Node, deadMsg, d, m.leaveBroadcast)
1124	} else {
1125		m.encodeAndBroadcast(d.Node, deadMsg, d)
1126	}
1127
1128	// Update metrics
1129	metrics.IncrCounter([]string{"memberlist", "msg", "dead"}, 1)
1130
1131	// Update the state
1132	state.Incarnation = d.Incarnation
1133	state.State = stateDead
1134	state.StateChange = time.Now()
1135
1136	// Notify of death
1137	if m.config.Events != nil {
1138		m.config.Events.NotifyLeave(&state.Node)
1139	}
1140}
1141
1142// mergeState is invoked by the network layer when we get a Push/Pull
1143// state transfer
1144func (m *Memberlist) mergeState(remote []pushNodeState) {
1145	for _, r := range remote {
1146		switch r.State {
1147		case stateAlive:
1148			a := alive{
1149				Incarnation: r.Incarnation,
1150				Node:        r.Name,
1151				Addr:        r.Addr,
1152				Port:        r.Port,
1153				Meta:        r.Meta,
1154				Vsn:         r.Vsn,
1155			}
1156			m.aliveNode(&a, nil, false)
1157
1158		case stateDead:
1159			// If the remote node believes a node is dead, we prefer to
1160			// suspect that node instead of declaring it dead instantly
1161			fallthrough
1162		case stateSuspect:
1163			s := suspect{Incarnation: r.Incarnation, Node: r.Name, From: m.config.Name}
1164			m.suspectNode(&s)
1165		}
1166	}
1167}
1168