1/* 2memberlist is a library that manages cluster 3membership and member failure detection using a gossip based protocol. 4 5The use cases for such a library are far-reaching: all distributed systems 6require membership, and memberlist is a re-usable solution to managing 7cluster membership and node failure detection. 8 9memberlist is eventually consistent but converges quickly on average. 10The speed at which it converges can be heavily tuned via various knobs 11on the protocol. Node failures are detected and network partitions are partially 12tolerated by attempting to communicate to potentially dead nodes through 13multiple routes. 14*/ 15package memberlist 16 17import ( 18 "container/list" 19 "errors" 20 "fmt" 21 "log" 22 "net" 23 "os" 24 "strconv" 25 "strings" 26 "sync" 27 "sync/atomic" 28 "time" 29 30 multierror "github.com/hashicorp/go-multierror" 31 sockaddr "github.com/hashicorp/go-sockaddr" 32 "github.com/miekg/dns" 33) 34 35var errNodeNamesAreRequired = errors.New("memberlist: node names are required by configuration but one was not provided") 36 37type Memberlist struct { 38 sequenceNum uint32 // Local sequence number 39 incarnation uint32 // Local incarnation number 40 numNodes uint32 // Number of known nodes (estimate) 41 pushPullReq uint32 // Number of push/pull requests 42 43 advertiseLock sync.RWMutex 44 advertiseAddr net.IP 45 advertisePort uint16 46 47 config *Config 48 shutdown int32 // Used as an atomic boolean value 49 shutdownCh chan struct{} 50 leave int32 // Used as an atomic boolean value 51 leaveBroadcast chan struct{} 52 53 shutdownLock sync.Mutex // Serializes calls to Shutdown 54 leaveLock sync.Mutex // Serializes calls to Leave 55 56 transport NodeAwareTransport 57 58 handoffCh chan struct{} 59 highPriorityMsgQueue *list.List 60 lowPriorityMsgQueue *list.List 61 msgQueueLock sync.Mutex 62 63 nodeLock sync.RWMutex 64 nodes []*nodeState // Known nodes 65 nodeMap map[string]*nodeState // Maps Node.Name -> NodeState 66 nodeTimers map[string]*suspicion // Maps Node.Name -> suspicion timer 67 awareness *awareness 68 69 tickerLock sync.Mutex 70 tickers []*time.Ticker 71 stopTick chan struct{} 72 probeIndex int 73 74 ackLock sync.Mutex 75 ackHandlers map[uint32]*ackHandler 76 77 broadcasts *TransmitLimitedQueue 78 79 logger *log.Logger 80} 81 82// BuildVsnArray creates the array of Vsn 83func (conf *Config) BuildVsnArray() []uint8 { 84 return []uint8{ 85 ProtocolVersionMin, ProtocolVersionMax, conf.ProtocolVersion, 86 conf.DelegateProtocolMin, conf.DelegateProtocolMax, 87 conf.DelegateProtocolVersion, 88 } 89} 90 91// newMemberlist creates the network listeners. 92// Does not schedule execution of background maintenance. 93func newMemberlist(conf *Config) (*Memberlist, error) { 94 if conf.ProtocolVersion < ProtocolVersionMin { 95 return nil, fmt.Errorf("Protocol version '%d' too low. Must be in range: [%d, %d]", 96 conf.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax) 97 } else if conf.ProtocolVersion > ProtocolVersionMax { 98 return nil, fmt.Errorf("Protocol version '%d' too high. Must be in range: [%d, %d]", 99 conf.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax) 100 } 101 102 if len(conf.SecretKey) > 0 { 103 if conf.Keyring == nil { 104 keyring, err := NewKeyring(nil, conf.SecretKey) 105 if err != nil { 106 return nil, err 107 } 108 conf.Keyring = keyring 109 } else { 110 if err := conf.Keyring.AddKey(conf.SecretKey); err != nil { 111 return nil, err 112 } 113 if err := conf.Keyring.UseKey(conf.SecretKey); err != nil { 114 return nil, err 115 } 116 } 117 } 118 119 if conf.LogOutput != nil && conf.Logger != nil { 120 return nil, fmt.Errorf("Cannot specify both LogOutput and Logger. Please choose a single log configuration setting.") 121 } 122 123 logDest := conf.LogOutput 124 if logDest == nil { 125 logDest = os.Stderr 126 } 127 128 logger := conf.Logger 129 if logger == nil { 130 logger = log.New(logDest, "", log.LstdFlags) 131 } 132 133 // Set up a network transport by default if a custom one wasn't given 134 // by the config. 135 transport := conf.Transport 136 if transport == nil { 137 nc := &NetTransportConfig{ 138 BindAddrs: []string{conf.BindAddr}, 139 BindPort: conf.BindPort, 140 Logger: logger, 141 } 142 143 // See comment below for details about the retry in here. 144 makeNetRetry := func(limit int) (*NetTransport, error) { 145 var err error 146 for try := 0; try < limit; try++ { 147 var nt *NetTransport 148 if nt, err = NewNetTransport(nc); err == nil { 149 return nt, nil 150 } 151 if strings.Contains(err.Error(), "address already in use") { 152 logger.Printf("[DEBUG] memberlist: Got bind error: %v", err) 153 continue 154 } 155 } 156 157 return nil, fmt.Errorf("failed to obtain an address: %v", err) 158 } 159 160 // The dynamic bind port operation is inherently racy because 161 // even though we are using the kernel to find a port for us, we 162 // are attempting to bind multiple protocols (and potentially 163 // multiple addresses) with the same port number. We build in a 164 // few retries here since this often gets transient errors in 165 // busy unit tests. 166 limit := 1 167 if conf.BindPort == 0 { 168 limit = 10 169 } 170 171 nt, err := makeNetRetry(limit) 172 if err != nil { 173 return nil, fmt.Errorf("Could not set up network transport: %v", err) 174 } 175 if conf.BindPort == 0 { 176 port := nt.GetAutoBindPort() 177 conf.BindPort = port 178 conf.AdvertisePort = port 179 logger.Printf("[DEBUG] memberlist: Using dynamic bind port %d", port) 180 } 181 transport = nt 182 } 183 184 nodeAwareTransport, ok := transport.(NodeAwareTransport) 185 if !ok { 186 logger.Printf("[DEBUG] memberlist: configured Transport is not a NodeAwareTransport and some features may not work as desired") 187 nodeAwareTransport = &shimNodeAwareTransport{transport} 188 } 189 190 m := &Memberlist{ 191 config: conf, 192 shutdownCh: make(chan struct{}), 193 leaveBroadcast: make(chan struct{}, 1), 194 transport: nodeAwareTransport, 195 handoffCh: make(chan struct{}, 1), 196 highPriorityMsgQueue: list.New(), 197 lowPriorityMsgQueue: list.New(), 198 nodeMap: make(map[string]*nodeState), 199 nodeTimers: make(map[string]*suspicion), 200 awareness: newAwareness(conf.AwarenessMaxMultiplier), 201 ackHandlers: make(map[uint32]*ackHandler), 202 broadcasts: &TransmitLimitedQueue{RetransmitMult: conf.RetransmitMult}, 203 logger: logger, 204 } 205 m.broadcasts.NumNodes = func() int { 206 return m.estNumNodes() 207 } 208 209 // Get the final advertise address from the transport, which may need 210 // to see which address we bound to. We'll refresh this each time we 211 // send out an alive message. 212 if _, _, err := m.refreshAdvertise(); err != nil { 213 return nil, err 214 } 215 216 go m.streamListen() 217 go m.packetListen() 218 go m.packetHandler() 219 return m, nil 220} 221 222// Create will create a new Memberlist using the given configuration. 223// This will not connect to any other node (see Join) yet, but will start 224// all the listeners to allow other nodes to join this memberlist. 225// After creating a Memberlist, the configuration given should not be 226// modified by the user anymore. 227func Create(conf *Config) (*Memberlist, error) { 228 m, err := newMemberlist(conf) 229 if err != nil { 230 return nil, err 231 } 232 if err := m.setAlive(); err != nil { 233 m.Shutdown() 234 return nil, err 235 } 236 m.schedule() 237 return m, nil 238} 239 240// Join is used to take an existing Memberlist and attempt to join a cluster 241// by contacting all the given hosts and performing a state sync. Initially, 242// the Memberlist only contains our own state, so doing this will cause 243// remote nodes to become aware of the existence of this node, effectively 244// joining the cluster. 245// 246// This returns the number of hosts successfully contacted and an error if 247// none could be reached. If an error is returned, the node did not successfully 248// join the cluster. 249func (m *Memberlist) Join(existing []string) (int, error) { 250 numSuccess := 0 251 var errs error 252 for _, exist := range existing { 253 addrs, err := m.resolveAddr(exist) 254 if err != nil { 255 err = fmt.Errorf("Failed to resolve %s: %v", exist, err) 256 errs = multierror.Append(errs, err) 257 m.logger.Printf("[WARN] memberlist: %v", err) 258 continue 259 } 260 261 for _, addr := range addrs { 262 hp := joinHostPort(addr.ip.String(), addr.port) 263 a := Address{Addr: hp, Name: addr.nodeName} 264 if err := m.pushPullNode(a, true); err != nil { 265 err = fmt.Errorf("Failed to join %s: %v", addr.ip, err) 266 errs = multierror.Append(errs, err) 267 m.logger.Printf("[DEBUG] memberlist: %v", err) 268 continue 269 } 270 numSuccess++ 271 } 272 273 } 274 if numSuccess > 0 { 275 errs = nil 276 } 277 return numSuccess, errs 278} 279 280// ipPort holds information about a node we want to try to join. 281type ipPort struct { 282 ip net.IP 283 port uint16 284 nodeName string // optional 285} 286 287// tcpLookupIP is a helper to initiate a TCP-based DNS lookup for the given host. 288// The built-in Go resolver will do a UDP lookup first, and will only use TCP if 289// the response has the truncate bit set, which isn't common on DNS servers like 290// Consul's. By doing the TCP lookup directly, we get the best chance for the 291// largest list of hosts to join. Since joins are relatively rare events, it's ok 292// to do this rather expensive operation. 293func (m *Memberlist) tcpLookupIP(host string, defaultPort uint16, nodeName string) ([]ipPort, error) { 294 // Don't attempt any TCP lookups against non-fully qualified domain 295 // names, since those will likely come from the resolv.conf file. 296 if !strings.Contains(host, ".") { 297 return nil, nil 298 } 299 300 // Make sure the domain name is terminated with a dot (we know there's 301 // at least one character at this point). 302 dn := host 303 if dn[len(dn)-1] != '.' { 304 dn = dn + "." 305 } 306 307 // See if we can find a server to try. 308 cc, err := dns.ClientConfigFromFile(m.config.DNSConfigPath) 309 if err != nil { 310 return nil, err 311 } 312 if len(cc.Servers) > 0 { 313 // We support host:port in the DNS config, but need to add the 314 // default port if one is not supplied. 315 server := cc.Servers[0] 316 if !hasPort(server) { 317 server = net.JoinHostPort(server, cc.Port) 318 } 319 320 // Do the lookup. 321 c := new(dns.Client) 322 c.Net = "tcp" 323 msg := new(dns.Msg) 324 msg.SetQuestion(dn, dns.TypeANY) 325 in, _, err := c.Exchange(msg, server) 326 if err != nil { 327 return nil, err 328 } 329 330 // Handle any IPs we get back that we can attempt to join. 331 var ips []ipPort 332 for _, r := range in.Answer { 333 switch rr := r.(type) { 334 case (*dns.A): 335 ips = append(ips, ipPort{ip: rr.A, port: defaultPort, nodeName: nodeName}) 336 case (*dns.AAAA): 337 ips = append(ips, ipPort{ip: rr.AAAA, port: defaultPort, nodeName: nodeName}) 338 case (*dns.CNAME): 339 m.logger.Printf("[DEBUG] memberlist: Ignoring CNAME RR in TCP-first answer for '%s'", host) 340 } 341 } 342 return ips, nil 343 } 344 345 return nil, nil 346} 347 348// resolveAddr is used to resolve the address into an address, 349// port, and error. If no port is given, use the default 350func (m *Memberlist) resolveAddr(hostStr string) ([]ipPort, error) { 351 // First peel off any leading node name. This is optional. 352 nodeName := "" 353 if slashIdx := strings.Index(hostStr, "/"); slashIdx >= 0 { 354 if slashIdx == 0 { 355 return nil, fmt.Errorf("empty node name provided") 356 } 357 nodeName = hostStr[0:slashIdx] 358 hostStr = hostStr[slashIdx+1:] 359 } 360 361 // This captures the supplied port, or the default one. 362 hostStr = ensurePort(hostStr, m.config.BindPort) 363 host, sport, err := net.SplitHostPort(hostStr) 364 if err != nil { 365 return nil, err 366 } 367 lport, err := strconv.ParseUint(sport, 10, 16) 368 if err != nil { 369 return nil, err 370 } 371 port := uint16(lport) 372 373 // If it looks like an IP address we are done. The SplitHostPort() above 374 // will make sure the host part is in good shape for parsing, even for 375 // IPv6 addresses. 376 if ip := net.ParseIP(host); ip != nil { 377 return []ipPort{ 378 ipPort{ip: ip, port: port, nodeName: nodeName}, 379 }, nil 380 } 381 382 // First try TCP so we have the best chance for the largest list of 383 // hosts to join. If this fails it's not fatal since this isn't a standard 384 // way to query DNS, and we have a fallback below. 385 ips, err := m.tcpLookupIP(host, port, nodeName) 386 if err != nil { 387 m.logger.Printf("[DEBUG] memberlist: TCP-first lookup failed for '%s', falling back to UDP: %s", hostStr, err) 388 } 389 if len(ips) > 0 { 390 return ips, nil 391 } 392 393 // If TCP didn't yield anything then use the normal Go resolver which 394 // will try UDP, then might possibly try TCP again if the UDP response 395 // indicates it was truncated. 396 ans, err := net.LookupIP(host) 397 if err != nil { 398 return nil, err 399 } 400 ips = make([]ipPort, 0, len(ans)) 401 for _, ip := range ans { 402 ips = append(ips, ipPort{ip: ip, port: port, nodeName: nodeName}) 403 } 404 return ips, nil 405} 406 407// setAlive is used to mark this node as being alive. This is the same 408// as if we received an alive notification our own network channel for 409// ourself. 410func (m *Memberlist) setAlive() error { 411 // Get the final advertise address from the transport, which may need 412 // to see which address we bound to. 413 addr, port, err := m.refreshAdvertise() 414 if err != nil { 415 return err 416 } 417 418 // Check if this is a public address without encryption 419 ipAddr, err := sockaddr.NewIPAddr(addr.String()) 420 if err != nil { 421 return fmt.Errorf("Failed to parse interface addresses: %v", err) 422 } 423 ifAddrs := []sockaddr.IfAddr{ 424 sockaddr.IfAddr{ 425 SockAddr: ipAddr, 426 }, 427 } 428 _, publicIfs, err := sockaddr.IfByRFC("6890", ifAddrs) 429 if len(publicIfs) > 0 && !m.config.EncryptionEnabled() { 430 m.logger.Printf("[WARN] memberlist: Binding to public address without encryption!") 431 } 432 433 // Set any metadata from the delegate. 434 var meta []byte 435 if m.config.Delegate != nil { 436 meta = m.config.Delegate.NodeMeta(MetaMaxSize) 437 if len(meta) > MetaMaxSize { 438 panic("Node meta data provided is longer than the limit") 439 } 440 } 441 442 a := alive{ 443 Incarnation: m.nextIncarnation(), 444 Node: m.config.Name, 445 Addr: addr, 446 Port: uint16(port), 447 Meta: meta, 448 Vsn: m.config.BuildVsnArray(), 449 } 450 m.aliveNode(&a, nil, true) 451 452 return nil 453} 454 455func (m *Memberlist) getAdvertise() (net.IP, uint16) { 456 m.advertiseLock.RLock() 457 defer m.advertiseLock.RUnlock() 458 return m.advertiseAddr, m.advertisePort 459} 460 461func (m *Memberlist) setAdvertise(addr net.IP, port int) { 462 m.advertiseLock.Lock() 463 defer m.advertiseLock.Unlock() 464 m.advertiseAddr = addr 465 m.advertisePort = uint16(port) 466} 467 468func (m *Memberlist) refreshAdvertise() (net.IP, int, error) { 469 addr, port, err := m.transport.FinalAdvertiseAddr( 470 m.config.AdvertiseAddr, m.config.AdvertisePort) 471 if err != nil { 472 return nil, 0, fmt.Errorf("Failed to get final advertise address: %v", err) 473 } 474 m.setAdvertise(addr, port) 475 return addr, port, nil 476} 477 478// LocalNode is used to return the local Node 479func (m *Memberlist) LocalNode() *Node { 480 m.nodeLock.RLock() 481 defer m.nodeLock.RUnlock() 482 state := m.nodeMap[m.config.Name] 483 return &state.Node 484} 485 486// UpdateNode is used to trigger re-advertising the local node. This is 487// primarily used with a Delegate to support dynamic updates to the local 488// meta data. This will block until the update message is successfully 489// broadcasted to a member of the cluster, if any exist or until a specified 490// timeout is reached. 491func (m *Memberlist) UpdateNode(timeout time.Duration) error { 492 // Get the node meta data 493 var meta []byte 494 if m.config.Delegate != nil { 495 meta = m.config.Delegate.NodeMeta(MetaMaxSize) 496 if len(meta) > MetaMaxSize { 497 panic("Node meta data provided is longer than the limit") 498 } 499 } 500 501 // Get the existing node 502 m.nodeLock.RLock() 503 state := m.nodeMap[m.config.Name] 504 m.nodeLock.RUnlock() 505 506 // Format a new alive message 507 a := alive{ 508 Incarnation: m.nextIncarnation(), 509 Node: m.config.Name, 510 Addr: state.Addr, 511 Port: state.Port, 512 Meta: meta, 513 Vsn: m.config.BuildVsnArray(), 514 } 515 notifyCh := make(chan struct{}) 516 m.aliveNode(&a, notifyCh, true) 517 518 // Wait for the broadcast or a timeout 519 if m.anyAlive() { 520 var timeoutCh <-chan time.Time 521 if timeout > 0 { 522 timeoutCh = time.After(timeout) 523 } 524 select { 525 case <-notifyCh: 526 case <-timeoutCh: 527 return fmt.Errorf("timeout waiting for update broadcast") 528 } 529 } 530 return nil 531} 532 533// Deprecated: SendTo is deprecated in favor of SendBestEffort, which requires a node to 534// target. If you don't have a node then use SendToAddress. 535func (m *Memberlist) SendTo(to net.Addr, msg []byte) error { 536 a := Address{Addr: to.String(), Name: ""} 537 return m.SendToAddress(a, msg) 538} 539 540func (m *Memberlist) SendToAddress(a Address, msg []byte) error { 541 // Encode as a user message 542 buf := make([]byte, 1, len(msg)+1) 543 buf[0] = byte(userMsg) 544 buf = append(buf, msg...) 545 546 // Send the message 547 return m.rawSendMsgPacket(a, nil, buf) 548} 549 550// Deprecated: SendToUDP is deprecated in favor of SendBestEffort. 551func (m *Memberlist) SendToUDP(to *Node, msg []byte) error { 552 return m.SendBestEffort(to, msg) 553} 554 555// Deprecated: SendToTCP is deprecated in favor of SendReliable. 556func (m *Memberlist) SendToTCP(to *Node, msg []byte) error { 557 return m.SendReliable(to, msg) 558} 559 560// SendBestEffort uses the unreliable packet-oriented interface of the transport 561// to target a user message at the given node (this does not use the gossip 562// mechanism). The maximum size of the message depends on the configured 563// UDPBufferSize for this memberlist instance. 564func (m *Memberlist) SendBestEffort(to *Node, msg []byte) error { 565 // Encode as a user message 566 buf := make([]byte, 1, len(msg)+1) 567 buf[0] = byte(userMsg) 568 buf = append(buf, msg...) 569 570 // Send the message 571 a := Address{Addr: to.Address(), Name: to.Name} 572 return m.rawSendMsgPacket(a, to, buf) 573} 574 575// SendReliable uses the reliable stream-oriented interface of the transport to 576// target a user message at the given node (this does not use the gossip 577// mechanism). Delivery is guaranteed if no error is returned, and there is no 578// limit on the size of the message. 579func (m *Memberlist) SendReliable(to *Node, msg []byte) error { 580 return m.sendUserMsg(to.FullAddress(), msg) 581} 582 583// Members returns a list of all known live nodes. The node structures 584// returned must not be modified. If you wish to modify a Node, make a 585// copy first. 586func (m *Memberlist) Members() []*Node { 587 m.nodeLock.RLock() 588 defer m.nodeLock.RUnlock() 589 590 nodes := make([]*Node, 0, len(m.nodes)) 591 for _, n := range m.nodes { 592 if !n.DeadOrLeft() { 593 nodes = append(nodes, &n.Node) 594 } 595 } 596 597 return nodes 598} 599 600// NumMembers returns the number of alive nodes currently known. Between 601// the time of calling this and calling Members, the number of alive nodes 602// may have changed, so this shouldn't be used to determine how many 603// members will be returned by Members. 604func (m *Memberlist) NumMembers() (alive int) { 605 m.nodeLock.RLock() 606 defer m.nodeLock.RUnlock() 607 608 for _, n := range m.nodes { 609 if !n.DeadOrLeft() { 610 alive++ 611 } 612 } 613 614 return 615} 616 617// Leave will broadcast a leave message but will not shutdown the background 618// listeners, meaning the node will continue participating in gossip and state 619// updates. 620// 621// This will block until the leave message is successfully broadcasted to 622// a member of the cluster, if any exist or until a specified timeout 623// is reached. 624// 625// This method is safe to call multiple times, but must not be called 626// after the cluster is already shut down. 627func (m *Memberlist) Leave(timeout time.Duration) error { 628 m.leaveLock.Lock() 629 defer m.leaveLock.Unlock() 630 631 if m.hasShutdown() { 632 panic("leave after shutdown") 633 } 634 635 if !m.hasLeft() { 636 atomic.StoreInt32(&m.leave, 1) 637 638 m.nodeLock.Lock() 639 state, ok := m.nodeMap[m.config.Name] 640 m.nodeLock.Unlock() 641 if !ok { 642 m.logger.Printf("[WARN] memberlist: Leave but we're not in the node map.") 643 return nil 644 } 645 646 // This dead message is special, because Node and From are the 647 // same. This helps other nodes figure out that a node left 648 // intentionally. When Node equals From, other nodes know for 649 // sure this node is gone. 650 d := dead{ 651 Incarnation: state.Incarnation, 652 Node: state.Name, 653 From: state.Name, 654 } 655 m.deadNode(&d) 656 657 // Block until the broadcast goes out 658 if m.anyAlive() { 659 var timeoutCh <-chan time.Time 660 if timeout > 0 { 661 timeoutCh = time.After(timeout) 662 } 663 select { 664 case <-m.leaveBroadcast: 665 case <-timeoutCh: 666 return fmt.Errorf("timeout waiting for leave broadcast") 667 } 668 } 669 } 670 671 return nil 672} 673 674// Check for any other alive node. 675func (m *Memberlist) anyAlive() bool { 676 m.nodeLock.RLock() 677 defer m.nodeLock.RUnlock() 678 for _, n := range m.nodes { 679 if !n.DeadOrLeft() && n.Name != m.config.Name { 680 return true 681 } 682 } 683 return false 684} 685 686// GetHealthScore gives this instance's idea of how well it is meeting the soft 687// real-time requirements of the protocol. Lower numbers are better, and zero 688// means "totally healthy". 689func (m *Memberlist) GetHealthScore() int { 690 return m.awareness.GetHealthScore() 691} 692 693// ProtocolVersion returns the protocol version currently in use by 694// this memberlist. 695func (m *Memberlist) ProtocolVersion() uint8 { 696 // NOTE: This method exists so that in the future we can control 697 // any locking if necessary, if we change the protocol version at 698 // runtime, etc. 699 return m.config.ProtocolVersion 700} 701 702// Shutdown will stop any background maintenance of network activity 703// for this memberlist, causing it to appear "dead". A leave message 704// will not be broadcasted prior, so the cluster being left will have 705// to detect this node's shutdown using probing. If you wish to more 706// gracefully exit the cluster, call Leave prior to shutting down. 707// 708// This method is safe to call multiple times. 709func (m *Memberlist) Shutdown() error { 710 m.shutdownLock.Lock() 711 defer m.shutdownLock.Unlock() 712 713 if m.hasShutdown() { 714 return nil 715 } 716 717 // Shut down the transport first, which should block until it's 718 // completely torn down. If we kill the memberlist-side handlers 719 // those I/O handlers might get stuck. 720 if err := m.transport.Shutdown(); err != nil { 721 m.logger.Printf("[ERR] Failed to shutdown transport: %v", err) 722 } 723 724 // Now tear down everything else. 725 atomic.StoreInt32(&m.shutdown, 1) 726 close(m.shutdownCh) 727 m.deschedule() 728 return nil 729} 730 731func (m *Memberlist) hasShutdown() bool { 732 return atomic.LoadInt32(&m.shutdown) == 1 733} 734 735func (m *Memberlist) hasLeft() bool { 736 return atomic.LoadInt32(&m.leave) == 1 737} 738 739func (m *Memberlist) getNodeState(addr string) NodeStateType { 740 m.nodeLock.RLock() 741 defer m.nodeLock.RUnlock() 742 743 n := m.nodeMap[addr] 744 return n.State 745} 746 747func (m *Memberlist) getNodeStateChange(addr string) time.Time { 748 m.nodeLock.RLock() 749 defer m.nodeLock.RUnlock() 750 751 n := m.nodeMap[addr] 752 return n.StateChange 753} 754 755func (m *Memberlist) changeNode(addr string, f func(*nodeState)) { 756 m.nodeLock.Lock() 757 defer m.nodeLock.Unlock() 758 759 n := m.nodeMap[addr] 760 f(n) 761} 762