1package memberlist 2 3import ( 4 "bufio" 5 "bytes" 6 "encoding/binary" 7 "fmt" 8 "hash/crc32" 9 "io" 10 "net" 11 "sync/atomic" 12 "time" 13 14 metrics "github.com/armon/go-metrics" 15 "github.com/hashicorp/go-msgpack/codec" 16) 17 18// This is the minimum and maximum protocol version that we can 19// _understand_. We're allowed to speak at any version within this 20// range. This range is inclusive. 21const ( 22 ProtocolVersionMin uint8 = 1 23 24 // Version 3 added support for TCP pings but we kept the default 25 // protocol version at 2 to ease transition to this new feature. 26 // A memberlist speaking version 2 of the protocol will attempt 27 // to TCP ping another memberlist who understands version 3 or 28 // greater. 29 // 30 // Version 4 added support for nacks as part of indirect probes. 31 // A memberlist speaking version 2 of the protocol will expect 32 // nacks from another memberlist who understands version 4 or 33 // greater, and likewise nacks will be sent to memberlists who 34 // understand version 4 or greater. 35 ProtocolVersion2Compatible = 2 36 37 ProtocolVersionMax = 5 38) 39 40// messageType is an integer ID of a type of message that can be received 41// on network channels from other members. 42type messageType uint8 43 44// The list of available message types. 45// 46// WARNING: ONLY APPEND TO THIS LIST! The numeric values are part of the 47// protocol itself. 48const ( 49 pingMsg messageType = iota 50 indirectPingMsg 51 ackRespMsg 52 suspectMsg 53 aliveMsg 54 deadMsg 55 pushPullMsg 56 compoundMsg 57 userMsg // User mesg, not handled by us 58 compressMsg 59 encryptMsg 60 nackRespMsg 61 hasCrcMsg 62 errMsg 63) 64 65const ( 66 // hasLabelMsg has a deliberately high value so that you can disambiguate 67 // it from the encryptionVersion header which is either 0/1 right now and 68 // also any of the existing messageTypes 69 hasLabelMsg messageType = 244 70) 71 72// compressionType is used to specify the compression algorithm 73type compressionType uint8 74 75const ( 76 lzwAlgo compressionType = iota 77) 78 79const ( 80 MetaMaxSize = 512 // Maximum size for node meta data 81 compoundHeaderOverhead = 2 // Assumed header overhead 82 compoundOverhead = 2 // Assumed overhead per entry in compoundHeader 83 userMsgOverhead = 1 84 blockingWarning = 10 * time.Millisecond // Warn if a UDP packet takes this long to process 85 maxPushStateBytes = 20 * 1024 * 1024 86 maxPushPullRequests = 128 // Maximum number of concurrent push/pull requests 87) 88 89// ping request sent directly to node 90type ping struct { 91 SeqNo uint32 92 93 // Node is sent so the target can verify they are 94 // the intended recipient. This is to protect again an agent 95 // restart with a new name. 96 Node string 97 98 SourceAddr []byte `codec:",omitempty"` // Source address, used for a direct reply 99 SourcePort uint16 `codec:",omitempty"` // Source port, used for a direct reply 100 SourceNode string `codec:",omitempty"` // Source name, used for a direct reply 101} 102 103// indirect ping sent to an indirect node 104type indirectPingReq struct { 105 SeqNo uint32 106 Target []byte 107 Port uint16 108 109 // Node is sent so the target can verify they are 110 // the intended recipient. This is to protect against an agent 111 // restart with a new name. 112 Node string 113 114 Nack bool // true if we'd like a nack back 115 116 SourceAddr []byte `codec:",omitempty"` // Source address, used for a direct reply 117 SourcePort uint16 `codec:",omitempty"` // Source port, used for a direct reply 118 SourceNode string `codec:",omitempty"` // Source name, used for a direct reply 119} 120 121// ack response is sent for a ping 122type ackResp struct { 123 SeqNo uint32 124 Payload []byte 125} 126 127// nack response is sent for an indirect ping when the pinger doesn't hear from 128// the ping-ee within the configured timeout. This lets the original node know 129// that the indirect ping attempt happened but didn't succeed. 130type nackResp struct { 131 SeqNo uint32 132} 133 134// err response is sent to relay the error from the remote end 135type errResp struct { 136 Error string 137} 138 139// suspect is broadcast when we suspect a node is dead 140type suspect struct { 141 Incarnation uint32 142 Node string 143 From string // Include who is suspecting 144} 145 146// alive is broadcast when we know a node is alive. 147// Overloaded for nodes joining 148type alive struct { 149 Incarnation uint32 150 Node string 151 Addr []byte 152 Port uint16 153 Meta []byte 154 155 // The versions of the protocol/delegate that are being spoken, order: 156 // pmin, pmax, pcur, dmin, dmax, dcur 157 Vsn []uint8 158} 159 160// dead is broadcast when we confirm a node is dead 161// Overloaded for nodes leaving 162type dead struct { 163 Incarnation uint32 164 Node string 165 From string // Include who is suspecting 166} 167 168// pushPullHeader is used to inform the 169// otherside how many states we are transferring 170type pushPullHeader struct { 171 Nodes int 172 UserStateLen int // Encodes the byte lengh of user state 173 Join bool // Is this a join request or a anti-entropy run 174} 175 176// userMsgHeader is used to encapsulate a userMsg 177type userMsgHeader struct { 178 UserMsgLen int // Encodes the byte lengh of user state 179} 180 181// pushNodeState is used for pushPullReq when we are 182// transferring out node states 183type pushNodeState struct { 184 Name string 185 Addr []byte 186 Port uint16 187 Meta []byte 188 Incarnation uint32 189 State NodeStateType 190 Vsn []uint8 // Protocol versions 191} 192 193// compress is used to wrap an underlying payload 194// using a specified compression algorithm 195type compress struct { 196 Algo compressionType 197 Buf []byte 198} 199 200// msgHandoff is used to transfer a message between goroutines 201type msgHandoff struct { 202 msgType messageType 203 buf []byte 204 from net.Addr 205} 206 207// encryptionVersion returns the encryption version to use 208func (m *Memberlist) encryptionVersion() encryptionVersion { 209 switch m.ProtocolVersion() { 210 case 1: 211 return 0 212 default: 213 return 1 214 } 215} 216 217// streamListen is a long running goroutine that pulls incoming streams from the 218// transport and hands them off for processing. 219func (m *Memberlist) streamListen() { 220 for { 221 select { 222 case conn := <-m.transport.StreamCh(): 223 go m.handleConn(conn) 224 225 case <-m.shutdownCh: 226 return 227 } 228 } 229} 230 231// handleConn handles a single incoming stream connection from the transport. 232func (m *Memberlist) handleConn(conn net.Conn) { 233 defer conn.Close() 234 m.logger.Printf("[DEBUG] memberlist: Stream connection %s", LogConn(conn)) 235 236 metrics.IncrCounter([]string{"memberlist", "tcp", "accept"}, 1) 237 238 conn.SetDeadline(time.Now().Add(m.config.TCPTimeout)) 239 240 var ( 241 streamLabel string 242 err error 243 ) 244 conn, streamLabel, err = RemoveLabelHeaderFromStream(conn) 245 if err != nil { 246 m.logger.Printf("[ERR] memberlist: failed to receive and remove the stream label header: %s %s", err, LogConn(conn)) 247 return 248 } 249 250 if m.config.SkipInboundLabelCheck { 251 if streamLabel != "" { 252 m.logger.Printf("[ERR] memberlist: unexpected double stream label header: %s", LogConn(conn)) 253 return 254 } 255 // Set this from config so that the auth data assertions work below. 256 streamLabel = m.config.Label 257 } 258 259 if m.config.Label != streamLabel { 260 m.logger.Printf("[ERR] memberlist: discarding stream with unacceptable label %q: %s", streamLabel, LogConn(conn)) 261 return 262 } 263 264 msgType, bufConn, dec, err := m.readStream(conn, streamLabel) 265 if err != nil { 266 if err != io.EOF { 267 m.logger.Printf("[ERR] memberlist: failed to receive: %s %s", err, LogConn(conn)) 268 269 resp := errResp{err.Error()} 270 out, err := encode(errMsg, &resp) 271 if err != nil { 272 m.logger.Printf("[ERR] memberlist: Failed to encode error response: %s", err) 273 return 274 } 275 276 err = m.rawSendMsgStream(conn, out.Bytes(), streamLabel) 277 if err != nil { 278 m.logger.Printf("[ERR] memberlist: Failed to send error: %s %s", err, LogConn(conn)) 279 return 280 } 281 } 282 return 283 } 284 285 switch msgType { 286 case userMsg: 287 if err := m.readUserMsg(bufConn, dec); err != nil { 288 m.logger.Printf("[ERR] memberlist: Failed to receive user message: %s %s", err, LogConn(conn)) 289 } 290 case pushPullMsg: 291 // Increment counter of pending push/pulls 292 numConcurrent := atomic.AddUint32(&m.pushPullReq, 1) 293 defer atomic.AddUint32(&m.pushPullReq, ^uint32(0)) 294 295 // Check if we have too many open push/pull requests 296 if numConcurrent >= maxPushPullRequests { 297 m.logger.Printf("[ERR] memberlist: Too many pending push/pull requests") 298 return 299 } 300 301 join, remoteNodes, userState, err := m.readRemoteState(bufConn, dec) 302 if err != nil { 303 m.logger.Printf("[ERR] memberlist: Failed to read remote state: %s %s", err, LogConn(conn)) 304 return 305 } 306 307 if err := m.sendLocalState(conn, join, streamLabel); err != nil { 308 m.logger.Printf("[ERR] memberlist: Failed to push local state: %s %s", err, LogConn(conn)) 309 return 310 } 311 312 if err := m.mergeRemoteState(join, remoteNodes, userState); err != nil { 313 m.logger.Printf("[ERR] memberlist: Failed push/pull merge: %s %s", err, LogConn(conn)) 314 return 315 } 316 case pingMsg: 317 var p ping 318 if err := dec.Decode(&p); err != nil { 319 m.logger.Printf("[ERR] memberlist: Failed to decode ping: %s %s", err, LogConn(conn)) 320 return 321 } 322 323 if p.Node != "" && p.Node != m.config.Name { 324 m.logger.Printf("[WARN] memberlist: Got ping for unexpected node %s %s", p.Node, LogConn(conn)) 325 return 326 } 327 328 ack := ackResp{p.SeqNo, nil} 329 out, err := encode(ackRespMsg, &ack) 330 if err != nil { 331 m.logger.Printf("[ERR] memberlist: Failed to encode ack: %s", err) 332 return 333 } 334 335 err = m.rawSendMsgStream(conn, out.Bytes(), streamLabel) 336 if err != nil { 337 m.logger.Printf("[ERR] memberlist: Failed to send ack: %s %s", err, LogConn(conn)) 338 return 339 } 340 default: 341 m.logger.Printf("[ERR] memberlist: Received invalid msgType (%d) %s", msgType, LogConn(conn)) 342 } 343} 344 345// packetListen is a long running goroutine that pulls packets out of the 346// transport and hands them off for processing. 347func (m *Memberlist) packetListen() { 348 for { 349 select { 350 case packet := <-m.transport.PacketCh(): 351 m.ingestPacket(packet.Buf, packet.From, packet.Timestamp) 352 353 case <-m.shutdownCh: 354 return 355 } 356 } 357} 358 359func (m *Memberlist) ingestPacket(buf []byte, from net.Addr, timestamp time.Time) { 360 var ( 361 packetLabel string 362 err error 363 ) 364 buf, packetLabel, err = RemoveLabelHeaderFromPacket(buf) 365 if err != nil { 366 m.logger.Printf("[ERR] memberlist: %v %s", err, LogAddress(from)) 367 return 368 } 369 370 if m.config.SkipInboundLabelCheck { 371 if packetLabel != "" { 372 m.logger.Printf("[ERR] memberlist: unexpected double packet label header: %s", LogAddress(from)) 373 return 374 } 375 // Set this from config so that the auth data assertions work below. 376 packetLabel = m.config.Label 377 } 378 379 if m.config.Label != packetLabel { 380 m.logger.Printf("[ERR] memberlist: discarding packet with unacceptable label %q: %s", packetLabel, LogAddress(from)) 381 return 382 } 383 384 // Check if encryption is enabled 385 if m.config.EncryptionEnabled() { 386 // Decrypt the payload 387 authData := []byte(packetLabel) 388 plain, err := decryptPayload(m.config.Keyring.GetKeys(), buf, authData) 389 if err != nil { 390 if !m.config.GossipVerifyIncoming { 391 // Treat the message as plaintext 392 plain = buf 393 } else { 394 m.logger.Printf("[ERR] memberlist: Decrypt packet failed: %v %s", err, LogAddress(from)) 395 return 396 } 397 } 398 399 // Continue processing the plaintext buffer 400 buf = plain 401 } 402 403 // See if there's a checksum included to verify the contents of the message 404 if len(buf) >= 5 && messageType(buf[0]) == hasCrcMsg { 405 crc := crc32.ChecksumIEEE(buf[5:]) 406 expected := binary.BigEndian.Uint32(buf[1:5]) 407 if crc != expected { 408 m.logger.Printf("[WARN] memberlist: Got invalid checksum for UDP packet: %x, %x", crc, expected) 409 return 410 } 411 m.handleCommand(buf[5:], from, timestamp) 412 } else { 413 m.handleCommand(buf, from, timestamp) 414 } 415} 416 417func (m *Memberlist) handleCommand(buf []byte, from net.Addr, timestamp time.Time) { 418 if len(buf) < 1 { 419 m.logger.Printf("[ERR] memberlist: missing message type byte %s", LogAddress(from)) 420 return 421 } 422 // Decode the message type 423 msgType := messageType(buf[0]) 424 buf = buf[1:] 425 426 // Switch on the msgType 427 switch msgType { 428 case compoundMsg: 429 m.handleCompound(buf, from, timestamp) 430 case compressMsg: 431 m.handleCompressed(buf, from, timestamp) 432 433 case pingMsg: 434 m.handlePing(buf, from) 435 case indirectPingMsg: 436 m.handleIndirectPing(buf, from) 437 case ackRespMsg: 438 m.handleAck(buf, from, timestamp) 439 case nackRespMsg: 440 m.handleNack(buf, from) 441 442 case suspectMsg: 443 fallthrough 444 case aliveMsg: 445 fallthrough 446 case deadMsg: 447 fallthrough 448 case userMsg: 449 // Determine the message queue, prioritize alive 450 queue := m.lowPriorityMsgQueue 451 if msgType == aliveMsg { 452 queue = m.highPriorityMsgQueue 453 } 454 455 // Check for overflow and append if not full 456 m.msgQueueLock.Lock() 457 if queue.Len() >= m.config.HandoffQueueDepth { 458 m.logger.Printf("[WARN] memberlist: handler queue full, dropping message (%d) %s", msgType, LogAddress(from)) 459 } else { 460 queue.PushBack(msgHandoff{msgType, buf, from}) 461 } 462 m.msgQueueLock.Unlock() 463 464 // Notify of pending message 465 select { 466 case m.handoffCh <- struct{}{}: 467 default: 468 } 469 470 default: 471 m.logger.Printf("[ERR] memberlist: msg type (%d) not supported %s", msgType, LogAddress(from)) 472 } 473} 474 475// getNextMessage returns the next message to process in priority order, using LIFO 476func (m *Memberlist) getNextMessage() (msgHandoff, bool) { 477 m.msgQueueLock.Lock() 478 defer m.msgQueueLock.Unlock() 479 480 if el := m.highPriorityMsgQueue.Back(); el != nil { 481 m.highPriorityMsgQueue.Remove(el) 482 msg := el.Value.(msgHandoff) 483 return msg, true 484 } else if el := m.lowPriorityMsgQueue.Back(); el != nil { 485 m.lowPriorityMsgQueue.Remove(el) 486 msg := el.Value.(msgHandoff) 487 return msg, true 488 } 489 return msgHandoff{}, false 490} 491 492// packetHandler is a long running goroutine that processes messages received 493// over the packet interface, but is decoupled from the listener to avoid 494// blocking the listener which may cause ping/ack messages to be delayed. 495func (m *Memberlist) packetHandler() { 496 for { 497 select { 498 case <-m.handoffCh: 499 for { 500 msg, ok := m.getNextMessage() 501 if !ok { 502 break 503 } 504 msgType := msg.msgType 505 buf := msg.buf 506 from := msg.from 507 508 switch msgType { 509 case suspectMsg: 510 m.handleSuspect(buf, from) 511 case aliveMsg: 512 m.handleAlive(buf, from) 513 case deadMsg: 514 m.handleDead(buf, from) 515 case userMsg: 516 m.handleUser(buf, from) 517 default: 518 m.logger.Printf("[ERR] memberlist: Message type (%d) not supported %s (packet handler)", msgType, LogAddress(from)) 519 } 520 } 521 522 case <-m.shutdownCh: 523 return 524 } 525 } 526} 527 528func (m *Memberlist) handleCompound(buf []byte, from net.Addr, timestamp time.Time) { 529 // Decode the parts 530 trunc, parts, err := decodeCompoundMessage(buf) 531 if err != nil { 532 m.logger.Printf("[ERR] memberlist: Failed to decode compound request: %s %s", err, LogAddress(from)) 533 return 534 } 535 536 // Log any truncation 537 if trunc > 0 { 538 m.logger.Printf("[WARN] memberlist: Compound request had %d truncated messages %s", trunc, LogAddress(from)) 539 } 540 541 // Handle each message 542 for _, part := range parts { 543 m.handleCommand(part, from, timestamp) 544 } 545} 546 547func (m *Memberlist) handlePing(buf []byte, from net.Addr) { 548 var p ping 549 if err := decode(buf, &p); err != nil { 550 m.logger.Printf("[ERR] memberlist: Failed to decode ping request: %s %s", err, LogAddress(from)) 551 return 552 } 553 // If node is provided, verify that it is for us 554 if p.Node != "" && p.Node != m.config.Name { 555 m.logger.Printf("[WARN] memberlist: Got ping for unexpected node '%s' %s", p.Node, LogAddress(from)) 556 return 557 } 558 var ack ackResp 559 ack.SeqNo = p.SeqNo 560 if m.config.Ping != nil { 561 ack.Payload = m.config.Ping.AckPayload() 562 } 563 564 addr := "" 565 if len(p.SourceAddr) > 0 && p.SourcePort > 0 { 566 addr = joinHostPort(net.IP(p.SourceAddr).String(), p.SourcePort) 567 } else { 568 addr = from.String() 569 } 570 571 a := Address{ 572 Addr: addr, 573 Name: p.SourceNode, 574 } 575 if err := m.encodeAndSendMsg(a, ackRespMsg, &ack); err != nil { 576 m.logger.Printf("[ERR] memberlist: Failed to send ack: %s %s", err, LogAddress(from)) 577 } 578} 579 580func (m *Memberlist) handleIndirectPing(buf []byte, from net.Addr) { 581 var ind indirectPingReq 582 if err := decode(buf, &ind); err != nil { 583 m.logger.Printf("[ERR] memberlist: Failed to decode indirect ping request: %s %s", err, LogAddress(from)) 584 return 585 } 586 587 // For proto versions < 2, there is no port provided. Mask old 588 // behavior by using the configured port. 589 if m.ProtocolVersion() < 2 || ind.Port == 0 { 590 ind.Port = uint16(m.config.BindPort) 591 } 592 593 // Send a ping to the correct host. 594 localSeqNo := m.nextSeqNo() 595 selfAddr, selfPort := m.getAdvertise() 596 ping := ping{ 597 SeqNo: localSeqNo, 598 Node: ind.Node, 599 // The outbound message is addressed FROM us. 600 SourceAddr: selfAddr, 601 SourcePort: selfPort, 602 SourceNode: m.config.Name, 603 } 604 605 // Forward the ack back to the requestor. If the request encodes an origin 606 // use that otherwise assume that the other end of the UDP socket is 607 // usable. 608 indAddr := "" 609 if len(ind.SourceAddr) > 0 && ind.SourcePort > 0 { 610 indAddr = joinHostPort(net.IP(ind.SourceAddr).String(), ind.SourcePort) 611 } else { 612 indAddr = from.String() 613 } 614 615 // Setup a response handler to relay the ack 616 cancelCh := make(chan struct{}) 617 respHandler := func(payload []byte, timestamp time.Time) { 618 // Try to prevent the nack if we've caught it in time. 619 close(cancelCh) 620 621 ack := ackResp{ind.SeqNo, nil} 622 a := Address{ 623 Addr: indAddr, 624 Name: ind.SourceNode, 625 } 626 if err := m.encodeAndSendMsg(a, ackRespMsg, &ack); err != nil { 627 m.logger.Printf("[ERR] memberlist: Failed to forward ack: %s %s", err, LogStringAddress(indAddr)) 628 } 629 } 630 m.setAckHandler(localSeqNo, respHandler, m.config.ProbeTimeout) 631 632 // Send the ping. 633 addr := joinHostPort(net.IP(ind.Target).String(), ind.Port) 634 a := Address{ 635 Addr: addr, 636 Name: ind.Node, 637 } 638 if err := m.encodeAndSendMsg(a, pingMsg, &ping); err != nil { 639 m.logger.Printf("[ERR] memberlist: Failed to send indirect ping: %s %s", err, LogStringAddress(indAddr)) 640 } 641 642 // Setup a timer to fire off a nack if no ack is seen in time. 643 if ind.Nack { 644 go func() { 645 select { 646 case <-cancelCh: 647 return 648 case <-time.After(m.config.ProbeTimeout): 649 nack := nackResp{ind.SeqNo} 650 a := Address{ 651 Addr: indAddr, 652 Name: ind.SourceNode, 653 } 654 if err := m.encodeAndSendMsg(a, nackRespMsg, &nack); err != nil { 655 m.logger.Printf("[ERR] memberlist: Failed to send nack: %s %s", err, LogStringAddress(indAddr)) 656 } 657 } 658 }() 659 } 660} 661 662func (m *Memberlist) handleAck(buf []byte, from net.Addr, timestamp time.Time) { 663 var ack ackResp 664 if err := decode(buf, &ack); err != nil { 665 m.logger.Printf("[ERR] memberlist: Failed to decode ack response: %s %s", err, LogAddress(from)) 666 return 667 } 668 m.invokeAckHandler(ack, timestamp) 669} 670 671func (m *Memberlist) handleNack(buf []byte, from net.Addr) { 672 var nack nackResp 673 if err := decode(buf, &nack); err != nil { 674 m.logger.Printf("[ERR] memberlist: Failed to decode nack response: %s %s", err, LogAddress(from)) 675 return 676 } 677 m.invokeNackHandler(nack) 678} 679 680func (m *Memberlist) handleSuspect(buf []byte, from net.Addr) { 681 var sus suspect 682 if err := decode(buf, &sus); err != nil { 683 m.logger.Printf("[ERR] memberlist: Failed to decode suspect message: %s %s", err, LogAddress(from)) 684 return 685 } 686 m.suspectNode(&sus) 687} 688 689// ensureCanConnect return the IP from a RemoteAddress 690// return error if this client must not connect 691func (m *Memberlist) ensureCanConnect(from net.Addr) error { 692 if !m.config.IPMustBeChecked() { 693 return nil 694 } 695 source := from.String() 696 if source == "pipe" { 697 return nil 698 } 699 host, _, err := net.SplitHostPort(source) 700 if err != nil { 701 return err 702 } 703 704 ip := net.ParseIP(host) 705 if ip == nil { 706 return fmt.Errorf("Cannot parse IP from %s", host) 707 } 708 return m.config.IPAllowed(ip) 709} 710 711func (m *Memberlist) handleAlive(buf []byte, from net.Addr) { 712 if err := m.ensureCanConnect(from); err != nil { 713 m.logger.Printf("[DEBUG] memberlist: Blocked alive message: %s %s", err, LogAddress(from)) 714 return 715 } 716 var live alive 717 if err := decode(buf, &live); err != nil { 718 m.logger.Printf("[ERR] memberlist: Failed to decode alive message: %s %s", err, LogAddress(from)) 719 return 720 } 721 if m.config.IPMustBeChecked() { 722 innerIP := net.IP(live.Addr) 723 if innerIP != nil { 724 if err := m.config.IPAllowed(innerIP); err != nil { 725 m.logger.Printf("[DEBUG] memberlist: Blocked alive.Addr=%s message from: %s %s", innerIP.String(), err, LogAddress(from)) 726 return 727 } 728 } 729 } 730 731 // For proto versions < 2, there is no port provided. Mask old 732 // behavior by using the configured port 733 if m.ProtocolVersion() < 2 || live.Port == 0 { 734 live.Port = uint16(m.config.BindPort) 735 } 736 737 m.aliveNode(&live, nil, false) 738} 739 740func (m *Memberlist) handleDead(buf []byte, from net.Addr) { 741 var d dead 742 if err := decode(buf, &d); err != nil { 743 m.logger.Printf("[ERR] memberlist: Failed to decode dead message: %s %s", err, LogAddress(from)) 744 return 745 } 746 m.deadNode(&d) 747} 748 749// handleUser is used to notify channels of incoming user data 750func (m *Memberlist) handleUser(buf []byte, from net.Addr) { 751 d := m.config.Delegate 752 if d != nil { 753 d.NotifyMsg(buf) 754 } 755} 756 757// handleCompressed is used to unpack a compressed message 758func (m *Memberlist) handleCompressed(buf []byte, from net.Addr, timestamp time.Time) { 759 // Try to decode the payload 760 payload, err := decompressPayload(buf) 761 if err != nil { 762 m.logger.Printf("[ERR] memberlist: Failed to decompress payload: %v %s", err, LogAddress(from)) 763 return 764 } 765 766 // Recursively handle the payload 767 m.handleCommand(payload, from, timestamp) 768} 769 770// encodeAndSendMsg is used to combine the encoding and sending steps 771func (m *Memberlist) encodeAndSendMsg(a Address, msgType messageType, msg interface{}) error { 772 out, err := encode(msgType, msg) 773 if err != nil { 774 return err 775 } 776 if err := m.sendMsg(a, out.Bytes()); err != nil { 777 return err 778 } 779 return nil 780} 781 782// sendMsg is used to send a message via packet to another host. It will 783// opportunistically create a compoundMsg and piggy back other broadcasts. 784func (m *Memberlist) sendMsg(a Address, msg []byte) error { 785 // Check if we can piggy back any messages 786 bytesAvail := m.config.UDPBufferSize - len(msg) - compoundHeaderOverhead - labelOverhead(m.config.Label) 787 if m.config.EncryptionEnabled() && m.config.GossipVerifyOutgoing { 788 bytesAvail -= encryptOverhead(m.encryptionVersion()) 789 } 790 extra := m.getBroadcasts(compoundOverhead, bytesAvail) 791 792 // Fast path if nothing to piggypack 793 if len(extra) == 0 { 794 return m.rawSendMsgPacket(a, nil, msg) 795 } 796 797 // Join all the messages 798 msgs := make([][]byte, 0, 1+len(extra)) 799 msgs = append(msgs, msg) 800 msgs = append(msgs, extra...) 801 802 // Create a compound message 803 compound := makeCompoundMessage(msgs) 804 805 // Send the message 806 return m.rawSendMsgPacket(a, nil, compound.Bytes()) 807} 808 809// rawSendMsgPacket is used to send message via packet to another host without 810// modification, other than compression or encryption if enabled. 811func (m *Memberlist) rawSendMsgPacket(a Address, node *Node, msg []byte) error { 812 if a.Name == "" && m.config.RequireNodeNames { 813 return errNodeNamesAreRequired 814 } 815 816 // Check if we have compression enabled 817 if m.config.EnableCompression { 818 buf, err := compressPayload(msg) 819 if err != nil { 820 m.logger.Printf("[WARN] memberlist: Failed to compress payload: %v", err) 821 } else { 822 // Only use compression if it reduced the size 823 if buf.Len() < len(msg) { 824 msg = buf.Bytes() 825 } 826 } 827 } 828 829 // Try to look up the destination node. Note this will only work if the 830 // bare ip address is used as the node name, which is not guaranteed. 831 if node == nil { 832 toAddr, _, err := net.SplitHostPort(a.Addr) 833 if err != nil { 834 m.logger.Printf("[ERR] memberlist: Failed to parse address %q: %v", a.Addr, err) 835 return err 836 } 837 m.nodeLock.RLock() 838 nodeState, ok := m.nodeMap[toAddr] 839 m.nodeLock.RUnlock() 840 if ok { 841 node = &nodeState.Node 842 } 843 } 844 845 // Add a CRC to the end of the payload if the recipient understands 846 // ProtocolVersion >= 5 847 if node != nil && node.PMax >= 5 { 848 crc := crc32.ChecksumIEEE(msg) 849 header := make([]byte, 5, 5+len(msg)) 850 header[0] = byte(hasCrcMsg) 851 binary.BigEndian.PutUint32(header[1:], crc) 852 msg = append(header, msg...) 853 } 854 855 // Check if we have encryption enabled 856 if m.config.EncryptionEnabled() && m.config.GossipVerifyOutgoing { 857 // Encrypt the payload 858 var ( 859 primaryKey = m.config.Keyring.GetPrimaryKey() 860 packetLabel = []byte(m.config.Label) 861 buf bytes.Buffer 862 ) 863 err := encryptPayload(m.encryptionVersion(), primaryKey, msg, packetLabel, &buf) 864 if err != nil { 865 m.logger.Printf("[ERR] memberlist: Encryption of message failed: %v", err) 866 return err 867 } 868 msg = buf.Bytes() 869 } 870 871 metrics.IncrCounter([]string{"memberlist", "udp", "sent"}, float32(len(msg))) 872 _, err := m.transport.WriteToAddress(msg, a) 873 return err 874} 875 876// rawSendMsgStream is used to stream a message to another host without 877// modification, other than applying compression and encryption if enabled. 878func (m *Memberlist) rawSendMsgStream(conn net.Conn, sendBuf []byte, streamLabel string) error { 879 // Check if compression is enabled 880 if m.config.EnableCompression { 881 compBuf, err := compressPayload(sendBuf) 882 if err != nil { 883 m.logger.Printf("[ERROR] memberlist: Failed to compress payload: %v", err) 884 } else { 885 sendBuf = compBuf.Bytes() 886 } 887 } 888 889 // Check if encryption is enabled 890 if m.config.EncryptionEnabled() && m.config.GossipVerifyOutgoing { 891 crypt, err := m.encryptLocalState(sendBuf, streamLabel) 892 if err != nil { 893 m.logger.Printf("[ERROR] memberlist: Failed to encrypt local state: %v", err) 894 return err 895 } 896 sendBuf = crypt 897 } 898 899 // Write out the entire send buffer 900 metrics.IncrCounter([]string{"memberlist", "tcp", "sent"}, float32(len(sendBuf))) 901 902 if n, err := conn.Write(sendBuf); err != nil { 903 return err 904 } else if n != len(sendBuf) { 905 return fmt.Errorf("only %d of %d bytes written", n, len(sendBuf)) 906 } 907 908 return nil 909} 910 911// sendUserMsg is used to stream a user message to another host. 912func (m *Memberlist) sendUserMsg(a Address, sendBuf []byte) error { 913 if a.Name == "" && m.config.RequireNodeNames { 914 return errNodeNamesAreRequired 915 } 916 917 conn, err := m.transport.DialAddressTimeout(a, m.config.TCPTimeout) 918 if err != nil { 919 return err 920 } 921 defer conn.Close() 922 923 bufConn := bytes.NewBuffer(nil) 924 if err := bufConn.WriteByte(byte(userMsg)); err != nil { 925 return err 926 } 927 928 header := userMsgHeader{UserMsgLen: len(sendBuf)} 929 hd := codec.MsgpackHandle{} 930 enc := codec.NewEncoder(bufConn, &hd) 931 if err := enc.Encode(&header); err != nil { 932 return err 933 } 934 if _, err := bufConn.Write(sendBuf); err != nil { 935 return err 936 } 937 938 return m.rawSendMsgStream(conn, bufConn.Bytes(), m.config.Label) 939} 940 941// sendAndReceiveState is used to initiate a push/pull over a stream with a 942// remote host. 943func (m *Memberlist) sendAndReceiveState(a Address, join bool) ([]pushNodeState, []byte, error) { 944 if a.Name == "" && m.config.RequireNodeNames { 945 return nil, nil, errNodeNamesAreRequired 946 } 947 948 // Attempt to connect 949 conn, err := m.transport.DialAddressTimeout(a, m.config.TCPTimeout) 950 if err != nil { 951 return nil, nil, err 952 } 953 defer conn.Close() 954 m.logger.Printf("[DEBUG] memberlist: Initiating push/pull sync with: %s %s", a.Name, conn.RemoteAddr()) 955 metrics.IncrCounter([]string{"memberlist", "tcp", "connect"}, 1) 956 957 // Send our state 958 if err := m.sendLocalState(conn, join, m.config.Label); err != nil { 959 return nil, nil, err 960 } 961 962 conn.SetDeadline(time.Now().Add(m.config.TCPTimeout)) 963 msgType, bufConn, dec, err := m.readStream(conn, m.config.Label) 964 if err != nil { 965 return nil, nil, err 966 } 967 968 if msgType == errMsg { 969 var resp errResp 970 if err := dec.Decode(&resp); err != nil { 971 return nil, nil, err 972 } 973 return nil, nil, fmt.Errorf("remote error: %v", resp.Error) 974 } 975 976 // Quit if not push/pull 977 if msgType != pushPullMsg { 978 err := fmt.Errorf("received invalid msgType (%d), expected pushPullMsg (%d) %s", msgType, pushPullMsg, LogConn(conn)) 979 return nil, nil, err 980 } 981 982 // Read remote state 983 _, remoteNodes, userState, err := m.readRemoteState(bufConn, dec) 984 return remoteNodes, userState, err 985} 986 987// sendLocalState is invoked to send our local state over a stream connection. 988func (m *Memberlist) sendLocalState(conn net.Conn, join bool, streamLabel string) error { 989 // Setup a deadline 990 conn.SetDeadline(time.Now().Add(m.config.TCPTimeout)) 991 992 // Prepare the local node state 993 m.nodeLock.RLock() 994 localNodes := make([]pushNodeState, len(m.nodes)) 995 for idx, n := range m.nodes { 996 localNodes[idx].Name = n.Name 997 localNodes[idx].Addr = n.Addr 998 localNodes[idx].Port = n.Port 999 localNodes[idx].Incarnation = n.Incarnation 1000 localNodes[idx].State = n.State 1001 localNodes[idx].Meta = n.Meta 1002 localNodes[idx].Vsn = []uint8{ 1003 n.PMin, n.PMax, n.PCur, 1004 n.DMin, n.DMax, n.DCur, 1005 } 1006 } 1007 m.nodeLock.RUnlock() 1008 1009 // Get the delegate state 1010 var userData []byte 1011 if m.config.Delegate != nil { 1012 userData = m.config.Delegate.LocalState(join) 1013 } 1014 1015 // Create a bytes buffer writer 1016 bufConn := bytes.NewBuffer(nil) 1017 1018 // Send our node state 1019 header := pushPullHeader{Nodes: len(localNodes), UserStateLen: len(userData), Join: join} 1020 hd := codec.MsgpackHandle{} 1021 enc := codec.NewEncoder(bufConn, &hd) 1022 1023 // Begin state push 1024 if _, err := bufConn.Write([]byte{byte(pushPullMsg)}); err != nil { 1025 return err 1026 } 1027 1028 if err := enc.Encode(&header); err != nil { 1029 return err 1030 } 1031 for i := 0; i < header.Nodes; i++ { 1032 if err := enc.Encode(&localNodes[i]); err != nil { 1033 return err 1034 } 1035 } 1036 1037 // Write the user state as well 1038 if userData != nil { 1039 if _, err := bufConn.Write(userData); err != nil { 1040 return err 1041 } 1042 } 1043 1044 // Get the send buffer 1045 return m.rawSendMsgStream(conn, bufConn.Bytes(), streamLabel) 1046} 1047 1048// encryptLocalState is used to help encrypt local state before sending 1049func (m *Memberlist) encryptLocalState(sendBuf []byte, streamLabel string) ([]byte, error) { 1050 var buf bytes.Buffer 1051 1052 // Write the encryptMsg byte 1053 buf.WriteByte(byte(encryptMsg)) 1054 1055 // Write the size of the message 1056 sizeBuf := make([]byte, 4) 1057 encVsn := m.encryptionVersion() 1058 encLen := encryptedLength(encVsn, len(sendBuf)) 1059 binary.BigEndian.PutUint32(sizeBuf, uint32(encLen)) 1060 buf.Write(sizeBuf) 1061 1062 // Authenticated Data is: 1063 // 1064 // [messageType; byte] [messageLength; uint32] [stream_label; optional] 1065 // 1066 dataBytes := appendBytes(buf.Bytes()[:5], []byte(streamLabel)) 1067 1068 // Write the encrypted cipher text to the buffer 1069 key := m.config.Keyring.GetPrimaryKey() 1070 err := encryptPayload(encVsn, key, sendBuf, dataBytes, &buf) 1071 if err != nil { 1072 return nil, err 1073 } 1074 return buf.Bytes(), nil 1075} 1076 1077// decryptRemoteState is used to help decrypt the remote state 1078func (m *Memberlist) decryptRemoteState(bufConn io.Reader, streamLabel string) ([]byte, error) { 1079 // Read in enough to determine message length 1080 cipherText := bytes.NewBuffer(nil) 1081 cipherText.WriteByte(byte(encryptMsg)) 1082 _, err := io.CopyN(cipherText, bufConn, 4) 1083 if err != nil { 1084 return nil, err 1085 } 1086 1087 // Ensure we aren't asked to download too much. This is to guard against 1088 // an attack vector where a huge amount of state is sent 1089 moreBytes := binary.BigEndian.Uint32(cipherText.Bytes()[1:5]) 1090 if moreBytes > maxPushStateBytes { 1091 return nil, fmt.Errorf("Remote node state is larger than limit (%d)", moreBytes) 1092 } 1093 1094 // Read in the rest of the payload 1095 _, err = io.CopyN(cipherText, bufConn, int64(moreBytes)) 1096 if err != nil { 1097 return nil, err 1098 } 1099 1100 // Decrypt the cipherText with some authenticated data 1101 // 1102 // Authenticated Data is: 1103 // 1104 // [messageType; byte] [messageLength; uint32] [label_data; optional] 1105 // 1106 dataBytes := appendBytes(cipherText.Bytes()[:5], []byte(streamLabel)) 1107 cipherBytes := cipherText.Bytes()[5:] 1108 1109 // Decrypt the payload 1110 keys := m.config.Keyring.GetKeys() 1111 return decryptPayload(keys, cipherBytes, dataBytes) 1112} 1113 1114// readStream is used to read messages from a stream connection, decrypting and 1115// decompressing the stream if necessary. 1116// 1117// The provided streamLabel if present will be authenticated during decryption 1118// of each message. 1119func (m *Memberlist) readStream(conn net.Conn, streamLabel string) (messageType, io.Reader, *codec.Decoder, error) { 1120 // Created a buffered reader 1121 var bufConn io.Reader = bufio.NewReader(conn) 1122 1123 // Read the message type 1124 buf := [1]byte{0} 1125 if _, err := io.ReadFull(bufConn, buf[:]); err != nil { 1126 return 0, nil, nil, err 1127 } 1128 msgType := messageType(buf[0]) 1129 1130 // Check if the message is encrypted 1131 if msgType == encryptMsg { 1132 if !m.config.EncryptionEnabled() { 1133 return 0, nil, nil, 1134 fmt.Errorf("Remote state is encrypted and encryption is not configured") 1135 } 1136 1137 plain, err := m.decryptRemoteState(bufConn, streamLabel) 1138 if err != nil { 1139 return 0, nil, nil, err 1140 } 1141 1142 // Reset message type and bufConn 1143 msgType = messageType(plain[0]) 1144 bufConn = bytes.NewReader(plain[1:]) 1145 } else if m.config.EncryptionEnabled() && m.config.GossipVerifyIncoming { 1146 return 0, nil, nil, 1147 fmt.Errorf("Encryption is configured but remote state is not encrypted") 1148 } 1149 1150 // Get the msgPack decoders 1151 hd := codec.MsgpackHandle{} 1152 dec := codec.NewDecoder(bufConn, &hd) 1153 1154 // Check if we have a compressed message 1155 if msgType == compressMsg { 1156 var c compress 1157 if err := dec.Decode(&c); err != nil { 1158 return 0, nil, nil, err 1159 } 1160 decomp, err := decompressBuffer(&c) 1161 if err != nil { 1162 return 0, nil, nil, err 1163 } 1164 1165 // Reset the message type 1166 msgType = messageType(decomp[0]) 1167 1168 // Create a new bufConn 1169 bufConn = bytes.NewReader(decomp[1:]) 1170 1171 // Create a new decoder 1172 dec = codec.NewDecoder(bufConn, &hd) 1173 } 1174 1175 return msgType, bufConn, dec, nil 1176} 1177 1178// readRemoteState is used to read the remote state from a connection 1179func (m *Memberlist) readRemoteState(bufConn io.Reader, dec *codec.Decoder) (bool, []pushNodeState, []byte, error) { 1180 // Read the push/pull header 1181 var header pushPullHeader 1182 if err := dec.Decode(&header); err != nil { 1183 return false, nil, nil, err 1184 } 1185 1186 // Allocate space for the transfer 1187 remoteNodes := make([]pushNodeState, header.Nodes) 1188 1189 // Try to decode all the states 1190 for i := 0; i < header.Nodes; i++ { 1191 if err := dec.Decode(&remoteNodes[i]); err != nil { 1192 return false, nil, nil, err 1193 } 1194 } 1195 1196 // Read the remote user state into a buffer 1197 var userBuf []byte 1198 if header.UserStateLen > 0 { 1199 userBuf = make([]byte, header.UserStateLen) 1200 bytes, err := io.ReadAtLeast(bufConn, userBuf, header.UserStateLen) 1201 if err == nil && bytes != header.UserStateLen { 1202 err = fmt.Errorf( 1203 "Failed to read full user state (%d / %d)", 1204 bytes, header.UserStateLen) 1205 } 1206 if err != nil { 1207 return false, nil, nil, err 1208 } 1209 } 1210 1211 // For proto versions < 2, there is no port provided. Mask old 1212 // behavior by using the configured port 1213 for idx := range remoteNodes { 1214 if m.ProtocolVersion() < 2 || remoteNodes[idx].Port == 0 { 1215 remoteNodes[idx].Port = uint16(m.config.BindPort) 1216 } 1217 } 1218 1219 return header.Join, remoteNodes, userBuf, nil 1220} 1221 1222// mergeRemoteState is used to merge the remote state with our local state 1223func (m *Memberlist) mergeRemoteState(join bool, remoteNodes []pushNodeState, userBuf []byte) error { 1224 if err := m.verifyProtocol(remoteNodes); err != nil { 1225 return err 1226 } 1227 1228 // Invoke the merge delegate if any 1229 if join && m.config.Merge != nil { 1230 nodes := make([]*Node, len(remoteNodes)) 1231 for idx, n := range remoteNodes { 1232 nodes[idx] = &Node{ 1233 Name: n.Name, 1234 Addr: n.Addr, 1235 Port: n.Port, 1236 Meta: n.Meta, 1237 State: n.State, 1238 PMin: n.Vsn[0], 1239 PMax: n.Vsn[1], 1240 PCur: n.Vsn[2], 1241 DMin: n.Vsn[3], 1242 DMax: n.Vsn[4], 1243 DCur: n.Vsn[5], 1244 } 1245 } 1246 if err := m.config.Merge.NotifyMerge(nodes); err != nil { 1247 return err 1248 } 1249 } 1250 1251 // Merge the membership state 1252 m.mergeState(remoteNodes) 1253 1254 // Invoke the delegate for user state 1255 if userBuf != nil && m.config.Delegate != nil { 1256 m.config.Delegate.MergeRemoteState(userBuf, join) 1257 } 1258 return nil 1259} 1260 1261// readUserMsg is used to decode a userMsg from a stream. 1262func (m *Memberlist) readUserMsg(bufConn io.Reader, dec *codec.Decoder) error { 1263 // Read the user message header 1264 var header userMsgHeader 1265 if err := dec.Decode(&header); err != nil { 1266 return err 1267 } 1268 1269 // Read the user message into a buffer 1270 var userBuf []byte 1271 if header.UserMsgLen > 0 { 1272 userBuf = make([]byte, header.UserMsgLen) 1273 bytes, err := io.ReadAtLeast(bufConn, userBuf, header.UserMsgLen) 1274 if err == nil && bytes != header.UserMsgLen { 1275 err = fmt.Errorf( 1276 "Failed to read full user message (%d / %d)", 1277 bytes, header.UserMsgLen) 1278 } 1279 if err != nil { 1280 return err 1281 } 1282 1283 d := m.config.Delegate 1284 if d != nil { 1285 d.NotifyMsg(userBuf) 1286 } 1287 } 1288 1289 return nil 1290} 1291 1292// sendPingAndWaitForAck makes a stream connection to the given address, sends 1293// a ping, and waits for an ack. All of this is done as a series of blocking 1294// operations, given the deadline. The bool return parameter is true if we 1295// we able to round trip a ping to the other node. 1296func (m *Memberlist) sendPingAndWaitForAck(a Address, ping ping, deadline time.Time) (bool, error) { 1297 if a.Name == "" && m.config.RequireNodeNames { 1298 return false, errNodeNamesAreRequired 1299 } 1300 1301 conn, err := m.transport.DialAddressTimeout(a, deadline.Sub(time.Now())) 1302 if err != nil { 1303 // If the node is actually dead we expect this to fail, so we 1304 // shouldn't spam the logs with it. After this point, errors 1305 // with the connection are real, unexpected errors and should 1306 // get propagated up. 1307 return false, nil 1308 } 1309 defer conn.Close() 1310 conn.SetDeadline(deadline) 1311 1312 out, err := encode(pingMsg, &ping) 1313 if err != nil { 1314 return false, err 1315 } 1316 1317 if err = m.rawSendMsgStream(conn, out.Bytes(), m.config.Label); err != nil { 1318 return false, err 1319 } 1320 1321 msgType, _, dec, err := m.readStream(conn, m.config.Label) 1322 if err != nil { 1323 return false, err 1324 } 1325 1326 if msgType != ackRespMsg { 1327 return false, fmt.Errorf("Unexpected msgType (%d) from ping %s", msgType, LogConn(conn)) 1328 } 1329 1330 var ack ackResp 1331 if err = dec.Decode(&ack); err != nil { 1332 return false, err 1333 } 1334 1335 if ack.SeqNo != ping.SeqNo { 1336 return false, fmt.Errorf("Sequence number from ack (%d) doesn't match ping (%d)", ack.SeqNo, ping.SeqNo) 1337 } 1338 1339 return true, nil 1340} 1341