1package memberlist
2
3import (
4	"bufio"
5	"bytes"
6	"encoding/binary"
7	"fmt"
8	"hash/crc32"
9	"io"
10	"net"
11	"sync/atomic"
12	"time"
13
14	metrics "github.com/armon/go-metrics"
15	"github.com/hashicorp/go-msgpack/codec"
16)
17
18// This is the minimum and maximum protocol version that we can
19// _understand_. We're allowed to speak at any version within this
20// range. This range is inclusive.
21const (
22	ProtocolVersionMin uint8 = 1
23
24	// Version 3 added support for TCP pings but we kept the default
25	// protocol version at 2 to ease transition to this new feature.
26	// A memberlist speaking version 2 of the protocol will attempt
27	// to TCP ping another memberlist who understands version 3 or
28	// greater.
29	//
30	// Version 4 added support for nacks as part of indirect probes.
31	// A memberlist speaking version 2 of the protocol will expect
32	// nacks from another memberlist who understands version 4 or
33	// greater, and likewise nacks will be sent to memberlists who
34	// understand version 4 or greater.
35	ProtocolVersion2Compatible = 2
36
37	ProtocolVersionMax = 5
38)
39
40// messageType is an integer ID of a type of message that can be received
41// on network channels from other members.
42type messageType uint8
43
44// The list of available message types.
45//
46// WARNING: ONLY APPEND TO THIS LIST! The numeric values are part of the
47// protocol itself.
48const (
49	pingMsg messageType = iota
50	indirectPingMsg
51	ackRespMsg
52	suspectMsg
53	aliveMsg
54	deadMsg
55	pushPullMsg
56	compoundMsg
57	userMsg // User mesg, not handled by us
58	compressMsg
59	encryptMsg
60	nackRespMsg
61	hasCrcMsg
62	errMsg
63)
64
65const (
66	// hasLabelMsg has a deliberately high value so that you can disambiguate
67	// it from the encryptionVersion header which is either 0/1 right now and
68	// also any of the existing messageTypes
69	hasLabelMsg messageType = 244
70)
71
72// compressionType is used to specify the compression algorithm
73type compressionType uint8
74
75const (
76	lzwAlgo compressionType = iota
77)
78
79const (
80	MetaMaxSize            = 512 // Maximum size for node meta data
81	compoundHeaderOverhead = 2   // Assumed header overhead
82	compoundOverhead       = 2   // Assumed overhead per entry in compoundHeader
83	userMsgOverhead        = 1
84	blockingWarning        = 10 * time.Millisecond // Warn if a UDP packet takes this long to process
85	maxPushStateBytes      = 20 * 1024 * 1024
86	maxPushPullRequests    = 128 // Maximum number of concurrent push/pull requests
87)
88
89// ping request sent directly to node
90type ping struct {
91	SeqNo uint32
92
93	// Node is sent so the target can verify they are
94	// the intended recipient. This is to protect again an agent
95	// restart with a new name.
96	Node string
97
98	SourceAddr []byte `codec:",omitempty"` // Source address, used for a direct reply
99	SourcePort uint16 `codec:",omitempty"` // Source port, used for a direct reply
100	SourceNode string `codec:",omitempty"` // Source name, used for a direct reply
101}
102
103// indirect ping sent to an indirect node
104type indirectPingReq struct {
105	SeqNo  uint32
106	Target []byte
107	Port   uint16
108
109	// Node is sent so the target can verify they are
110	// the intended recipient. This is to protect against an agent
111	// restart with a new name.
112	Node string
113
114	Nack bool // true if we'd like a nack back
115
116	SourceAddr []byte `codec:",omitempty"` // Source address, used for a direct reply
117	SourcePort uint16 `codec:",omitempty"` // Source port, used for a direct reply
118	SourceNode string `codec:",omitempty"` // Source name, used for a direct reply
119}
120
121// ack response is sent for a ping
122type ackResp struct {
123	SeqNo   uint32
124	Payload []byte
125}
126
127// nack response is sent for an indirect ping when the pinger doesn't hear from
128// the ping-ee within the configured timeout. This lets the original node know
129// that the indirect ping attempt happened but didn't succeed.
130type nackResp struct {
131	SeqNo uint32
132}
133
134// err response is sent to relay the error from the remote end
135type errResp struct {
136	Error string
137}
138
139// suspect is broadcast when we suspect a node is dead
140type suspect struct {
141	Incarnation uint32
142	Node        string
143	From        string // Include who is suspecting
144}
145
146// alive is broadcast when we know a node is alive.
147// Overloaded for nodes joining
148type alive struct {
149	Incarnation uint32
150	Node        string
151	Addr        []byte
152	Port        uint16
153	Meta        []byte
154
155	// The versions of the protocol/delegate that are being spoken, order:
156	// pmin, pmax, pcur, dmin, dmax, dcur
157	Vsn []uint8
158}
159
160// dead is broadcast when we confirm a node is dead
161// Overloaded for nodes leaving
162type dead struct {
163	Incarnation uint32
164	Node        string
165	From        string // Include who is suspecting
166}
167
168// pushPullHeader is used to inform the
169// otherside how many states we are transferring
170type pushPullHeader struct {
171	Nodes        int
172	UserStateLen int  // Encodes the byte lengh of user state
173	Join         bool // Is this a join request or a anti-entropy run
174}
175
176// userMsgHeader is used to encapsulate a userMsg
177type userMsgHeader struct {
178	UserMsgLen int // Encodes the byte lengh of user state
179}
180
181// pushNodeState is used for pushPullReq when we are
182// transferring out node states
183type pushNodeState struct {
184	Name        string
185	Addr        []byte
186	Port        uint16
187	Meta        []byte
188	Incarnation uint32
189	State       NodeStateType
190	Vsn         []uint8 // Protocol versions
191}
192
193// compress is used to wrap an underlying payload
194// using a specified compression algorithm
195type compress struct {
196	Algo compressionType
197	Buf  []byte
198}
199
200// msgHandoff is used to transfer a message between goroutines
201type msgHandoff struct {
202	msgType messageType
203	buf     []byte
204	from    net.Addr
205}
206
207// encryptionVersion returns the encryption version to use
208func (m *Memberlist) encryptionVersion() encryptionVersion {
209	switch m.ProtocolVersion() {
210	case 1:
211		return 0
212	default:
213		return 1
214	}
215}
216
217// streamListen is a long running goroutine that pulls incoming streams from the
218// transport and hands them off for processing.
219func (m *Memberlist) streamListen() {
220	for {
221		select {
222		case conn := <-m.transport.StreamCh():
223			go m.handleConn(conn)
224
225		case <-m.shutdownCh:
226			return
227		}
228	}
229}
230
231// handleConn handles a single incoming stream connection from the transport.
232func (m *Memberlist) handleConn(conn net.Conn) {
233	defer conn.Close()
234	m.logger.Printf("[DEBUG] memberlist: Stream connection %s", LogConn(conn))
235
236	metrics.IncrCounter([]string{"memberlist", "tcp", "accept"}, 1)
237
238	conn.SetDeadline(time.Now().Add(m.config.TCPTimeout))
239
240	var (
241		streamLabel string
242		err         error
243	)
244	conn, streamLabel, err = RemoveLabelHeaderFromStream(conn)
245	if err != nil {
246		m.logger.Printf("[ERR] memberlist: failed to receive and remove the stream label header: %s %s", err, LogConn(conn))
247		return
248	}
249
250	if m.config.SkipInboundLabelCheck {
251		if streamLabel != "" {
252			m.logger.Printf("[ERR] memberlist: unexpected double stream label header: %s", LogConn(conn))
253			return
254		}
255		// Set this from config so that the auth data assertions work below.
256		streamLabel = m.config.Label
257	}
258
259	if m.config.Label != streamLabel {
260		m.logger.Printf("[ERR] memberlist: discarding stream with unacceptable label %q: %s", streamLabel, LogConn(conn))
261		return
262	}
263
264	msgType, bufConn, dec, err := m.readStream(conn, streamLabel)
265	if err != nil {
266		if err != io.EOF {
267			m.logger.Printf("[ERR] memberlist: failed to receive: %s %s", err, LogConn(conn))
268
269			resp := errResp{err.Error()}
270			out, err := encode(errMsg, &resp)
271			if err != nil {
272				m.logger.Printf("[ERR] memberlist: Failed to encode error response: %s", err)
273				return
274			}
275
276			err = m.rawSendMsgStream(conn, out.Bytes(), streamLabel)
277			if err != nil {
278				m.logger.Printf("[ERR] memberlist: Failed to send error: %s %s", err, LogConn(conn))
279				return
280			}
281		}
282		return
283	}
284
285	switch msgType {
286	case userMsg:
287		if err := m.readUserMsg(bufConn, dec); err != nil {
288			m.logger.Printf("[ERR] memberlist: Failed to receive user message: %s %s", err, LogConn(conn))
289		}
290	case pushPullMsg:
291		// Increment counter of pending push/pulls
292		numConcurrent := atomic.AddUint32(&m.pushPullReq, 1)
293		defer atomic.AddUint32(&m.pushPullReq, ^uint32(0))
294
295		// Check if we have too many open push/pull requests
296		if numConcurrent >= maxPushPullRequests {
297			m.logger.Printf("[ERR] memberlist: Too many pending push/pull requests")
298			return
299		}
300
301		join, remoteNodes, userState, err := m.readRemoteState(bufConn, dec)
302		if err != nil {
303			m.logger.Printf("[ERR] memberlist: Failed to read remote state: %s %s", err, LogConn(conn))
304			return
305		}
306
307		if err := m.sendLocalState(conn, join, streamLabel); err != nil {
308			m.logger.Printf("[ERR] memberlist: Failed to push local state: %s %s", err, LogConn(conn))
309			return
310		}
311
312		if err := m.mergeRemoteState(join, remoteNodes, userState); err != nil {
313			m.logger.Printf("[ERR] memberlist: Failed push/pull merge: %s %s", err, LogConn(conn))
314			return
315		}
316	case pingMsg:
317		var p ping
318		if err := dec.Decode(&p); err != nil {
319			m.logger.Printf("[ERR] memberlist: Failed to decode ping: %s %s", err, LogConn(conn))
320			return
321		}
322
323		if p.Node != "" && p.Node != m.config.Name {
324			m.logger.Printf("[WARN] memberlist: Got ping for unexpected node %s %s", p.Node, LogConn(conn))
325			return
326		}
327
328		ack := ackResp{p.SeqNo, nil}
329		out, err := encode(ackRespMsg, &ack)
330		if err != nil {
331			m.logger.Printf("[ERR] memberlist: Failed to encode ack: %s", err)
332			return
333		}
334
335		err = m.rawSendMsgStream(conn, out.Bytes(), streamLabel)
336		if err != nil {
337			m.logger.Printf("[ERR] memberlist: Failed to send ack: %s %s", err, LogConn(conn))
338			return
339		}
340	default:
341		m.logger.Printf("[ERR] memberlist: Received invalid msgType (%d) %s", msgType, LogConn(conn))
342	}
343}
344
345// packetListen is a long running goroutine that pulls packets out of the
346// transport and hands them off for processing.
347func (m *Memberlist) packetListen() {
348	for {
349		select {
350		case packet := <-m.transport.PacketCh():
351			m.ingestPacket(packet.Buf, packet.From, packet.Timestamp)
352
353		case <-m.shutdownCh:
354			return
355		}
356	}
357}
358
359func (m *Memberlist) ingestPacket(buf []byte, from net.Addr, timestamp time.Time) {
360	var (
361		packetLabel string
362		err         error
363	)
364	buf, packetLabel, err = RemoveLabelHeaderFromPacket(buf)
365	if err != nil {
366		m.logger.Printf("[ERR] memberlist: %v %s", err, LogAddress(from))
367		return
368	}
369
370	if m.config.SkipInboundLabelCheck {
371		if packetLabel != "" {
372			m.logger.Printf("[ERR] memberlist: unexpected double packet label header: %s", LogAddress(from))
373			return
374		}
375		// Set this from config so that the auth data assertions work below.
376		packetLabel = m.config.Label
377	}
378
379	if m.config.Label != packetLabel {
380		m.logger.Printf("[ERR] memberlist: discarding packet with unacceptable label %q: %s", packetLabel, LogAddress(from))
381		return
382	}
383
384	// Check if encryption is enabled
385	if m.config.EncryptionEnabled() {
386		// Decrypt the payload
387		authData := []byte(packetLabel)
388		plain, err := decryptPayload(m.config.Keyring.GetKeys(), buf, authData)
389		if err != nil {
390			if !m.config.GossipVerifyIncoming {
391				// Treat the message as plaintext
392				plain = buf
393			} else {
394				m.logger.Printf("[ERR] memberlist: Decrypt packet failed: %v %s", err, LogAddress(from))
395				return
396			}
397		}
398
399		// Continue processing the plaintext buffer
400		buf = plain
401	}
402
403	// See if there's a checksum included to verify the contents of the message
404	if len(buf) >= 5 && messageType(buf[0]) == hasCrcMsg {
405		crc := crc32.ChecksumIEEE(buf[5:])
406		expected := binary.BigEndian.Uint32(buf[1:5])
407		if crc != expected {
408			m.logger.Printf("[WARN] memberlist: Got invalid checksum for UDP packet: %x, %x", crc, expected)
409			return
410		}
411		m.handleCommand(buf[5:], from, timestamp)
412	} else {
413		m.handleCommand(buf, from, timestamp)
414	}
415}
416
417func (m *Memberlist) handleCommand(buf []byte, from net.Addr, timestamp time.Time) {
418	if len(buf) < 1 {
419		m.logger.Printf("[ERR] memberlist: missing message type byte %s", LogAddress(from))
420		return
421	}
422	// Decode the message type
423	msgType := messageType(buf[0])
424	buf = buf[1:]
425
426	// Switch on the msgType
427	switch msgType {
428	case compoundMsg:
429		m.handleCompound(buf, from, timestamp)
430	case compressMsg:
431		m.handleCompressed(buf, from, timestamp)
432
433	case pingMsg:
434		m.handlePing(buf, from)
435	case indirectPingMsg:
436		m.handleIndirectPing(buf, from)
437	case ackRespMsg:
438		m.handleAck(buf, from, timestamp)
439	case nackRespMsg:
440		m.handleNack(buf, from)
441
442	case suspectMsg:
443		fallthrough
444	case aliveMsg:
445		fallthrough
446	case deadMsg:
447		fallthrough
448	case userMsg:
449		// Determine the message queue, prioritize alive
450		queue := m.lowPriorityMsgQueue
451		if msgType == aliveMsg {
452			queue = m.highPriorityMsgQueue
453		}
454
455		// Check for overflow and append if not full
456		m.msgQueueLock.Lock()
457		if queue.Len() >= m.config.HandoffQueueDepth {
458			m.logger.Printf("[WARN] memberlist: handler queue full, dropping message (%d) %s", msgType, LogAddress(from))
459		} else {
460			queue.PushBack(msgHandoff{msgType, buf, from})
461		}
462		m.msgQueueLock.Unlock()
463
464		// Notify of pending message
465		select {
466		case m.handoffCh <- struct{}{}:
467		default:
468		}
469
470	default:
471		m.logger.Printf("[ERR] memberlist: msg type (%d) not supported %s", msgType, LogAddress(from))
472	}
473}
474
475// getNextMessage returns the next message to process in priority order, using LIFO
476func (m *Memberlist) getNextMessage() (msgHandoff, bool) {
477	m.msgQueueLock.Lock()
478	defer m.msgQueueLock.Unlock()
479
480	if el := m.highPriorityMsgQueue.Back(); el != nil {
481		m.highPriorityMsgQueue.Remove(el)
482		msg := el.Value.(msgHandoff)
483		return msg, true
484	} else if el := m.lowPriorityMsgQueue.Back(); el != nil {
485		m.lowPriorityMsgQueue.Remove(el)
486		msg := el.Value.(msgHandoff)
487		return msg, true
488	}
489	return msgHandoff{}, false
490}
491
492// packetHandler is a long running goroutine that processes messages received
493// over the packet interface, but is decoupled from the listener to avoid
494// blocking the listener which may cause ping/ack messages to be delayed.
495func (m *Memberlist) packetHandler() {
496	for {
497		select {
498		case <-m.handoffCh:
499			for {
500				msg, ok := m.getNextMessage()
501				if !ok {
502					break
503				}
504				msgType := msg.msgType
505				buf := msg.buf
506				from := msg.from
507
508				switch msgType {
509				case suspectMsg:
510					m.handleSuspect(buf, from)
511				case aliveMsg:
512					m.handleAlive(buf, from)
513				case deadMsg:
514					m.handleDead(buf, from)
515				case userMsg:
516					m.handleUser(buf, from)
517				default:
518					m.logger.Printf("[ERR] memberlist: Message type (%d) not supported %s (packet handler)", msgType, LogAddress(from))
519				}
520			}
521
522		case <-m.shutdownCh:
523			return
524		}
525	}
526}
527
528func (m *Memberlist) handleCompound(buf []byte, from net.Addr, timestamp time.Time) {
529	// Decode the parts
530	trunc, parts, err := decodeCompoundMessage(buf)
531	if err != nil {
532		m.logger.Printf("[ERR] memberlist: Failed to decode compound request: %s %s", err, LogAddress(from))
533		return
534	}
535
536	// Log any truncation
537	if trunc > 0 {
538		m.logger.Printf("[WARN] memberlist: Compound request had %d truncated messages %s", trunc, LogAddress(from))
539	}
540
541	// Handle each message
542	for _, part := range parts {
543		m.handleCommand(part, from, timestamp)
544	}
545}
546
547func (m *Memberlist) handlePing(buf []byte, from net.Addr) {
548	var p ping
549	if err := decode(buf, &p); err != nil {
550		m.logger.Printf("[ERR] memberlist: Failed to decode ping request: %s %s", err, LogAddress(from))
551		return
552	}
553	// If node is provided, verify that it is for us
554	if p.Node != "" && p.Node != m.config.Name {
555		m.logger.Printf("[WARN] memberlist: Got ping for unexpected node '%s' %s", p.Node, LogAddress(from))
556		return
557	}
558	var ack ackResp
559	ack.SeqNo = p.SeqNo
560	if m.config.Ping != nil {
561		ack.Payload = m.config.Ping.AckPayload()
562	}
563
564	addr := ""
565	if len(p.SourceAddr) > 0 && p.SourcePort > 0 {
566		addr = joinHostPort(net.IP(p.SourceAddr).String(), p.SourcePort)
567	} else {
568		addr = from.String()
569	}
570
571	a := Address{
572		Addr: addr,
573		Name: p.SourceNode,
574	}
575	if err := m.encodeAndSendMsg(a, ackRespMsg, &ack); err != nil {
576		m.logger.Printf("[ERR] memberlist: Failed to send ack: %s %s", err, LogAddress(from))
577	}
578}
579
580func (m *Memberlist) handleIndirectPing(buf []byte, from net.Addr) {
581	var ind indirectPingReq
582	if err := decode(buf, &ind); err != nil {
583		m.logger.Printf("[ERR] memberlist: Failed to decode indirect ping request: %s %s", err, LogAddress(from))
584		return
585	}
586
587	// For proto versions < 2, there is no port provided. Mask old
588	// behavior by using the configured port.
589	if m.ProtocolVersion() < 2 || ind.Port == 0 {
590		ind.Port = uint16(m.config.BindPort)
591	}
592
593	// Send a ping to the correct host.
594	localSeqNo := m.nextSeqNo()
595	selfAddr, selfPort := m.getAdvertise()
596	ping := ping{
597		SeqNo: localSeqNo,
598		Node:  ind.Node,
599		// The outbound message is addressed FROM us.
600		SourceAddr: selfAddr,
601		SourcePort: selfPort,
602		SourceNode: m.config.Name,
603	}
604
605	// Forward the ack back to the requestor. If the request encodes an origin
606	// use that otherwise assume that the other end of the UDP socket is
607	// usable.
608	indAddr := ""
609	if len(ind.SourceAddr) > 0 && ind.SourcePort > 0 {
610		indAddr = joinHostPort(net.IP(ind.SourceAddr).String(), ind.SourcePort)
611	} else {
612		indAddr = from.String()
613	}
614
615	// Setup a response handler to relay the ack
616	cancelCh := make(chan struct{})
617	respHandler := func(payload []byte, timestamp time.Time) {
618		// Try to prevent the nack if we've caught it in time.
619		close(cancelCh)
620
621		ack := ackResp{ind.SeqNo, nil}
622		a := Address{
623			Addr: indAddr,
624			Name: ind.SourceNode,
625		}
626		if err := m.encodeAndSendMsg(a, ackRespMsg, &ack); err != nil {
627			m.logger.Printf("[ERR] memberlist: Failed to forward ack: %s %s", err, LogStringAddress(indAddr))
628		}
629	}
630	m.setAckHandler(localSeqNo, respHandler, m.config.ProbeTimeout)
631
632	// Send the ping.
633	addr := joinHostPort(net.IP(ind.Target).String(), ind.Port)
634	a := Address{
635		Addr: addr,
636		Name: ind.Node,
637	}
638	if err := m.encodeAndSendMsg(a, pingMsg, &ping); err != nil {
639		m.logger.Printf("[ERR] memberlist: Failed to send indirect ping: %s %s", err, LogStringAddress(indAddr))
640	}
641
642	// Setup a timer to fire off a nack if no ack is seen in time.
643	if ind.Nack {
644		go func() {
645			select {
646			case <-cancelCh:
647				return
648			case <-time.After(m.config.ProbeTimeout):
649				nack := nackResp{ind.SeqNo}
650				a := Address{
651					Addr: indAddr,
652					Name: ind.SourceNode,
653				}
654				if err := m.encodeAndSendMsg(a, nackRespMsg, &nack); err != nil {
655					m.logger.Printf("[ERR] memberlist: Failed to send nack: %s %s", err, LogStringAddress(indAddr))
656				}
657			}
658		}()
659	}
660}
661
662func (m *Memberlist) handleAck(buf []byte, from net.Addr, timestamp time.Time) {
663	var ack ackResp
664	if err := decode(buf, &ack); err != nil {
665		m.logger.Printf("[ERR] memberlist: Failed to decode ack response: %s %s", err, LogAddress(from))
666		return
667	}
668	m.invokeAckHandler(ack, timestamp)
669}
670
671func (m *Memberlist) handleNack(buf []byte, from net.Addr) {
672	var nack nackResp
673	if err := decode(buf, &nack); err != nil {
674		m.logger.Printf("[ERR] memberlist: Failed to decode nack response: %s %s", err, LogAddress(from))
675		return
676	}
677	m.invokeNackHandler(nack)
678}
679
680func (m *Memberlist) handleSuspect(buf []byte, from net.Addr) {
681	var sus suspect
682	if err := decode(buf, &sus); err != nil {
683		m.logger.Printf("[ERR] memberlist: Failed to decode suspect message: %s %s", err, LogAddress(from))
684		return
685	}
686	m.suspectNode(&sus)
687}
688
689// ensureCanConnect return the IP from a RemoteAddress
690// return error if this client must not connect
691func (m *Memberlist) ensureCanConnect(from net.Addr) error {
692	if !m.config.IPMustBeChecked() {
693		return nil
694	}
695	source := from.String()
696	if source == "pipe" {
697		return nil
698	}
699	host, _, err := net.SplitHostPort(source)
700	if err != nil {
701		return err
702	}
703
704	ip := net.ParseIP(host)
705	if ip == nil {
706		return fmt.Errorf("Cannot parse IP from %s", host)
707	}
708	return m.config.IPAllowed(ip)
709}
710
711func (m *Memberlist) handleAlive(buf []byte, from net.Addr) {
712	if err := m.ensureCanConnect(from); err != nil {
713		m.logger.Printf("[DEBUG] memberlist: Blocked alive message: %s %s", err, LogAddress(from))
714		return
715	}
716	var live alive
717	if err := decode(buf, &live); err != nil {
718		m.logger.Printf("[ERR] memberlist: Failed to decode alive message: %s %s", err, LogAddress(from))
719		return
720	}
721	if m.config.IPMustBeChecked() {
722		innerIP := net.IP(live.Addr)
723		if innerIP != nil {
724			if err := m.config.IPAllowed(innerIP); err != nil {
725				m.logger.Printf("[DEBUG] memberlist: Blocked alive.Addr=%s message from: %s %s", innerIP.String(), err, LogAddress(from))
726				return
727			}
728		}
729	}
730
731	// For proto versions < 2, there is no port provided. Mask old
732	// behavior by using the configured port
733	if m.ProtocolVersion() < 2 || live.Port == 0 {
734		live.Port = uint16(m.config.BindPort)
735	}
736
737	m.aliveNode(&live, nil, false)
738}
739
740func (m *Memberlist) handleDead(buf []byte, from net.Addr) {
741	var d dead
742	if err := decode(buf, &d); err != nil {
743		m.logger.Printf("[ERR] memberlist: Failed to decode dead message: %s %s", err, LogAddress(from))
744		return
745	}
746	m.deadNode(&d)
747}
748
749// handleUser is used to notify channels of incoming user data
750func (m *Memberlist) handleUser(buf []byte, from net.Addr) {
751	d := m.config.Delegate
752	if d != nil {
753		d.NotifyMsg(buf)
754	}
755}
756
757// handleCompressed is used to unpack a compressed message
758func (m *Memberlist) handleCompressed(buf []byte, from net.Addr, timestamp time.Time) {
759	// Try to decode the payload
760	payload, err := decompressPayload(buf)
761	if err != nil {
762		m.logger.Printf("[ERR] memberlist: Failed to decompress payload: %v %s", err, LogAddress(from))
763		return
764	}
765
766	// Recursively handle the payload
767	m.handleCommand(payload, from, timestamp)
768}
769
770// encodeAndSendMsg is used to combine the encoding and sending steps
771func (m *Memberlist) encodeAndSendMsg(a Address, msgType messageType, msg interface{}) error {
772	out, err := encode(msgType, msg)
773	if err != nil {
774		return err
775	}
776	if err := m.sendMsg(a, out.Bytes()); err != nil {
777		return err
778	}
779	return nil
780}
781
782// sendMsg is used to send a message via packet to another host. It will
783// opportunistically create a compoundMsg and piggy back other broadcasts.
784func (m *Memberlist) sendMsg(a Address, msg []byte) error {
785	// Check if we can piggy back any messages
786	bytesAvail := m.config.UDPBufferSize - len(msg) - compoundHeaderOverhead - labelOverhead(m.config.Label)
787	if m.config.EncryptionEnabled() && m.config.GossipVerifyOutgoing {
788		bytesAvail -= encryptOverhead(m.encryptionVersion())
789	}
790	extra := m.getBroadcasts(compoundOverhead, bytesAvail)
791
792	// Fast path if nothing to piggypack
793	if len(extra) == 0 {
794		return m.rawSendMsgPacket(a, nil, msg)
795	}
796
797	// Join all the messages
798	msgs := make([][]byte, 0, 1+len(extra))
799	msgs = append(msgs, msg)
800	msgs = append(msgs, extra...)
801
802	// Create a compound message
803	compound := makeCompoundMessage(msgs)
804
805	// Send the message
806	return m.rawSendMsgPacket(a, nil, compound.Bytes())
807}
808
809// rawSendMsgPacket is used to send message via packet to another host without
810// modification, other than compression or encryption if enabled.
811func (m *Memberlist) rawSendMsgPacket(a Address, node *Node, msg []byte) error {
812	if a.Name == "" && m.config.RequireNodeNames {
813		return errNodeNamesAreRequired
814	}
815
816	// Check if we have compression enabled
817	if m.config.EnableCompression {
818		buf, err := compressPayload(msg)
819		if err != nil {
820			m.logger.Printf("[WARN] memberlist: Failed to compress payload: %v", err)
821		} else {
822			// Only use compression if it reduced the size
823			if buf.Len() < len(msg) {
824				msg = buf.Bytes()
825			}
826		}
827	}
828
829	// Try to look up the destination node. Note this will only work if the
830	// bare ip address is used as the node name, which is not guaranteed.
831	if node == nil {
832		toAddr, _, err := net.SplitHostPort(a.Addr)
833		if err != nil {
834			m.logger.Printf("[ERR] memberlist: Failed to parse address %q: %v", a.Addr, err)
835			return err
836		}
837		m.nodeLock.RLock()
838		nodeState, ok := m.nodeMap[toAddr]
839		m.nodeLock.RUnlock()
840		if ok {
841			node = &nodeState.Node
842		}
843	}
844
845	// Add a CRC to the end of the payload if the recipient understands
846	// ProtocolVersion >= 5
847	if node != nil && node.PMax >= 5 {
848		crc := crc32.ChecksumIEEE(msg)
849		header := make([]byte, 5, 5+len(msg))
850		header[0] = byte(hasCrcMsg)
851		binary.BigEndian.PutUint32(header[1:], crc)
852		msg = append(header, msg...)
853	}
854
855	// Check if we have encryption enabled
856	if m.config.EncryptionEnabled() && m.config.GossipVerifyOutgoing {
857		// Encrypt the payload
858		var (
859			primaryKey  = m.config.Keyring.GetPrimaryKey()
860			packetLabel = []byte(m.config.Label)
861			buf         bytes.Buffer
862		)
863		err := encryptPayload(m.encryptionVersion(), primaryKey, msg, packetLabel, &buf)
864		if err != nil {
865			m.logger.Printf("[ERR] memberlist: Encryption of message failed: %v", err)
866			return err
867		}
868		msg = buf.Bytes()
869	}
870
871	metrics.IncrCounter([]string{"memberlist", "udp", "sent"}, float32(len(msg)))
872	_, err := m.transport.WriteToAddress(msg, a)
873	return err
874}
875
876// rawSendMsgStream is used to stream a message to another host without
877// modification, other than applying compression and encryption if enabled.
878func (m *Memberlist) rawSendMsgStream(conn net.Conn, sendBuf []byte, streamLabel string) error {
879	// Check if compression is enabled
880	if m.config.EnableCompression {
881		compBuf, err := compressPayload(sendBuf)
882		if err != nil {
883			m.logger.Printf("[ERROR] memberlist: Failed to compress payload: %v", err)
884		} else {
885			sendBuf = compBuf.Bytes()
886		}
887	}
888
889	// Check if encryption is enabled
890	if m.config.EncryptionEnabled() && m.config.GossipVerifyOutgoing {
891		crypt, err := m.encryptLocalState(sendBuf, streamLabel)
892		if err != nil {
893			m.logger.Printf("[ERROR] memberlist: Failed to encrypt local state: %v", err)
894			return err
895		}
896		sendBuf = crypt
897	}
898
899	// Write out the entire send buffer
900	metrics.IncrCounter([]string{"memberlist", "tcp", "sent"}, float32(len(sendBuf)))
901
902	if n, err := conn.Write(sendBuf); err != nil {
903		return err
904	} else if n != len(sendBuf) {
905		return fmt.Errorf("only %d of %d bytes written", n, len(sendBuf))
906	}
907
908	return nil
909}
910
911// sendUserMsg is used to stream a user message to another host.
912func (m *Memberlist) sendUserMsg(a Address, sendBuf []byte) error {
913	if a.Name == "" && m.config.RequireNodeNames {
914		return errNodeNamesAreRequired
915	}
916
917	conn, err := m.transport.DialAddressTimeout(a, m.config.TCPTimeout)
918	if err != nil {
919		return err
920	}
921	defer conn.Close()
922
923	bufConn := bytes.NewBuffer(nil)
924	if err := bufConn.WriteByte(byte(userMsg)); err != nil {
925		return err
926	}
927
928	header := userMsgHeader{UserMsgLen: len(sendBuf)}
929	hd := codec.MsgpackHandle{}
930	enc := codec.NewEncoder(bufConn, &hd)
931	if err := enc.Encode(&header); err != nil {
932		return err
933	}
934	if _, err := bufConn.Write(sendBuf); err != nil {
935		return err
936	}
937
938	return m.rawSendMsgStream(conn, bufConn.Bytes(), m.config.Label)
939}
940
941// sendAndReceiveState is used to initiate a push/pull over a stream with a
942// remote host.
943func (m *Memberlist) sendAndReceiveState(a Address, join bool) ([]pushNodeState, []byte, error) {
944	if a.Name == "" && m.config.RequireNodeNames {
945		return nil, nil, errNodeNamesAreRequired
946	}
947
948	// Attempt to connect
949	conn, err := m.transport.DialAddressTimeout(a, m.config.TCPTimeout)
950	if err != nil {
951		return nil, nil, err
952	}
953	defer conn.Close()
954	m.logger.Printf("[DEBUG] memberlist: Initiating push/pull sync with: %s %s", a.Name, conn.RemoteAddr())
955	metrics.IncrCounter([]string{"memberlist", "tcp", "connect"}, 1)
956
957	// Send our state
958	if err := m.sendLocalState(conn, join, m.config.Label); err != nil {
959		return nil, nil, err
960	}
961
962	conn.SetDeadline(time.Now().Add(m.config.TCPTimeout))
963	msgType, bufConn, dec, err := m.readStream(conn, m.config.Label)
964	if err != nil {
965		return nil, nil, err
966	}
967
968	if msgType == errMsg {
969		var resp errResp
970		if err := dec.Decode(&resp); err != nil {
971			return nil, nil, err
972		}
973		return nil, nil, fmt.Errorf("remote error: %v", resp.Error)
974	}
975
976	// Quit if not push/pull
977	if msgType != pushPullMsg {
978		err := fmt.Errorf("received invalid msgType (%d), expected pushPullMsg (%d) %s", msgType, pushPullMsg, LogConn(conn))
979		return nil, nil, err
980	}
981
982	// Read remote state
983	_, remoteNodes, userState, err := m.readRemoteState(bufConn, dec)
984	return remoteNodes, userState, err
985}
986
987// sendLocalState is invoked to send our local state over a stream connection.
988func (m *Memberlist) sendLocalState(conn net.Conn, join bool, streamLabel string) error {
989	// Setup a deadline
990	conn.SetDeadline(time.Now().Add(m.config.TCPTimeout))
991
992	// Prepare the local node state
993	m.nodeLock.RLock()
994	localNodes := make([]pushNodeState, len(m.nodes))
995	for idx, n := range m.nodes {
996		localNodes[idx].Name = n.Name
997		localNodes[idx].Addr = n.Addr
998		localNodes[idx].Port = n.Port
999		localNodes[idx].Incarnation = n.Incarnation
1000		localNodes[idx].State = n.State
1001		localNodes[idx].Meta = n.Meta
1002		localNodes[idx].Vsn = []uint8{
1003			n.PMin, n.PMax, n.PCur,
1004			n.DMin, n.DMax, n.DCur,
1005		}
1006	}
1007	m.nodeLock.RUnlock()
1008
1009	// Get the delegate state
1010	var userData []byte
1011	if m.config.Delegate != nil {
1012		userData = m.config.Delegate.LocalState(join)
1013	}
1014
1015	// Create a bytes buffer writer
1016	bufConn := bytes.NewBuffer(nil)
1017
1018	// Send our node state
1019	header := pushPullHeader{Nodes: len(localNodes), UserStateLen: len(userData), Join: join}
1020	hd := codec.MsgpackHandle{}
1021	enc := codec.NewEncoder(bufConn, &hd)
1022
1023	// Begin state push
1024	if _, err := bufConn.Write([]byte{byte(pushPullMsg)}); err != nil {
1025		return err
1026	}
1027
1028	if err := enc.Encode(&header); err != nil {
1029		return err
1030	}
1031	for i := 0; i < header.Nodes; i++ {
1032		if err := enc.Encode(&localNodes[i]); err != nil {
1033			return err
1034		}
1035	}
1036
1037	// Write the user state as well
1038	if userData != nil {
1039		if _, err := bufConn.Write(userData); err != nil {
1040			return err
1041		}
1042	}
1043
1044	// Get the send buffer
1045	return m.rawSendMsgStream(conn, bufConn.Bytes(), streamLabel)
1046}
1047
1048// encryptLocalState is used to help encrypt local state before sending
1049func (m *Memberlist) encryptLocalState(sendBuf []byte, streamLabel string) ([]byte, error) {
1050	var buf bytes.Buffer
1051
1052	// Write the encryptMsg byte
1053	buf.WriteByte(byte(encryptMsg))
1054
1055	// Write the size of the message
1056	sizeBuf := make([]byte, 4)
1057	encVsn := m.encryptionVersion()
1058	encLen := encryptedLength(encVsn, len(sendBuf))
1059	binary.BigEndian.PutUint32(sizeBuf, uint32(encLen))
1060	buf.Write(sizeBuf)
1061
1062	// Authenticated Data is:
1063	//
1064	//   [messageType; byte] [messageLength; uint32] [stream_label; optional]
1065	//
1066	dataBytes := appendBytes(buf.Bytes()[:5], []byte(streamLabel))
1067
1068	// Write the encrypted cipher text to the buffer
1069	key := m.config.Keyring.GetPrimaryKey()
1070	err := encryptPayload(encVsn, key, sendBuf, dataBytes, &buf)
1071	if err != nil {
1072		return nil, err
1073	}
1074	return buf.Bytes(), nil
1075}
1076
1077// decryptRemoteState is used to help decrypt the remote state
1078func (m *Memberlist) decryptRemoteState(bufConn io.Reader, streamLabel string) ([]byte, error) {
1079	// Read in enough to determine message length
1080	cipherText := bytes.NewBuffer(nil)
1081	cipherText.WriteByte(byte(encryptMsg))
1082	_, err := io.CopyN(cipherText, bufConn, 4)
1083	if err != nil {
1084		return nil, err
1085	}
1086
1087	// Ensure we aren't asked to download too much. This is to guard against
1088	// an attack vector where a huge amount of state is sent
1089	moreBytes := binary.BigEndian.Uint32(cipherText.Bytes()[1:5])
1090	if moreBytes > maxPushStateBytes {
1091		return nil, fmt.Errorf("Remote node state is larger than limit (%d)", moreBytes)
1092	}
1093
1094	// Read in the rest of the payload
1095	_, err = io.CopyN(cipherText, bufConn, int64(moreBytes))
1096	if err != nil {
1097		return nil, err
1098	}
1099
1100	// Decrypt the cipherText with some authenticated data
1101	//
1102	// Authenticated Data is:
1103	//
1104	//   [messageType; byte] [messageLength; uint32] [label_data; optional]
1105	//
1106	dataBytes := appendBytes(cipherText.Bytes()[:5], []byte(streamLabel))
1107	cipherBytes := cipherText.Bytes()[5:]
1108
1109	// Decrypt the payload
1110	keys := m.config.Keyring.GetKeys()
1111	return decryptPayload(keys, cipherBytes, dataBytes)
1112}
1113
1114// readStream is used to read messages from a stream connection, decrypting and
1115// decompressing the stream if necessary.
1116//
1117// The provided streamLabel if present will be authenticated during decryption
1118// of each message.
1119func (m *Memberlist) readStream(conn net.Conn, streamLabel string) (messageType, io.Reader, *codec.Decoder, error) {
1120	// Created a buffered reader
1121	var bufConn io.Reader = bufio.NewReader(conn)
1122
1123	// Read the message type
1124	buf := [1]byte{0}
1125	if _, err := io.ReadFull(bufConn, buf[:]); err != nil {
1126		return 0, nil, nil, err
1127	}
1128	msgType := messageType(buf[0])
1129
1130	// Check if the message is encrypted
1131	if msgType == encryptMsg {
1132		if !m.config.EncryptionEnabled() {
1133			return 0, nil, nil,
1134				fmt.Errorf("Remote state is encrypted and encryption is not configured")
1135		}
1136
1137		plain, err := m.decryptRemoteState(bufConn, streamLabel)
1138		if err != nil {
1139			return 0, nil, nil, err
1140		}
1141
1142		// Reset message type and bufConn
1143		msgType = messageType(plain[0])
1144		bufConn = bytes.NewReader(plain[1:])
1145	} else if m.config.EncryptionEnabled() && m.config.GossipVerifyIncoming {
1146		return 0, nil, nil,
1147			fmt.Errorf("Encryption is configured but remote state is not encrypted")
1148	}
1149
1150	// Get the msgPack decoders
1151	hd := codec.MsgpackHandle{}
1152	dec := codec.NewDecoder(bufConn, &hd)
1153
1154	// Check if we have a compressed message
1155	if msgType == compressMsg {
1156		var c compress
1157		if err := dec.Decode(&c); err != nil {
1158			return 0, nil, nil, err
1159		}
1160		decomp, err := decompressBuffer(&c)
1161		if err != nil {
1162			return 0, nil, nil, err
1163		}
1164
1165		// Reset the message type
1166		msgType = messageType(decomp[0])
1167
1168		// Create a new bufConn
1169		bufConn = bytes.NewReader(decomp[1:])
1170
1171		// Create a new decoder
1172		dec = codec.NewDecoder(bufConn, &hd)
1173	}
1174
1175	return msgType, bufConn, dec, nil
1176}
1177
1178// readRemoteState is used to read the remote state from a connection
1179func (m *Memberlist) readRemoteState(bufConn io.Reader, dec *codec.Decoder) (bool, []pushNodeState, []byte, error) {
1180	// Read the push/pull header
1181	var header pushPullHeader
1182	if err := dec.Decode(&header); err != nil {
1183		return false, nil, nil, err
1184	}
1185
1186	// Allocate space for the transfer
1187	remoteNodes := make([]pushNodeState, header.Nodes)
1188
1189	// Try to decode all the states
1190	for i := 0; i < header.Nodes; i++ {
1191		if err := dec.Decode(&remoteNodes[i]); err != nil {
1192			return false, nil, nil, err
1193		}
1194	}
1195
1196	// Read the remote user state into a buffer
1197	var userBuf []byte
1198	if header.UserStateLen > 0 {
1199		userBuf = make([]byte, header.UserStateLen)
1200		bytes, err := io.ReadAtLeast(bufConn, userBuf, header.UserStateLen)
1201		if err == nil && bytes != header.UserStateLen {
1202			err = fmt.Errorf(
1203				"Failed to read full user state (%d / %d)",
1204				bytes, header.UserStateLen)
1205		}
1206		if err != nil {
1207			return false, nil, nil, err
1208		}
1209	}
1210
1211	// For proto versions < 2, there is no port provided. Mask old
1212	// behavior by using the configured port
1213	for idx := range remoteNodes {
1214		if m.ProtocolVersion() < 2 || remoteNodes[idx].Port == 0 {
1215			remoteNodes[idx].Port = uint16(m.config.BindPort)
1216		}
1217	}
1218
1219	return header.Join, remoteNodes, userBuf, nil
1220}
1221
1222// mergeRemoteState is used to merge the remote state with our local state
1223func (m *Memberlist) mergeRemoteState(join bool, remoteNodes []pushNodeState, userBuf []byte) error {
1224	if err := m.verifyProtocol(remoteNodes); err != nil {
1225		return err
1226	}
1227
1228	// Invoke the merge delegate if any
1229	if join && m.config.Merge != nil {
1230		nodes := make([]*Node, len(remoteNodes))
1231		for idx, n := range remoteNodes {
1232			nodes[idx] = &Node{
1233				Name:  n.Name,
1234				Addr:  n.Addr,
1235				Port:  n.Port,
1236				Meta:  n.Meta,
1237				State: n.State,
1238				PMin:  n.Vsn[0],
1239				PMax:  n.Vsn[1],
1240				PCur:  n.Vsn[2],
1241				DMin:  n.Vsn[3],
1242				DMax:  n.Vsn[4],
1243				DCur:  n.Vsn[5],
1244			}
1245		}
1246		if err := m.config.Merge.NotifyMerge(nodes); err != nil {
1247			return err
1248		}
1249	}
1250
1251	// Merge the membership state
1252	m.mergeState(remoteNodes)
1253
1254	// Invoke the delegate for user state
1255	if userBuf != nil && m.config.Delegate != nil {
1256		m.config.Delegate.MergeRemoteState(userBuf, join)
1257	}
1258	return nil
1259}
1260
1261// readUserMsg is used to decode a userMsg from a stream.
1262func (m *Memberlist) readUserMsg(bufConn io.Reader, dec *codec.Decoder) error {
1263	// Read the user message header
1264	var header userMsgHeader
1265	if err := dec.Decode(&header); err != nil {
1266		return err
1267	}
1268
1269	// Read the user message into a buffer
1270	var userBuf []byte
1271	if header.UserMsgLen > 0 {
1272		userBuf = make([]byte, header.UserMsgLen)
1273		bytes, err := io.ReadAtLeast(bufConn, userBuf, header.UserMsgLen)
1274		if err == nil && bytes != header.UserMsgLen {
1275			err = fmt.Errorf(
1276				"Failed to read full user message (%d / %d)",
1277				bytes, header.UserMsgLen)
1278		}
1279		if err != nil {
1280			return err
1281		}
1282
1283		d := m.config.Delegate
1284		if d != nil {
1285			d.NotifyMsg(userBuf)
1286		}
1287	}
1288
1289	return nil
1290}
1291
1292// sendPingAndWaitForAck makes a stream connection to the given address, sends
1293// a ping, and waits for an ack. All of this is done as a series of blocking
1294// operations, given the deadline. The bool return parameter is true if we
1295// we able to round trip a ping to the other node.
1296func (m *Memberlist) sendPingAndWaitForAck(a Address, ping ping, deadline time.Time) (bool, error) {
1297	if a.Name == "" && m.config.RequireNodeNames {
1298		return false, errNodeNamesAreRequired
1299	}
1300
1301	conn, err := m.transport.DialAddressTimeout(a, deadline.Sub(time.Now()))
1302	if err != nil {
1303		// If the node is actually dead we expect this to fail, so we
1304		// shouldn't spam the logs with it. After this point, errors
1305		// with the connection are real, unexpected errors and should
1306		// get propagated up.
1307		return false, nil
1308	}
1309	defer conn.Close()
1310	conn.SetDeadline(deadline)
1311
1312	out, err := encode(pingMsg, &ping)
1313	if err != nil {
1314		return false, err
1315	}
1316
1317	if err = m.rawSendMsgStream(conn, out.Bytes(), m.config.Label); err != nil {
1318		return false, err
1319	}
1320
1321	msgType, _, dec, err := m.readStream(conn, m.config.Label)
1322	if err != nil {
1323		return false, err
1324	}
1325
1326	if msgType != ackRespMsg {
1327		return false, fmt.Errorf("Unexpected msgType (%d) from ping %s", msgType, LogConn(conn))
1328	}
1329
1330	var ack ackResp
1331	if err = dec.Decode(&ack); err != nil {
1332		return false, err
1333	}
1334
1335	if ack.SeqNo != ping.SeqNo {
1336		return false, fmt.Errorf("Sequence number from ack (%d) doesn't match ping (%d)", ack.SeqNo, ping.SeqNo)
1337	}
1338
1339	return true, nil
1340}
1341