1package memberlist 2 3import ( 4 "bytes" 5 "fmt" 6 "math" 7 "math/rand" 8 "net" 9 "strings" 10 "sync/atomic" 11 "time" 12 13 metrics "github.com/armon/go-metrics" 14) 15 16type NodeStateType int 17 18const ( 19 StateAlive NodeStateType = iota 20 StateSuspect 21 StateDead 22 StateLeft 23) 24 25// Node represents a node in the cluster. 26type Node struct { 27 Name string 28 Addr net.IP 29 Port uint16 30 Meta []byte // Metadata from the delegate for this node. 31 State NodeStateType // State of the node. 32 PMin uint8 // Minimum protocol version this understands 33 PMax uint8 // Maximum protocol version this understands 34 PCur uint8 // Current version node is speaking 35 DMin uint8 // Min protocol version for the delegate to understand 36 DMax uint8 // Max protocol version for the delegate to understand 37 DCur uint8 // Current version delegate is speaking 38} 39 40// Address returns the host:port form of a node's address, suitable for use 41// with a transport. 42func (n *Node) Address() string { 43 return joinHostPort(n.Addr.String(), n.Port) 44} 45 46// FullAddress returns the node name and host:port form of a node's address, 47// suitable for use with a transport. 48func (n *Node) FullAddress() Address { 49 return Address{ 50 Addr: joinHostPort(n.Addr.String(), n.Port), 51 Name: n.Name, 52 } 53} 54 55// String returns the node name 56func (n *Node) String() string { 57 return n.Name 58} 59 60// NodeState is used to manage our state view of another node 61type nodeState struct { 62 Node 63 Incarnation uint32 // Last known incarnation number 64 State NodeStateType // Current state 65 StateChange time.Time // Time last state change happened 66} 67 68// Address returns the host:port form of a node's address, suitable for use 69// with a transport. 70func (n *nodeState) Address() string { 71 return n.Node.Address() 72} 73 74// FullAddress returns the node name and host:port form of a node's address, 75// suitable for use with a transport. 76func (n *nodeState) FullAddress() Address { 77 return n.Node.FullAddress() 78} 79 80func (n *nodeState) DeadOrLeft() bool { 81 return n.State == StateDead || n.State == StateLeft 82} 83 84// ackHandler is used to register handlers for incoming acks and nacks. 85type ackHandler struct { 86 ackFn func([]byte, time.Time) 87 nackFn func() 88 timer *time.Timer 89} 90 91// NoPingResponseError is used to indicate a 'ping' packet was 92// successfully issued but no response was received 93type NoPingResponseError struct { 94 node string 95} 96 97func (f NoPingResponseError) Error() string { 98 return fmt.Sprintf("No response from node %s", f.node) 99} 100 101// Schedule is used to ensure the Tick is performed periodically. This 102// function is safe to call multiple times. If the memberlist is already 103// scheduled, then it won't do anything. 104func (m *Memberlist) schedule() { 105 m.tickerLock.Lock() 106 defer m.tickerLock.Unlock() 107 108 // If we already have tickers, then don't do anything, since we're 109 // scheduled 110 if len(m.tickers) > 0 { 111 return 112 } 113 114 // Create the stop tick channel, a blocking channel. We close this 115 // when we should stop the tickers. 116 stopCh := make(chan struct{}) 117 118 // Create a new probeTicker 119 if m.config.ProbeInterval > 0 { 120 t := time.NewTicker(m.config.ProbeInterval) 121 go m.triggerFunc(m.config.ProbeInterval, t.C, stopCh, m.probe) 122 m.tickers = append(m.tickers, t) 123 } 124 125 // Create a push pull ticker if needed 126 if m.config.PushPullInterval > 0 { 127 go m.pushPullTrigger(stopCh) 128 } 129 130 // Create a gossip ticker if needed 131 if m.config.GossipInterval > 0 && m.config.GossipNodes > 0 { 132 t := time.NewTicker(m.config.GossipInterval) 133 go m.triggerFunc(m.config.GossipInterval, t.C, stopCh, m.gossip) 134 m.tickers = append(m.tickers, t) 135 } 136 137 // If we made any tickers, then record the stopTick channel for 138 // later. 139 if len(m.tickers) > 0 { 140 m.stopTick = stopCh 141 } 142} 143 144// triggerFunc is used to trigger a function call each time a 145// message is received until a stop tick arrives. 146func (m *Memberlist) triggerFunc(stagger time.Duration, C <-chan time.Time, stop <-chan struct{}, f func()) { 147 // Use a random stagger to avoid syncronizing 148 randStagger := time.Duration(uint64(rand.Int63()) % uint64(stagger)) 149 select { 150 case <-time.After(randStagger): 151 case <-stop: 152 return 153 } 154 for { 155 select { 156 case <-C: 157 f() 158 case <-stop: 159 return 160 } 161 } 162} 163 164// pushPullTrigger is used to periodically trigger a push/pull until 165// a stop tick arrives. We don't use triggerFunc since the push/pull 166// timer is dynamically scaled based on cluster size to avoid network 167// saturation 168func (m *Memberlist) pushPullTrigger(stop <-chan struct{}) { 169 interval := m.config.PushPullInterval 170 171 // Use a random stagger to avoid syncronizing 172 randStagger := time.Duration(uint64(rand.Int63()) % uint64(interval)) 173 select { 174 case <-time.After(randStagger): 175 case <-stop: 176 return 177 } 178 179 // Tick using a dynamic timer 180 for { 181 tickTime := pushPullScale(interval, m.estNumNodes()) 182 select { 183 case <-time.After(tickTime): 184 m.pushPull() 185 case <-stop: 186 return 187 } 188 } 189} 190 191// Deschedule is used to stop the background maintenance. This is safe 192// to call multiple times. 193func (m *Memberlist) deschedule() { 194 m.tickerLock.Lock() 195 defer m.tickerLock.Unlock() 196 197 // If we have no tickers, then we aren't scheduled. 198 if len(m.tickers) == 0 { 199 return 200 } 201 202 // Close the stop channel so all the ticker listeners stop. 203 close(m.stopTick) 204 205 // Explicitly stop all the tickers themselves so they don't take 206 // up any more resources, and get rid of the list. 207 for _, t := range m.tickers { 208 t.Stop() 209 } 210 m.tickers = nil 211} 212 213// Tick is used to perform a single round of failure detection and gossip 214func (m *Memberlist) probe() { 215 // Track the number of indexes we've considered probing 216 numCheck := 0 217START: 218 m.nodeLock.RLock() 219 220 // Make sure we don't wrap around infinitely 221 if numCheck >= len(m.nodes) { 222 m.nodeLock.RUnlock() 223 return 224 } 225 226 // Handle the wrap around case 227 if m.probeIndex >= len(m.nodes) { 228 m.nodeLock.RUnlock() 229 m.resetNodes() 230 m.probeIndex = 0 231 numCheck++ 232 goto START 233 } 234 235 // Determine if we should probe this node 236 skip := false 237 var node nodeState 238 239 node = *m.nodes[m.probeIndex] 240 if node.Name == m.config.Name { 241 skip = true 242 } else if node.DeadOrLeft() { 243 skip = true 244 } 245 246 // Potentially skip 247 m.nodeLock.RUnlock() 248 m.probeIndex++ 249 if skip { 250 numCheck++ 251 goto START 252 } 253 254 // Probe the specific node 255 m.probeNode(&node) 256} 257 258// probeNodeByAddr just safely calls probeNode given only the address of the node (for tests) 259func (m *Memberlist) probeNodeByAddr(addr string) { 260 m.nodeLock.RLock() 261 n := m.nodeMap[addr] 262 m.nodeLock.RUnlock() 263 264 m.probeNode(n) 265} 266 267// failedRemote checks the error and decides if it indicates a failure on the 268// other end. 269func failedRemote(err error) bool { 270 switch t := err.(type) { 271 case *net.OpError: 272 if strings.HasPrefix(t.Net, "tcp") { 273 switch t.Op { 274 case "dial", "read", "write": 275 return true 276 } 277 } 278 } 279 return false 280} 281 282// probeNode handles a single round of failure checking on a node. 283func (m *Memberlist) probeNode(node *nodeState) { 284 defer metrics.MeasureSince([]string{"memberlist", "probeNode"}, time.Now()) 285 286 // We use our health awareness to scale the overall probe interval, so we 287 // slow down if we detect problems. The ticker that calls us can handle 288 // us running over the base interval, and will skip missed ticks. 289 probeInterval := m.awareness.ScaleTimeout(m.config.ProbeInterval) 290 if probeInterval > m.config.ProbeInterval { 291 metrics.IncrCounter([]string{"memberlist", "degraded", "probe"}, 1) 292 } 293 294 // Prepare a ping message and setup an ack handler. 295 selfAddr, selfPort := m.getAdvertise() 296 ping := ping{ 297 SeqNo: m.nextSeqNo(), 298 Node: node.Name, 299 SourceAddr: selfAddr, 300 SourcePort: selfPort, 301 SourceNode: m.config.Name, 302 } 303 ackCh := make(chan ackMessage, m.config.IndirectChecks+1) 304 nackCh := make(chan struct{}, m.config.IndirectChecks+1) 305 m.setProbeChannels(ping.SeqNo, ackCh, nackCh, probeInterval) 306 307 // Mark the sent time here, which should be after any pre-processing but 308 // before system calls to do the actual send. This probably over-reports 309 // a bit, but it's the best we can do. We had originally put this right 310 // after the I/O, but that would sometimes give negative RTT measurements 311 // which was not desirable. 312 sent := time.Now() 313 314 // Send a ping to the node. If this node looks like it's suspect or dead, 315 // also tack on a suspect message so that it has a chance to refute as 316 // soon as possible. 317 deadline := sent.Add(probeInterval) 318 addr := node.Address() 319 320 // Arrange for our self-awareness to get updated. 321 var awarenessDelta int 322 defer func() { 323 m.awareness.ApplyDelta(awarenessDelta) 324 }() 325 if node.State == StateAlive { 326 if err := m.encodeAndSendMsg(node.FullAddress(), pingMsg, &ping); err != nil { 327 m.logger.Printf("[ERR] memberlist: Failed to send ping: %s", err) 328 if failedRemote(err) { 329 goto HANDLE_REMOTE_FAILURE 330 } else { 331 return 332 } 333 } 334 } else { 335 var msgs [][]byte 336 if buf, err := encode(pingMsg, &ping); err != nil { 337 m.logger.Printf("[ERR] memberlist: Failed to encode ping message: %s", err) 338 return 339 } else { 340 msgs = append(msgs, buf.Bytes()) 341 } 342 s := suspect{Incarnation: node.Incarnation, Node: node.Name, From: m.config.Name} 343 if buf, err := encode(suspectMsg, &s); err != nil { 344 m.logger.Printf("[ERR] memberlist: Failed to encode suspect message: %s", err) 345 return 346 } else { 347 msgs = append(msgs, buf.Bytes()) 348 } 349 350 compound := makeCompoundMessage(msgs) 351 if err := m.rawSendMsgPacket(node.FullAddress(), &node.Node, compound.Bytes()); err != nil { 352 m.logger.Printf("[ERR] memberlist: Failed to send compound ping and suspect message to %s: %s", addr, err) 353 if failedRemote(err) { 354 goto HANDLE_REMOTE_FAILURE 355 } else { 356 return 357 } 358 } 359 } 360 361 // Arrange for our self-awareness to get updated. At this point we've 362 // sent the ping, so any return statement means the probe succeeded 363 // which will improve our health until we get to the failure scenarios 364 // at the end of this function, which will alter this delta variable 365 // accordingly. 366 awarenessDelta = -1 367 368 // Wait for response or round-trip-time. 369 select { 370 case v := <-ackCh: 371 if v.Complete == true { 372 if m.config.Ping != nil { 373 rtt := v.Timestamp.Sub(sent) 374 m.config.Ping.NotifyPingComplete(&node.Node, rtt, v.Payload) 375 } 376 return 377 } 378 379 // As an edge case, if we get a timeout, we need to re-enqueue it 380 // here to break out of the select below. 381 if v.Complete == false { 382 ackCh <- v 383 } 384 case <-time.After(m.config.ProbeTimeout): 385 // Note that we don't scale this timeout based on awareness and 386 // the health score. That's because we don't really expect waiting 387 // longer to help get UDP through. Since health does extend the 388 // probe interval it will give the TCP fallback more time, which 389 // is more active in dealing with lost packets, and it gives more 390 // time to wait for indirect acks/nacks. 391 m.logger.Printf("[DEBUG] memberlist: Failed ping: %s (timeout reached)", node.Name) 392 } 393 394HANDLE_REMOTE_FAILURE: 395 // Get some random live nodes. 396 m.nodeLock.RLock() 397 kNodes := kRandomNodes(m.config.IndirectChecks, m.nodes, func(n *nodeState) bool { 398 return n.Name == m.config.Name || 399 n.Name == node.Name || 400 n.State != StateAlive 401 }) 402 m.nodeLock.RUnlock() 403 404 // Attempt an indirect ping. 405 expectedNacks := 0 406 selfAddr, selfPort = m.getAdvertise() 407 ind := indirectPingReq{ 408 SeqNo: ping.SeqNo, 409 Target: node.Addr, 410 Port: node.Port, 411 Node: node.Name, 412 SourceAddr: selfAddr, 413 SourcePort: selfPort, 414 SourceNode: m.config.Name, 415 } 416 for _, peer := range kNodes { 417 // We only expect nack to be sent from peers who understand 418 // version 4 of the protocol. 419 if ind.Nack = peer.PMax >= 4; ind.Nack { 420 expectedNacks++ 421 } 422 423 if err := m.encodeAndSendMsg(peer.FullAddress(), indirectPingMsg, &ind); err != nil { 424 m.logger.Printf("[ERR] memberlist: Failed to send indirect ping: %s", err) 425 } 426 } 427 428 // Also make an attempt to contact the node directly over TCP. This 429 // helps prevent confused clients who get isolated from UDP traffic 430 // but can still speak TCP (which also means they can possibly report 431 // misinformation to other nodes via anti-entropy), avoiding flapping in 432 // the cluster. 433 // 434 // This is a little unusual because we will attempt a TCP ping to any 435 // member who understands version 3 of the protocol, regardless of 436 // which protocol version we are speaking. That's why we've included a 437 // config option to turn this off if desired. 438 fallbackCh := make(chan bool, 1) 439 440 disableTcpPings := m.config.DisableTcpPings || 441 (m.config.DisableTcpPingsForNode != nil && m.config.DisableTcpPingsForNode(node.Name)) 442 if (!disableTcpPings) && (node.PMax >= 3) { 443 go func() { 444 defer close(fallbackCh) 445 didContact, err := m.sendPingAndWaitForAck(node.FullAddress(), ping, deadline) 446 if err != nil { 447 m.logger.Printf("[ERR] memberlist: Failed fallback ping: %s", err) 448 } else { 449 fallbackCh <- didContact 450 } 451 }() 452 } else { 453 close(fallbackCh) 454 } 455 456 // Wait for the acks or timeout. Note that we don't check the fallback 457 // channel here because we want to issue a warning below if that's the 458 // *only* way we hear back from the peer, so we have to let this time 459 // out first to allow the normal UDP-based acks to come in. 460 select { 461 case v := <-ackCh: 462 if v.Complete == true { 463 return 464 } 465 } 466 467 // Finally, poll the fallback channel. The timeouts are set such that 468 // the channel will have something or be closed without having to wait 469 // any additional time here. 470 for didContact := range fallbackCh { 471 if didContact { 472 m.logger.Printf("[WARN] memberlist: Was able to connect to %s but other probes failed, network may be misconfigured", node.Name) 473 return 474 } 475 } 476 477 // Update our self-awareness based on the results of this failed probe. 478 // If we don't have peers who will send nacks then we penalize for any 479 // failed probe as a simple health metric. If we do have peers to nack 480 // verify, then we can use that as a more sophisticated measure of self- 481 // health because we assume them to be working, and they can help us 482 // decide if the probed node was really dead or if it was something wrong 483 // with ourselves. 484 awarenessDelta = 0 485 if expectedNacks > 0 { 486 if nackCount := len(nackCh); nackCount < expectedNacks { 487 awarenessDelta += (expectedNacks - nackCount) 488 } 489 } else { 490 awarenessDelta += 1 491 } 492 493 // No acks received from target, suspect it as failed. 494 m.logger.Printf("[INFO] memberlist: Suspect %s has failed, no acks received", node.Name) 495 s := suspect{Incarnation: node.Incarnation, Node: node.Name, From: m.config.Name} 496 m.suspectNode(&s) 497} 498 499// Ping initiates a ping to the node with the specified name. 500func (m *Memberlist) Ping(node string, addr net.Addr) (time.Duration, error) { 501 // Prepare a ping message and setup an ack handler. 502 selfAddr, selfPort := m.getAdvertise() 503 ping := ping{ 504 SeqNo: m.nextSeqNo(), 505 Node: node, 506 SourceAddr: selfAddr, 507 SourcePort: selfPort, 508 SourceNode: m.config.Name, 509 } 510 ackCh := make(chan ackMessage, m.config.IndirectChecks+1) 511 m.setProbeChannels(ping.SeqNo, ackCh, nil, m.config.ProbeInterval) 512 513 a := Address{Addr: addr.String(), Name: node} 514 515 // Send a ping to the node. 516 if err := m.encodeAndSendMsg(a, pingMsg, &ping); err != nil { 517 return 0, err 518 } 519 520 // Mark the sent time here, which should be after any pre-processing and 521 // system calls to do the actual send. This probably under-reports a bit, 522 // but it's the best we can do. 523 sent := time.Now() 524 525 // Wait for response or timeout. 526 select { 527 case v := <-ackCh: 528 if v.Complete == true { 529 return v.Timestamp.Sub(sent), nil 530 } 531 case <-time.After(m.config.ProbeTimeout): 532 // Timeout, return an error below. 533 } 534 535 m.logger.Printf("[DEBUG] memberlist: Failed UDP ping: %v (timeout reached)", node) 536 return 0, NoPingResponseError{ping.Node} 537} 538 539// resetNodes is used when the tick wraps around. It will reap the 540// dead nodes and shuffle the node list. 541func (m *Memberlist) resetNodes() { 542 m.nodeLock.Lock() 543 defer m.nodeLock.Unlock() 544 545 // Move dead nodes, but respect gossip to the dead interval 546 deadIdx := moveDeadNodes(m.nodes, m.config.GossipToTheDeadTime) 547 548 // Deregister the dead nodes 549 for i := deadIdx; i < len(m.nodes); i++ { 550 delete(m.nodeMap, m.nodes[i].Name) 551 m.nodes[i] = nil 552 } 553 554 // Trim the nodes to exclude the dead nodes 555 m.nodes = m.nodes[0:deadIdx] 556 557 // Update numNodes after we've trimmed the dead nodes 558 atomic.StoreUint32(&m.numNodes, uint32(deadIdx)) 559 560 // Shuffle live nodes 561 shuffleNodes(m.nodes) 562} 563 564// gossip is invoked every GossipInterval period to broadcast our gossip 565// messages to a few random nodes. 566func (m *Memberlist) gossip() { 567 defer metrics.MeasureSince([]string{"memberlist", "gossip"}, time.Now()) 568 569 // Get some random live, suspect, or recently dead nodes 570 m.nodeLock.RLock() 571 kNodes := kRandomNodes(m.config.GossipNodes, m.nodes, func(n *nodeState) bool { 572 if n.Name == m.config.Name { 573 return true 574 } 575 576 switch n.State { 577 case StateAlive, StateSuspect: 578 return false 579 580 case StateDead: 581 return time.Since(n.StateChange) > m.config.GossipToTheDeadTime 582 583 default: 584 return true 585 } 586 }) 587 m.nodeLock.RUnlock() 588 589 // Compute the bytes available 590 bytesAvail := m.config.UDPBufferSize - compoundHeaderOverhead 591 if m.config.EncryptionEnabled() { 592 bytesAvail -= encryptOverhead(m.encryptionVersion()) 593 } 594 595 for _, node := range kNodes { 596 // Get any pending broadcasts 597 msgs := m.getBroadcasts(compoundOverhead, bytesAvail) 598 if len(msgs) == 0 { 599 return 600 } 601 602 addr := node.Address() 603 if len(msgs) == 1 { 604 // Send single message as is 605 if err := m.rawSendMsgPacket(node.FullAddress(), &node.Node, msgs[0]); err != nil { 606 m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", addr, err) 607 } 608 } else { 609 // Otherwise create and send a compound message 610 compound := makeCompoundMessage(msgs) 611 if err := m.rawSendMsgPacket(node.FullAddress(), &node.Node, compound.Bytes()); err != nil { 612 m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", addr, err) 613 } 614 } 615 } 616} 617 618// pushPull is invoked periodically to randomly perform a complete state 619// exchange. Used to ensure a high level of convergence, but is also 620// reasonably expensive as the entire state of this node is exchanged 621// with the other node. 622func (m *Memberlist) pushPull() { 623 // Get a random live node 624 m.nodeLock.RLock() 625 nodes := kRandomNodes(1, m.nodes, func(n *nodeState) bool { 626 return n.Name == m.config.Name || 627 n.State != StateAlive 628 }) 629 m.nodeLock.RUnlock() 630 631 // If no nodes, bail 632 if len(nodes) == 0 { 633 return 634 } 635 node := nodes[0] 636 637 // Attempt a push pull 638 if err := m.pushPullNode(node.FullAddress(), false); err != nil { 639 m.logger.Printf("[ERR] memberlist: Push/Pull with %s failed: %s", node.Name, err) 640 } 641} 642 643// pushPullNode does a complete state exchange with a specific node. 644func (m *Memberlist) pushPullNode(a Address, join bool) error { 645 defer metrics.MeasureSince([]string{"memberlist", "pushPullNode"}, time.Now()) 646 647 // Attempt to send and receive with the node 648 remote, userState, err := m.sendAndReceiveState(a, join) 649 if err != nil { 650 return err 651 } 652 653 if err := m.mergeRemoteState(join, remote, userState); err != nil { 654 return err 655 } 656 return nil 657} 658 659// verifyProtocol verifies that all the remote nodes can speak with our 660// nodes and vice versa on both the core protocol as well as the 661// delegate protocol level. 662// 663// The verification works by finding the maximum minimum and 664// minimum maximum understood protocol and delegate versions. In other words, 665// it finds the common denominator of protocol and delegate version ranges 666// for the entire cluster. 667// 668// After this, it goes through the entire cluster (local and remote) and 669// verifies that everyone's speaking protocol versions satisfy this range. 670// If this passes, it means that every node can understand each other. 671func (m *Memberlist) verifyProtocol(remote []pushNodeState) error { 672 m.nodeLock.RLock() 673 defer m.nodeLock.RUnlock() 674 675 // Maximum minimum understood and minimum maximum understood for both 676 // the protocol and delegate versions. We use this to verify everyone 677 // can be understood. 678 var maxpmin, minpmax uint8 679 var maxdmin, mindmax uint8 680 minpmax = math.MaxUint8 681 mindmax = math.MaxUint8 682 683 for _, rn := range remote { 684 // If the node isn't alive, then skip it 685 if rn.State != StateAlive { 686 continue 687 } 688 689 // Skip nodes that don't have versions set, it just means 690 // their version is zero. 691 if len(rn.Vsn) == 0 { 692 continue 693 } 694 695 if rn.Vsn[0] > maxpmin { 696 maxpmin = rn.Vsn[0] 697 } 698 699 if rn.Vsn[1] < minpmax { 700 minpmax = rn.Vsn[1] 701 } 702 703 if rn.Vsn[3] > maxdmin { 704 maxdmin = rn.Vsn[3] 705 } 706 707 if rn.Vsn[4] < mindmax { 708 mindmax = rn.Vsn[4] 709 } 710 } 711 712 for _, n := range m.nodes { 713 // Ignore non-alive nodes 714 if n.State != StateAlive { 715 continue 716 } 717 718 if n.PMin > maxpmin { 719 maxpmin = n.PMin 720 } 721 722 if n.PMax < minpmax { 723 minpmax = n.PMax 724 } 725 726 if n.DMin > maxdmin { 727 maxdmin = n.DMin 728 } 729 730 if n.DMax < mindmax { 731 mindmax = n.DMax 732 } 733 } 734 735 // Now that we definitively know the minimum and maximum understood 736 // version that satisfies the whole cluster, we verify that every 737 // node in the cluster satisifies this. 738 for _, n := range remote { 739 var nPCur, nDCur uint8 740 if len(n.Vsn) > 0 { 741 nPCur = n.Vsn[2] 742 nDCur = n.Vsn[5] 743 } 744 745 if nPCur < maxpmin || nPCur > minpmax { 746 return fmt.Errorf( 747 "Node '%s' protocol version (%d) is incompatible: [%d, %d]", 748 n.Name, nPCur, maxpmin, minpmax) 749 } 750 751 if nDCur < maxdmin || nDCur > mindmax { 752 return fmt.Errorf( 753 "Node '%s' delegate protocol version (%d) is incompatible: [%d, %d]", 754 n.Name, nDCur, maxdmin, mindmax) 755 } 756 } 757 758 for _, n := range m.nodes { 759 nPCur := n.PCur 760 nDCur := n.DCur 761 762 if nPCur < maxpmin || nPCur > minpmax { 763 return fmt.Errorf( 764 "Node '%s' protocol version (%d) is incompatible: [%d, %d]", 765 n.Name, nPCur, maxpmin, minpmax) 766 } 767 768 if nDCur < maxdmin || nDCur > mindmax { 769 return fmt.Errorf( 770 "Node '%s' delegate protocol version (%d) is incompatible: [%d, %d]", 771 n.Name, nDCur, maxdmin, mindmax) 772 } 773 } 774 775 return nil 776} 777 778// nextSeqNo returns a usable sequence number in a thread safe way 779func (m *Memberlist) nextSeqNo() uint32 { 780 return atomic.AddUint32(&m.sequenceNum, 1) 781} 782 783// nextIncarnation returns the next incarnation number in a thread safe way 784func (m *Memberlist) nextIncarnation() uint32 { 785 return atomic.AddUint32(&m.incarnation, 1) 786} 787 788// skipIncarnation adds the positive offset to the incarnation number. 789func (m *Memberlist) skipIncarnation(offset uint32) uint32 { 790 return atomic.AddUint32(&m.incarnation, offset) 791} 792 793// estNumNodes is used to get the current estimate of the number of nodes 794func (m *Memberlist) estNumNodes() int { 795 return int(atomic.LoadUint32(&m.numNodes)) 796} 797 798type ackMessage struct { 799 Complete bool 800 Payload []byte 801 Timestamp time.Time 802} 803 804// setProbeChannels is used to attach the ackCh to receive a message when an ack 805// with a given sequence number is received. The `complete` field of the message 806// will be false on timeout. Any nack messages will cause an empty struct to be 807// passed to the nackCh, which can be nil if not needed. 808func (m *Memberlist) setProbeChannels(seqNo uint32, ackCh chan ackMessage, nackCh chan struct{}, timeout time.Duration) { 809 // Create handler functions for acks and nacks 810 ackFn := func(payload []byte, timestamp time.Time) { 811 select { 812 case ackCh <- ackMessage{true, payload, timestamp}: 813 default: 814 } 815 } 816 nackFn := func() { 817 select { 818 case nackCh <- struct{}{}: 819 default: 820 } 821 } 822 823 // Add the handlers 824 ah := &ackHandler{ackFn, nackFn, nil} 825 m.ackLock.Lock() 826 m.ackHandlers[seqNo] = ah 827 m.ackLock.Unlock() 828 829 // Setup a reaping routing 830 ah.timer = time.AfterFunc(timeout, func() { 831 m.ackLock.Lock() 832 delete(m.ackHandlers, seqNo) 833 m.ackLock.Unlock() 834 select { 835 case ackCh <- ackMessage{false, nil, time.Now()}: 836 default: 837 } 838 }) 839} 840 841// setAckHandler is used to attach a handler to be invoked when an ack with a 842// given sequence number is received. If a timeout is reached, the handler is 843// deleted. This is used for indirect pings so does not configure a function 844// for nacks. 845func (m *Memberlist) setAckHandler(seqNo uint32, ackFn func([]byte, time.Time), timeout time.Duration) { 846 // Add the handler 847 ah := &ackHandler{ackFn, nil, nil} 848 m.ackLock.Lock() 849 m.ackHandlers[seqNo] = ah 850 m.ackLock.Unlock() 851 852 // Setup a reaping routing 853 ah.timer = time.AfterFunc(timeout, func() { 854 m.ackLock.Lock() 855 delete(m.ackHandlers, seqNo) 856 m.ackLock.Unlock() 857 }) 858} 859 860// Invokes an ack handler if any is associated, and reaps the handler immediately 861func (m *Memberlist) invokeAckHandler(ack ackResp, timestamp time.Time) { 862 m.ackLock.Lock() 863 ah, ok := m.ackHandlers[ack.SeqNo] 864 delete(m.ackHandlers, ack.SeqNo) 865 m.ackLock.Unlock() 866 if !ok { 867 return 868 } 869 ah.timer.Stop() 870 ah.ackFn(ack.Payload, timestamp) 871} 872 873// Invokes nack handler if any is associated. 874func (m *Memberlist) invokeNackHandler(nack nackResp) { 875 m.ackLock.Lock() 876 ah, ok := m.ackHandlers[nack.SeqNo] 877 m.ackLock.Unlock() 878 if !ok || ah.nackFn == nil { 879 return 880 } 881 ah.nackFn() 882} 883 884// refute gossips an alive message in response to incoming information that we 885// are suspect or dead. It will make sure the incarnation number beats the given 886// accusedInc value, or you can supply 0 to just get the next incarnation number. 887// This alters the node state that's passed in so this MUST be called while the 888// nodeLock is held. 889func (m *Memberlist) refute(me *nodeState, accusedInc uint32) { 890 // Make sure the incarnation number beats the accusation. 891 inc := m.nextIncarnation() 892 if accusedInc >= inc { 893 inc = m.skipIncarnation(accusedInc - inc + 1) 894 } 895 me.Incarnation = inc 896 897 // Decrease our health because we are being asked to refute a problem. 898 m.awareness.ApplyDelta(1) 899 900 // Format and broadcast an alive message. 901 a := alive{ 902 Incarnation: inc, 903 Node: me.Name, 904 Addr: me.Addr, 905 Port: me.Port, 906 Meta: me.Meta, 907 Vsn: []uint8{ 908 me.PMin, me.PMax, me.PCur, 909 me.DMin, me.DMax, me.DCur, 910 }, 911 } 912 m.encodeAndBroadcast(me.Addr.String(), aliveMsg, a) 913} 914 915// aliveNode is invoked by the network layer when we get a message about a 916// live node. 917func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) { 918 m.nodeLock.Lock() 919 defer m.nodeLock.Unlock() 920 state, ok := m.nodeMap[a.Node] 921 922 // It is possible that during a Leave(), there is already an aliveMsg 923 // in-queue to be processed but blocked by the locks above. If we let 924 // that aliveMsg process, it'll cause us to re-join the cluster. This 925 // ensures that we don't. 926 if m.hasLeft() && a.Node == m.config.Name { 927 return 928 } 929 930 if len(a.Vsn) >= 3 { 931 pMin := a.Vsn[0] 932 pMax := a.Vsn[1] 933 pCur := a.Vsn[2] 934 if pMin == 0 || pMax == 0 || pMin > pMax { 935 m.logger.Printf("[WARN] memberlist: Ignoring an alive message for '%s' (%v:%d) because protocol version(s) are wrong: %d <= %d <= %d should be >0", a.Node, net.IP(a.Addr), a.Port, pMin, pCur, pMax) 936 return 937 } 938 } 939 940 // Invoke the Alive delegate if any. This can be used to filter out 941 // alive messages based on custom logic. For example, using a cluster name. 942 // Using a merge delegate is not enough, as it is possible for passive 943 // cluster merging to still occur. 944 if m.config.Alive != nil { 945 if len(a.Vsn) < 6 { 946 m.logger.Printf("[WARN] memberlist: ignoring alive message for '%s' (%v:%d) because Vsn is not present", 947 a.Node, net.IP(a.Addr), a.Port) 948 return 949 } 950 node := &Node{ 951 Name: a.Node, 952 Addr: a.Addr, 953 Port: a.Port, 954 Meta: a.Meta, 955 PMin: a.Vsn[0], 956 PMax: a.Vsn[1], 957 PCur: a.Vsn[2], 958 DMin: a.Vsn[3], 959 DMax: a.Vsn[4], 960 DCur: a.Vsn[5], 961 } 962 if err := m.config.Alive.NotifyAlive(node); err != nil { 963 m.logger.Printf("[WARN] memberlist: ignoring alive message for '%s': %s", 964 a.Node, err) 965 return 966 } 967 } 968 969 // Check if we've never seen this node before, and if not, then 970 // store this node in our node map. 971 var updatesNode bool 972 if !ok { 973 errCon := m.config.IPAllowed(a.Addr) 974 if errCon != nil { 975 m.logger.Printf("[WARN] memberlist: Rejected node %s (%v): %s", a.Node, net.IP(a.Addr), errCon) 976 return 977 } 978 state = &nodeState{ 979 Node: Node{ 980 Name: a.Node, 981 Addr: a.Addr, 982 Port: a.Port, 983 Meta: a.Meta, 984 }, 985 State: StateDead, 986 } 987 if len(a.Vsn) > 5 { 988 state.PMin = a.Vsn[0] 989 state.PMax = a.Vsn[1] 990 state.PCur = a.Vsn[2] 991 state.DMin = a.Vsn[3] 992 state.DMax = a.Vsn[4] 993 state.DCur = a.Vsn[5] 994 } 995 996 // Add to map 997 m.nodeMap[a.Node] = state 998 999 // Get a random offset. This is important to ensure 1000 // the failure detection bound is low on average. If all 1001 // nodes did an append, failure detection bound would be 1002 // very high. 1003 n := len(m.nodes) 1004 offset := randomOffset(n) 1005 1006 // Add at the end and swap with the node at the offset 1007 m.nodes = append(m.nodes, state) 1008 m.nodes[offset], m.nodes[n] = m.nodes[n], m.nodes[offset] 1009 1010 // Update numNodes after we've added a new node 1011 atomic.AddUint32(&m.numNodes, 1) 1012 } else { 1013 // Check if this address is different than the existing node unless the old node is dead. 1014 if !bytes.Equal([]byte(state.Addr), a.Addr) || state.Port != a.Port { 1015 errCon := m.config.IPAllowed(a.Addr) 1016 if errCon != nil { 1017 m.logger.Printf("[WARN] memberlist: Rejected IP update from %v to %v for node %s: %s", a.Node, state.Addr, net.IP(a.Addr), errCon) 1018 return 1019 } 1020 // If DeadNodeReclaimTime is configured, check if enough time has elapsed since the node died. 1021 canReclaim := (m.config.DeadNodeReclaimTime > 0 && 1022 time.Since(state.StateChange) > m.config.DeadNodeReclaimTime) 1023 1024 // Allow the address to be updated if a dead node is being replaced. 1025 if state.State == StateLeft || (state.State == StateDead && canReclaim) { 1026 m.logger.Printf("[INFO] memberlist: Updating address for left or failed node %s from %v:%d to %v:%d", 1027 state.Name, state.Addr, state.Port, net.IP(a.Addr), a.Port) 1028 updatesNode = true 1029 } else { 1030 m.logger.Printf("[ERR] memberlist: Conflicting address for %s. Mine: %v:%d Theirs: %v:%d Old state: %v", 1031 state.Name, state.Addr, state.Port, net.IP(a.Addr), a.Port, state.State) 1032 1033 // Inform the conflict delegate if provided 1034 if m.config.Conflict != nil { 1035 other := Node{ 1036 Name: a.Node, 1037 Addr: a.Addr, 1038 Port: a.Port, 1039 Meta: a.Meta, 1040 } 1041 m.config.Conflict.NotifyConflict(&state.Node, &other) 1042 } 1043 return 1044 } 1045 } 1046 } 1047 1048 // Bail if the incarnation number is older, and this is not about us 1049 isLocalNode := state.Name == m.config.Name 1050 if a.Incarnation <= state.Incarnation && !isLocalNode && !updatesNode { 1051 return 1052 } 1053 1054 // Bail if strictly less and this is about us 1055 if a.Incarnation < state.Incarnation && isLocalNode { 1056 return 1057 } 1058 1059 // Clear out any suspicion timer that may be in effect. 1060 delete(m.nodeTimers, a.Node) 1061 1062 // Store the old state and meta data 1063 oldState := state.State 1064 oldMeta := state.Meta 1065 1066 // If this is us we need to refute, otherwise re-broadcast 1067 if !bootstrap && isLocalNode { 1068 // Compute the version vector 1069 versions := []uint8{ 1070 state.PMin, state.PMax, state.PCur, 1071 state.DMin, state.DMax, state.DCur, 1072 } 1073 1074 // If the Incarnation is the same, we need special handling, since it 1075 // possible for the following situation to happen: 1076 // 1) Start with configuration C, join cluster 1077 // 2) Hard fail / Kill / Shutdown 1078 // 3) Restart with configuration C', join cluster 1079 // 1080 // In this case, other nodes and the local node see the same incarnation, 1081 // but the values may not be the same. For this reason, we always 1082 // need to do an equality check for this Incarnation. In most cases, 1083 // we just ignore, but we may need to refute. 1084 // 1085 if a.Incarnation == state.Incarnation && 1086 bytes.Equal(a.Meta, state.Meta) && 1087 bytes.Equal(a.Vsn, versions) { 1088 return 1089 } 1090 m.refute(state, a.Incarnation) 1091 m.logger.Printf("[WARN] memberlist: Refuting an alive message for '%s' (%v:%d) meta:(%v VS %v), vsn:(%v VS %v)", a.Node, net.IP(a.Addr), a.Port, a.Meta, state.Meta, a.Vsn, versions) 1092 } else { 1093 m.encodeBroadcastNotify(a.Node, aliveMsg, a, notify) 1094 1095 // Update protocol versions if it arrived 1096 if len(a.Vsn) > 0 { 1097 state.PMin = a.Vsn[0] 1098 state.PMax = a.Vsn[1] 1099 state.PCur = a.Vsn[2] 1100 state.DMin = a.Vsn[3] 1101 state.DMax = a.Vsn[4] 1102 state.DCur = a.Vsn[5] 1103 } 1104 1105 // Update the state and incarnation number 1106 state.Incarnation = a.Incarnation 1107 state.Meta = a.Meta 1108 state.Addr = a.Addr 1109 state.Port = a.Port 1110 if state.State != StateAlive { 1111 state.State = StateAlive 1112 state.StateChange = time.Now() 1113 } 1114 } 1115 1116 // Update metrics 1117 metrics.IncrCounter([]string{"memberlist", "msg", "alive"}, 1) 1118 1119 // Notify the delegate of any relevant updates 1120 if m.config.Events != nil { 1121 if oldState == StateDead || oldState == StateLeft { 1122 // if Dead/Left -> Alive, notify of join 1123 m.config.Events.NotifyJoin(&state.Node) 1124 1125 } else if !bytes.Equal(oldMeta, state.Meta) { 1126 // if Meta changed, trigger an update notification 1127 m.config.Events.NotifyUpdate(&state.Node) 1128 } 1129 } 1130} 1131 1132// suspectNode is invoked by the network layer when we get a message 1133// about a suspect node 1134func (m *Memberlist) suspectNode(s *suspect) { 1135 m.nodeLock.Lock() 1136 defer m.nodeLock.Unlock() 1137 state, ok := m.nodeMap[s.Node] 1138 1139 // If we've never heard about this node before, ignore it 1140 if !ok { 1141 return 1142 } 1143 1144 // Ignore old incarnation numbers 1145 if s.Incarnation < state.Incarnation { 1146 return 1147 } 1148 1149 // See if there's a suspicion timer we can confirm. If the info is new 1150 // to us we will go ahead and re-gossip it. This allows for multiple 1151 // independent confirmations to flow even when a node probes a node 1152 // that's already suspect. 1153 if timer, ok := m.nodeTimers[s.Node]; ok { 1154 if timer.Confirm(s.From) { 1155 m.encodeAndBroadcast(s.Node, suspectMsg, s) 1156 } 1157 return 1158 } 1159 1160 // Ignore non-alive nodes 1161 if state.State != StateAlive { 1162 return 1163 } 1164 1165 // If this is us we need to refute, otherwise re-broadcast 1166 if state.Name == m.config.Name { 1167 m.refute(state, s.Incarnation) 1168 m.logger.Printf("[WARN] memberlist: Refuting a suspect message (from: %s)", s.From) 1169 return // Do not mark ourself suspect 1170 } else { 1171 m.encodeAndBroadcast(s.Node, suspectMsg, s) 1172 } 1173 1174 // Update metrics 1175 metrics.IncrCounter([]string{"memberlist", "msg", "suspect"}, 1) 1176 1177 // Update the state 1178 state.Incarnation = s.Incarnation 1179 state.State = StateSuspect 1180 changeTime := time.Now() 1181 state.StateChange = changeTime 1182 1183 // Setup a suspicion timer. Given that we don't have any known phase 1184 // relationship with our peers, we set up k such that we hit the nominal 1185 // timeout two probe intervals short of what we expect given the suspicion 1186 // multiplier. 1187 k := m.config.SuspicionMult - 2 1188 1189 // If there aren't enough nodes to give the expected confirmations, just 1190 // set k to 0 to say that we don't expect any. Note we subtract 2 from n 1191 // here to take out ourselves and the node being probed. 1192 n := m.estNumNodes() 1193 if n-2 < k { 1194 k = 0 1195 } 1196 1197 // Compute the timeouts based on the size of the cluster. 1198 min := suspicionTimeout(m.config.SuspicionMult, n, m.config.ProbeInterval) 1199 max := time.Duration(m.config.SuspicionMaxTimeoutMult) * min 1200 fn := func(numConfirmations int) { 1201 m.nodeLock.Lock() 1202 state, ok := m.nodeMap[s.Node] 1203 timeout := ok && state.State == StateSuspect && state.StateChange == changeTime 1204 m.nodeLock.Unlock() 1205 1206 if timeout { 1207 if k > 0 && numConfirmations < k { 1208 metrics.IncrCounter([]string{"memberlist", "degraded", "timeout"}, 1) 1209 } 1210 1211 m.logger.Printf("[INFO] memberlist: Marking %s as failed, suspect timeout reached (%d peer confirmations)", 1212 state.Name, numConfirmations) 1213 d := dead{Incarnation: state.Incarnation, Node: state.Name, From: m.config.Name} 1214 m.deadNode(&d) 1215 } 1216 } 1217 m.nodeTimers[s.Node] = newSuspicion(s.From, k, min, max, fn) 1218} 1219 1220// deadNode is invoked by the network layer when we get a message 1221// about a dead node 1222func (m *Memberlist) deadNode(d *dead) { 1223 m.nodeLock.Lock() 1224 defer m.nodeLock.Unlock() 1225 state, ok := m.nodeMap[d.Node] 1226 1227 // If we've never heard about this node before, ignore it 1228 if !ok { 1229 return 1230 } 1231 1232 // Ignore old incarnation numbers 1233 if d.Incarnation < state.Incarnation { 1234 return 1235 } 1236 1237 // Clear out any suspicion timer that may be in effect. 1238 delete(m.nodeTimers, d.Node) 1239 1240 // Ignore if node is already dead 1241 if state.DeadOrLeft() { 1242 return 1243 } 1244 1245 // Check if this is us 1246 if state.Name == m.config.Name { 1247 // If we are not leaving we need to refute 1248 if !m.hasLeft() { 1249 m.refute(state, d.Incarnation) 1250 m.logger.Printf("[WARN] memberlist: Refuting a dead message (from: %s)", d.From) 1251 return // Do not mark ourself dead 1252 } 1253 1254 // If we are leaving, we broadcast and wait 1255 m.encodeBroadcastNotify(d.Node, deadMsg, d, m.leaveBroadcast) 1256 } else { 1257 m.encodeAndBroadcast(d.Node, deadMsg, d) 1258 } 1259 1260 // Update metrics 1261 metrics.IncrCounter([]string{"memberlist", "msg", "dead"}, 1) 1262 1263 // Update the state 1264 state.Incarnation = d.Incarnation 1265 1266 // If the dead message was send by the node itself, mark it is left 1267 // instead of dead. 1268 if d.Node == d.From { 1269 state.State = StateLeft 1270 } else { 1271 state.State = StateDead 1272 } 1273 state.StateChange = time.Now() 1274 1275 // Notify of death 1276 if m.config.Events != nil { 1277 m.config.Events.NotifyLeave(&state.Node) 1278 } 1279} 1280 1281// mergeState is invoked by the network layer when we get a Push/Pull 1282// state transfer 1283func (m *Memberlist) mergeState(remote []pushNodeState) { 1284 for _, r := range remote { 1285 switch r.State { 1286 case StateAlive: 1287 a := alive{ 1288 Incarnation: r.Incarnation, 1289 Node: r.Name, 1290 Addr: r.Addr, 1291 Port: r.Port, 1292 Meta: r.Meta, 1293 Vsn: r.Vsn, 1294 } 1295 m.aliveNode(&a, nil, false) 1296 1297 case StateLeft: 1298 d := dead{Incarnation: r.Incarnation, Node: r.Name, From: r.Name} 1299 m.deadNode(&d) 1300 case StateDead: 1301 // If the remote node believes a node is dead, we prefer to 1302 // suspect that node instead of declaring it dead instantly 1303 fallthrough 1304 case StateSuspect: 1305 s := suspect{Incarnation: r.Incarnation, Node: r.Name, From: m.config.Name} 1306 m.suspectNode(&s) 1307 } 1308 } 1309} 1310