1/* 2memberlist is a library that manages cluster 3membership and member failure detection using a gossip based protocol. 4 5The use cases for such a library are far-reaching: all distributed systems 6require membership, and memberlist is a re-usable solution to managing 7cluster membership and node failure detection. 8 9memberlist is eventually consistent but converges quickly on average. 10The speed at which it converges can be heavily tuned via various knobs 11on the protocol. Node failures are detected and network partitions are partially 12tolerated by attempting to communicate to potentially dead nodes through 13multiple routes. 14*/ 15package memberlist 16 17import ( 18 "container/list" 19 "errors" 20 "fmt" 21 "log" 22 "net" 23 "os" 24 "strconv" 25 "strings" 26 "sync" 27 "sync/atomic" 28 "time" 29 30 multierror "github.com/hashicorp/go-multierror" 31 sockaddr "github.com/hashicorp/go-sockaddr" 32 "github.com/miekg/dns" 33) 34 35var errNodeNamesAreRequired = errors.New("memberlist: node names are required by configuration but one was not provided") 36 37type Memberlist struct { 38 sequenceNum uint32 // Local sequence number 39 incarnation uint32 // Local incarnation number 40 numNodes uint32 // Number of known nodes (estimate) 41 pushPullReq uint32 // Number of push/pull requests 42 43 advertiseLock sync.RWMutex 44 advertiseAddr net.IP 45 advertisePort uint16 46 47 config *Config 48 shutdown int32 // Used as an atomic boolean value 49 shutdownCh chan struct{} 50 leave int32 // Used as an atomic boolean value 51 leaveBroadcast chan struct{} 52 53 shutdownLock sync.Mutex // Serializes calls to Shutdown 54 leaveLock sync.Mutex // Serializes calls to Leave 55 56 transport NodeAwareTransport 57 58 handoffCh chan struct{} 59 highPriorityMsgQueue *list.List 60 lowPriorityMsgQueue *list.List 61 msgQueueLock sync.Mutex 62 63 nodeLock sync.RWMutex 64 nodes []*nodeState // Known nodes 65 nodeMap map[string]*nodeState // Maps Node.Name -> NodeState 66 nodeTimers map[string]*suspicion // Maps Node.Name -> suspicion timer 67 awareness *awareness 68 69 tickerLock sync.Mutex 70 tickers []*time.Ticker 71 stopTick chan struct{} 72 probeIndex int 73 74 ackLock sync.Mutex 75 ackHandlers map[uint32]*ackHandler 76 77 broadcasts *TransmitLimitedQueue 78 79 logger *log.Logger 80} 81 82// BuildVsnArray creates the array of Vsn 83func (conf *Config) BuildVsnArray() []uint8 { 84 return []uint8{ 85 ProtocolVersionMin, ProtocolVersionMax, conf.ProtocolVersion, 86 conf.DelegateProtocolMin, conf.DelegateProtocolMax, 87 conf.DelegateProtocolVersion, 88 } 89} 90 91// newMemberlist creates the network listeners. 92// Does not schedule execution of background maintenance. 93func newMemberlist(conf *Config) (*Memberlist, error) { 94 if conf.ProtocolVersion < ProtocolVersionMin { 95 return nil, fmt.Errorf("Protocol version '%d' too low. Must be in range: [%d, %d]", 96 conf.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax) 97 } else if conf.ProtocolVersion > ProtocolVersionMax { 98 return nil, fmt.Errorf("Protocol version '%d' too high. Must be in range: [%d, %d]", 99 conf.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax) 100 } 101 102 if len(conf.SecretKey) > 0 { 103 if conf.Keyring == nil { 104 keyring, err := NewKeyring(nil, conf.SecretKey) 105 if err != nil { 106 return nil, err 107 } 108 conf.Keyring = keyring 109 } else { 110 if err := conf.Keyring.AddKey(conf.SecretKey); err != nil { 111 return nil, err 112 } 113 if err := conf.Keyring.UseKey(conf.SecretKey); err != nil { 114 return nil, err 115 } 116 } 117 } 118 119 if conf.LogOutput != nil && conf.Logger != nil { 120 return nil, fmt.Errorf("Cannot specify both LogOutput and Logger. Please choose a single log configuration setting.") 121 } 122 123 logDest := conf.LogOutput 124 if logDest == nil { 125 logDest = os.Stderr 126 } 127 128 logger := conf.Logger 129 if logger == nil { 130 logger = log.New(logDest, "", log.LstdFlags) 131 } 132 133 // Set up a network transport by default if a custom one wasn't given 134 // by the config. 135 transport := conf.Transport 136 if transport == nil { 137 nc := &NetTransportConfig{ 138 BindAddrs: []string{conf.BindAddr}, 139 BindPort: conf.BindPort, 140 Logger: logger, 141 } 142 143 // See comment below for details about the retry in here. 144 makeNetRetry := func(limit int) (*NetTransport, error) { 145 var err error 146 for try := 0; try < limit; try++ { 147 var nt *NetTransport 148 if nt, err = NewNetTransport(nc); err == nil { 149 return nt, nil 150 } 151 if strings.Contains(err.Error(), "address already in use") { 152 logger.Printf("[DEBUG] memberlist: Got bind error: %v", err) 153 continue 154 } 155 } 156 157 return nil, fmt.Errorf("failed to obtain an address: %v", err) 158 } 159 160 // The dynamic bind port operation is inherently racy because 161 // even though we are using the kernel to find a port for us, we 162 // are attempting to bind multiple protocols (and potentially 163 // multiple addresses) with the same port number. We build in a 164 // few retries here since this often gets transient errors in 165 // busy unit tests. 166 limit := 1 167 if conf.BindPort == 0 { 168 limit = 10 169 } 170 171 nt, err := makeNetRetry(limit) 172 if err != nil { 173 return nil, fmt.Errorf("Could not set up network transport: %v", err) 174 } 175 if conf.BindPort == 0 { 176 port := nt.GetAutoBindPort() 177 conf.BindPort = port 178 conf.AdvertisePort = port 179 logger.Printf("[DEBUG] memberlist: Using dynamic bind port %d", port) 180 } 181 transport = nt 182 } 183 184 nodeAwareTransport, ok := transport.(NodeAwareTransport) 185 if !ok { 186 logger.Printf("[DEBUG] memberlist: configured Transport is not a NodeAwareTransport and some features may not work as desired") 187 nodeAwareTransport = &shimNodeAwareTransport{transport} 188 } 189 190 if len(conf.Label) > LabelMaxSize { 191 return nil, fmt.Errorf("could not use %q as a label: too long", conf.Label) 192 } 193 194 if conf.Label != "" { 195 nodeAwareTransport = &labelWrappedTransport{ 196 label: conf.Label, 197 NodeAwareTransport: nodeAwareTransport, 198 } 199 } 200 201 m := &Memberlist{ 202 config: conf, 203 shutdownCh: make(chan struct{}), 204 leaveBroadcast: make(chan struct{}, 1), 205 transport: nodeAwareTransport, 206 handoffCh: make(chan struct{}, 1), 207 highPriorityMsgQueue: list.New(), 208 lowPriorityMsgQueue: list.New(), 209 nodeMap: make(map[string]*nodeState), 210 nodeTimers: make(map[string]*suspicion), 211 awareness: newAwareness(conf.AwarenessMaxMultiplier), 212 ackHandlers: make(map[uint32]*ackHandler), 213 broadcasts: &TransmitLimitedQueue{RetransmitMult: conf.RetransmitMult}, 214 logger: logger, 215 } 216 m.broadcasts.NumNodes = func() int { 217 return m.estNumNodes() 218 } 219 220 // Get the final advertise address from the transport, which may need 221 // to see which address we bound to. We'll refresh this each time we 222 // send out an alive message. 223 if _, _, err := m.refreshAdvertise(); err != nil { 224 return nil, err 225 } 226 227 go m.streamListen() 228 go m.packetListen() 229 go m.packetHandler() 230 return m, nil 231} 232 233// Create will create a new Memberlist using the given configuration. 234// This will not connect to any other node (see Join) yet, but will start 235// all the listeners to allow other nodes to join this memberlist. 236// After creating a Memberlist, the configuration given should not be 237// modified by the user anymore. 238func Create(conf *Config) (*Memberlist, error) { 239 m, err := newMemberlist(conf) 240 if err != nil { 241 return nil, err 242 } 243 if err := m.setAlive(); err != nil { 244 m.Shutdown() 245 return nil, err 246 } 247 m.schedule() 248 return m, nil 249} 250 251// Join is used to take an existing Memberlist and attempt to join a cluster 252// by contacting all the given hosts and performing a state sync. Initially, 253// the Memberlist only contains our own state, so doing this will cause 254// remote nodes to become aware of the existence of this node, effectively 255// joining the cluster. 256// 257// This returns the number of hosts successfully contacted and an error if 258// none could be reached. If an error is returned, the node did not successfully 259// join the cluster. 260func (m *Memberlist) Join(existing []string) (int, error) { 261 numSuccess := 0 262 var errs error 263 for _, exist := range existing { 264 addrs, err := m.resolveAddr(exist) 265 if err != nil { 266 err = fmt.Errorf("Failed to resolve %s: %v", exist, err) 267 errs = multierror.Append(errs, err) 268 m.logger.Printf("[WARN] memberlist: %v", err) 269 continue 270 } 271 272 for _, addr := range addrs { 273 hp := joinHostPort(addr.ip.String(), addr.port) 274 a := Address{Addr: hp, Name: addr.nodeName} 275 if err := m.pushPullNode(a, true); err != nil { 276 err = fmt.Errorf("Failed to join %s: %v", a.Addr, err) 277 errs = multierror.Append(errs, err) 278 m.logger.Printf("[DEBUG] memberlist: %v", err) 279 continue 280 } 281 numSuccess++ 282 } 283 284 } 285 if numSuccess > 0 { 286 errs = nil 287 } 288 return numSuccess, errs 289} 290 291// ipPort holds information about a node we want to try to join. 292type ipPort struct { 293 ip net.IP 294 port uint16 295 nodeName string // optional 296} 297 298// tcpLookupIP is a helper to initiate a TCP-based DNS lookup for the given host. 299// The built-in Go resolver will do a UDP lookup first, and will only use TCP if 300// the response has the truncate bit set, which isn't common on DNS servers like 301// Consul's. By doing the TCP lookup directly, we get the best chance for the 302// largest list of hosts to join. Since joins are relatively rare events, it's ok 303// to do this rather expensive operation. 304func (m *Memberlist) tcpLookupIP(host string, defaultPort uint16, nodeName string) ([]ipPort, error) { 305 // Don't attempt any TCP lookups against non-fully qualified domain 306 // names, since those will likely come from the resolv.conf file. 307 if !strings.Contains(host, ".") { 308 return nil, nil 309 } 310 311 // Make sure the domain name is terminated with a dot (we know there's 312 // at least one character at this point). 313 dn := host 314 if dn[len(dn)-1] != '.' { 315 dn = dn + "." 316 } 317 318 // See if we can find a server to try. 319 cc, err := dns.ClientConfigFromFile(m.config.DNSConfigPath) 320 if err != nil { 321 return nil, err 322 } 323 if len(cc.Servers) > 0 { 324 // We support host:port in the DNS config, but need to add the 325 // default port if one is not supplied. 326 server := cc.Servers[0] 327 if !hasPort(server) { 328 server = net.JoinHostPort(server, cc.Port) 329 } 330 331 // Do the lookup. 332 c := new(dns.Client) 333 c.Net = "tcp" 334 msg := new(dns.Msg) 335 msg.SetQuestion(dn, dns.TypeANY) 336 in, _, err := c.Exchange(msg, server) 337 if err != nil { 338 return nil, err 339 } 340 341 // Handle any IPs we get back that we can attempt to join. 342 var ips []ipPort 343 for _, r := range in.Answer { 344 switch rr := r.(type) { 345 case (*dns.A): 346 ips = append(ips, ipPort{ip: rr.A, port: defaultPort, nodeName: nodeName}) 347 case (*dns.AAAA): 348 ips = append(ips, ipPort{ip: rr.AAAA, port: defaultPort, nodeName: nodeName}) 349 case (*dns.CNAME): 350 m.logger.Printf("[DEBUG] memberlist: Ignoring CNAME RR in TCP-first answer for '%s'", host) 351 } 352 } 353 return ips, nil 354 } 355 356 return nil, nil 357} 358 359// resolveAddr is used to resolve the address into an address, 360// port, and error. If no port is given, use the default 361func (m *Memberlist) resolveAddr(hostStr string) ([]ipPort, error) { 362 // First peel off any leading node name. This is optional. 363 nodeName := "" 364 if slashIdx := strings.Index(hostStr, "/"); slashIdx >= 0 { 365 if slashIdx == 0 { 366 return nil, fmt.Errorf("empty node name provided") 367 } 368 nodeName = hostStr[0:slashIdx] 369 hostStr = hostStr[slashIdx+1:] 370 } 371 372 // This captures the supplied port, or the default one. 373 hostStr = ensurePort(hostStr, m.config.BindPort) 374 host, sport, err := net.SplitHostPort(hostStr) 375 if err != nil { 376 return nil, err 377 } 378 lport, err := strconv.ParseUint(sport, 10, 16) 379 if err != nil { 380 return nil, err 381 } 382 port := uint16(lport) 383 384 // If it looks like an IP address we are done. The SplitHostPort() above 385 // will make sure the host part is in good shape for parsing, even for 386 // IPv6 addresses. 387 if ip := net.ParseIP(host); ip != nil { 388 return []ipPort{ 389 ipPort{ip: ip, port: port, nodeName: nodeName}, 390 }, nil 391 } 392 393 // First try TCP so we have the best chance for the largest list of 394 // hosts to join. If this fails it's not fatal since this isn't a standard 395 // way to query DNS, and we have a fallback below. 396 ips, err := m.tcpLookupIP(host, port, nodeName) 397 if err != nil { 398 m.logger.Printf("[DEBUG] memberlist: TCP-first lookup failed for '%s', falling back to UDP: %s", hostStr, err) 399 } 400 if len(ips) > 0 { 401 return ips, nil 402 } 403 404 // If TCP didn't yield anything then use the normal Go resolver which 405 // will try UDP, then might possibly try TCP again if the UDP response 406 // indicates it was truncated. 407 ans, err := net.LookupIP(host) 408 if err != nil { 409 return nil, err 410 } 411 ips = make([]ipPort, 0, len(ans)) 412 for _, ip := range ans { 413 ips = append(ips, ipPort{ip: ip, port: port, nodeName: nodeName}) 414 } 415 return ips, nil 416} 417 418// setAlive is used to mark this node as being alive. This is the same 419// as if we received an alive notification our own network channel for 420// ourself. 421func (m *Memberlist) setAlive() error { 422 // Get the final advertise address from the transport, which may need 423 // to see which address we bound to. 424 addr, port, err := m.refreshAdvertise() 425 if err != nil { 426 return err 427 } 428 429 // Check if this is a public address without encryption 430 ipAddr, err := sockaddr.NewIPAddr(addr.String()) 431 if err != nil { 432 return fmt.Errorf("Failed to parse interface addresses: %v", err) 433 } 434 ifAddrs := []sockaddr.IfAddr{ 435 sockaddr.IfAddr{ 436 SockAddr: ipAddr, 437 }, 438 } 439 _, publicIfs, err := sockaddr.IfByRFC("6890", ifAddrs) 440 if len(publicIfs) > 0 && !m.config.EncryptionEnabled() { 441 m.logger.Printf("[WARN] memberlist: Binding to public address without encryption!") 442 } 443 444 // Set any metadata from the delegate. 445 var meta []byte 446 if m.config.Delegate != nil { 447 meta = m.config.Delegate.NodeMeta(MetaMaxSize) 448 if len(meta) > MetaMaxSize { 449 panic("Node meta data provided is longer than the limit") 450 } 451 } 452 453 a := alive{ 454 Incarnation: m.nextIncarnation(), 455 Node: m.config.Name, 456 Addr: addr, 457 Port: uint16(port), 458 Meta: meta, 459 Vsn: m.config.BuildVsnArray(), 460 } 461 m.aliveNode(&a, nil, true) 462 463 return nil 464} 465 466func (m *Memberlist) getAdvertise() (net.IP, uint16) { 467 m.advertiseLock.RLock() 468 defer m.advertiseLock.RUnlock() 469 return m.advertiseAddr, m.advertisePort 470} 471 472func (m *Memberlist) setAdvertise(addr net.IP, port int) { 473 m.advertiseLock.Lock() 474 defer m.advertiseLock.Unlock() 475 m.advertiseAddr = addr 476 m.advertisePort = uint16(port) 477} 478 479func (m *Memberlist) refreshAdvertise() (net.IP, int, error) { 480 addr, port, err := m.transport.FinalAdvertiseAddr( 481 m.config.AdvertiseAddr, m.config.AdvertisePort) 482 if err != nil { 483 return nil, 0, fmt.Errorf("Failed to get final advertise address: %v", err) 484 } 485 m.setAdvertise(addr, port) 486 return addr, port, nil 487} 488 489// LocalNode is used to return the local Node 490func (m *Memberlist) LocalNode() *Node { 491 m.nodeLock.RLock() 492 defer m.nodeLock.RUnlock() 493 state := m.nodeMap[m.config.Name] 494 return &state.Node 495} 496 497// UpdateNode is used to trigger re-advertising the local node. This is 498// primarily used with a Delegate to support dynamic updates to the local 499// meta data. This will block until the update message is successfully 500// broadcasted to a member of the cluster, if any exist or until a specified 501// timeout is reached. 502func (m *Memberlist) UpdateNode(timeout time.Duration) error { 503 // Get the node meta data 504 var meta []byte 505 if m.config.Delegate != nil { 506 meta = m.config.Delegate.NodeMeta(MetaMaxSize) 507 if len(meta) > MetaMaxSize { 508 panic("Node meta data provided is longer than the limit") 509 } 510 } 511 512 // Get the existing node 513 m.nodeLock.RLock() 514 state := m.nodeMap[m.config.Name] 515 m.nodeLock.RUnlock() 516 517 // Format a new alive message 518 a := alive{ 519 Incarnation: m.nextIncarnation(), 520 Node: m.config.Name, 521 Addr: state.Addr, 522 Port: state.Port, 523 Meta: meta, 524 Vsn: m.config.BuildVsnArray(), 525 } 526 notifyCh := make(chan struct{}) 527 m.aliveNode(&a, notifyCh, true) 528 529 // Wait for the broadcast or a timeout 530 if m.anyAlive() { 531 var timeoutCh <-chan time.Time 532 if timeout > 0 { 533 timeoutCh = time.After(timeout) 534 } 535 select { 536 case <-notifyCh: 537 case <-timeoutCh: 538 return fmt.Errorf("timeout waiting for update broadcast") 539 } 540 } 541 return nil 542} 543 544// Deprecated: SendTo is deprecated in favor of SendBestEffort, which requires a node to 545// target. If you don't have a node then use SendToAddress. 546func (m *Memberlist) SendTo(to net.Addr, msg []byte) error { 547 a := Address{Addr: to.String(), Name: ""} 548 return m.SendToAddress(a, msg) 549} 550 551func (m *Memberlist) SendToAddress(a Address, msg []byte) error { 552 // Encode as a user message 553 buf := make([]byte, 1, len(msg)+1) 554 buf[0] = byte(userMsg) 555 buf = append(buf, msg...) 556 557 // Send the message 558 return m.rawSendMsgPacket(a, nil, buf) 559} 560 561// Deprecated: SendToUDP is deprecated in favor of SendBestEffort. 562func (m *Memberlist) SendToUDP(to *Node, msg []byte) error { 563 return m.SendBestEffort(to, msg) 564} 565 566// Deprecated: SendToTCP is deprecated in favor of SendReliable. 567func (m *Memberlist) SendToTCP(to *Node, msg []byte) error { 568 return m.SendReliable(to, msg) 569} 570 571// SendBestEffort uses the unreliable packet-oriented interface of the transport 572// to target a user message at the given node (this does not use the gossip 573// mechanism). The maximum size of the message depends on the configured 574// UDPBufferSize for this memberlist instance. 575func (m *Memberlist) SendBestEffort(to *Node, msg []byte) error { 576 // Encode as a user message 577 buf := make([]byte, 1, len(msg)+1) 578 buf[0] = byte(userMsg) 579 buf = append(buf, msg...) 580 581 // Send the message 582 a := Address{Addr: to.Address(), Name: to.Name} 583 return m.rawSendMsgPacket(a, to, buf) 584} 585 586// SendReliable uses the reliable stream-oriented interface of the transport to 587// target a user message at the given node (this does not use the gossip 588// mechanism). Delivery is guaranteed if no error is returned, and there is no 589// limit on the size of the message. 590func (m *Memberlist) SendReliable(to *Node, msg []byte) error { 591 return m.sendUserMsg(to.FullAddress(), msg) 592} 593 594// Members returns a list of all known live nodes. The node structures 595// returned must not be modified. If you wish to modify a Node, make a 596// copy first. 597func (m *Memberlist) Members() []*Node { 598 m.nodeLock.RLock() 599 defer m.nodeLock.RUnlock() 600 601 nodes := make([]*Node, 0, len(m.nodes)) 602 for _, n := range m.nodes { 603 if !n.DeadOrLeft() { 604 nodes = append(nodes, &n.Node) 605 } 606 } 607 608 return nodes 609} 610 611// NumMembers returns the number of alive nodes currently known. Between 612// the time of calling this and calling Members, the number of alive nodes 613// may have changed, so this shouldn't be used to determine how many 614// members will be returned by Members. 615func (m *Memberlist) NumMembers() (alive int) { 616 m.nodeLock.RLock() 617 defer m.nodeLock.RUnlock() 618 619 for _, n := range m.nodes { 620 if !n.DeadOrLeft() { 621 alive++ 622 } 623 } 624 625 return 626} 627 628// Leave will broadcast a leave message but will not shutdown the background 629// listeners, meaning the node will continue participating in gossip and state 630// updates. 631// 632// This will block until the leave message is successfully broadcasted to 633// a member of the cluster, if any exist or until a specified timeout 634// is reached. 635// 636// This method is safe to call multiple times, but must not be called 637// after the cluster is already shut down. 638func (m *Memberlist) Leave(timeout time.Duration) error { 639 m.leaveLock.Lock() 640 defer m.leaveLock.Unlock() 641 642 if m.hasShutdown() { 643 panic("leave after shutdown") 644 } 645 646 if !m.hasLeft() { 647 atomic.StoreInt32(&m.leave, 1) 648 649 m.nodeLock.Lock() 650 state, ok := m.nodeMap[m.config.Name] 651 m.nodeLock.Unlock() 652 if !ok { 653 m.logger.Printf("[WARN] memberlist: Leave but we're not in the node map.") 654 return nil 655 } 656 657 // This dead message is special, because Node and From are the 658 // same. This helps other nodes figure out that a node left 659 // intentionally. When Node equals From, other nodes know for 660 // sure this node is gone. 661 d := dead{ 662 Incarnation: state.Incarnation, 663 Node: state.Name, 664 From: state.Name, 665 } 666 m.deadNode(&d) 667 668 // Block until the broadcast goes out 669 if m.anyAlive() { 670 var timeoutCh <-chan time.Time 671 if timeout > 0 { 672 timeoutCh = time.After(timeout) 673 } 674 select { 675 case <-m.leaveBroadcast: 676 case <-timeoutCh: 677 return fmt.Errorf("timeout waiting for leave broadcast") 678 } 679 } 680 } 681 682 return nil 683} 684 685// Check for any other alive node. 686func (m *Memberlist) anyAlive() bool { 687 m.nodeLock.RLock() 688 defer m.nodeLock.RUnlock() 689 for _, n := range m.nodes { 690 if !n.DeadOrLeft() && n.Name != m.config.Name { 691 return true 692 } 693 } 694 return false 695} 696 697// GetHealthScore gives this instance's idea of how well it is meeting the soft 698// real-time requirements of the protocol. Lower numbers are better, and zero 699// means "totally healthy". 700func (m *Memberlist) GetHealthScore() int { 701 return m.awareness.GetHealthScore() 702} 703 704// ProtocolVersion returns the protocol version currently in use by 705// this memberlist. 706func (m *Memberlist) ProtocolVersion() uint8 { 707 // NOTE: This method exists so that in the future we can control 708 // any locking if necessary, if we change the protocol version at 709 // runtime, etc. 710 return m.config.ProtocolVersion 711} 712 713// Shutdown will stop any background maintenance of network activity 714// for this memberlist, causing it to appear "dead". A leave message 715// will not be broadcasted prior, so the cluster being left will have 716// to detect this node's shutdown using probing. If you wish to more 717// gracefully exit the cluster, call Leave prior to shutting down. 718// 719// This method is safe to call multiple times. 720func (m *Memberlist) Shutdown() error { 721 m.shutdownLock.Lock() 722 defer m.shutdownLock.Unlock() 723 724 if m.hasShutdown() { 725 return nil 726 } 727 728 // Shut down the transport first, which should block until it's 729 // completely torn down. If we kill the memberlist-side handlers 730 // those I/O handlers might get stuck. 731 if err := m.transport.Shutdown(); err != nil { 732 m.logger.Printf("[ERR] Failed to shutdown transport: %v", err) 733 } 734 735 // Now tear down everything else. 736 atomic.StoreInt32(&m.shutdown, 1) 737 close(m.shutdownCh) 738 m.deschedule() 739 return nil 740} 741 742func (m *Memberlist) hasShutdown() bool { 743 return atomic.LoadInt32(&m.shutdown) == 1 744} 745 746func (m *Memberlist) hasLeft() bool { 747 return atomic.LoadInt32(&m.leave) == 1 748} 749 750func (m *Memberlist) getNodeState(addr string) NodeStateType { 751 m.nodeLock.RLock() 752 defer m.nodeLock.RUnlock() 753 754 n := m.nodeMap[addr] 755 return n.State 756} 757 758func (m *Memberlist) getNodeStateChange(addr string) time.Time { 759 m.nodeLock.RLock() 760 defer m.nodeLock.RUnlock() 761 762 n := m.nodeMap[addr] 763 return n.StateChange 764} 765 766func (m *Memberlist) changeNode(addr string, f func(*nodeState)) { 767 m.nodeLock.Lock() 768 defer m.nodeLock.Unlock() 769 770 n := m.nodeMap[addr] 771 f(n) 772} 773