1/*
2memberlist is a library that manages cluster
3membership and member failure detection using a gossip based protocol.
4
5The use cases for such a library are far-reaching: all distributed systems
6require membership, and memberlist is a re-usable solution to managing
7cluster membership and node failure detection.
8
9memberlist is eventually consistent but converges quickly on average.
10The speed at which it converges can be heavily tuned via various knobs
11on the protocol. Node failures are detected and network partitions are partially
12tolerated by attempting to communicate to potentially dead nodes through
13multiple routes.
14*/
15package memberlist
16
17import (
18	"container/list"
19	"errors"
20	"fmt"
21	"log"
22	"net"
23	"os"
24	"strconv"
25	"strings"
26	"sync"
27	"sync/atomic"
28	"time"
29
30	multierror "github.com/hashicorp/go-multierror"
31	sockaddr "github.com/hashicorp/go-sockaddr"
32	"github.com/miekg/dns"
33)
34
35var errNodeNamesAreRequired = errors.New("memberlist: node names are required by configuration but one was not provided")
36
37type Memberlist struct {
38	sequenceNum uint32 // Local sequence number
39	incarnation uint32 // Local incarnation number
40	numNodes    uint32 // Number of known nodes (estimate)
41	pushPullReq uint32 // Number of push/pull requests
42
43	advertiseLock sync.RWMutex
44	advertiseAddr net.IP
45	advertisePort uint16
46
47	config         *Config
48	shutdown       int32 // Used as an atomic boolean value
49	shutdownCh     chan struct{}
50	leave          int32 // Used as an atomic boolean value
51	leaveBroadcast chan struct{}
52
53	shutdownLock sync.Mutex // Serializes calls to Shutdown
54	leaveLock    sync.Mutex // Serializes calls to Leave
55
56	transport NodeAwareTransport
57
58	handoffCh            chan struct{}
59	highPriorityMsgQueue *list.List
60	lowPriorityMsgQueue  *list.List
61	msgQueueLock         sync.Mutex
62
63	nodeLock   sync.RWMutex
64	nodes      []*nodeState          // Known nodes
65	nodeMap    map[string]*nodeState // Maps Node.Name -> NodeState
66	nodeTimers map[string]*suspicion // Maps Node.Name -> suspicion timer
67	awareness  *awareness
68
69	tickerLock sync.Mutex
70	tickers    []*time.Ticker
71	stopTick   chan struct{}
72	probeIndex int
73
74	ackLock     sync.Mutex
75	ackHandlers map[uint32]*ackHandler
76
77	broadcasts *TransmitLimitedQueue
78
79	logger *log.Logger
80}
81
82// BuildVsnArray creates the array of Vsn
83func (conf *Config) BuildVsnArray() []uint8 {
84	return []uint8{
85		ProtocolVersionMin, ProtocolVersionMax, conf.ProtocolVersion,
86		conf.DelegateProtocolMin, conf.DelegateProtocolMax,
87		conf.DelegateProtocolVersion,
88	}
89}
90
91// newMemberlist creates the network listeners.
92// Does not schedule execution of background maintenance.
93func newMemberlist(conf *Config) (*Memberlist, error) {
94	if conf.ProtocolVersion < ProtocolVersionMin {
95		return nil, fmt.Errorf("Protocol version '%d' too low. Must be in range: [%d, %d]",
96			conf.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax)
97	} else if conf.ProtocolVersion > ProtocolVersionMax {
98		return nil, fmt.Errorf("Protocol version '%d' too high. Must be in range: [%d, %d]",
99			conf.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax)
100	}
101
102	if len(conf.SecretKey) > 0 {
103		if conf.Keyring == nil {
104			keyring, err := NewKeyring(nil, conf.SecretKey)
105			if err != nil {
106				return nil, err
107			}
108			conf.Keyring = keyring
109		} else {
110			if err := conf.Keyring.AddKey(conf.SecretKey); err != nil {
111				return nil, err
112			}
113			if err := conf.Keyring.UseKey(conf.SecretKey); err != nil {
114				return nil, err
115			}
116		}
117	}
118
119	if conf.LogOutput != nil && conf.Logger != nil {
120		return nil, fmt.Errorf("Cannot specify both LogOutput and Logger. Please choose a single log configuration setting.")
121	}
122
123	logDest := conf.LogOutput
124	if logDest == nil {
125		logDest = os.Stderr
126	}
127
128	logger := conf.Logger
129	if logger == nil {
130		logger = log.New(logDest, "", log.LstdFlags)
131	}
132
133	// Set up a network transport by default if a custom one wasn't given
134	// by the config.
135	transport := conf.Transport
136	if transport == nil {
137		nc := &NetTransportConfig{
138			BindAddrs: []string{conf.BindAddr},
139			BindPort:  conf.BindPort,
140			Logger:    logger,
141		}
142
143		// See comment below for details about the retry in here.
144		makeNetRetry := func(limit int) (*NetTransport, error) {
145			var err error
146			for try := 0; try < limit; try++ {
147				var nt *NetTransport
148				if nt, err = NewNetTransport(nc); err == nil {
149					return nt, nil
150				}
151				if strings.Contains(err.Error(), "address already in use") {
152					logger.Printf("[DEBUG] memberlist: Got bind error: %v", err)
153					continue
154				}
155			}
156
157			return nil, fmt.Errorf("failed to obtain an address: %v", err)
158		}
159
160		// The dynamic bind port operation is inherently racy because
161		// even though we are using the kernel to find a port for us, we
162		// are attempting to bind multiple protocols (and potentially
163		// multiple addresses) with the same port number. We build in a
164		// few retries here since this often gets transient errors in
165		// busy unit tests.
166		limit := 1
167		if conf.BindPort == 0 {
168			limit = 10
169		}
170
171		nt, err := makeNetRetry(limit)
172		if err != nil {
173			return nil, fmt.Errorf("Could not set up network transport: %v", err)
174		}
175		if conf.BindPort == 0 {
176			port := nt.GetAutoBindPort()
177			conf.BindPort = port
178			conf.AdvertisePort = port
179			logger.Printf("[DEBUG] memberlist: Using dynamic bind port %d", port)
180		}
181		transport = nt
182	}
183
184	nodeAwareTransport, ok := transport.(NodeAwareTransport)
185	if !ok {
186		logger.Printf("[DEBUG] memberlist: configured Transport is not a NodeAwareTransport and some features may not work as desired")
187		nodeAwareTransport = &shimNodeAwareTransport{transport}
188	}
189
190	if len(conf.Label) > LabelMaxSize {
191		return nil, fmt.Errorf("could not use %q as a label: too long", conf.Label)
192	}
193
194	if conf.Label != "" {
195		nodeAwareTransport = &labelWrappedTransport{
196			label:              conf.Label,
197			NodeAwareTransport: nodeAwareTransport,
198		}
199	}
200
201	m := &Memberlist{
202		config:               conf,
203		shutdownCh:           make(chan struct{}),
204		leaveBroadcast:       make(chan struct{}, 1),
205		transport:            nodeAwareTransport,
206		handoffCh:            make(chan struct{}, 1),
207		highPriorityMsgQueue: list.New(),
208		lowPriorityMsgQueue:  list.New(),
209		nodeMap:              make(map[string]*nodeState),
210		nodeTimers:           make(map[string]*suspicion),
211		awareness:            newAwareness(conf.AwarenessMaxMultiplier),
212		ackHandlers:          make(map[uint32]*ackHandler),
213		broadcasts:           &TransmitLimitedQueue{RetransmitMult: conf.RetransmitMult},
214		logger:               logger,
215	}
216	m.broadcasts.NumNodes = func() int {
217		return m.estNumNodes()
218	}
219
220	// Get the final advertise address from the transport, which may need
221	// to see which address we bound to. We'll refresh this each time we
222	// send out an alive message.
223	if _, _, err := m.refreshAdvertise(); err != nil {
224		return nil, err
225	}
226
227	go m.streamListen()
228	go m.packetListen()
229	go m.packetHandler()
230	return m, nil
231}
232
233// Create will create a new Memberlist using the given configuration.
234// This will not connect to any other node (see Join) yet, but will start
235// all the listeners to allow other nodes to join this memberlist.
236// After creating a Memberlist, the configuration given should not be
237// modified by the user anymore.
238func Create(conf *Config) (*Memberlist, error) {
239	m, err := newMemberlist(conf)
240	if err != nil {
241		return nil, err
242	}
243	if err := m.setAlive(); err != nil {
244		m.Shutdown()
245		return nil, err
246	}
247	m.schedule()
248	return m, nil
249}
250
251// Join is used to take an existing Memberlist and attempt to join a cluster
252// by contacting all the given hosts and performing a state sync. Initially,
253// the Memberlist only contains our own state, so doing this will cause
254// remote nodes to become aware of the existence of this node, effectively
255// joining the cluster.
256//
257// This returns the number of hosts successfully contacted and an error if
258// none could be reached. If an error is returned, the node did not successfully
259// join the cluster.
260func (m *Memberlist) Join(existing []string) (int, error) {
261	numSuccess := 0
262	var errs error
263	for _, exist := range existing {
264		addrs, err := m.resolveAddr(exist)
265		if err != nil {
266			err = fmt.Errorf("Failed to resolve %s: %v", exist, err)
267			errs = multierror.Append(errs, err)
268			m.logger.Printf("[WARN] memberlist: %v", err)
269			continue
270		}
271
272		for _, addr := range addrs {
273			hp := joinHostPort(addr.ip.String(), addr.port)
274			a := Address{Addr: hp, Name: addr.nodeName}
275			if err := m.pushPullNode(a, true); err != nil {
276				err = fmt.Errorf("Failed to join %s: %v", a.Addr, err)
277				errs = multierror.Append(errs, err)
278				m.logger.Printf("[DEBUG] memberlist: %v", err)
279				continue
280			}
281			numSuccess++
282		}
283
284	}
285	if numSuccess > 0 {
286		errs = nil
287	}
288	return numSuccess, errs
289}
290
291// ipPort holds information about a node we want to try to join.
292type ipPort struct {
293	ip       net.IP
294	port     uint16
295	nodeName string // optional
296}
297
298// tcpLookupIP is a helper to initiate a TCP-based DNS lookup for the given host.
299// The built-in Go resolver will do a UDP lookup first, and will only use TCP if
300// the response has the truncate bit set, which isn't common on DNS servers like
301// Consul's. By doing the TCP lookup directly, we get the best chance for the
302// largest list of hosts to join. Since joins are relatively rare events, it's ok
303// to do this rather expensive operation.
304func (m *Memberlist) tcpLookupIP(host string, defaultPort uint16, nodeName string) ([]ipPort, error) {
305	// Don't attempt any TCP lookups against non-fully qualified domain
306	// names, since those will likely come from the resolv.conf file.
307	if !strings.Contains(host, ".") {
308		return nil, nil
309	}
310
311	// Make sure the domain name is terminated with a dot (we know there's
312	// at least one character at this point).
313	dn := host
314	if dn[len(dn)-1] != '.' {
315		dn = dn + "."
316	}
317
318	// See if we can find a server to try.
319	cc, err := dns.ClientConfigFromFile(m.config.DNSConfigPath)
320	if err != nil {
321		return nil, err
322	}
323	if len(cc.Servers) > 0 {
324		// We support host:port in the DNS config, but need to add the
325		// default port if one is not supplied.
326		server := cc.Servers[0]
327		if !hasPort(server) {
328			server = net.JoinHostPort(server, cc.Port)
329		}
330
331		// Do the lookup.
332		c := new(dns.Client)
333		c.Net = "tcp"
334		msg := new(dns.Msg)
335		msg.SetQuestion(dn, dns.TypeANY)
336		in, _, err := c.Exchange(msg, server)
337		if err != nil {
338			return nil, err
339		}
340
341		// Handle any IPs we get back that we can attempt to join.
342		var ips []ipPort
343		for _, r := range in.Answer {
344			switch rr := r.(type) {
345			case (*dns.A):
346				ips = append(ips, ipPort{ip: rr.A, port: defaultPort, nodeName: nodeName})
347			case (*dns.AAAA):
348				ips = append(ips, ipPort{ip: rr.AAAA, port: defaultPort, nodeName: nodeName})
349			case (*dns.CNAME):
350				m.logger.Printf("[DEBUG] memberlist: Ignoring CNAME RR in TCP-first answer for '%s'", host)
351			}
352		}
353		return ips, nil
354	}
355
356	return nil, nil
357}
358
359// resolveAddr is used to resolve the address into an address,
360// port, and error. If no port is given, use the default
361func (m *Memberlist) resolveAddr(hostStr string) ([]ipPort, error) {
362	// First peel off any leading node name. This is optional.
363	nodeName := ""
364	if slashIdx := strings.Index(hostStr, "/"); slashIdx >= 0 {
365		if slashIdx == 0 {
366			return nil, fmt.Errorf("empty node name provided")
367		}
368		nodeName = hostStr[0:slashIdx]
369		hostStr = hostStr[slashIdx+1:]
370	}
371
372	// This captures the supplied port, or the default one.
373	hostStr = ensurePort(hostStr, m.config.BindPort)
374	host, sport, err := net.SplitHostPort(hostStr)
375	if err != nil {
376		return nil, err
377	}
378	lport, err := strconv.ParseUint(sport, 10, 16)
379	if err != nil {
380		return nil, err
381	}
382	port := uint16(lport)
383
384	// If it looks like an IP address we are done. The SplitHostPort() above
385	// will make sure the host part is in good shape for parsing, even for
386	// IPv6 addresses.
387	if ip := net.ParseIP(host); ip != nil {
388		return []ipPort{
389			ipPort{ip: ip, port: port, nodeName: nodeName},
390		}, nil
391	}
392
393	// First try TCP so we have the best chance for the largest list of
394	// hosts to join. If this fails it's not fatal since this isn't a standard
395	// way to query DNS, and we have a fallback below.
396	ips, err := m.tcpLookupIP(host, port, nodeName)
397	if err != nil {
398		m.logger.Printf("[DEBUG] memberlist: TCP-first lookup failed for '%s', falling back to UDP: %s", hostStr, err)
399	}
400	if len(ips) > 0 {
401		return ips, nil
402	}
403
404	// If TCP didn't yield anything then use the normal Go resolver which
405	// will try UDP, then might possibly try TCP again if the UDP response
406	// indicates it was truncated.
407	ans, err := net.LookupIP(host)
408	if err != nil {
409		return nil, err
410	}
411	ips = make([]ipPort, 0, len(ans))
412	for _, ip := range ans {
413		ips = append(ips, ipPort{ip: ip, port: port, nodeName: nodeName})
414	}
415	return ips, nil
416}
417
418// setAlive is used to mark this node as being alive. This is the same
419// as if we received an alive notification our own network channel for
420// ourself.
421func (m *Memberlist) setAlive() error {
422	// Get the final advertise address from the transport, which may need
423	// to see which address we bound to.
424	addr, port, err := m.refreshAdvertise()
425	if err != nil {
426		return err
427	}
428
429	// Check if this is a public address without encryption
430	ipAddr, err := sockaddr.NewIPAddr(addr.String())
431	if err != nil {
432		return fmt.Errorf("Failed to parse interface addresses: %v", err)
433	}
434	ifAddrs := []sockaddr.IfAddr{
435		sockaddr.IfAddr{
436			SockAddr: ipAddr,
437		},
438	}
439	_, publicIfs, err := sockaddr.IfByRFC("6890", ifAddrs)
440	if len(publicIfs) > 0 && !m.config.EncryptionEnabled() {
441		m.logger.Printf("[WARN] memberlist: Binding to public address without encryption!")
442	}
443
444	// Set any metadata from the delegate.
445	var meta []byte
446	if m.config.Delegate != nil {
447		meta = m.config.Delegate.NodeMeta(MetaMaxSize)
448		if len(meta) > MetaMaxSize {
449			panic("Node meta data provided is longer than the limit")
450		}
451	}
452
453	a := alive{
454		Incarnation: m.nextIncarnation(),
455		Node:        m.config.Name,
456		Addr:        addr,
457		Port:        uint16(port),
458		Meta:        meta,
459		Vsn:         m.config.BuildVsnArray(),
460	}
461	m.aliveNode(&a, nil, true)
462
463	return nil
464}
465
466func (m *Memberlist) getAdvertise() (net.IP, uint16) {
467	m.advertiseLock.RLock()
468	defer m.advertiseLock.RUnlock()
469	return m.advertiseAddr, m.advertisePort
470}
471
472func (m *Memberlist) setAdvertise(addr net.IP, port int) {
473	m.advertiseLock.Lock()
474	defer m.advertiseLock.Unlock()
475	m.advertiseAddr = addr
476	m.advertisePort = uint16(port)
477}
478
479func (m *Memberlist) refreshAdvertise() (net.IP, int, error) {
480	addr, port, err := m.transport.FinalAdvertiseAddr(
481		m.config.AdvertiseAddr, m.config.AdvertisePort)
482	if err != nil {
483		return nil, 0, fmt.Errorf("Failed to get final advertise address: %v", err)
484	}
485	m.setAdvertise(addr, port)
486	return addr, port, nil
487}
488
489// LocalNode is used to return the local Node
490func (m *Memberlist) LocalNode() *Node {
491	m.nodeLock.RLock()
492	defer m.nodeLock.RUnlock()
493	state := m.nodeMap[m.config.Name]
494	return &state.Node
495}
496
497// UpdateNode is used to trigger re-advertising the local node. This is
498// primarily used with a Delegate to support dynamic updates to the local
499// meta data.  This will block until the update message is successfully
500// broadcasted to a member of the cluster, if any exist or until a specified
501// timeout is reached.
502func (m *Memberlist) UpdateNode(timeout time.Duration) error {
503	// Get the node meta data
504	var meta []byte
505	if m.config.Delegate != nil {
506		meta = m.config.Delegate.NodeMeta(MetaMaxSize)
507		if len(meta) > MetaMaxSize {
508			panic("Node meta data provided is longer than the limit")
509		}
510	}
511
512	// Get the existing node
513	m.nodeLock.RLock()
514	state := m.nodeMap[m.config.Name]
515	m.nodeLock.RUnlock()
516
517	// Format a new alive message
518	a := alive{
519		Incarnation: m.nextIncarnation(),
520		Node:        m.config.Name,
521		Addr:        state.Addr,
522		Port:        state.Port,
523		Meta:        meta,
524		Vsn:         m.config.BuildVsnArray(),
525	}
526	notifyCh := make(chan struct{})
527	m.aliveNode(&a, notifyCh, true)
528
529	// Wait for the broadcast or a timeout
530	if m.anyAlive() {
531		var timeoutCh <-chan time.Time
532		if timeout > 0 {
533			timeoutCh = time.After(timeout)
534		}
535		select {
536		case <-notifyCh:
537		case <-timeoutCh:
538			return fmt.Errorf("timeout waiting for update broadcast")
539		}
540	}
541	return nil
542}
543
544// Deprecated: SendTo is deprecated in favor of SendBestEffort, which requires a node to
545// target. If you don't have a node then use SendToAddress.
546func (m *Memberlist) SendTo(to net.Addr, msg []byte) error {
547	a := Address{Addr: to.String(), Name: ""}
548	return m.SendToAddress(a, msg)
549}
550
551func (m *Memberlist) SendToAddress(a Address, msg []byte) error {
552	// Encode as a user message
553	buf := make([]byte, 1, len(msg)+1)
554	buf[0] = byte(userMsg)
555	buf = append(buf, msg...)
556
557	// Send the message
558	return m.rawSendMsgPacket(a, nil, buf)
559}
560
561// Deprecated: SendToUDP is deprecated in favor of SendBestEffort.
562func (m *Memberlist) SendToUDP(to *Node, msg []byte) error {
563	return m.SendBestEffort(to, msg)
564}
565
566// Deprecated: SendToTCP is deprecated in favor of SendReliable.
567func (m *Memberlist) SendToTCP(to *Node, msg []byte) error {
568	return m.SendReliable(to, msg)
569}
570
571// SendBestEffort uses the unreliable packet-oriented interface of the transport
572// to target a user message at the given node (this does not use the gossip
573// mechanism). The maximum size of the message depends on the configured
574// UDPBufferSize for this memberlist instance.
575func (m *Memberlist) SendBestEffort(to *Node, msg []byte) error {
576	// Encode as a user message
577	buf := make([]byte, 1, len(msg)+1)
578	buf[0] = byte(userMsg)
579	buf = append(buf, msg...)
580
581	// Send the message
582	a := Address{Addr: to.Address(), Name: to.Name}
583	return m.rawSendMsgPacket(a, to, buf)
584}
585
586// SendReliable uses the reliable stream-oriented interface of the transport to
587// target a user message at the given node (this does not use the gossip
588// mechanism). Delivery is guaranteed if no error is returned, and there is no
589// limit on the size of the message.
590func (m *Memberlist) SendReliable(to *Node, msg []byte) error {
591	return m.sendUserMsg(to.FullAddress(), msg)
592}
593
594// Members returns a list of all known live nodes. The node structures
595// returned must not be modified. If you wish to modify a Node, make a
596// copy first.
597func (m *Memberlist) Members() []*Node {
598	m.nodeLock.RLock()
599	defer m.nodeLock.RUnlock()
600
601	nodes := make([]*Node, 0, len(m.nodes))
602	for _, n := range m.nodes {
603		if !n.DeadOrLeft() {
604			nodes = append(nodes, &n.Node)
605		}
606	}
607
608	return nodes
609}
610
611// NumMembers returns the number of alive nodes currently known. Between
612// the time of calling this and calling Members, the number of alive nodes
613// may have changed, so this shouldn't be used to determine how many
614// members will be returned by Members.
615func (m *Memberlist) NumMembers() (alive int) {
616	m.nodeLock.RLock()
617	defer m.nodeLock.RUnlock()
618
619	for _, n := range m.nodes {
620		if !n.DeadOrLeft() {
621			alive++
622		}
623	}
624
625	return
626}
627
628// Leave will broadcast a leave message but will not shutdown the background
629// listeners, meaning the node will continue participating in gossip and state
630// updates.
631//
632// This will block until the leave message is successfully broadcasted to
633// a member of the cluster, if any exist or until a specified timeout
634// is reached.
635//
636// This method is safe to call multiple times, but must not be called
637// after the cluster is already shut down.
638func (m *Memberlist) Leave(timeout time.Duration) error {
639	m.leaveLock.Lock()
640	defer m.leaveLock.Unlock()
641
642	if m.hasShutdown() {
643		panic("leave after shutdown")
644	}
645
646	if !m.hasLeft() {
647		atomic.StoreInt32(&m.leave, 1)
648
649		m.nodeLock.Lock()
650		state, ok := m.nodeMap[m.config.Name]
651		m.nodeLock.Unlock()
652		if !ok {
653			m.logger.Printf("[WARN] memberlist: Leave but we're not in the node map.")
654			return nil
655		}
656
657		// This dead message is special, because Node and From are the
658		// same. This helps other nodes figure out that a node left
659		// intentionally. When Node equals From, other nodes know for
660		// sure this node is gone.
661		d := dead{
662			Incarnation: state.Incarnation,
663			Node:        state.Name,
664			From:        state.Name,
665		}
666		m.deadNode(&d)
667
668		// Block until the broadcast goes out
669		if m.anyAlive() {
670			var timeoutCh <-chan time.Time
671			if timeout > 0 {
672				timeoutCh = time.After(timeout)
673			}
674			select {
675			case <-m.leaveBroadcast:
676			case <-timeoutCh:
677				return fmt.Errorf("timeout waiting for leave broadcast")
678			}
679		}
680	}
681
682	return nil
683}
684
685// Check for any other alive node.
686func (m *Memberlist) anyAlive() bool {
687	m.nodeLock.RLock()
688	defer m.nodeLock.RUnlock()
689	for _, n := range m.nodes {
690		if !n.DeadOrLeft() && n.Name != m.config.Name {
691			return true
692		}
693	}
694	return false
695}
696
697// GetHealthScore gives this instance's idea of how well it is meeting the soft
698// real-time requirements of the protocol. Lower numbers are better, and zero
699// means "totally healthy".
700func (m *Memberlist) GetHealthScore() int {
701	return m.awareness.GetHealthScore()
702}
703
704// ProtocolVersion returns the protocol version currently in use by
705// this memberlist.
706func (m *Memberlist) ProtocolVersion() uint8 {
707	// NOTE: This method exists so that in the future we can control
708	// any locking if necessary, if we change the protocol version at
709	// runtime, etc.
710	return m.config.ProtocolVersion
711}
712
713// Shutdown will stop any background maintenance of network activity
714// for this memberlist, causing it to appear "dead". A leave message
715// will not be broadcasted prior, so the cluster being left will have
716// to detect this node's shutdown using probing. If you wish to more
717// gracefully exit the cluster, call Leave prior to shutting down.
718//
719// This method is safe to call multiple times.
720func (m *Memberlist) Shutdown() error {
721	m.shutdownLock.Lock()
722	defer m.shutdownLock.Unlock()
723
724	if m.hasShutdown() {
725		return nil
726	}
727
728	// Shut down the transport first, which should block until it's
729	// completely torn down. If we kill the memberlist-side handlers
730	// those I/O handlers might get stuck.
731	if err := m.transport.Shutdown(); err != nil {
732		m.logger.Printf("[ERR] Failed to shutdown transport: %v", err)
733	}
734
735	// Now tear down everything else.
736	atomic.StoreInt32(&m.shutdown, 1)
737	close(m.shutdownCh)
738	m.deschedule()
739	return nil
740}
741
742func (m *Memberlist) hasShutdown() bool {
743	return atomic.LoadInt32(&m.shutdown) == 1
744}
745
746func (m *Memberlist) hasLeft() bool {
747	return atomic.LoadInt32(&m.leave) == 1
748}
749
750func (m *Memberlist) getNodeState(addr string) NodeStateType {
751	m.nodeLock.RLock()
752	defer m.nodeLock.RUnlock()
753
754	n := m.nodeMap[addr]
755	return n.State
756}
757
758func (m *Memberlist) getNodeStateChange(addr string) time.Time {
759	m.nodeLock.RLock()
760	defer m.nodeLock.RUnlock()
761
762	n := m.nodeMap[addr]
763	return n.StateChange
764}
765
766func (m *Memberlist) changeNode(addr string, f func(*nodeState)) {
767	m.nodeLock.Lock()
768	defer m.nodeLock.Unlock()
769
770	n := m.nodeMap[addr]
771	f(n)
772}
773