1package memberlist 2 3import ( 4 "bytes" 5 "fmt" 6 "math" 7 "math/rand" 8 "net" 9 "sync/atomic" 10 "time" 11 12 "github.com/armon/go-metrics" 13) 14 15type nodeStateType int 16 17const ( 18 stateAlive nodeStateType = iota 19 stateSuspect 20 stateDead 21) 22 23// Node represents a node in the cluster. 24type Node struct { 25 Name string 26 Addr net.IP 27 Port uint16 28 Meta []byte // Metadata from the delegate for this node. 29 PMin uint8 // Minimum protocol version this understands 30 PMax uint8 // Maximum protocol version this understands 31 PCur uint8 // Current version node is speaking 32 DMin uint8 // Min protocol version for the delegate to understand 33 DMax uint8 // Max protocol version for the delegate to understand 34 DCur uint8 // Current version delegate is speaking 35} 36 37// Address returns the host:port form of a node's address, suitable for use 38// with a transport. 39func (n *Node) Address() string { 40 return joinHostPort(n.Addr.String(), n.Port) 41} 42 43// String returns the node name 44func (n *Node) String() string { 45 return n.Name 46} 47 48// NodeState is used to manage our state view of another node 49type nodeState struct { 50 Node 51 Incarnation uint32 // Last known incarnation number 52 State nodeStateType // Current state 53 StateChange time.Time // Time last state change happened 54} 55 56// Address returns the host:port form of a node's address, suitable for use 57// with a transport. 58func (n *nodeState) Address() string { 59 return n.Node.Address() 60} 61 62// ackHandler is used to register handlers for incoming acks and nacks. 63type ackHandler struct { 64 ackFn func([]byte, time.Time) 65 nackFn func() 66 timer *time.Timer 67} 68 69// NoPingResponseError is used to indicate a 'ping' packet was 70// successfully issued but no response was received 71type NoPingResponseError struct { 72 node string 73} 74 75func (f NoPingResponseError) Error() string { 76 return fmt.Sprintf("No response from node %s", f.node) 77} 78 79// Schedule is used to ensure the Tick is performed periodically. This 80// function is safe to call multiple times. If the memberlist is already 81// scheduled, then it won't do anything. 82func (m *Memberlist) schedule() { 83 m.tickerLock.Lock() 84 defer m.tickerLock.Unlock() 85 86 // If we already have tickers, then don't do anything, since we're 87 // scheduled 88 if len(m.tickers) > 0 { 89 return 90 } 91 92 // Create the stop tick channel, a blocking channel. We close this 93 // when we should stop the tickers. 94 stopCh := make(chan struct{}) 95 96 // Create a new probeTicker 97 if m.config.ProbeInterval > 0 { 98 t := time.NewTicker(m.config.ProbeInterval) 99 go m.triggerFunc(m.config.ProbeInterval, t.C, stopCh, m.probe) 100 m.tickers = append(m.tickers, t) 101 } 102 103 // Create a push pull ticker if needed 104 if m.config.PushPullInterval > 0 { 105 go m.pushPullTrigger(stopCh) 106 } 107 108 // Create a gossip ticker if needed 109 if m.config.GossipInterval > 0 && m.config.GossipNodes > 0 { 110 t := time.NewTicker(m.config.GossipInterval) 111 go m.triggerFunc(m.config.GossipInterval, t.C, stopCh, m.gossip) 112 m.tickers = append(m.tickers, t) 113 } 114 115 // If we made any tickers, then record the stopTick channel for 116 // later. 117 if len(m.tickers) > 0 { 118 m.stopTick = stopCh 119 } 120} 121 122// triggerFunc is used to trigger a function call each time a 123// message is received until a stop tick arrives. 124func (m *Memberlist) triggerFunc(stagger time.Duration, C <-chan time.Time, stop <-chan struct{}, f func()) { 125 // Use a random stagger to avoid syncronizing 126 randStagger := time.Duration(uint64(rand.Int63()) % uint64(stagger)) 127 select { 128 case <-time.After(randStagger): 129 case <-stop: 130 return 131 } 132 for { 133 select { 134 case <-C: 135 f() 136 case <-stop: 137 return 138 } 139 } 140} 141 142// pushPullTrigger is used to periodically trigger a push/pull until 143// a stop tick arrives. We don't use triggerFunc since the push/pull 144// timer is dynamically scaled based on cluster size to avoid network 145// saturation 146func (m *Memberlist) pushPullTrigger(stop <-chan struct{}) { 147 interval := m.config.PushPullInterval 148 149 // Use a random stagger to avoid syncronizing 150 randStagger := time.Duration(uint64(rand.Int63()) % uint64(interval)) 151 select { 152 case <-time.After(randStagger): 153 case <-stop: 154 return 155 } 156 157 // Tick using a dynamic timer 158 for { 159 tickTime := pushPullScale(interval, m.estNumNodes()) 160 select { 161 case <-time.After(tickTime): 162 m.pushPull() 163 case <-stop: 164 return 165 } 166 } 167} 168 169// Deschedule is used to stop the background maintenance. This is safe 170// to call multiple times. 171func (m *Memberlist) deschedule() { 172 m.tickerLock.Lock() 173 defer m.tickerLock.Unlock() 174 175 // If we have no tickers, then we aren't scheduled. 176 if len(m.tickers) == 0 { 177 return 178 } 179 180 // Close the stop channel so all the ticker listeners stop. 181 close(m.stopTick) 182 183 // Explicitly stop all the tickers themselves so they don't take 184 // up any more resources, and get rid of the list. 185 for _, t := range m.tickers { 186 t.Stop() 187 } 188 m.tickers = nil 189} 190 191// Tick is used to perform a single round of failure detection and gossip 192func (m *Memberlist) probe() { 193 // Track the number of indexes we've considered probing 194 numCheck := 0 195START: 196 m.nodeLock.RLock() 197 198 // Make sure we don't wrap around infinitely 199 if numCheck >= len(m.nodes) { 200 m.nodeLock.RUnlock() 201 return 202 } 203 204 // Handle the wrap around case 205 if m.probeIndex >= len(m.nodes) { 206 m.nodeLock.RUnlock() 207 m.resetNodes() 208 m.probeIndex = 0 209 numCheck++ 210 goto START 211 } 212 213 // Determine if we should probe this node 214 skip := false 215 var node nodeState 216 217 node = *m.nodes[m.probeIndex] 218 if node.Name == m.config.Name { 219 skip = true 220 } else if node.State == stateDead { 221 skip = true 222 } 223 224 // Potentially skip 225 m.nodeLock.RUnlock() 226 m.probeIndex++ 227 if skip { 228 numCheck++ 229 goto START 230 } 231 232 // Probe the specific node 233 m.probeNode(&node) 234} 235 236// probeNode handles a single round of failure checking on a node. 237func (m *Memberlist) probeNode(node *nodeState) { 238 defer metrics.MeasureSince([]string{"memberlist", "probeNode"}, time.Now()) 239 240 // We use our health awareness to scale the overall probe interval, so we 241 // slow down if we detect problems. The ticker that calls us can handle 242 // us running over the base interval, and will skip missed ticks. 243 probeInterval := m.awareness.ScaleTimeout(m.config.ProbeInterval) 244 if probeInterval > m.config.ProbeInterval { 245 metrics.IncrCounter([]string{"memberlist", "degraded", "probe"}, 1) 246 } 247 248 // Prepare a ping message and setup an ack handler. 249 ping := ping{SeqNo: m.nextSeqNo(), Node: node.Name} 250 ackCh := make(chan ackMessage, m.config.IndirectChecks+1) 251 nackCh := make(chan struct{}, m.config.IndirectChecks+1) 252 m.setProbeChannels(ping.SeqNo, ackCh, nackCh, probeInterval) 253 254 // Mark the sent time here, which should be after any pre-processing but 255 // before system calls to do the actual send. This probably over-reports 256 // a bit, but it's the best we can do. We had originally put this right 257 // after the I/O, but that would sometimes give negative RTT measurements 258 // which was not desirable. 259 sent := time.Now() 260 261 // Send a ping to the node. If this node looks like it's suspect or dead, 262 // also tack on a suspect message so that it has a chance to refute as 263 // soon as possible. 264 deadline := sent.Add(probeInterval) 265 addr := node.Address() 266 if node.State == stateAlive { 267 if err := m.encodeAndSendMsg(addr, pingMsg, &ping); err != nil { 268 m.logger.Printf("[ERR] memberlist: Failed to send ping: %s", err) 269 return 270 } 271 } else { 272 var msgs [][]byte 273 if buf, err := encode(pingMsg, &ping); err != nil { 274 m.logger.Printf("[ERR] memberlist: Failed to encode ping message: %s", err) 275 return 276 } else { 277 msgs = append(msgs, buf.Bytes()) 278 } 279 s := suspect{Incarnation: node.Incarnation, Node: node.Name, From: m.config.Name} 280 if buf, err := encode(suspectMsg, &s); err != nil { 281 m.logger.Printf("[ERR] memberlist: Failed to encode suspect message: %s", err) 282 return 283 } else { 284 msgs = append(msgs, buf.Bytes()) 285 } 286 287 compound := makeCompoundMessage(msgs) 288 if err := m.rawSendMsgPacket(addr, &node.Node, compound.Bytes()); err != nil { 289 m.logger.Printf("[ERR] memberlist: Failed to send compound ping and suspect message to %s: %s", addr, err) 290 return 291 } 292 } 293 294 // Arrange for our self-awareness to get updated. At this point we've 295 // sent the ping, so any return statement means the probe succeeded 296 // which will improve our health until we get to the failure scenarios 297 // at the end of this function, which will alter this delta variable 298 // accordingly. 299 awarenessDelta := -1 300 defer func() { 301 m.awareness.ApplyDelta(awarenessDelta) 302 }() 303 304 // Wait for response or round-trip-time. 305 select { 306 case v := <-ackCh: 307 if v.Complete == true { 308 if m.config.Ping != nil { 309 rtt := v.Timestamp.Sub(sent) 310 m.config.Ping.NotifyPingComplete(&node.Node, rtt, v.Payload) 311 } 312 return 313 } 314 315 // As an edge case, if we get a timeout, we need to re-enqueue it 316 // here to break out of the select below. 317 if v.Complete == false { 318 ackCh <- v 319 } 320 case <-time.After(m.config.ProbeTimeout): 321 // Note that we don't scale this timeout based on awareness and 322 // the health score. That's because we don't really expect waiting 323 // longer to help get UDP through. Since health does extend the 324 // probe interval it will give the TCP fallback more time, which 325 // is more active in dealing with lost packets, and it gives more 326 // time to wait for indirect acks/nacks. 327 m.logger.Printf("[DEBUG] memberlist: Failed ping: %v (timeout reached)", node.Name) 328 } 329 330 // Get some random live nodes. 331 m.nodeLock.RLock() 332 kNodes := kRandomNodes(m.config.IndirectChecks, m.nodes, func(n *nodeState) bool { 333 return n.Name == m.config.Name || 334 n.Name == node.Name || 335 n.State != stateAlive 336 }) 337 m.nodeLock.RUnlock() 338 339 // Attempt an indirect ping. 340 expectedNacks := 0 341 ind := indirectPingReq{SeqNo: ping.SeqNo, Target: node.Addr, Port: node.Port, Node: node.Name} 342 for _, peer := range kNodes { 343 // We only expect nack to be sent from peers who understand 344 // version 4 of the protocol. 345 if ind.Nack = peer.PMax >= 4; ind.Nack { 346 expectedNacks++ 347 } 348 349 if err := m.encodeAndSendMsg(peer.Address(), indirectPingMsg, &ind); err != nil { 350 m.logger.Printf("[ERR] memberlist: Failed to send indirect ping: %s", err) 351 } 352 } 353 354 // Also make an attempt to contact the node directly over TCP. This 355 // helps prevent confused clients who get isolated from UDP traffic 356 // but can still speak TCP (which also means they can possibly report 357 // misinformation to other nodes via anti-entropy), avoiding flapping in 358 // the cluster. 359 // 360 // This is a little unusual because we will attempt a TCP ping to any 361 // member who understands version 3 of the protocol, regardless of 362 // which protocol version we are speaking. That's why we've included a 363 // config option to turn this off if desired. 364 fallbackCh := make(chan bool, 1) 365 if (!m.config.DisableTcpPings) && (node.PMax >= 3) { 366 go func() { 367 defer close(fallbackCh) 368 didContact, err := m.sendPingAndWaitForAck(node.Address(), ping, deadline) 369 if err != nil { 370 m.logger.Printf("[ERR] memberlist: Failed fallback ping: %s", err) 371 } else { 372 fallbackCh <- didContact 373 } 374 }() 375 } else { 376 close(fallbackCh) 377 } 378 379 // Wait for the acks or timeout. Note that we don't check the fallback 380 // channel here because we want to issue a warning below if that's the 381 // *only* way we hear back from the peer, so we have to let this time 382 // out first to allow the normal UDP-based acks to come in. 383 select { 384 case v := <-ackCh: 385 if v.Complete == true { 386 return 387 } 388 } 389 390 // Finally, poll the fallback channel. The timeouts are set such that 391 // the channel will have something or be closed without having to wait 392 // any additional time here. 393 for didContact := range fallbackCh { 394 if didContact { 395 m.logger.Printf("[WARN] memberlist: Was able to connect to %s but other probes failed, network may be misconfigured", node.Name) 396 return 397 } 398 } 399 400 // Update our self-awareness based on the results of this failed probe. 401 // If we don't have peers who will send nacks then we penalize for any 402 // failed probe as a simple health metric. If we do have peers to nack 403 // verify, then we can use that as a more sophisticated measure of self- 404 // health because we assume them to be working, and they can help us 405 // decide if the probed node was really dead or if it was something wrong 406 // with ourselves. 407 awarenessDelta = 0 408 if expectedNacks > 0 { 409 if nackCount := len(nackCh); nackCount < expectedNacks { 410 awarenessDelta += (expectedNacks - nackCount) 411 } 412 } else { 413 awarenessDelta += 1 414 } 415 416 // No acks received from target, suspect it as failed. 417 m.logger.Printf("[INFO] memberlist: Suspect %s has failed, no acks received", node.Name) 418 s := suspect{Incarnation: node.Incarnation, Node: node.Name, From: m.config.Name} 419 m.suspectNode(&s) 420} 421 422// Ping initiates a ping to the node with the specified name. 423func (m *Memberlist) Ping(node string, addr net.Addr) (time.Duration, error) { 424 // Prepare a ping message and setup an ack handler. 425 ping := ping{SeqNo: m.nextSeqNo(), Node: node} 426 ackCh := make(chan ackMessage, m.config.IndirectChecks+1) 427 m.setProbeChannels(ping.SeqNo, ackCh, nil, m.config.ProbeInterval) 428 429 // Send a ping to the node. 430 if err := m.encodeAndSendMsg(addr.String(), pingMsg, &ping); err != nil { 431 return 0, err 432 } 433 434 // Mark the sent time here, which should be after any pre-processing and 435 // system calls to do the actual send. This probably under-reports a bit, 436 // but it's the best we can do. 437 sent := time.Now() 438 439 // Wait for response or timeout. 440 select { 441 case v := <-ackCh: 442 if v.Complete == true { 443 return v.Timestamp.Sub(sent), nil 444 } 445 case <-time.After(m.config.ProbeTimeout): 446 // Timeout, return an error below. 447 } 448 449 m.logger.Printf("[DEBUG] memberlist: Failed UDP ping: %v (timeout reached)", node) 450 return 0, NoPingResponseError{ping.Node} 451} 452 453// resetNodes is used when the tick wraps around. It will reap the 454// dead nodes and shuffle the node list. 455func (m *Memberlist) resetNodes() { 456 m.nodeLock.Lock() 457 defer m.nodeLock.Unlock() 458 459 // Move dead nodes, but respect gossip to the dead interval 460 deadIdx := moveDeadNodes(m.nodes, m.config.GossipToTheDeadTime) 461 462 // Deregister the dead nodes 463 for i := deadIdx; i < len(m.nodes); i++ { 464 delete(m.nodeMap, m.nodes[i].Name) 465 m.nodes[i] = nil 466 } 467 468 // Trim the nodes to exclude the dead nodes 469 m.nodes = m.nodes[0:deadIdx] 470 471 // Update numNodes after we've trimmed the dead nodes 472 atomic.StoreUint32(&m.numNodes, uint32(deadIdx)) 473 474 // Shuffle live nodes 475 shuffleNodes(m.nodes) 476} 477 478// gossip is invoked every GossipInterval period to broadcast our gossip 479// messages to a few random nodes. 480func (m *Memberlist) gossip() { 481 defer metrics.MeasureSince([]string{"memberlist", "gossip"}, time.Now()) 482 483 // Get some random live, suspect, or recently dead nodes 484 m.nodeLock.RLock() 485 kNodes := kRandomNodes(m.config.GossipNodes, m.nodes, func(n *nodeState) bool { 486 if n.Name == m.config.Name { 487 return true 488 } 489 490 switch n.State { 491 case stateAlive, stateSuspect: 492 return false 493 494 case stateDead: 495 return time.Since(n.StateChange) > m.config.GossipToTheDeadTime 496 497 default: 498 return true 499 } 500 }) 501 m.nodeLock.RUnlock() 502 503 // Compute the bytes available 504 bytesAvail := m.config.UDPBufferSize - compoundHeaderOverhead 505 if m.config.EncryptionEnabled() { 506 bytesAvail -= encryptOverhead(m.encryptionVersion()) 507 } 508 509 for _, node := range kNodes { 510 // Get any pending broadcasts 511 msgs := m.getBroadcasts(compoundOverhead, bytesAvail) 512 if len(msgs) == 0 { 513 return 514 } 515 516 addr := node.Address() 517 if len(msgs) == 1 { 518 // Send single message as is 519 if err := m.rawSendMsgPacket(addr, &node.Node, msgs[0]); err != nil { 520 m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", addr, err) 521 } 522 } else { 523 // Otherwise create and send a compound message 524 compound := makeCompoundMessage(msgs) 525 if err := m.rawSendMsgPacket(addr, &node.Node, compound.Bytes()); err != nil { 526 m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", addr, err) 527 } 528 } 529 } 530} 531 532// pushPull is invoked periodically to randomly perform a complete state 533// exchange. Used to ensure a high level of convergence, but is also 534// reasonably expensive as the entire state of this node is exchanged 535// with the other node. 536func (m *Memberlist) pushPull() { 537 // Get a random live node 538 m.nodeLock.RLock() 539 nodes := kRandomNodes(1, m.nodes, func(n *nodeState) bool { 540 return n.Name == m.config.Name || 541 n.State != stateAlive 542 }) 543 m.nodeLock.RUnlock() 544 545 // If no nodes, bail 546 if len(nodes) == 0 { 547 return 548 } 549 node := nodes[0] 550 551 // Attempt a push pull 552 if err := m.pushPullNode(node.Address(), false); err != nil { 553 m.logger.Printf("[ERR] memberlist: Push/Pull with %s failed: %s", node.Name, err) 554 } 555} 556 557// pushPullNode does a complete state exchange with a specific node. 558func (m *Memberlist) pushPullNode(addr string, join bool) error { 559 defer metrics.MeasureSince([]string{"memberlist", "pushPullNode"}, time.Now()) 560 561 // Attempt to send and receive with the node 562 remote, userState, err := m.sendAndReceiveState(addr, join) 563 if err != nil { 564 return err 565 } 566 567 if err := m.mergeRemoteState(join, remote, userState); err != nil { 568 return err 569 } 570 return nil 571} 572 573// verifyProtocol verifies that all the remote nodes can speak with our 574// nodes and vice versa on both the core protocol as well as the 575// delegate protocol level. 576// 577// The verification works by finding the maximum minimum and 578// minimum maximum understood protocol and delegate versions. In other words, 579// it finds the common denominator of protocol and delegate version ranges 580// for the entire cluster. 581// 582// After this, it goes through the entire cluster (local and remote) and 583// verifies that everyone's speaking protocol versions satisfy this range. 584// If this passes, it means that every node can understand each other. 585func (m *Memberlist) verifyProtocol(remote []pushNodeState) error { 586 m.nodeLock.RLock() 587 defer m.nodeLock.RUnlock() 588 589 // Maximum minimum understood and minimum maximum understood for both 590 // the protocol and delegate versions. We use this to verify everyone 591 // can be understood. 592 var maxpmin, minpmax uint8 593 var maxdmin, mindmax uint8 594 minpmax = math.MaxUint8 595 mindmax = math.MaxUint8 596 597 for _, rn := range remote { 598 // If the node isn't alive, then skip it 599 if rn.State != stateAlive { 600 continue 601 } 602 603 // Skip nodes that don't have versions set, it just means 604 // their version is zero. 605 if len(rn.Vsn) == 0 { 606 continue 607 } 608 609 if rn.Vsn[0] > maxpmin { 610 maxpmin = rn.Vsn[0] 611 } 612 613 if rn.Vsn[1] < minpmax { 614 minpmax = rn.Vsn[1] 615 } 616 617 if rn.Vsn[3] > maxdmin { 618 maxdmin = rn.Vsn[3] 619 } 620 621 if rn.Vsn[4] < mindmax { 622 mindmax = rn.Vsn[4] 623 } 624 } 625 626 for _, n := range m.nodes { 627 // Ignore non-alive nodes 628 if n.State != stateAlive { 629 continue 630 } 631 632 if n.PMin > maxpmin { 633 maxpmin = n.PMin 634 } 635 636 if n.PMax < minpmax { 637 minpmax = n.PMax 638 } 639 640 if n.DMin > maxdmin { 641 maxdmin = n.DMin 642 } 643 644 if n.DMax < mindmax { 645 mindmax = n.DMax 646 } 647 } 648 649 // Now that we definitively know the minimum and maximum understood 650 // version that satisfies the whole cluster, we verify that every 651 // node in the cluster satisifies this. 652 for _, n := range remote { 653 var nPCur, nDCur uint8 654 if len(n.Vsn) > 0 { 655 nPCur = n.Vsn[2] 656 nDCur = n.Vsn[5] 657 } 658 659 if nPCur < maxpmin || nPCur > minpmax { 660 return fmt.Errorf( 661 "Node '%s' protocol version (%d) is incompatible: [%d, %d]", 662 n.Name, nPCur, maxpmin, minpmax) 663 } 664 665 if nDCur < maxdmin || nDCur > mindmax { 666 return fmt.Errorf( 667 "Node '%s' delegate protocol version (%d) is incompatible: [%d, %d]", 668 n.Name, nDCur, maxdmin, mindmax) 669 } 670 } 671 672 for _, n := range m.nodes { 673 nPCur := n.PCur 674 nDCur := n.DCur 675 676 if nPCur < maxpmin || nPCur > minpmax { 677 return fmt.Errorf( 678 "Node '%s' protocol version (%d) is incompatible: [%d, %d]", 679 n.Name, nPCur, maxpmin, minpmax) 680 } 681 682 if nDCur < maxdmin || nDCur > mindmax { 683 return fmt.Errorf( 684 "Node '%s' delegate protocol version (%d) is incompatible: [%d, %d]", 685 n.Name, nDCur, maxdmin, mindmax) 686 } 687 } 688 689 return nil 690} 691 692// nextSeqNo returns a usable sequence number in a thread safe way 693func (m *Memberlist) nextSeqNo() uint32 { 694 return atomic.AddUint32(&m.sequenceNum, 1) 695} 696 697// nextIncarnation returns the next incarnation number in a thread safe way 698func (m *Memberlist) nextIncarnation() uint32 { 699 return atomic.AddUint32(&m.incarnation, 1) 700} 701 702// skipIncarnation adds the positive offset to the incarnation number. 703func (m *Memberlist) skipIncarnation(offset uint32) uint32 { 704 return atomic.AddUint32(&m.incarnation, offset) 705} 706 707// estNumNodes is used to get the current estimate of the number of nodes 708func (m *Memberlist) estNumNodes() int { 709 return int(atomic.LoadUint32(&m.numNodes)) 710} 711 712type ackMessage struct { 713 Complete bool 714 Payload []byte 715 Timestamp time.Time 716} 717 718// setProbeChannels is used to attach the ackCh to receive a message when an ack 719// with a given sequence number is received. The `complete` field of the message 720// will be false on timeout. Any nack messages will cause an empty struct to be 721// passed to the nackCh, which can be nil if not needed. 722func (m *Memberlist) setProbeChannels(seqNo uint32, ackCh chan ackMessage, nackCh chan struct{}, timeout time.Duration) { 723 // Create handler functions for acks and nacks 724 ackFn := func(payload []byte, timestamp time.Time) { 725 select { 726 case ackCh <- ackMessage{true, payload, timestamp}: 727 default: 728 } 729 } 730 nackFn := func() { 731 select { 732 case nackCh <- struct{}{}: 733 default: 734 } 735 } 736 737 // Add the handlers 738 ah := &ackHandler{ackFn, nackFn, nil} 739 m.ackLock.Lock() 740 m.ackHandlers[seqNo] = ah 741 m.ackLock.Unlock() 742 743 // Setup a reaping routing 744 ah.timer = time.AfterFunc(timeout, func() { 745 m.ackLock.Lock() 746 delete(m.ackHandlers, seqNo) 747 m.ackLock.Unlock() 748 select { 749 case ackCh <- ackMessage{false, nil, time.Now()}: 750 default: 751 } 752 }) 753} 754 755// setAckHandler is used to attach a handler to be invoked when an ack with a 756// given sequence number is received. If a timeout is reached, the handler is 757// deleted. This is used for indirect pings so does not configure a function 758// for nacks. 759func (m *Memberlist) setAckHandler(seqNo uint32, ackFn func([]byte, time.Time), timeout time.Duration) { 760 // Add the handler 761 ah := &ackHandler{ackFn, nil, nil} 762 m.ackLock.Lock() 763 m.ackHandlers[seqNo] = ah 764 m.ackLock.Unlock() 765 766 // Setup a reaping routing 767 ah.timer = time.AfterFunc(timeout, func() { 768 m.ackLock.Lock() 769 delete(m.ackHandlers, seqNo) 770 m.ackLock.Unlock() 771 }) 772} 773 774// Invokes an ack handler if any is associated, and reaps the handler immediately 775func (m *Memberlist) invokeAckHandler(ack ackResp, timestamp time.Time) { 776 m.ackLock.Lock() 777 ah, ok := m.ackHandlers[ack.SeqNo] 778 delete(m.ackHandlers, ack.SeqNo) 779 m.ackLock.Unlock() 780 if !ok { 781 return 782 } 783 ah.timer.Stop() 784 ah.ackFn(ack.Payload, timestamp) 785} 786 787// Invokes nack handler if any is associated. 788func (m *Memberlist) invokeNackHandler(nack nackResp) { 789 m.ackLock.Lock() 790 ah, ok := m.ackHandlers[nack.SeqNo] 791 m.ackLock.Unlock() 792 if !ok || ah.nackFn == nil { 793 return 794 } 795 ah.nackFn() 796} 797 798// refute gossips an alive message in response to incoming information that we 799// are suspect or dead. It will make sure the incarnation number beats the given 800// accusedInc value, or you can supply 0 to just get the next incarnation number. 801// This alters the node state that's passed in so this MUST be called while the 802// nodeLock is held. 803func (m *Memberlist) refute(me *nodeState, accusedInc uint32) { 804 // Make sure the incarnation number beats the accusation. 805 inc := m.nextIncarnation() 806 if accusedInc >= inc { 807 inc = m.skipIncarnation(accusedInc - inc + 1) 808 } 809 me.Incarnation = inc 810 811 // Decrease our health because we are being asked to refute a problem. 812 m.awareness.ApplyDelta(1) 813 814 // Format and broadcast an alive message. 815 a := alive{ 816 Incarnation: inc, 817 Node: me.Name, 818 Addr: me.Addr, 819 Port: me.Port, 820 Meta: me.Meta, 821 Vsn: []uint8{ 822 me.PMin, me.PMax, me.PCur, 823 me.DMin, me.DMax, me.DCur, 824 }, 825 } 826 m.encodeAndBroadcast(me.Addr.String(), aliveMsg, a) 827} 828 829// aliveNode is invoked by the network layer when we get a message about a 830// live node. 831func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) { 832 m.nodeLock.Lock() 833 defer m.nodeLock.Unlock() 834 state, ok := m.nodeMap[a.Node] 835 836 // It is possible that during a Leave(), there is already an aliveMsg 837 // in-queue to be processed but blocked by the locks above. If we let 838 // that aliveMsg process, it'll cause us to re-join the cluster. This 839 // ensures that we don't. 840 if m.hasLeft() && a.Node == m.config.Name { 841 return 842 } 843 844 // Invoke the Alive delegate if any. This can be used to filter out 845 // alive messages based on custom logic. For example, using a cluster name. 846 // Using a merge delegate is not enough, as it is possible for passive 847 // cluster merging to still occur. 848 if m.config.Alive != nil { 849 node := &Node{ 850 Name: a.Node, 851 Addr: a.Addr, 852 Port: a.Port, 853 Meta: a.Meta, 854 PMin: a.Vsn[0], 855 PMax: a.Vsn[1], 856 PCur: a.Vsn[2], 857 DMin: a.Vsn[3], 858 DMax: a.Vsn[4], 859 DCur: a.Vsn[5], 860 } 861 if err := m.config.Alive.NotifyAlive(node); err != nil { 862 m.logger.Printf("[WARN] memberlist: ignoring alive message for '%s': %s", 863 a.Node, err) 864 return 865 } 866 } 867 868 // Check if we've never seen this node before, and if not, then 869 // store this node in our node map. 870 if !ok { 871 state = &nodeState{ 872 Node: Node{ 873 Name: a.Node, 874 Addr: a.Addr, 875 Port: a.Port, 876 Meta: a.Meta, 877 }, 878 State: stateDead, 879 } 880 881 // Add to map 882 m.nodeMap[a.Node] = state 883 884 // Get a random offset. This is important to ensure 885 // the failure detection bound is low on average. If all 886 // nodes did an append, failure detection bound would be 887 // very high. 888 n := len(m.nodes) 889 offset := randomOffset(n) 890 891 // Add at the end and swap with the node at the offset 892 m.nodes = append(m.nodes, state) 893 m.nodes[offset], m.nodes[n] = m.nodes[n], m.nodes[offset] 894 895 // Update numNodes after we've added a new node 896 atomic.AddUint32(&m.numNodes, 1) 897 } 898 899 // Check if this address is different than the existing node 900 if !bytes.Equal([]byte(state.Addr), a.Addr) || state.Port != a.Port { 901 m.logger.Printf("[ERR] memberlist: Conflicting address for %s. Mine: %v:%d Theirs: %v:%d", 902 state.Name, state.Addr, state.Port, net.IP(a.Addr), a.Port) 903 904 // Inform the conflict delegate if provided 905 if m.config.Conflict != nil { 906 other := Node{ 907 Name: a.Node, 908 Addr: a.Addr, 909 Port: a.Port, 910 Meta: a.Meta, 911 } 912 m.config.Conflict.NotifyConflict(&state.Node, &other) 913 } 914 return 915 } 916 917 // Bail if the incarnation number is older, and this is not about us 918 isLocalNode := state.Name == m.config.Name 919 if a.Incarnation <= state.Incarnation && !isLocalNode { 920 return 921 } 922 923 // Bail if strictly less and this is about us 924 if a.Incarnation < state.Incarnation && isLocalNode { 925 return 926 } 927 928 // Clear out any suspicion timer that may be in effect. 929 delete(m.nodeTimers, a.Node) 930 931 // Store the old state and meta data 932 oldState := state.State 933 oldMeta := state.Meta 934 935 // If this is us we need to refute, otherwise re-broadcast 936 if !bootstrap && isLocalNode { 937 // Compute the version vector 938 versions := []uint8{ 939 state.PMin, state.PMax, state.PCur, 940 state.DMin, state.DMax, state.DCur, 941 } 942 943 // If the Incarnation is the same, we need special handling, since it 944 // possible for the following situation to happen: 945 // 1) Start with configuration C, join cluster 946 // 2) Hard fail / Kill / Shutdown 947 // 3) Restart with configuration C', join cluster 948 // 949 // In this case, other nodes and the local node see the same incarnation, 950 // but the values may not be the same. For this reason, we always 951 // need to do an equality check for this Incarnation. In most cases, 952 // we just ignore, but we may need to refute. 953 // 954 if a.Incarnation == state.Incarnation && 955 bytes.Equal(a.Meta, state.Meta) && 956 bytes.Equal(a.Vsn, versions) { 957 return 958 } 959 960 m.refute(state, a.Incarnation) 961 m.logger.Printf("[WARN] memberlist: Refuting an alive message") 962 } else { 963 m.encodeBroadcastNotify(a.Node, aliveMsg, a, notify) 964 965 // Update protocol versions if it arrived 966 if len(a.Vsn) > 0 { 967 state.PMin = a.Vsn[0] 968 state.PMax = a.Vsn[1] 969 state.PCur = a.Vsn[2] 970 state.DMin = a.Vsn[3] 971 state.DMax = a.Vsn[4] 972 state.DCur = a.Vsn[5] 973 } 974 975 // Update the state and incarnation number 976 state.Incarnation = a.Incarnation 977 state.Meta = a.Meta 978 if state.State != stateAlive { 979 state.State = stateAlive 980 state.StateChange = time.Now() 981 } 982 } 983 984 // Update metrics 985 metrics.IncrCounter([]string{"memberlist", "msg", "alive"}, 1) 986 987 // Notify the delegate of any relevant updates 988 if m.config.Events != nil { 989 if oldState == stateDead { 990 // if Dead -> Alive, notify of join 991 m.config.Events.NotifyJoin(&state.Node) 992 993 } else if !bytes.Equal(oldMeta, state.Meta) { 994 // if Meta changed, trigger an update notification 995 m.config.Events.NotifyUpdate(&state.Node) 996 } 997 } 998} 999 1000// suspectNode is invoked by the network layer when we get a message 1001// about a suspect node 1002func (m *Memberlist) suspectNode(s *suspect) { 1003 m.nodeLock.Lock() 1004 defer m.nodeLock.Unlock() 1005 state, ok := m.nodeMap[s.Node] 1006 1007 // If we've never heard about this node before, ignore it 1008 if !ok { 1009 return 1010 } 1011 1012 // Ignore old incarnation numbers 1013 if s.Incarnation < state.Incarnation { 1014 return 1015 } 1016 1017 // See if there's a suspicion timer we can confirm. If the info is new 1018 // to us we will go ahead and re-gossip it. This allows for multiple 1019 // independent confirmations to flow even when a node probes a node 1020 // that's already suspect. 1021 if timer, ok := m.nodeTimers[s.Node]; ok { 1022 if timer.Confirm(s.From) { 1023 m.encodeAndBroadcast(s.Node, suspectMsg, s) 1024 } 1025 return 1026 } 1027 1028 // Ignore non-alive nodes 1029 if state.State != stateAlive { 1030 return 1031 } 1032 1033 // If this is us we need to refute, otherwise re-broadcast 1034 if state.Name == m.config.Name { 1035 m.refute(state, s.Incarnation) 1036 m.logger.Printf("[WARN] memberlist: Refuting a suspect message (from: %s)", s.From) 1037 return // Do not mark ourself suspect 1038 } else { 1039 m.encodeAndBroadcast(s.Node, suspectMsg, s) 1040 } 1041 1042 // Update metrics 1043 metrics.IncrCounter([]string{"memberlist", "msg", "suspect"}, 1) 1044 1045 // Update the state 1046 state.Incarnation = s.Incarnation 1047 state.State = stateSuspect 1048 changeTime := time.Now() 1049 state.StateChange = changeTime 1050 1051 // Setup a suspicion timer. Given that we don't have any known phase 1052 // relationship with our peers, we set up k such that we hit the nominal 1053 // timeout two probe intervals short of what we expect given the suspicion 1054 // multiplier. 1055 k := m.config.SuspicionMult - 2 1056 1057 // If there aren't enough nodes to give the expected confirmations, just 1058 // set k to 0 to say that we don't expect any. Note we subtract 2 from n 1059 // here to take out ourselves and the node being probed. 1060 n := m.estNumNodes() 1061 if n-2 < k { 1062 k = 0 1063 } 1064 1065 // Compute the timeouts based on the size of the cluster. 1066 min := suspicionTimeout(m.config.SuspicionMult, n, m.config.ProbeInterval) 1067 max := time.Duration(m.config.SuspicionMaxTimeoutMult) * min 1068 fn := func(numConfirmations int) { 1069 m.nodeLock.Lock() 1070 state, ok := m.nodeMap[s.Node] 1071 timeout := ok && state.State == stateSuspect && state.StateChange == changeTime 1072 m.nodeLock.Unlock() 1073 1074 if timeout { 1075 if k > 0 && numConfirmations < k { 1076 metrics.IncrCounter([]string{"memberlist", "degraded", "timeout"}, 1) 1077 } 1078 1079 m.logger.Printf("[INFO] memberlist: Marking %s as failed, suspect timeout reached (%d peer confirmations)", 1080 state.Name, numConfirmations) 1081 d := dead{Incarnation: state.Incarnation, Node: state.Name, From: m.config.Name} 1082 m.deadNode(&d) 1083 } 1084 } 1085 m.nodeTimers[s.Node] = newSuspicion(s.From, k, min, max, fn) 1086} 1087 1088// deadNode is invoked by the network layer when we get a message 1089// about a dead node 1090func (m *Memberlist) deadNode(d *dead) { 1091 m.nodeLock.Lock() 1092 defer m.nodeLock.Unlock() 1093 state, ok := m.nodeMap[d.Node] 1094 1095 // If we've never heard about this node before, ignore it 1096 if !ok { 1097 return 1098 } 1099 1100 // Ignore old incarnation numbers 1101 if d.Incarnation < state.Incarnation { 1102 return 1103 } 1104 1105 // Clear out any suspicion timer that may be in effect. 1106 delete(m.nodeTimers, d.Node) 1107 1108 // Ignore if node is already dead 1109 if state.State == stateDead { 1110 return 1111 } 1112 1113 // Check if this is us 1114 if state.Name == m.config.Name { 1115 // If we are not leaving we need to refute 1116 if !m.hasLeft() { 1117 m.refute(state, d.Incarnation) 1118 m.logger.Printf("[WARN] memberlist: Refuting a dead message (from: %s)", d.From) 1119 return // Do not mark ourself dead 1120 } 1121 1122 // If we are leaving, we broadcast and wait 1123 m.encodeBroadcastNotify(d.Node, deadMsg, d, m.leaveBroadcast) 1124 } else { 1125 m.encodeAndBroadcast(d.Node, deadMsg, d) 1126 } 1127 1128 // Update metrics 1129 metrics.IncrCounter([]string{"memberlist", "msg", "dead"}, 1) 1130 1131 // Update the state 1132 state.Incarnation = d.Incarnation 1133 state.State = stateDead 1134 state.StateChange = time.Now() 1135 1136 // Notify of death 1137 if m.config.Events != nil { 1138 m.config.Events.NotifyLeave(&state.Node) 1139 } 1140} 1141 1142// mergeState is invoked by the network layer when we get a Push/Pull 1143// state transfer 1144func (m *Memberlist) mergeState(remote []pushNodeState) { 1145 for _, r := range remote { 1146 switch r.State { 1147 case stateAlive: 1148 a := alive{ 1149 Incarnation: r.Incarnation, 1150 Node: r.Name, 1151 Addr: r.Addr, 1152 Port: r.Port, 1153 Meta: r.Meta, 1154 Vsn: r.Vsn, 1155 } 1156 m.aliveNode(&a, nil, false) 1157 1158 case stateDead: 1159 // If the remote node believes a node is dead, we prefer to 1160 // suspect that node instead of declaring it dead instantly 1161 fallthrough 1162 case stateSuspect: 1163 s := suspect{Incarnation: r.Incarnation, Node: r.Name, From: m.config.Name} 1164 m.suspectNode(&s) 1165 } 1166 } 1167} 1168