1package memberlist 2 3import ( 4 "bytes" 5 "fmt" 6 "math" 7 "math/rand" 8 "net" 9 "sync/atomic" 10 "time" 11 12 "github.com/armon/go-metrics" 13) 14 15type nodeStateType int 16 17const ( 18 stateAlive nodeStateType = iota 19 stateSuspect 20 stateDead 21) 22 23// Node represents a node in the cluster. 24type Node struct { 25 Name string 26 Addr net.IP 27 Port uint16 28 Meta []byte // Metadata from the delegate for this node. 29 PMin uint8 // Minimum protocol version this understands 30 PMax uint8 // Maximum protocol version this understands 31 PCur uint8 // Current version node is speaking 32 DMin uint8 // Min protocol version for the delegate to understand 33 DMax uint8 // Max protocol version for the delegate to understand 34 DCur uint8 // Current version delegate is speaking 35} 36 37// NodeState is used to manage our state view of another node 38type nodeState struct { 39 Node 40 Incarnation uint32 // Last known incarnation number 41 State nodeStateType // Current state 42 StateChange time.Time // Time last state change happened 43} 44 45// ackHandler is used to register handlers for incoming acks 46type ackHandler struct { 47 handler func([]byte, time.Time) 48 timer *time.Timer 49} 50 51// NoPingResponseError is used to indicate a 'ping' packet was 52// successfully issued but no response was received 53type NoPingResponseError struct { 54 node string 55} 56 57func (f NoPingResponseError) Error() string { 58 return fmt.Sprintf("No response from node %s", f.node) 59} 60 61// Schedule is used to ensure the Tick is performed periodically. This 62// function is safe to call multiple times. If the memberlist is already 63// scheduled, then it won't do anything. 64func (m *Memberlist) schedule() { 65 m.tickerLock.Lock() 66 defer m.tickerLock.Unlock() 67 68 // If we already have tickers, then don't do anything, since we're 69 // scheduled 70 if len(m.tickers) > 0 { 71 return 72 } 73 74 // Create the stop tick channel, a blocking channel. We close this 75 // when we should stop the tickers. 76 stopCh := make(chan struct{}) 77 78 // Create a new probeTicker 79 if m.config.ProbeInterval > 0 { 80 t := time.NewTicker(m.config.ProbeInterval) 81 go m.triggerFunc(m.config.ProbeInterval, t.C, stopCh, m.probe) 82 m.tickers = append(m.tickers, t) 83 } 84 85 // Create a push pull ticker if needed 86 if m.config.PushPullInterval > 0 { 87 go m.pushPullTrigger(stopCh) 88 } 89 90 // Create a gossip ticker if needed 91 if m.config.GossipInterval > 0 && m.config.GossipNodes > 0 { 92 t := time.NewTicker(m.config.GossipInterval) 93 go m.triggerFunc(m.config.GossipInterval, t.C, stopCh, m.gossip) 94 m.tickers = append(m.tickers, t) 95 } 96 97 // If we made any tickers, then record the stopTick channel for 98 // later. 99 if len(m.tickers) > 0 { 100 m.stopTick = stopCh 101 } 102} 103 104// triggerFunc is used to trigger a function call each time a 105// message is received until a stop tick arrives. 106func (m *Memberlist) triggerFunc(stagger time.Duration, C <-chan time.Time, stop <-chan struct{}, f func()) { 107 // Use a random stagger to avoid syncronizing 108 randStagger := time.Duration(uint64(rand.Int63()) % uint64(stagger)) 109 select { 110 case <-time.After(randStagger): 111 case <-stop: 112 return 113 } 114 for { 115 select { 116 case <-C: 117 f() 118 case <-stop: 119 return 120 } 121 } 122} 123 124// pushPullTrigger is used to periodically trigger a push/pull until 125// a stop tick arrives. We don't use triggerFunc since the push/pull 126// timer is dynamically scaled based on cluster size to avoid network 127// saturation 128func (m *Memberlist) pushPullTrigger(stop <-chan struct{}) { 129 interval := m.config.PushPullInterval 130 131 // Use a random stagger to avoid syncronizing 132 randStagger := time.Duration(uint64(rand.Int63()) % uint64(interval)) 133 select { 134 case <-time.After(randStagger): 135 case <-stop: 136 return 137 } 138 139 // Tick using a dynamic timer 140 for { 141 tickTime := pushPullScale(interval, m.estNumNodes()) 142 select { 143 case <-time.After(tickTime): 144 m.pushPull() 145 case <-stop: 146 return 147 } 148 } 149} 150 151// Deschedule is used to stop the background maintenence. This is safe 152// to call multiple times. 153func (m *Memberlist) deschedule() { 154 m.tickerLock.Lock() 155 defer m.tickerLock.Unlock() 156 157 // If we have no tickers, then we aren't scheduled. 158 if len(m.tickers) == 0 { 159 return 160 } 161 162 // Close the stop channel so all the ticker listeners stop. 163 close(m.stopTick) 164 165 // Explicitly stop all the tickers themselves so they don't take 166 // up any more resources, and get rid of the list. 167 for _, t := range m.tickers { 168 t.Stop() 169 } 170 m.tickers = nil 171} 172 173// Tick is used to perform a single round of failure detection and gossip 174func (m *Memberlist) probe() { 175 // Track the number of indexes we've considered probing 176 numCheck := 0 177START: 178 m.nodeLock.RLock() 179 180 // Make sure we don't wrap around infinitely 181 if numCheck >= len(m.nodes) { 182 m.nodeLock.RUnlock() 183 return 184 } 185 186 // Handle the wrap around case 187 if m.probeIndex >= len(m.nodes) { 188 m.nodeLock.RUnlock() 189 m.resetNodes() 190 m.probeIndex = 0 191 numCheck++ 192 goto START 193 } 194 195 // Determine if we should probe this node 196 skip := false 197 var node nodeState 198 199 node = *m.nodes[m.probeIndex] 200 if node.Name == m.config.Name { 201 skip = true 202 } else if node.State == stateDead { 203 skip = true 204 } 205 206 // Potentially skip 207 m.nodeLock.RUnlock() 208 m.probeIndex++ 209 if skip { 210 numCheck++ 211 goto START 212 } 213 214 // Probe the specific node 215 m.probeNode(&node) 216} 217 218// probeNode handles a single round of failure checking on a node. 219func (m *Memberlist) probeNode(node *nodeState) { 220 defer metrics.MeasureSince([]string{"memberlist", "probeNode"}, time.Now()) 221 222 // Prepare a ping message and setup an ack handler. 223 ping := ping{SeqNo: m.nextSeqNo(), Node: node.Name} 224 ackCh := make(chan ackMessage, m.config.IndirectChecks+1) 225 m.setAckChannel(ping.SeqNo, ackCh, m.config.ProbeInterval) 226 227 // Send a ping to the node. 228 deadline := time.Now().Add(m.config.ProbeInterval) 229 destAddr := &net.UDPAddr{IP: node.Addr, Port: int(node.Port)} 230 if err := m.encodeAndSendMsg(destAddr, pingMsg, &ping); err != nil { 231 m.logger.Printf("[ERR] memberlist: Failed to send ping: %s", err) 232 return 233 } 234 235 // Mark the sent time here, which should be after any pre-processing and 236 // system calls to do the actual send. This probably under-reports a bit, 237 // but it's the best we can do. 238 sent := time.Now() 239 240 // Wait for response or round-trip-time. 241 select { 242 case v := <-ackCh: 243 if v.Complete == true { 244 if m.config.Ping != nil { 245 rtt := v.Timestamp.Sub(sent) 246 m.config.Ping.NotifyPingComplete(&node.Node, rtt, v.Payload) 247 } 248 return 249 } 250 251 // As an edge case, if we get a timeout, we need to re-enqueue it 252 // here to break out of the select below. 253 if v.Complete == false { 254 ackCh <- v 255 } 256 case <-time.After(m.config.ProbeTimeout): 257 m.logger.Printf("[DEBUG] memberlist: Failed UDP ping: %v (timeout reached)", node.Name) 258 } 259 260 // Get some random live nodes. 261 m.nodeLock.RLock() 262 excludes := []string{m.config.Name, node.Name} 263 kNodes := kRandomNodes(m.config.IndirectChecks, excludes, m.nodes) 264 m.nodeLock.RUnlock() 265 266 // Attempt an indirect ping. 267 ind := indirectPingReq{SeqNo: ping.SeqNo, Target: node.Addr, Port: node.Port, Node: node.Name} 268 for _, peer := range kNodes { 269 destAddr := &net.UDPAddr{IP: peer.Addr, Port: int(peer.Port)} 270 if err := m.encodeAndSendMsg(destAddr, indirectPingMsg, &ind); err != nil { 271 m.logger.Printf("[ERR] memberlist: Failed to send indirect ping: %s", err) 272 } 273 } 274 275 // Also make an attempt to contact the node directly over TCP. This 276 // helps prevent confused clients who get isolated from UDP traffic 277 // but can still speak TCP (which also means they can possibly report 278 // misinformation to other nodes via anti-entropy), avoiding flapping in 279 // the cluster. 280 // 281 // This is a little unusual because we will attempt a TCP ping to any 282 // member who understands version 3 of the protocol, regardless of 283 // which protocol version we are speaking. That's why we've included a 284 // config option to turn this off if desired. 285 fallbackCh := make(chan bool, 1) 286 if (!m.config.DisableTcpPings) && (node.PMax >= 3) { 287 destAddr := &net.TCPAddr{IP: node.Addr, Port: int(node.Port)} 288 go func() { 289 defer close(fallbackCh) 290 didContact, err := m.sendPingAndWaitForAck(destAddr, ping, deadline) 291 if err != nil { 292 m.logger.Printf("[ERR] memberlist: Failed TCP fallback ping: %s", err) 293 } else { 294 fallbackCh <- didContact 295 } 296 }() 297 } else { 298 close(fallbackCh) 299 } 300 301 // Wait for the acks or timeout. Note that we don't check the fallback 302 // channel here because we want to issue a warning below if that's the 303 // *only* way we hear back from the peer, so we have to let this time 304 // out first to allow the normal UDP-based acks to come in. 305 select { 306 case v := <-ackCh: 307 if v.Complete == true { 308 return 309 } 310 } 311 312 // Finally, poll the fallback channel. The timeouts are set such that 313 // the channel will have something or be closed without having to wait 314 // any additional time here. 315 for didContact := range fallbackCh { 316 if didContact { 317 m.logger.Printf("[WARN] memberlist: Was able to reach %s via TCP but not UDP, network may be misconfigured and not allowing bidirectional UDP", node.Name) 318 return 319 } 320 } 321 322 // No acks received from target, suspect 323 m.logger.Printf("[INFO] memberlist: Suspect %s has failed, no acks received", node.Name) 324 s := suspect{Incarnation: node.Incarnation, Node: node.Name, From: m.config.Name} 325 m.suspectNode(&s) 326} 327 328// Ping initiates a ping to the node with the specified name. 329func (m *Memberlist) Ping(node string, addr net.Addr) (time.Duration, error) { 330 // Prepare a ping message and setup an ack handler. 331 ping := ping{SeqNo: m.nextSeqNo(), Node: node} 332 ackCh := make(chan ackMessage, m.config.IndirectChecks+1) 333 m.setAckChannel(ping.SeqNo, ackCh, m.config.ProbeInterval) 334 335 // Send a ping to the node. 336 if err := m.encodeAndSendMsg(addr, pingMsg, &ping); err != nil { 337 return 0, err 338 } 339 340 // Mark the sent time here, which should be after any pre-processing and 341 // system calls to do the actual send. This probably under-reports a bit, 342 // but it's the best we can do. 343 sent := time.Now() 344 345 // Wait for response or timeout. 346 select { 347 case v := <-ackCh: 348 if v.Complete == true { 349 return v.Timestamp.Sub(sent), nil 350 } 351 case <-time.After(m.config.ProbeTimeout): 352 // Timeout, return an error below. 353 } 354 355 m.logger.Printf("[DEBUG] memberlist: Failed UDP ping: %v (timeout reached)", node) 356 return 0, NoPingResponseError{ping.Node} 357} 358 359// resetNodes is used when the tick wraps around. It will reap the 360// dead nodes and shuffle the node list. 361func (m *Memberlist) resetNodes() { 362 m.nodeLock.Lock() 363 defer m.nodeLock.Unlock() 364 365 // Move the dead nodes 366 deadIdx := moveDeadNodes(m.nodes) 367 368 // Deregister the dead nodes 369 for i := deadIdx; i < len(m.nodes); i++ { 370 delete(m.nodeMap, m.nodes[i].Name) 371 m.nodes[i] = nil 372 } 373 374 // Trim the nodes to exclude the dead nodes 375 m.nodes = m.nodes[0:deadIdx] 376 377 // Update numNodes after we've trimmed the dead nodes 378 atomic.StoreUint32(&m.numNodes, uint32(deadIdx)) 379 380 // Shuffle live nodes 381 shuffleNodes(m.nodes) 382} 383 384// gossip is invoked every GossipInterval period to broadcast our gossip 385// messages to a few random nodes. 386func (m *Memberlist) gossip() { 387 defer metrics.MeasureSince([]string{"memberlist", "gossip"}, time.Now()) 388 389 // Get some random live nodes 390 m.nodeLock.RLock() 391 excludes := []string{m.config.Name} 392 kNodes := kRandomNodes(m.config.GossipNodes, excludes, m.nodes) 393 m.nodeLock.RUnlock() 394 395 // Compute the bytes available 396 bytesAvail := udpSendBuf - compoundHeaderOverhead 397 if m.config.EncryptionEnabled() { 398 bytesAvail -= encryptOverhead(m.encryptionVersion()) 399 } 400 401 for _, node := range kNodes { 402 // Get any pending broadcasts 403 msgs := m.getBroadcasts(compoundOverhead, bytesAvail) 404 if len(msgs) == 0 { 405 return 406 } 407 408 // Create a compound message 409 compound := makeCompoundMessage(msgs) 410 411 // Send the compound message 412 destAddr := &net.UDPAddr{IP: node.Addr, Port: int(node.Port)} 413 if err := m.rawSendMsgUDP(destAddr, compound.Bytes()); err != nil { 414 m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", destAddr, err) 415 } 416 } 417} 418 419// pushPull is invoked periodically to randomly perform a complete state 420// exchange. Used to ensure a high level of convergence, but is also 421// reasonably expensive as the entire state of this node is exchanged 422// with the other node. 423func (m *Memberlist) pushPull() { 424 // Get a random live node 425 m.nodeLock.RLock() 426 excludes := []string{m.config.Name} 427 nodes := kRandomNodes(1, excludes, m.nodes) 428 m.nodeLock.RUnlock() 429 430 // If no nodes, bail 431 if len(nodes) == 0 { 432 return 433 } 434 node := nodes[0] 435 436 // Attempt a push pull 437 if err := m.pushPullNode(node.Addr, node.Port, false); err != nil { 438 m.logger.Printf("[ERR] memberlist: Push/Pull with %s failed: %s", node.Name, err) 439 } 440} 441 442// pushPullNode does a complete state exchange with a specific node. 443func (m *Memberlist) pushPullNode(addr []byte, port uint16, join bool) error { 444 defer metrics.MeasureSince([]string{"memberlist", "pushPullNode"}, time.Now()) 445 446 // Attempt to send and receive with the node 447 remote, userState, err := m.sendAndReceiveState(addr, port, join) 448 if err != nil { 449 return err 450 } 451 452 if err := m.mergeRemoteState(join, remote, userState); err != nil { 453 return err 454 } 455 return nil 456} 457 458// verifyProtocol verifies that all the remote nodes can speak with our 459// nodes and vice versa on both the core protocol as well as the 460// delegate protocol level. 461// 462// The verification works by finding the maximum minimum and 463// minimum maximum understood protocol and delegate versions. In other words, 464// it finds the common denominator of protocol and delegate version ranges 465// for the entire cluster. 466// 467// After this, it goes through the entire cluster (local and remote) and 468// verifies that everyone's speaking protocol versions satisfy this range. 469// If this passes, it means that every node can understand each other. 470func (m *Memberlist) verifyProtocol(remote []pushNodeState) error { 471 m.nodeLock.RLock() 472 defer m.nodeLock.RUnlock() 473 474 // Maximum minimum understood and minimum maximum understood for both 475 // the protocol and delegate versions. We use this to verify everyone 476 // can be understood. 477 var maxpmin, minpmax uint8 478 var maxdmin, mindmax uint8 479 minpmax = math.MaxUint8 480 mindmax = math.MaxUint8 481 482 for _, rn := range remote { 483 // If the node isn't alive, then skip it 484 if rn.State != stateAlive { 485 continue 486 } 487 488 // Skip nodes that don't have versions set, it just means 489 // their version is zero. 490 if len(rn.Vsn) == 0 { 491 continue 492 } 493 494 if rn.Vsn[0] > maxpmin { 495 maxpmin = rn.Vsn[0] 496 } 497 498 if rn.Vsn[1] < minpmax { 499 minpmax = rn.Vsn[1] 500 } 501 502 if rn.Vsn[3] > maxdmin { 503 maxdmin = rn.Vsn[3] 504 } 505 506 if rn.Vsn[4] < mindmax { 507 mindmax = rn.Vsn[4] 508 } 509 } 510 511 for _, n := range m.nodes { 512 // Ignore non-alive nodes 513 if n.State != stateAlive { 514 continue 515 } 516 517 if n.PMin > maxpmin { 518 maxpmin = n.PMin 519 } 520 521 if n.PMax < minpmax { 522 minpmax = n.PMax 523 } 524 525 if n.DMin > maxdmin { 526 maxdmin = n.DMin 527 } 528 529 if n.DMax < mindmax { 530 mindmax = n.DMax 531 } 532 } 533 534 // Now that we definitively know the minimum and maximum understood 535 // version that satisfies the whole cluster, we verify that every 536 // node in the cluster satisifies this. 537 for _, n := range remote { 538 var nPCur, nDCur uint8 539 if len(n.Vsn) > 0 { 540 nPCur = n.Vsn[2] 541 nDCur = n.Vsn[5] 542 } 543 544 if nPCur < maxpmin || nPCur > minpmax { 545 return fmt.Errorf( 546 "Node '%s' protocol version (%d) is incompatible: [%d, %d]", 547 n.Name, nPCur, maxpmin, minpmax) 548 } 549 550 if nDCur < maxdmin || nDCur > mindmax { 551 return fmt.Errorf( 552 "Node '%s' delegate protocol version (%d) is incompatible: [%d, %d]", 553 n.Name, nDCur, maxdmin, mindmax) 554 } 555 } 556 557 for _, n := range m.nodes { 558 nPCur := n.PCur 559 nDCur := n.DCur 560 561 if nPCur < maxpmin || nPCur > minpmax { 562 return fmt.Errorf( 563 "Node '%s' protocol version (%d) is incompatible: [%d, %d]", 564 n.Name, nPCur, maxpmin, minpmax) 565 } 566 567 if nDCur < maxdmin || nDCur > mindmax { 568 return fmt.Errorf( 569 "Node '%s' delegate protocol version (%d) is incompatible: [%d, %d]", 570 n.Name, nDCur, maxdmin, mindmax) 571 } 572 } 573 574 return nil 575} 576 577// nextSeqNo returns a usable sequence number in a thread safe way 578func (m *Memberlist) nextSeqNo() uint32 { 579 return atomic.AddUint32(&m.sequenceNum, 1) 580} 581 582// nextIncarnation returns the next incarnation number in a thread safe way 583func (m *Memberlist) nextIncarnation() uint32 { 584 return atomic.AddUint32(&m.incarnation, 1) 585} 586 587// estNumNodes is used to get the current estimate of the number of nodes 588func (m *Memberlist) estNumNodes() int { 589 return int(atomic.LoadUint32(&m.numNodes)) 590} 591 592type ackMessage struct { 593 Complete bool 594 Payload []byte 595 Timestamp time.Time 596} 597 598// setAckChannel is used to attach a channel to receive a message when an ack with a given 599// sequence number is received. The `complete` field of the message will be false on timeout 600func (m *Memberlist) setAckChannel(seqNo uint32, ch chan ackMessage, timeout time.Duration) { 601 // Create a handler function 602 handler := func(payload []byte, timestamp time.Time) { 603 select { 604 case ch <- ackMessage{true, payload, timestamp}: 605 default: 606 } 607 } 608 609 // Add the handler 610 ah := &ackHandler{handler, nil} 611 m.ackLock.Lock() 612 m.ackHandlers[seqNo] = ah 613 m.ackLock.Unlock() 614 615 // Setup a reaping routing 616 ah.timer = time.AfterFunc(timeout, func() { 617 m.ackLock.Lock() 618 delete(m.ackHandlers, seqNo) 619 m.ackLock.Unlock() 620 select { 621 case ch <- ackMessage{false, nil, time.Now()}: 622 default: 623 } 624 }) 625} 626 627// setAckHandler is used to attach a handler to be invoked when an 628// ack with a given sequence number is received. If a timeout is reached, 629// the handler is deleted 630func (m *Memberlist) setAckHandler(seqNo uint32, handler func([]byte, time.Time), timeout time.Duration) { 631 // Add the handler 632 ah := &ackHandler{handler, nil} 633 m.ackLock.Lock() 634 m.ackHandlers[seqNo] = ah 635 m.ackLock.Unlock() 636 637 // Setup a reaping routing 638 ah.timer = time.AfterFunc(timeout, func() { 639 m.ackLock.Lock() 640 delete(m.ackHandlers, seqNo) 641 m.ackLock.Unlock() 642 }) 643} 644 645// Invokes an Ack handler if any is associated, and reaps the handler immediately 646func (m *Memberlist) invokeAckHandler(ack ackResp, timestamp time.Time) { 647 m.ackLock.Lock() 648 ah, ok := m.ackHandlers[ack.SeqNo] 649 delete(m.ackHandlers, ack.SeqNo) 650 m.ackLock.Unlock() 651 if !ok { 652 return 653 } 654 ah.timer.Stop() 655 ah.handler(ack.Payload, timestamp) 656} 657 658// aliveNode is invoked by the network layer when we get a message about a 659// live node. 660func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) { 661 m.nodeLock.Lock() 662 defer m.nodeLock.Unlock() 663 state, ok := m.nodeMap[a.Node] 664 665 // It is possible that during a Leave(), there is already an aliveMsg 666 // in-queue to be processed but blocked by the locks above. If we let 667 // that aliveMsg process, it'll cause us to re-join the cluster. This 668 // ensures that we don't. 669 if m.leave && a.Node == m.config.Name { 670 return 671 } 672 673 // Invoke the Alive delegate if any. This can be used to filter out 674 // alive messages based on custom logic. For example, using a cluster name. 675 // Using a merge delegate is not enough, as it is possible for passive 676 // cluster merging to still occur. 677 if m.config.Alive != nil { 678 node := &Node{ 679 Name: a.Node, 680 Addr: a.Addr, 681 Port: a.Port, 682 Meta: a.Meta, 683 PMin: a.Vsn[0], 684 PMax: a.Vsn[1], 685 PCur: a.Vsn[2], 686 DMin: a.Vsn[3], 687 DMax: a.Vsn[4], 688 DCur: a.Vsn[5], 689 } 690 if err := m.config.Alive.NotifyAlive(node); err != nil { 691 m.logger.Printf("[WARN] memberlist: ignoring alive message for '%s': %s", 692 a.Node, err) 693 return 694 } 695 } 696 697 // Check if we've never seen this node before, and if not, then 698 // store this node in our node map. 699 if !ok { 700 state = &nodeState{ 701 Node: Node{ 702 Name: a.Node, 703 Addr: a.Addr, 704 Port: a.Port, 705 Meta: a.Meta, 706 }, 707 State: stateDead, 708 } 709 710 // Add to map 711 m.nodeMap[a.Node] = state 712 713 // Get a random offset. This is important to ensure 714 // the failure detection bound is low on average. If all 715 // nodes did an append, failure detection bound would be 716 // very high. 717 n := len(m.nodes) 718 offset := randomOffset(n) 719 720 // Add at the end and swap with the node at the offset 721 m.nodes = append(m.nodes, state) 722 m.nodes[offset], m.nodes[n] = m.nodes[n], m.nodes[offset] 723 724 // Update numNodes after we've added a new node 725 atomic.AddUint32(&m.numNodes, 1) 726 } 727 728 // Check if this address is different than the existing node 729 if !bytes.Equal([]byte(state.Addr), a.Addr) || state.Port != a.Port { 730 m.logger.Printf("[ERR] memberlist: Conflicting address for %s. Mine: %v:%d Theirs: %v:%d", 731 state.Name, state.Addr, state.Port, net.IP(a.Addr), a.Port) 732 733 // Inform the conflict delegate if provided 734 if m.config.Conflict != nil { 735 other := Node{ 736 Name: a.Node, 737 Addr: a.Addr, 738 Port: a.Port, 739 Meta: a.Meta, 740 } 741 m.config.Conflict.NotifyConflict(&state.Node, &other) 742 } 743 return 744 } 745 746 // Bail if the incarnation number is older, and this is not about us 747 isLocalNode := state.Name == m.config.Name 748 if a.Incarnation <= state.Incarnation && !isLocalNode { 749 return 750 } 751 752 // Bail if strictly less and this is about us 753 if a.Incarnation < state.Incarnation && isLocalNode { 754 return 755 } 756 757 // Store the old state and meta data 758 oldState := state.State 759 oldMeta := state.Meta 760 761 // If this is us we need to refute, otherwise re-broadcast 762 if !bootstrap && isLocalNode { 763 // Compute the version vector 764 versions := []uint8{ 765 state.PMin, state.PMax, state.PCur, 766 state.DMin, state.DMax, state.DCur, 767 } 768 769 // If the Incarnation is the same, we need special handling, since it 770 // possible for the following situation to happen: 771 // 1) Start with configuration C, join cluster 772 // 2) Hard fail / Kill / Shutdown 773 // 3) Restart with configuration C', join cluster 774 // 775 // In this case, other nodes and the local node see the same incarnation, 776 // but the values may not be the same. For this reason, we always 777 // need to do an equality check for this Incarnation. In most cases, 778 // we just ignore, but we may need to refute. 779 // 780 if a.Incarnation == state.Incarnation && 781 bytes.Equal(a.Meta, state.Meta) && 782 bytes.Equal(a.Vsn, versions) { 783 return 784 } 785 786 inc := m.nextIncarnation() 787 for a.Incarnation >= inc { 788 inc = m.nextIncarnation() 789 } 790 state.Incarnation = inc 791 792 a := alive{ 793 Incarnation: inc, 794 Node: state.Name, 795 Addr: state.Addr, 796 Port: state.Port, 797 Meta: state.Meta, 798 Vsn: versions, 799 } 800 m.encodeBroadcastNotify(a.Node, aliveMsg, a, notify) 801 m.logger.Printf("[WARN] memberlist: Refuting an alive message") 802 } else { 803 m.encodeBroadcastNotify(a.Node, aliveMsg, a, notify) 804 805 // Update protocol versions if it arrived 806 if len(a.Vsn) > 0 { 807 state.PMin = a.Vsn[0] 808 state.PMax = a.Vsn[1] 809 state.PCur = a.Vsn[2] 810 state.DMin = a.Vsn[3] 811 state.DMax = a.Vsn[4] 812 state.DCur = a.Vsn[5] 813 } 814 815 // Update the state and incarnation number 816 state.Incarnation = a.Incarnation 817 state.Meta = a.Meta 818 if state.State != stateAlive { 819 state.State = stateAlive 820 state.StateChange = time.Now() 821 } 822 } 823 824 // Update metrics 825 metrics.IncrCounter([]string{"memberlist", "msg", "alive"}, 1) 826 827 // Notify the delegate of any relevant updates 828 if m.config.Events != nil { 829 if oldState == stateDead { 830 // if Dead -> Alive, notify of join 831 m.config.Events.NotifyJoin(&state.Node) 832 833 } else if !bytes.Equal(oldMeta, state.Meta) { 834 // if Meta changed, trigger an update notification 835 m.config.Events.NotifyUpdate(&state.Node) 836 } 837 } 838} 839 840// suspectNode is invoked by the network layer when we get a message 841// about a suspect node 842func (m *Memberlist) suspectNode(s *suspect) { 843 m.nodeLock.Lock() 844 defer m.nodeLock.Unlock() 845 state, ok := m.nodeMap[s.Node] 846 847 // If we've never heard about this node before, ignore it 848 if !ok { 849 return 850 } 851 852 // Ignore old incarnation numbers 853 if s.Incarnation < state.Incarnation { 854 return 855 } 856 857 // Ignore non-alive nodes 858 if state.State != stateAlive { 859 return 860 } 861 862 // If this is us we need to refute, otherwise re-broadcast 863 if state.Name == m.config.Name { 864 inc := m.nextIncarnation() 865 for s.Incarnation >= inc { 866 inc = m.nextIncarnation() 867 } 868 state.Incarnation = inc 869 870 a := alive{ 871 Incarnation: inc, 872 Node: state.Name, 873 Addr: state.Addr, 874 Port: state.Port, 875 Meta: state.Meta, 876 Vsn: []uint8{ 877 state.PMin, state.PMax, state.PCur, 878 state.DMin, state.DMax, state.DCur, 879 }, 880 } 881 m.encodeAndBroadcast(s.Node, aliveMsg, a) 882 m.logger.Printf("[WARN] memberlist: Refuting a suspect message (from: %s)", s.From) 883 return // Do not mark ourself suspect 884 } else { 885 m.encodeAndBroadcast(s.Node, suspectMsg, s) 886 } 887 888 // Update metrics 889 metrics.IncrCounter([]string{"memberlist", "msg", "suspect"}, 1) 890 891 // Update the state 892 state.Incarnation = s.Incarnation 893 state.State = stateSuspect 894 changeTime := time.Now() 895 state.StateChange = changeTime 896 897 // Setup a timeout for this 898 timeout := suspicionTimeout(m.config.SuspicionMult, m.estNumNodes(), m.config.ProbeInterval) 899 time.AfterFunc(timeout, func() { 900 m.nodeLock.Lock() 901 state, ok := m.nodeMap[s.Node] 902 timeout := ok && state.State == stateSuspect && state.StateChange == changeTime 903 m.nodeLock.Unlock() 904 905 if timeout { 906 m.suspectTimeout(state) 907 } 908 }) 909} 910 911// suspectTimeout is invoked when a suspect timeout has occurred 912func (m *Memberlist) suspectTimeout(n *nodeState) { 913 // Construct a dead message 914 m.logger.Printf("[INFO] memberlist: Marking %s as failed, suspect timeout reached", n.Name) 915 d := dead{Incarnation: n.Incarnation, Node: n.Name, From: m.config.Name} 916 m.deadNode(&d) 917} 918 919// deadNode is invoked by the network layer when we get a message 920// about a dead node 921func (m *Memberlist) deadNode(d *dead) { 922 m.nodeLock.Lock() 923 defer m.nodeLock.Unlock() 924 state, ok := m.nodeMap[d.Node] 925 926 // If we've never heard about this node before, ignore it 927 if !ok { 928 return 929 } 930 931 // Ignore old incarnation numbers 932 if d.Incarnation < state.Incarnation { 933 return 934 } 935 936 // Ignore if node is already dead 937 if state.State == stateDead { 938 return 939 } 940 941 // Check if this is us 942 if state.Name == m.config.Name { 943 // If we are not leaving we need to refute 944 if !m.leave { 945 inc := m.nextIncarnation() 946 for d.Incarnation >= inc { 947 inc = m.nextIncarnation() 948 } 949 state.Incarnation = inc 950 951 a := alive{ 952 Incarnation: inc, 953 Node: state.Name, 954 Addr: state.Addr, 955 Port: state.Port, 956 Meta: state.Meta, 957 Vsn: []uint8{ 958 state.PMin, state.PMax, state.PCur, 959 state.DMin, state.DMax, state.DCur, 960 }, 961 } 962 m.encodeAndBroadcast(d.Node, aliveMsg, a) 963 m.logger.Printf("[WARN] memberlist: Refuting a dead message (from: %s)", d.From) 964 return // Do not mark ourself dead 965 } 966 967 // If we are leaving, we broadcast and wait 968 m.encodeBroadcastNotify(d.Node, deadMsg, d, m.leaveBroadcast) 969 } else { 970 m.encodeAndBroadcast(d.Node, deadMsg, d) 971 } 972 973 // Update metrics 974 metrics.IncrCounter([]string{"memberlist", "msg", "dead"}, 1) 975 976 // Update the state 977 state.Incarnation = d.Incarnation 978 state.State = stateDead 979 state.StateChange = time.Now() 980 981 // Notify of death 982 if m.config.Events != nil { 983 m.config.Events.NotifyLeave(&state.Node) 984 } 985} 986 987// mergeState is invoked by the network layer when we get a Push/Pull 988// state transfer 989func (m *Memberlist) mergeState(remote []pushNodeState) { 990 for _, r := range remote { 991 switch r.State { 992 case stateAlive: 993 a := alive{ 994 Incarnation: r.Incarnation, 995 Node: r.Name, 996 Addr: r.Addr, 997 Port: r.Port, 998 Meta: r.Meta, 999 Vsn: r.Vsn, 1000 } 1001 m.aliveNode(&a, nil, false) 1002 1003 case stateDead: 1004 // If the remote node belives a node is dead, we prefer to 1005 // suspect that node instead of declaring it dead instantly 1006 fallthrough 1007 case stateSuspect: 1008 s := suspect{Incarnation: r.Incarnation, Node: r.Name, From: m.config.Name} 1009 m.suspectNode(&s) 1010 } 1011 } 1012} 1013