1// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package rafthttp
16
17import (
18	"context"
19	"fmt"
20	"io"
21	"io/ioutil"
22	"net/http"
23	"path"
24	"strings"
25	"sync"
26	"time"
27
28	stats "go.etcd.io/etcd/etcdserver/api/v2stats"
29	"go.etcd.io/etcd/pkg/httputil"
30	"go.etcd.io/etcd/pkg/transport"
31	"go.etcd.io/etcd/pkg/types"
32	"go.etcd.io/etcd/raft/raftpb"
33	"go.etcd.io/etcd/version"
34
35	"github.com/coreos/go-semver/semver"
36	"go.uber.org/zap"
37	"golang.org/x/time/rate"
38)
39
40const (
41	streamTypeMessage  streamType = "message"
42	streamTypeMsgAppV2 streamType = "msgappv2"
43
44	streamBufSize = 4096
45)
46
47var (
48	errUnsupportedStreamType = fmt.Errorf("unsupported stream type")
49
50	// the key is in string format "major.minor.patch"
51	supportedStream = map[string][]streamType{
52		"2.0.0": {},
53		"2.1.0": {streamTypeMsgAppV2, streamTypeMessage},
54		"2.2.0": {streamTypeMsgAppV2, streamTypeMessage},
55		"2.3.0": {streamTypeMsgAppV2, streamTypeMessage},
56		"3.0.0": {streamTypeMsgAppV2, streamTypeMessage},
57		"3.1.0": {streamTypeMsgAppV2, streamTypeMessage},
58		"3.2.0": {streamTypeMsgAppV2, streamTypeMessage},
59		"3.3.0": {streamTypeMsgAppV2, streamTypeMessage},
60		"3.4.0": {streamTypeMsgAppV2, streamTypeMessage},
61	}
62)
63
64type streamType string
65
66func (t streamType) endpoint() string {
67	switch t {
68	case streamTypeMsgAppV2:
69		return path.Join(RaftStreamPrefix, "msgapp")
70	case streamTypeMessage:
71		return path.Join(RaftStreamPrefix, "message")
72	default:
73		plog.Panicf("unhandled stream type %v", t)
74		return ""
75	}
76}
77
78func (t streamType) String() string {
79	switch t {
80	case streamTypeMsgAppV2:
81		return "stream MsgApp v2"
82	case streamTypeMessage:
83		return "stream Message"
84	default:
85		return "unknown stream"
86	}
87}
88
89var (
90	// linkHeartbeatMessage is a special message used as heartbeat message in
91	// link layer. It never conflicts with messages from raft because raft
92	// doesn't send out messages without From and To fields.
93	linkHeartbeatMessage = raftpb.Message{Type: raftpb.MsgHeartbeat}
94)
95
96func isLinkHeartbeatMessage(m *raftpb.Message) bool {
97	return m.Type == raftpb.MsgHeartbeat && m.From == 0 && m.To == 0
98}
99
100type outgoingConn struct {
101	t streamType
102	io.Writer
103	http.Flusher
104	io.Closer
105
106	localID types.ID
107	peerID  types.ID
108}
109
110// streamWriter writes messages to the attached outgoingConn.
111type streamWriter struct {
112	lg *zap.Logger
113
114	localID types.ID
115	peerID  types.ID
116
117	status *peerStatus
118	fs     *stats.FollowerStats
119	r      Raft
120
121	mu      sync.Mutex // guard field working and closer
122	closer  io.Closer
123	working bool
124
125	msgc  chan raftpb.Message
126	connc chan *outgoingConn
127	stopc chan struct{}
128	done  chan struct{}
129}
130
131// startStreamWriter creates a streamWrite and starts a long running go-routine that accepts
132// messages and writes to the attached outgoing connection.
133func startStreamWriter(lg *zap.Logger, local, id types.ID, status *peerStatus, fs *stats.FollowerStats, r Raft) *streamWriter {
134	w := &streamWriter{
135		lg: lg,
136
137		localID: local,
138		peerID:  id,
139
140		status: status,
141		fs:     fs,
142		r:      r,
143		msgc:   make(chan raftpb.Message, streamBufSize),
144		connc:  make(chan *outgoingConn),
145		stopc:  make(chan struct{}),
146		done:   make(chan struct{}),
147	}
148	go w.run()
149	return w
150}
151
152func (cw *streamWriter) run() {
153	var (
154		msgc       chan raftpb.Message
155		heartbeatc <-chan time.Time
156		t          streamType
157		enc        encoder
158		flusher    http.Flusher
159		batched    int
160	)
161	tickc := time.NewTicker(ConnReadTimeout / 3)
162	defer tickc.Stop()
163	unflushed := 0
164
165	if cw.lg != nil {
166		cw.lg.Info(
167			"started stream writer with remote peer",
168			zap.String("local-member-id", cw.localID.String()),
169			zap.String("remote-peer-id", cw.peerID.String()),
170		)
171	} else {
172		plog.Infof("started streaming with peer %s (writer)", cw.peerID)
173	}
174
175	for {
176		select {
177		case <-heartbeatc:
178			err := enc.encode(&linkHeartbeatMessage)
179			unflushed += linkHeartbeatMessage.Size()
180			if err == nil {
181				flusher.Flush()
182				batched = 0
183				sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed))
184				unflushed = 0
185				continue
186			}
187
188			cw.status.deactivate(failureType{source: t.String(), action: "heartbeat"}, err.Error())
189
190			sentFailures.WithLabelValues(cw.peerID.String()).Inc()
191			cw.close()
192			if cw.lg != nil {
193				cw.lg.Warn(
194					"lost TCP streaming connection with remote peer",
195					zap.String("stream-writer-type", t.String()),
196					zap.String("local-member-id", cw.localID.String()),
197					zap.String("remote-peer-id", cw.peerID.String()),
198				)
199			} else {
200				plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
201			}
202			heartbeatc, msgc = nil, nil
203
204		case m := <-msgc:
205			err := enc.encode(&m)
206			if err == nil {
207				unflushed += m.Size()
208
209				if len(msgc) == 0 || batched > streamBufSize/2 {
210					flusher.Flush()
211					sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed))
212					unflushed = 0
213					batched = 0
214				} else {
215					batched++
216				}
217
218				continue
219			}
220
221			cw.status.deactivate(failureType{source: t.String(), action: "write"}, err.Error())
222			cw.close()
223			if cw.lg != nil {
224				cw.lg.Warn(
225					"lost TCP streaming connection with remote peer",
226					zap.String("stream-writer-type", t.String()),
227					zap.String("local-member-id", cw.localID.String()),
228					zap.String("remote-peer-id", cw.peerID.String()),
229				)
230			} else {
231				plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
232			}
233			heartbeatc, msgc = nil, nil
234			cw.r.ReportUnreachable(m.To)
235			sentFailures.WithLabelValues(cw.peerID.String()).Inc()
236
237		case conn := <-cw.connc:
238			cw.mu.Lock()
239			closed := cw.closeUnlocked()
240			t = conn.t
241			switch conn.t {
242			case streamTypeMsgAppV2:
243				enc = newMsgAppV2Encoder(conn.Writer, cw.fs)
244			case streamTypeMessage:
245				enc = &messageEncoder{w: conn.Writer}
246			default:
247				plog.Panicf("unhandled stream type %s", conn.t)
248			}
249			if cw.lg != nil {
250				cw.lg.Info(
251					"set message encoder",
252					zap.String("from", conn.localID.String()),
253					zap.String("to", conn.peerID.String()),
254					zap.String("stream-type", t.String()),
255				)
256			}
257			flusher = conn.Flusher
258			unflushed = 0
259			cw.status.activate()
260			cw.closer = conn.Closer
261			cw.working = true
262			cw.mu.Unlock()
263
264			if closed {
265				if cw.lg != nil {
266					cw.lg.Warn(
267						"closed TCP streaming connection with remote peer",
268						zap.String("stream-writer-type", t.String()),
269						zap.String("local-member-id", cw.localID.String()),
270						zap.String("remote-peer-id", cw.peerID.String()),
271					)
272				} else {
273					plog.Warningf("closed an existing TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
274				}
275			}
276			if cw.lg != nil {
277				cw.lg.Warn(
278					"established TCP streaming connection with remote peer",
279					zap.String("stream-writer-type", t.String()),
280					zap.String("local-member-id", cw.localID.String()),
281					zap.String("remote-peer-id", cw.peerID.String()),
282				)
283			} else {
284				plog.Infof("established a TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
285			}
286			heartbeatc, msgc = tickc.C, cw.msgc
287
288		case <-cw.stopc:
289			if cw.close() {
290				if cw.lg != nil {
291					cw.lg.Warn(
292						"closed TCP streaming connection with remote peer",
293						zap.String("stream-writer-type", t.String()),
294						zap.String("remote-peer-id", cw.peerID.String()),
295					)
296				} else {
297					plog.Infof("closed the TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
298				}
299			}
300			if cw.lg != nil {
301				cw.lg.Warn(
302					"stopped TCP streaming connection with remote peer",
303					zap.String("stream-writer-type", t.String()),
304					zap.String("remote-peer-id", cw.peerID.String()),
305				)
306			} else {
307				plog.Infof("stopped streaming with peer %s (writer)", cw.peerID)
308			}
309			close(cw.done)
310			return
311		}
312	}
313}
314
315func (cw *streamWriter) writec() (chan<- raftpb.Message, bool) {
316	cw.mu.Lock()
317	defer cw.mu.Unlock()
318	return cw.msgc, cw.working
319}
320
321func (cw *streamWriter) close() bool {
322	cw.mu.Lock()
323	defer cw.mu.Unlock()
324	return cw.closeUnlocked()
325}
326
327func (cw *streamWriter) closeUnlocked() bool {
328	if !cw.working {
329		return false
330	}
331	if err := cw.closer.Close(); err != nil {
332		if cw.lg != nil {
333			cw.lg.Warn(
334				"failed to close connection with remote peer",
335				zap.String("remote-peer-id", cw.peerID.String()),
336				zap.Error(err),
337			)
338		} else {
339			plog.Errorf("peer %s (writer) connection close error: %v", cw.peerID, err)
340		}
341	}
342	if len(cw.msgc) > 0 {
343		cw.r.ReportUnreachable(uint64(cw.peerID))
344	}
345	cw.msgc = make(chan raftpb.Message, streamBufSize)
346	cw.working = false
347	return true
348}
349
350func (cw *streamWriter) attach(conn *outgoingConn) bool {
351	select {
352	case cw.connc <- conn:
353		return true
354	case <-cw.done:
355		return false
356	}
357}
358
359func (cw *streamWriter) stop() {
360	close(cw.stopc)
361	<-cw.done
362}
363
364// streamReader is a long-running go-routine that dials to the remote stream
365// endpoint and reads messages from the response body returned.
366type streamReader struct {
367	lg *zap.Logger
368
369	peerID types.ID
370	typ    streamType
371
372	tr     *Transport
373	picker *urlPicker
374	status *peerStatus
375	recvc  chan<- raftpb.Message
376	propc  chan<- raftpb.Message
377
378	rl *rate.Limiter // alters the frequency of dial retrial attempts
379
380	errorc chan<- error
381
382	mu     sync.Mutex
383	paused bool
384	closer io.Closer
385
386	ctx    context.Context
387	cancel context.CancelFunc
388	done   chan struct{}
389}
390
391func (cr *streamReader) start() {
392	cr.done = make(chan struct{})
393	if cr.errorc == nil {
394		cr.errorc = cr.tr.ErrorC
395	}
396	if cr.ctx == nil {
397		cr.ctx, cr.cancel = context.WithCancel(context.Background())
398	}
399	go cr.run()
400}
401
402func (cr *streamReader) run() {
403	t := cr.typ
404
405	if cr.lg != nil {
406		cr.lg.Info(
407			"started stream reader with remote peer",
408			zap.String("stream-reader-type", t.String()),
409			zap.String("local-member-id", cr.tr.ID.String()),
410			zap.String("remote-peer-id", cr.peerID.String()),
411		)
412	} else {
413		plog.Infof("started streaming with peer %s (%s reader)", cr.peerID, t)
414	}
415
416	for {
417		rc, err := cr.dial(t)
418		if err != nil {
419			if err != errUnsupportedStreamType {
420				cr.status.deactivate(failureType{source: t.String(), action: "dial"}, err.Error())
421			}
422		} else {
423			cr.status.activate()
424			if cr.lg != nil {
425				cr.lg.Info(
426					"established TCP streaming connection with remote peer",
427					zap.String("stream-reader-type", cr.typ.String()),
428					zap.String("local-member-id", cr.tr.ID.String()),
429					zap.String("remote-peer-id", cr.peerID.String()),
430				)
431			} else {
432				plog.Infof("established a TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ)
433			}
434			err = cr.decodeLoop(rc, t)
435			if cr.lg != nil {
436				cr.lg.Warn(
437					"lost TCP streaming connection with remote peer",
438					zap.String("stream-reader-type", cr.typ.String()),
439					zap.String("local-member-id", cr.tr.ID.String()),
440					zap.String("remote-peer-id", cr.peerID.String()),
441					zap.Error(err),
442				)
443			} else {
444				plog.Warningf("lost the TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ)
445			}
446			switch {
447			// all data is read out
448			case err == io.EOF:
449			// connection is closed by the remote
450			case transport.IsClosedConnError(err):
451			default:
452				cr.status.deactivate(failureType{source: t.String(), action: "read"}, err.Error())
453			}
454		}
455		// Wait for a while before new dial attempt
456		err = cr.rl.Wait(cr.ctx)
457		if cr.ctx.Err() != nil {
458			if cr.lg != nil {
459				cr.lg.Info(
460					"stopped stream reader with remote peer",
461					zap.String("stream-reader-type", t.String()),
462					zap.String("local-member-id", cr.tr.ID.String()),
463					zap.String("remote-peer-id", cr.peerID.String()),
464				)
465			} else {
466				plog.Infof("stopped streaming with peer %s (%s reader)", cr.peerID, t)
467			}
468			close(cr.done)
469			return
470		}
471		if err != nil {
472			if cr.lg != nil {
473				cr.lg.Warn(
474					"rate limit on stream reader with remote peer",
475					zap.String("stream-reader-type", t.String()),
476					zap.String("local-member-id", cr.tr.ID.String()),
477					zap.String("remote-peer-id", cr.peerID.String()),
478					zap.Error(err),
479				)
480			} else {
481				plog.Errorf("streaming with peer %s (%s reader) rate limiter error: %v", cr.peerID, t, err)
482			}
483		}
484	}
485}
486
487func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
488	var dec decoder
489	cr.mu.Lock()
490	switch t {
491	case streamTypeMsgAppV2:
492		dec = newMsgAppV2Decoder(rc, cr.tr.ID, cr.peerID)
493	case streamTypeMessage:
494		dec = &messageDecoder{r: rc}
495	default:
496		if cr.lg != nil {
497			cr.lg.Panic("unknown stream type", zap.String("type", t.String()))
498		} else {
499			plog.Panicf("unhandled stream type %s", t)
500		}
501	}
502	select {
503	case <-cr.ctx.Done():
504		cr.mu.Unlock()
505		if err := rc.Close(); err != nil {
506			return err
507		}
508		return io.EOF
509	default:
510		cr.closer = rc
511	}
512	cr.mu.Unlock()
513
514	// gofail: labelRaftDropHeartbeat:
515	for {
516		m, err := dec.decode()
517		if err != nil {
518			cr.mu.Lock()
519			cr.close()
520			cr.mu.Unlock()
521			return err
522		}
523
524		// gofail-go: var raftDropHeartbeat struct{}
525		// continue labelRaftDropHeartbeat
526		receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size()))
527
528		cr.mu.Lock()
529		paused := cr.paused
530		cr.mu.Unlock()
531
532		if paused {
533			continue
534		}
535
536		if isLinkHeartbeatMessage(&m) {
537			// raft is not interested in link layer
538			// heartbeat message, so we should ignore
539			// it.
540			continue
541		}
542
543		recvc := cr.recvc
544		if m.Type == raftpb.MsgProp {
545			recvc = cr.propc
546		}
547
548		select {
549		case recvc <- m:
550		default:
551			if cr.status.isActive() {
552				if cr.lg != nil {
553					cr.lg.Warn(
554						"dropped internal Raft message since receiving buffer is full (overloaded network)",
555						zap.String("message-type", m.Type.String()),
556						zap.String("local-member-id", cr.tr.ID.String()),
557						zap.String("from", types.ID(m.From).String()),
558						zap.String("remote-peer-id", types.ID(m.To).String()),
559						zap.Bool("remote-peer-active", cr.status.isActive()),
560					)
561				} else {
562					plog.MergeWarningf("dropped internal raft message from %s since receiving buffer is full (overloaded network)", types.ID(m.From))
563				}
564			} else {
565				if cr.lg != nil {
566					cr.lg.Warn(
567						"dropped Raft message since receiving buffer is full (overloaded network)",
568						zap.String("message-type", m.Type.String()),
569						zap.String("local-member-id", cr.tr.ID.String()),
570						zap.String("from", types.ID(m.From).String()),
571						zap.String("remote-peer-id", types.ID(m.To).String()),
572						zap.Bool("remote-peer-active", cr.status.isActive()),
573					)
574				} else {
575					plog.Debugf("dropped %s from %s since receiving buffer is full", m.Type, types.ID(m.From))
576				}
577			}
578			recvFailures.WithLabelValues(types.ID(m.From).String()).Inc()
579		}
580	}
581}
582
583func (cr *streamReader) stop() {
584	cr.mu.Lock()
585	cr.cancel()
586	cr.close()
587	cr.mu.Unlock()
588	<-cr.done
589}
590
591func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
592	u := cr.picker.pick()
593	uu := u
594	uu.Path = path.Join(t.endpoint(), cr.tr.ID.String())
595
596	if cr.lg != nil {
597		cr.lg.Debug(
598			"dial stream reader",
599			zap.String("from", cr.tr.ID.String()),
600			zap.String("to", cr.peerID.String()),
601			zap.String("address", uu.String()),
602		)
603	}
604	req, err := http.NewRequest("GET", uu.String(), nil)
605	if err != nil {
606		cr.picker.unreachable(u)
607		return nil, fmt.Errorf("failed to make http request to %v (%v)", u, err)
608	}
609	req.Header.Set("X-Server-From", cr.tr.ID.String())
610	req.Header.Set("X-Server-Version", version.Version)
611	req.Header.Set("X-Min-Cluster-Version", version.MinClusterVersion)
612	req.Header.Set("X-Etcd-Cluster-ID", cr.tr.ClusterID.String())
613	req.Header.Set("X-Raft-To", cr.peerID.String())
614
615	setPeerURLsHeader(req, cr.tr.URLs)
616
617	req = req.WithContext(cr.ctx)
618
619	cr.mu.Lock()
620	select {
621	case <-cr.ctx.Done():
622		cr.mu.Unlock()
623		return nil, fmt.Errorf("stream reader is stopped")
624	default:
625	}
626	cr.mu.Unlock()
627
628	resp, err := cr.tr.streamRt.RoundTrip(req)
629	if err != nil {
630		cr.picker.unreachable(u)
631		return nil, err
632	}
633
634	rv := serverVersion(resp.Header)
635	lv := semver.Must(semver.NewVersion(version.Version))
636	if compareMajorMinorVersion(rv, lv) == -1 && !checkStreamSupport(rv, t) {
637		httputil.GracefulClose(resp)
638		cr.picker.unreachable(u)
639		return nil, errUnsupportedStreamType
640	}
641
642	switch resp.StatusCode {
643	case http.StatusGone:
644		httputil.GracefulClose(resp)
645		cr.picker.unreachable(u)
646		reportCriticalError(errMemberRemoved, cr.errorc)
647		return nil, errMemberRemoved
648
649	case http.StatusOK:
650		return resp.Body, nil
651
652	case http.StatusNotFound:
653		httputil.GracefulClose(resp)
654		cr.picker.unreachable(u)
655		return nil, fmt.Errorf("peer %s failed to find local node %s", cr.peerID, cr.tr.ID)
656
657	case http.StatusPreconditionFailed:
658		b, err := ioutil.ReadAll(resp.Body)
659		if err != nil {
660			cr.picker.unreachable(u)
661			return nil, err
662		}
663		httputil.GracefulClose(resp)
664		cr.picker.unreachable(u)
665
666		switch strings.TrimSuffix(string(b), "\n") {
667		case errIncompatibleVersion.Error():
668			if cr.lg != nil {
669				cr.lg.Warn(
670					"request sent was ignored by remote peer due to server version incompatibility",
671					zap.String("local-member-id", cr.tr.ID.String()),
672					zap.String("remote-peer-id", cr.peerID.String()),
673					zap.Error(errIncompatibleVersion),
674				)
675			} else {
676				plog.Errorf("request sent was ignored by peer %s (server version incompatible)", cr.peerID)
677			}
678			return nil, errIncompatibleVersion
679
680		case errClusterIDMismatch.Error():
681			if cr.lg != nil {
682				cr.lg.Warn(
683					"request sent was ignored by remote peer due to cluster ID mismatch",
684					zap.String("remote-peer-id", cr.peerID.String()),
685					zap.String("remote-peer-cluster-id", resp.Header.Get("X-Etcd-Cluster-ID")),
686					zap.String("local-member-id", cr.tr.ID.String()),
687					zap.String("local-member-cluster-id", cr.tr.ClusterID.String()),
688					zap.Error(errClusterIDMismatch),
689				)
690			} else {
691				plog.Errorf("request sent was ignored (cluster ID mismatch: peer[%s]=%s, local=%s)",
692					cr.peerID, resp.Header.Get("X-Etcd-Cluster-ID"), cr.tr.ClusterID)
693			}
694			return nil, errClusterIDMismatch
695
696		default:
697			return nil, fmt.Errorf("unhandled error %q when precondition failed", string(b))
698		}
699
700	default:
701		httputil.GracefulClose(resp)
702		cr.picker.unreachable(u)
703		return nil, fmt.Errorf("unhandled http status %d", resp.StatusCode)
704	}
705}
706
707func (cr *streamReader) close() {
708	if cr.closer != nil {
709		if err := cr.closer.Close(); err != nil {
710			if cr.lg != nil {
711				cr.lg.Warn(
712					"failed to close remote peer connection",
713					zap.String("local-member-id", cr.tr.ID.String()),
714					zap.String("remote-peer-id", cr.peerID.String()),
715					zap.Error(err),
716				)
717			} else {
718				plog.Errorf("peer %s (reader) connection close error: %v", cr.peerID, err)
719			}
720		}
721	}
722	cr.closer = nil
723}
724
725func (cr *streamReader) pause() {
726	cr.mu.Lock()
727	defer cr.mu.Unlock()
728	cr.paused = true
729}
730
731func (cr *streamReader) resume() {
732	cr.mu.Lock()
733	defer cr.mu.Unlock()
734	cr.paused = false
735}
736
737// checkStreamSupport checks whether the stream type is supported in the
738// given version.
739func checkStreamSupport(v *semver.Version, t streamType) bool {
740	nv := &semver.Version{Major: v.Major, Minor: v.Minor}
741	for _, s := range supportedStream[nv.String()] {
742		if s == t {
743			return true
744		}
745	}
746	return false
747}
748