1/*
2memberlist is a library that manages cluster
3membership and member failure detection using a gossip based protocol.
4
5The use cases for such a library are far-reaching: all distributed systems
6require membership, and memberlist is a re-usable solution to managing
7cluster membership and node failure detection.
8
9memberlist is eventually consistent but converges quickly on average.
10The speed at which it converges can be heavily tuned via various knobs
11on the protocol. Node failures are detected and network partitions are partially
12tolerated by attempting to communicate to potentially dead nodes through
13multiple routes.
14*/
15package memberlist
16
17import (
18	"container/list"
19	"errors"
20	"fmt"
21	"log"
22	"net"
23	"os"
24	"strconv"
25	"strings"
26	"sync"
27	"sync/atomic"
28	"time"
29
30	multierror "github.com/hashicorp/go-multierror"
31	sockaddr "github.com/hashicorp/go-sockaddr"
32	"github.com/miekg/dns"
33)
34
35var errNodeNamesAreRequired = errors.New("memberlist: node names are required by configuration but one was not provided")
36
37type Memberlist struct {
38	sequenceNum uint32 // Local sequence number
39	incarnation uint32 // Local incarnation number
40	numNodes    uint32 // Number of known nodes (estimate)
41	pushPullReq uint32 // Number of push/pull requests
42
43	advertiseLock sync.RWMutex
44	advertiseAddr net.IP
45	advertisePort uint16
46
47	config         *Config
48	shutdown       int32 // Used as an atomic boolean value
49	shutdownCh     chan struct{}
50	leave          int32 // Used as an atomic boolean value
51	leaveBroadcast chan struct{}
52
53	shutdownLock sync.Mutex // Serializes calls to Shutdown
54	leaveLock    sync.Mutex // Serializes calls to Leave
55
56	transport NodeAwareTransport
57
58	handoffCh            chan struct{}
59	highPriorityMsgQueue *list.List
60	lowPriorityMsgQueue  *list.List
61	msgQueueLock         sync.Mutex
62
63	nodeLock   sync.RWMutex
64	nodes      []*nodeState          // Known nodes
65	nodeMap    map[string]*nodeState // Maps Node.Name -> NodeState
66	nodeTimers map[string]*suspicion // Maps Node.Name -> suspicion timer
67	awareness  *awareness
68
69	tickerLock sync.Mutex
70	tickers    []*time.Ticker
71	stopTick   chan struct{}
72	probeIndex int
73
74	ackLock     sync.Mutex
75	ackHandlers map[uint32]*ackHandler
76
77	broadcasts *TransmitLimitedQueue
78
79	logger *log.Logger
80}
81
82// BuildVsnArray creates the array of Vsn
83func (conf *Config) BuildVsnArray() []uint8 {
84	return []uint8{
85		ProtocolVersionMin, ProtocolVersionMax, conf.ProtocolVersion,
86		conf.DelegateProtocolMin, conf.DelegateProtocolMax,
87		conf.DelegateProtocolVersion,
88	}
89}
90
91// newMemberlist creates the network listeners.
92// Does not schedule execution of background maintenance.
93func newMemberlist(conf *Config) (*Memberlist, error) {
94	if conf.ProtocolVersion < ProtocolVersionMin {
95		return nil, fmt.Errorf("Protocol version '%d' too low. Must be in range: [%d, %d]",
96			conf.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax)
97	} else if conf.ProtocolVersion > ProtocolVersionMax {
98		return nil, fmt.Errorf("Protocol version '%d' too high. Must be in range: [%d, %d]",
99			conf.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax)
100	}
101
102	if len(conf.SecretKey) > 0 {
103		if conf.Keyring == nil {
104			keyring, err := NewKeyring(nil, conf.SecretKey)
105			if err != nil {
106				return nil, err
107			}
108			conf.Keyring = keyring
109		} else {
110			if err := conf.Keyring.AddKey(conf.SecretKey); err != nil {
111				return nil, err
112			}
113			if err := conf.Keyring.UseKey(conf.SecretKey); err != nil {
114				return nil, err
115			}
116		}
117	}
118
119	if conf.LogOutput != nil && conf.Logger != nil {
120		return nil, fmt.Errorf("Cannot specify both LogOutput and Logger. Please choose a single log configuration setting.")
121	}
122
123	logDest := conf.LogOutput
124	if logDest == nil {
125		logDest = os.Stderr
126	}
127
128	logger := conf.Logger
129	if logger == nil {
130		logger = log.New(logDest, "", log.LstdFlags)
131	}
132
133	// Set up a network transport by default if a custom one wasn't given
134	// by the config.
135	transport := conf.Transport
136	if transport == nil {
137		nc := &NetTransportConfig{
138			BindAddrs: []string{conf.BindAddr},
139			BindPort:  conf.BindPort,
140			Logger:    logger,
141		}
142
143		// See comment below for details about the retry in here.
144		makeNetRetry := func(limit int) (*NetTransport, error) {
145			var err error
146			for try := 0; try < limit; try++ {
147				var nt *NetTransport
148				if nt, err = NewNetTransport(nc); err == nil {
149					return nt, nil
150				}
151				if strings.Contains(err.Error(), "address already in use") {
152					logger.Printf("[DEBUG] memberlist: Got bind error: %v", err)
153					continue
154				}
155			}
156
157			return nil, fmt.Errorf("failed to obtain an address: %v", err)
158		}
159
160		// The dynamic bind port operation is inherently racy because
161		// even though we are using the kernel to find a port for us, we
162		// are attempting to bind multiple protocols (and potentially
163		// multiple addresses) with the same port number. We build in a
164		// few retries here since this often gets transient errors in
165		// busy unit tests.
166		limit := 1
167		if conf.BindPort == 0 {
168			limit = 10
169		}
170
171		nt, err := makeNetRetry(limit)
172		if err != nil {
173			return nil, fmt.Errorf("Could not set up network transport: %v", err)
174		}
175		if conf.BindPort == 0 {
176			port := nt.GetAutoBindPort()
177			conf.BindPort = port
178			conf.AdvertisePort = port
179			logger.Printf("[DEBUG] memberlist: Using dynamic bind port %d", port)
180		}
181		transport = nt
182	}
183
184	nodeAwareTransport, ok := transport.(NodeAwareTransport)
185	if !ok {
186		logger.Printf("[DEBUG] memberlist: configured Transport is not a NodeAwareTransport and some features may not work as desired")
187		nodeAwareTransport = &shimNodeAwareTransport{transport}
188	}
189
190	m := &Memberlist{
191		config:               conf,
192		shutdownCh:           make(chan struct{}),
193		leaveBroadcast:       make(chan struct{}, 1),
194		transport:            nodeAwareTransport,
195		handoffCh:            make(chan struct{}, 1),
196		highPriorityMsgQueue: list.New(),
197		lowPriorityMsgQueue:  list.New(),
198		nodeMap:              make(map[string]*nodeState),
199		nodeTimers:           make(map[string]*suspicion),
200		awareness:            newAwareness(conf.AwarenessMaxMultiplier),
201		ackHandlers:          make(map[uint32]*ackHandler),
202		broadcasts:           &TransmitLimitedQueue{RetransmitMult: conf.RetransmitMult},
203		logger:               logger,
204	}
205	m.broadcasts.NumNodes = func() int {
206		return m.estNumNodes()
207	}
208
209	// Get the final advertise address from the transport, which may need
210	// to see which address we bound to. We'll refresh this each time we
211	// send out an alive message.
212	if _, _, err := m.refreshAdvertise(); err != nil {
213		return nil, err
214	}
215
216	go m.streamListen()
217	go m.packetListen()
218	go m.packetHandler()
219	return m, nil
220}
221
222// Create will create a new Memberlist using the given configuration.
223// This will not connect to any other node (see Join) yet, but will start
224// all the listeners to allow other nodes to join this memberlist.
225// After creating a Memberlist, the configuration given should not be
226// modified by the user anymore.
227func Create(conf *Config) (*Memberlist, error) {
228	m, err := newMemberlist(conf)
229	if err != nil {
230		return nil, err
231	}
232	if err := m.setAlive(); err != nil {
233		m.Shutdown()
234		return nil, err
235	}
236	m.schedule()
237	return m, nil
238}
239
240// Join is used to take an existing Memberlist and attempt to join a cluster
241// by contacting all the given hosts and performing a state sync. Initially,
242// the Memberlist only contains our own state, so doing this will cause
243// remote nodes to become aware of the existence of this node, effectively
244// joining the cluster.
245//
246// This returns the number of hosts successfully contacted and an error if
247// none could be reached. If an error is returned, the node did not successfully
248// join the cluster.
249func (m *Memberlist) Join(existing []string) (int, error) {
250	numSuccess := 0
251	var errs error
252	for _, exist := range existing {
253		addrs, err := m.resolveAddr(exist)
254		if err != nil {
255			err = fmt.Errorf("Failed to resolve %s: %v", exist, err)
256			errs = multierror.Append(errs, err)
257			m.logger.Printf("[WARN] memberlist: %v", err)
258			continue
259		}
260
261		for _, addr := range addrs {
262			hp := joinHostPort(addr.ip.String(), addr.port)
263			a := Address{Addr: hp, Name: addr.nodeName}
264			if err := m.pushPullNode(a, true); err != nil {
265				err = fmt.Errorf("Failed to join %s: %v", addr.ip, err)
266				errs = multierror.Append(errs, err)
267				m.logger.Printf("[DEBUG] memberlist: %v", err)
268				continue
269			}
270			numSuccess++
271		}
272
273	}
274	if numSuccess > 0 {
275		errs = nil
276	}
277	return numSuccess, errs
278}
279
280// ipPort holds information about a node we want to try to join.
281type ipPort struct {
282	ip       net.IP
283	port     uint16
284	nodeName string // optional
285}
286
287// tcpLookupIP is a helper to initiate a TCP-based DNS lookup for the given host.
288// The built-in Go resolver will do a UDP lookup first, and will only use TCP if
289// the response has the truncate bit set, which isn't common on DNS servers like
290// Consul's. By doing the TCP lookup directly, we get the best chance for the
291// largest list of hosts to join. Since joins are relatively rare events, it's ok
292// to do this rather expensive operation.
293func (m *Memberlist) tcpLookupIP(host string, defaultPort uint16, nodeName string) ([]ipPort, error) {
294	// Don't attempt any TCP lookups against non-fully qualified domain
295	// names, since those will likely come from the resolv.conf file.
296	if !strings.Contains(host, ".") {
297		return nil, nil
298	}
299
300	// Make sure the domain name is terminated with a dot (we know there's
301	// at least one character at this point).
302	dn := host
303	if dn[len(dn)-1] != '.' {
304		dn = dn + "."
305	}
306
307	// See if we can find a server to try.
308	cc, err := dns.ClientConfigFromFile(m.config.DNSConfigPath)
309	if err != nil {
310		return nil, err
311	}
312	if len(cc.Servers) > 0 {
313		// We support host:port in the DNS config, but need to add the
314		// default port if one is not supplied.
315		server := cc.Servers[0]
316		if !hasPort(server) {
317			server = net.JoinHostPort(server, cc.Port)
318		}
319
320		// Do the lookup.
321		c := new(dns.Client)
322		c.Net = "tcp"
323		msg := new(dns.Msg)
324		msg.SetQuestion(dn, dns.TypeANY)
325		in, _, err := c.Exchange(msg, server)
326		if err != nil {
327			return nil, err
328		}
329
330		// Handle any IPs we get back that we can attempt to join.
331		var ips []ipPort
332		for _, r := range in.Answer {
333			switch rr := r.(type) {
334			case (*dns.A):
335				ips = append(ips, ipPort{ip: rr.A, port: defaultPort, nodeName: nodeName})
336			case (*dns.AAAA):
337				ips = append(ips, ipPort{ip: rr.AAAA, port: defaultPort, nodeName: nodeName})
338			case (*dns.CNAME):
339				m.logger.Printf("[DEBUG] memberlist: Ignoring CNAME RR in TCP-first answer for '%s'", host)
340			}
341		}
342		return ips, nil
343	}
344
345	return nil, nil
346}
347
348// resolveAddr is used to resolve the address into an address,
349// port, and error. If no port is given, use the default
350func (m *Memberlist) resolveAddr(hostStr string) ([]ipPort, error) {
351	// First peel off any leading node name. This is optional.
352	nodeName := ""
353	if slashIdx := strings.Index(hostStr, "/"); slashIdx >= 0 {
354		if slashIdx == 0 {
355			return nil, fmt.Errorf("empty node name provided")
356		}
357		nodeName = hostStr[0:slashIdx]
358		hostStr = hostStr[slashIdx+1:]
359	}
360
361	// This captures the supplied port, or the default one.
362	hostStr = ensurePort(hostStr, m.config.BindPort)
363	host, sport, err := net.SplitHostPort(hostStr)
364	if err != nil {
365		return nil, err
366	}
367	lport, err := strconv.ParseUint(sport, 10, 16)
368	if err != nil {
369		return nil, err
370	}
371	port := uint16(lport)
372
373	// If it looks like an IP address we are done. The SplitHostPort() above
374	// will make sure the host part is in good shape for parsing, even for
375	// IPv6 addresses.
376	if ip := net.ParseIP(host); ip != nil {
377		return []ipPort{
378			ipPort{ip: ip, port: port, nodeName: nodeName},
379		}, nil
380	}
381
382	// First try TCP so we have the best chance for the largest list of
383	// hosts to join. If this fails it's not fatal since this isn't a standard
384	// way to query DNS, and we have a fallback below.
385	ips, err := m.tcpLookupIP(host, port, nodeName)
386	if err != nil {
387		m.logger.Printf("[DEBUG] memberlist: TCP-first lookup failed for '%s', falling back to UDP: %s", hostStr, err)
388	}
389	if len(ips) > 0 {
390		return ips, nil
391	}
392
393	// If TCP didn't yield anything then use the normal Go resolver which
394	// will try UDP, then might possibly try TCP again if the UDP response
395	// indicates it was truncated.
396	ans, err := net.LookupIP(host)
397	if err != nil {
398		return nil, err
399	}
400	ips = make([]ipPort, 0, len(ans))
401	for _, ip := range ans {
402		ips = append(ips, ipPort{ip: ip, port: port, nodeName: nodeName})
403	}
404	return ips, nil
405}
406
407// setAlive is used to mark this node as being alive. This is the same
408// as if we received an alive notification our own network channel for
409// ourself.
410func (m *Memberlist) setAlive() error {
411	// Get the final advertise address from the transport, which may need
412	// to see which address we bound to.
413	addr, port, err := m.refreshAdvertise()
414	if err != nil {
415		return err
416	}
417
418	// Check if this is a public address without encryption
419	ipAddr, err := sockaddr.NewIPAddr(addr.String())
420	if err != nil {
421		return fmt.Errorf("Failed to parse interface addresses: %v", err)
422	}
423	ifAddrs := []sockaddr.IfAddr{
424		sockaddr.IfAddr{
425			SockAddr: ipAddr,
426		},
427	}
428	_, publicIfs, err := sockaddr.IfByRFC("6890", ifAddrs)
429	if len(publicIfs) > 0 && !m.config.EncryptionEnabled() {
430		m.logger.Printf("[WARN] memberlist: Binding to public address without encryption!")
431	}
432
433	// Set any metadata from the delegate.
434	var meta []byte
435	if m.config.Delegate != nil {
436		meta = m.config.Delegate.NodeMeta(MetaMaxSize)
437		if len(meta) > MetaMaxSize {
438			panic("Node meta data provided is longer than the limit")
439		}
440	}
441
442	a := alive{
443		Incarnation: m.nextIncarnation(),
444		Node:        m.config.Name,
445		Addr:        addr,
446		Port:        uint16(port),
447		Meta:        meta,
448		Vsn:         m.config.BuildVsnArray(),
449	}
450	m.aliveNode(&a, nil, true)
451
452	return nil
453}
454
455func (m *Memberlist) getAdvertise() (net.IP, uint16) {
456	m.advertiseLock.RLock()
457	defer m.advertiseLock.RUnlock()
458	return m.advertiseAddr, m.advertisePort
459}
460
461func (m *Memberlist) setAdvertise(addr net.IP, port int) {
462	m.advertiseLock.Lock()
463	defer m.advertiseLock.Unlock()
464	m.advertiseAddr = addr
465	m.advertisePort = uint16(port)
466}
467
468func (m *Memberlist) refreshAdvertise() (net.IP, int, error) {
469	addr, port, err := m.transport.FinalAdvertiseAddr(
470		m.config.AdvertiseAddr, m.config.AdvertisePort)
471	if err != nil {
472		return nil, 0, fmt.Errorf("Failed to get final advertise address: %v", err)
473	}
474	m.setAdvertise(addr, port)
475	return addr, port, nil
476}
477
478// LocalNode is used to return the local Node
479func (m *Memberlist) LocalNode() *Node {
480	m.nodeLock.RLock()
481	defer m.nodeLock.RUnlock()
482	state := m.nodeMap[m.config.Name]
483	return &state.Node
484}
485
486// UpdateNode is used to trigger re-advertising the local node. This is
487// primarily used with a Delegate to support dynamic updates to the local
488// meta data.  This will block until the update message is successfully
489// broadcasted to a member of the cluster, if any exist or until a specified
490// timeout is reached.
491func (m *Memberlist) UpdateNode(timeout time.Duration) error {
492	// Get the node meta data
493	var meta []byte
494	if m.config.Delegate != nil {
495		meta = m.config.Delegate.NodeMeta(MetaMaxSize)
496		if len(meta) > MetaMaxSize {
497			panic("Node meta data provided is longer than the limit")
498		}
499	}
500
501	// Get the existing node
502	m.nodeLock.RLock()
503	state := m.nodeMap[m.config.Name]
504	m.nodeLock.RUnlock()
505
506	// Format a new alive message
507	a := alive{
508		Incarnation: m.nextIncarnation(),
509		Node:        m.config.Name,
510		Addr:        state.Addr,
511		Port:        state.Port,
512		Meta:        meta,
513		Vsn:         m.config.BuildVsnArray(),
514	}
515	notifyCh := make(chan struct{})
516	m.aliveNode(&a, notifyCh, true)
517
518	// Wait for the broadcast or a timeout
519	if m.anyAlive() {
520		var timeoutCh <-chan time.Time
521		if timeout > 0 {
522			timeoutCh = time.After(timeout)
523		}
524		select {
525		case <-notifyCh:
526		case <-timeoutCh:
527			return fmt.Errorf("timeout waiting for update broadcast")
528		}
529	}
530	return nil
531}
532
533// Deprecated: SendTo is deprecated in favor of SendBestEffort, which requires a node to
534// target. If you don't have a node then use SendToAddress.
535func (m *Memberlist) SendTo(to net.Addr, msg []byte) error {
536	a := Address{Addr: to.String(), Name: ""}
537	return m.SendToAddress(a, msg)
538}
539
540func (m *Memberlist) SendToAddress(a Address, msg []byte) error {
541	// Encode as a user message
542	buf := make([]byte, 1, len(msg)+1)
543	buf[0] = byte(userMsg)
544	buf = append(buf, msg...)
545
546	// Send the message
547	return m.rawSendMsgPacket(a, nil, buf)
548}
549
550// Deprecated: SendToUDP is deprecated in favor of SendBestEffort.
551func (m *Memberlist) SendToUDP(to *Node, msg []byte) error {
552	return m.SendBestEffort(to, msg)
553}
554
555// Deprecated: SendToTCP is deprecated in favor of SendReliable.
556func (m *Memberlist) SendToTCP(to *Node, msg []byte) error {
557	return m.SendReliable(to, msg)
558}
559
560// SendBestEffort uses the unreliable packet-oriented interface of the transport
561// to target a user message at the given node (this does not use the gossip
562// mechanism). The maximum size of the message depends on the configured
563// UDPBufferSize for this memberlist instance.
564func (m *Memberlist) SendBestEffort(to *Node, msg []byte) error {
565	// Encode as a user message
566	buf := make([]byte, 1, len(msg)+1)
567	buf[0] = byte(userMsg)
568	buf = append(buf, msg...)
569
570	// Send the message
571	a := Address{Addr: to.Address(), Name: to.Name}
572	return m.rawSendMsgPacket(a, to, buf)
573}
574
575// SendReliable uses the reliable stream-oriented interface of the transport to
576// target a user message at the given node (this does not use the gossip
577// mechanism). Delivery is guaranteed if no error is returned, and there is no
578// limit on the size of the message.
579func (m *Memberlist) SendReliable(to *Node, msg []byte) error {
580	return m.sendUserMsg(to.FullAddress(), msg)
581}
582
583// Members returns a list of all known live nodes. The node structures
584// returned must not be modified. If you wish to modify a Node, make a
585// copy first.
586func (m *Memberlist) Members() []*Node {
587	m.nodeLock.RLock()
588	defer m.nodeLock.RUnlock()
589
590	nodes := make([]*Node, 0, len(m.nodes))
591	for _, n := range m.nodes {
592		if !n.DeadOrLeft() {
593			nodes = append(nodes, &n.Node)
594		}
595	}
596
597	return nodes
598}
599
600// NumMembers returns the number of alive nodes currently known. Between
601// the time of calling this and calling Members, the number of alive nodes
602// may have changed, so this shouldn't be used to determine how many
603// members will be returned by Members.
604func (m *Memberlist) NumMembers() (alive int) {
605	m.nodeLock.RLock()
606	defer m.nodeLock.RUnlock()
607
608	for _, n := range m.nodes {
609		if !n.DeadOrLeft() {
610			alive++
611		}
612	}
613
614	return
615}
616
617// Leave will broadcast a leave message but will not shutdown the background
618// listeners, meaning the node will continue participating in gossip and state
619// updates.
620//
621// This will block until the leave message is successfully broadcasted to
622// a member of the cluster, if any exist or until a specified timeout
623// is reached.
624//
625// This method is safe to call multiple times, but must not be called
626// after the cluster is already shut down.
627func (m *Memberlist) Leave(timeout time.Duration) error {
628	m.leaveLock.Lock()
629	defer m.leaveLock.Unlock()
630
631	if m.hasShutdown() {
632		panic("leave after shutdown")
633	}
634
635	if !m.hasLeft() {
636		atomic.StoreInt32(&m.leave, 1)
637
638		m.nodeLock.Lock()
639		state, ok := m.nodeMap[m.config.Name]
640		m.nodeLock.Unlock()
641		if !ok {
642			m.logger.Printf("[WARN] memberlist: Leave but we're not in the node map.")
643			return nil
644		}
645
646		// This dead message is special, because Node and From are the
647		// same. This helps other nodes figure out that a node left
648		// intentionally. When Node equals From, other nodes know for
649		// sure this node is gone.
650		d := dead{
651			Incarnation: state.Incarnation,
652			Node:        state.Name,
653			From:        state.Name,
654		}
655		m.deadNode(&d)
656
657		// Block until the broadcast goes out
658		if m.anyAlive() {
659			var timeoutCh <-chan time.Time
660			if timeout > 0 {
661				timeoutCh = time.After(timeout)
662			}
663			select {
664			case <-m.leaveBroadcast:
665			case <-timeoutCh:
666				return fmt.Errorf("timeout waiting for leave broadcast")
667			}
668		}
669	}
670
671	return nil
672}
673
674// Check for any other alive node.
675func (m *Memberlist) anyAlive() bool {
676	m.nodeLock.RLock()
677	defer m.nodeLock.RUnlock()
678	for _, n := range m.nodes {
679		if !n.DeadOrLeft() && n.Name != m.config.Name {
680			return true
681		}
682	}
683	return false
684}
685
686// GetHealthScore gives this instance's idea of how well it is meeting the soft
687// real-time requirements of the protocol. Lower numbers are better, and zero
688// means "totally healthy".
689func (m *Memberlist) GetHealthScore() int {
690	return m.awareness.GetHealthScore()
691}
692
693// ProtocolVersion returns the protocol version currently in use by
694// this memberlist.
695func (m *Memberlist) ProtocolVersion() uint8 {
696	// NOTE: This method exists so that in the future we can control
697	// any locking if necessary, if we change the protocol version at
698	// runtime, etc.
699	return m.config.ProtocolVersion
700}
701
702// Shutdown will stop any background maintenance of network activity
703// for this memberlist, causing it to appear "dead". A leave message
704// will not be broadcasted prior, so the cluster being left will have
705// to detect this node's shutdown using probing. If you wish to more
706// gracefully exit the cluster, call Leave prior to shutting down.
707//
708// This method is safe to call multiple times.
709func (m *Memberlist) Shutdown() error {
710	m.shutdownLock.Lock()
711	defer m.shutdownLock.Unlock()
712
713	if m.hasShutdown() {
714		return nil
715	}
716
717	// Shut down the transport first, which should block until it's
718	// completely torn down. If we kill the memberlist-side handlers
719	// those I/O handlers might get stuck.
720	if err := m.transport.Shutdown(); err != nil {
721		m.logger.Printf("[ERR] Failed to shutdown transport: %v", err)
722	}
723
724	// Now tear down everything else.
725	atomic.StoreInt32(&m.shutdown, 1)
726	close(m.shutdownCh)
727	m.deschedule()
728	return nil
729}
730
731func (m *Memberlist) hasShutdown() bool {
732	return atomic.LoadInt32(&m.shutdown) == 1
733}
734
735func (m *Memberlist) hasLeft() bool {
736	return atomic.LoadInt32(&m.leave) == 1
737}
738
739func (m *Memberlist) getNodeState(addr string) NodeStateType {
740	m.nodeLock.RLock()
741	defer m.nodeLock.RUnlock()
742
743	n := m.nodeMap[addr]
744	return n.State
745}
746
747func (m *Memberlist) getNodeStateChange(addr string) time.Time {
748	m.nodeLock.RLock()
749	defer m.nodeLock.RUnlock()
750
751	n := m.nodeMap[addr]
752	return n.StateChange
753}
754
755func (m *Memberlist) changeNode(addr string, f func(*nodeState)) {
756	m.nodeLock.Lock()
757	defer m.nodeLock.Unlock()
758
759	n := m.nodeMap[addr]
760	f(n)
761}
762