1// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package rafthttp
16
17import (
18	"context"
19	"fmt"
20	"io"
21	"io/ioutil"
22	"net/http"
23	"path"
24	"strings"
25	"sync"
26	"time"
27
28	"go.etcd.io/etcd/api/v3/version"
29	"go.etcd.io/etcd/client/pkg/v3/transport"
30	"go.etcd.io/etcd/client/pkg/v3/types"
31	"go.etcd.io/etcd/pkg/v3/httputil"
32	"go.etcd.io/etcd/raft/v3/raftpb"
33	stats "go.etcd.io/etcd/server/v3/etcdserver/api/v2stats"
34
35	"github.com/coreos/go-semver/semver"
36	"go.uber.org/zap"
37	"golang.org/x/time/rate"
38)
39
40const (
41	streamTypeMessage  streamType = "message"
42	streamTypeMsgAppV2 streamType = "msgappv2"
43
44	streamBufSize = 4096
45)
46
47var (
48	errUnsupportedStreamType = fmt.Errorf("unsupported stream type")
49
50	// the key is in string format "major.minor.patch"
51	supportedStream = map[string][]streamType{
52		"2.0.0": {},
53		"2.1.0": {streamTypeMsgAppV2, streamTypeMessage},
54		"2.2.0": {streamTypeMsgAppV2, streamTypeMessage},
55		"2.3.0": {streamTypeMsgAppV2, streamTypeMessage},
56		"3.0.0": {streamTypeMsgAppV2, streamTypeMessage},
57		"3.1.0": {streamTypeMsgAppV2, streamTypeMessage},
58		"3.2.0": {streamTypeMsgAppV2, streamTypeMessage},
59		"3.3.0": {streamTypeMsgAppV2, streamTypeMessage},
60		"3.4.0": {streamTypeMsgAppV2, streamTypeMessage},
61		"3.5.0": {streamTypeMsgAppV2, streamTypeMessage},
62	}
63)
64
65type streamType string
66
67func (t streamType) endpoint(lg *zap.Logger) string {
68	switch t {
69	case streamTypeMsgAppV2:
70		return path.Join(RaftStreamPrefix, "msgapp")
71	case streamTypeMessage:
72		return path.Join(RaftStreamPrefix, "message")
73	default:
74		if lg != nil {
75			lg.Panic("unhandled stream type", zap.String("stream-type", t.String()))
76		}
77		return ""
78	}
79}
80
81func (t streamType) String() string {
82	switch t {
83	case streamTypeMsgAppV2:
84		return "stream MsgApp v2"
85	case streamTypeMessage:
86		return "stream Message"
87	default:
88		return "unknown stream"
89	}
90}
91
92var (
93	// linkHeartbeatMessage is a special message used as heartbeat message in
94	// link layer. It never conflicts with messages from raft because raft
95	// doesn't send out messages without From and To fields.
96	linkHeartbeatMessage = raftpb.Message{Type: raftpb.MsgHeartbeat}
97)
98
99func isLinkHeartbeatMessage(m *raftpb.Message) bool {
100	return m.Type == raftpb.MsgHeartbeat && m.From == 0 && m.To == 0
101}
102
103type outgoingConn struct {
104	t streamType
105	io.Writer
106	http.Flusher
107	io.Closer
108
109	localID types.ID
110	peerID  types.ID
111}
112
113// streamWriter writes messages to the attached outgoingConn.
114type streamWriter struct {
115	lg *zap.Logger
116
117	localID types.ID
118	peerID  types.ID
119
120	status *peerStatus
121	fs     *stats.FollowerStats
122	r      Raft
123
124	mu      sync.Mutex // guard field working and closer
125	closer  io.Closer
126	working bool
127
128	msgc  chan raftpb.Message
129	connc chan *outgoingConn
130	stopc chan struct{}
131	done  chan struct{}
132}
133
134// startStreamWriter creates a streamWrite and starts a long running go-routine that accepts
135// messages and writes to the attached outgoing connection.
136func startStreamWriter(lg *zap.Logger, local, id types.ID, status *peerStatus, fs *stats.FollowerStats, r Raft) *streamWriter {
137	w := &streamWriter{
138		lg: lg,
139
140		localID: local,
141		peerID:  id,
142
143		status: status,
144		fs:     fs,
145		r:      r,
146		msgc:   make(chan raftpb.Message, streamBufSize),
147		connc:  make(chan *outgoingConn),
148		stopc:  make(chan struct{}),
149		done:   make(chan struct{}),
150	}
151	go w.run()
152	return w
153}
154
155func (cw *streamWriter) run() {
156	var (
157		msgc       chan raftpb.Message
158		heartbeatc <-chan time.Time
159		t          streamType
160		enc        encoder
161		flusher    http.Flusher
162		batched    int
163	)
164	tickc := time.NewTicker(ConnReadTimeout / 3)
165	defer tickc.Stop()
166	unflushed := 0
167
168	if cw.lg != nil {
169		cw.lg.Info(
170			"started stream writer with remote peer",
171			zap.String("local-member-id", cw.localID.String()),
172			zap.String("remote-peer-id", cw.peerID.String()),
173		)
174	}
175
176	for {
177		select {
178		case <-heartbeatc:
179			err := enc.encode(&linkHeartbeatMessage)
180			unflushed += linkHeartbeatMessage.Size()
181			if err == nil {
182				flusher.Flush()
183				batched = 0
184				sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed))
185				unflushed = 0
186				continue
187			}
188
189			cw.status.deactivate(failureType{source: t.String(), action: "heartbeat"}, err.Error())
190
191			sentFailures.WithLabelValues(cw.peerID.String()).Inc()
192			cw.close()
193			if cw.lg != nil {
194				cw.lg.Warn(
195					"lost TCP streaming connection with remote peer",
196					zap.String("stream-writer-type", t.String()),
197					zap.String("local-member-id", cw.localID.String()),
198					zap.String("remote-peer-id", cw.peerID.String()),
199				)
200			}
201			heartbeatc, msgc = nil, nil
202
203		case m := <-msgc:
204			err := enc.encode(&m)
205			if err == nil {
206				unflushed += m.Size()
207
208				if len(msgc) == 0 || batched > streamBufSize/2 {
209					flusher.Flush()
210					sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed))
211					unflushed = 0
212					batched = 0
213				} else {
214					batched++
215				}
216
217				continue
218			}
219
220			cw.status.deactivate(failureType{source: t.String(), action: "write"}, err.Error())
221			cw.close()
222			if cw.lg != nil {
223				cw.lg.Warn(
224					"lost TCP streaming connection with remote peer",
225					zap.String("stream-writer-type", t.String()),
226					zap.String("local-member-id", cw.localID.String()),
227					zap.String("remote-peer-id", cw.peerID.String()),
228				)
229			}
230			heartbeatc, msgc = nil, nil
231			cw.r.ReportUnreachable(m.To)
232			sentFailures.WithLabelValues(cw.peerID.String()).Inc()
233
234		case conn := <-cw.connc:
235			cw.mu.Lock()
236			closed := cw.closeUnlocked()
237			t = conn.t
238			switch conn.t {
239			case streamTypeMsgAppV2:
240				enc = newMsgAppV2Encoder(conn.Writer, cw.fs)
241			case streamTypeMessage:
242				enc = &messageEncoder{w: conn.Writer}
243			default:
244				if cw.lg != nil {
245					cw.lg.Panic("unhandled stream type", zap.String("stream-type", t.String()))
246				}
247			}
248			if cw.lg != nil {
249				cw.lg.Info(
250					"set message encoder",
251					zap.String("from", conn.localID.String()),
252					zap.String("to", conn.peerID.String()),
253					zap.String("stream-type", t.String()),
254				)
255			}
256			flusher = conn.Flusher
257			unflushed = 0
258			cw.status.activate()
259			cw.closer = conn.Closer
260			cw.working = true
261			cw.mu.Unlock()
262
263			if closed {
264				if cw.lg != nil {
265					cw.lg.Warn(
266						"closed TCP streaming connection with remote peer",
267						zap.String("stream-writer-type", t.String()),
268						zap.String("local-member-id", cw.localID.String()),
269						zap.String("remote-peer-id", cw.peerID.String()),
270					)
271				}
272			}
273			if cw.lg != nil {
274				cw.lg.Info(
275					"established TCP streaming connection with remote peer",
276					zap.String("stream-writer-type", t.String()),
277					zap.String("local-member-id", cw.localID.String()),
278					zap.String("remote-peer-id", cw.peerID.String()),
279				)
280			}
281			heartbeatc, msgc = tickc.C, cw.msgc
282
283		case <-cw.stopc:
284			if cw.close() {
285				if cw.lg != nil {
286					cw.lg.Warn(
287						"closed TCP streaming connection with remote peer",
288						zap.String("stream-writer-type", t.String()),
289						zap.String("remote-peer-id", cw.peerID.String()),
290					)
291				}
292			}
293			if cw.lg != nil {
294				cw.lg.Info(
295					"stopped TCP streaming connection with remote peer",
296					zap.String("stream-writer-type", t.String()),
297					zap.String("remote-peer-id", cw.peerID.String()),
298				)
299			}
300			close(cw.done)
301			return
302		}
303	}
304}
305
306func (cw *streamWriter) writec() (chan<- raftpb.Message, bool) {
307	cw.mu.Lock()
308	defer cw.mu.Unlock()
309	return cw.msgc, cw.working
310}
311
312func (cw *streamWriter) close() bool {
313	cw.mu.Lock()
314	defer cw.mu.Unlock()
315	return cw.closeUnlocked()
316}
317
318func (cw *streamWriter) closeUnlocked() bool {
319	if !cw.working {
320		return false
321	}
322	if err := cw.closer.Close(); err != nil {
323		if cw.lg != nil {
324			cw.lg.Warn(
325				"failed to close connection with remote peer",
326				zap.String("remote-peer-id", cw.peerID.String()),
327				zap.Error(err),
328			)
329		}
330	}
331	if len(cw.msgc) > 0 {
332		cw.r.ReportUnreachable(uint64(cw.peerID))
333	}
334	cw.msgc = make(chan raftpb.Message, streamBufSize)
335	cw.working = false
336	return true
337}
338
339func (cw *streamWriter) attach(conn *outgoingConn) bool {
340	select {
341	case cw.connc <- conn:
342		return true
343	case <-cw.done:
344		return false
345	}
346}
347
348func (cw *streamWriter) stop() {
349	close(cw.stopc)
350	<-cw.done
351}
352
353// streamReader is a long-running go-routine that dials to the remote stream
354// endpoint and reads messages from the response body returned.
355type streamReader struct {
356	lg *zap.Logger
357
358	peerID types.ID
359	typ    streamType
360
361	tr     *Transport
362	picker *urlPicker
363	status *peerStatus
364	recvc  chan<- raftpb.Message
365	propc  chan<- raftpb.Message
366
367	rl *rate.Limiter // alters the frequency of dial retrial attempts
368
369	errorc chan<- error
370
371	mu     sync.Mutex
372	paused bool
373	closer io.Closer
374
375	ctx    context.Context
376	cancel context.CancelFunc
377	done   chan struct{}
378}
379
380func (cr *streamReader) start() {
381	cr.done = make(chan struct{})
382	if cr.errorc == nil {
383		cr.errorc = cr.tr.ErrorC
384	}
385	if cr.ctx == nil {
386		cr.ctx, cr.cancel = context.WithCancel(context.Background())
387	}
388	go cr.run()
389}
390
391func (cr *streamReader) run() {
392	t := cr.typ
393
394	if cr.lg != nil {
395		cr.lg.Info(
396			"started stream reader with remote peer",
397			zap.String("stream-reader-type", t.String()),
398			zap.String("local-member-id", cr.tr.ID.String()),
399			zap.String("remote-peer-id", cr.peerID.String()),
400		)
401	}
402
403	for {
404		rc, err := cr.dial(t)
405		if err != nil {
406			if err != errUnsupportedStreamType {
407				cr.status.deactivate(failureType{source: t.String(), action: "dial"}, err.Error())
408			}
409		} else {
410			cr.status.activate()
411			if cr.lg != nil {
412				cr.lg.Info(
413					"established TCP streaming connection with remote peer",
414					zap.String("stream-reader-type", cr.typ.String()),
415					zap.String("local-member-id", cr.tr.ID.String()),
416					zap.String("remote-peer-id", cr.peerID.String()),
417				)
418			}
419			err = cr.decodeLoop(rc, t)
420			if cr.lg != nil {
421				cr.lg.Warn(
422					"lost TCP streaming connection with remote peer",
423					zap.String("stream-reader-type", cr.typ.String()),
424					zap.String("local-member-id", cr.tr.ID.String()),
425					zap.String("remote-peer-id", cr.peerID.String()),
426					zap.Error(err),
427				)
428			}
429			switch {
430			// all data is read out
431			case err == io.EOF:
432			// connection is closed by the remote
433			case transport.IsClosedConnError(err):
434			default:
435				cr.status.deactivate(failureType{source: t.String(), action: "read"}, err.Error())
436			}
437		}
438		// Wait for a while before new dial attempt
439		err = cr.rl.Wait(cr.ctx)
440		if cr.ctx.Err() != nil {
441			if cr.lg != nil {
442				cr.lg.Info(
443					"stopped stream reader with remote peer",
444					zap.String("stream-reader-type", t.String()),
445					zap.String("local-member-id", cr.tr.ID.String()),
446					zap.String("remote-peer-id", cr.peerID.String()),
447				)
448			}
449			close(cr.done)
450			return
451		}
452		if err != nil {
453			if cr.lg != nil {
454				cr.lg.Warn(
455					"rate limit on stream reader with remote peer",
456					zap.String("stream-reader-type", t.String()),
457					zap.String("local-member-id", cr.tr.ID.String()),
458					zap.String("remote-peer-id", cr.peerID.String()),
459					zap.Error(err),
460				)
461			}
462		}
463	}
464}
465
466func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
467	var dec decoder
468	cr.mu.Lock()
469	switch t {
470	case streamTypeMsgAppV2:
471		dec = newMsgAppV2Decoder(rc, cr.tr.ID, cr.peerID)
472	case streamTypeMessage:
473		dec = &messageDecoder{r: rc}
474	default:
475		if cr.lg != nil {
476			cr.lg.Panic("unknown stream type", zap.String("type", t.String()))
477		}
478	}
479	select {
480	case <-cr.ctx.Done():
481		cr.mu.Unlock()
482		if err := rc.Close(); err != nil {
483			return err
484		}
485		return io.EOF
486	default:
487		cr.closer = rc
488	}
489	cr.mu.Unlock()
490
491	// gofail: labelRaftDropHeartbeat:
492	for {
493		m, err := dec.decode()
494		if err != nil {
495			cr.mu.Lock()
496			cr.close()
497			cr.mu.Unlock()
498			return err
499		}
500
501		// gofail-go: var raftDropHeartbeat struct{}
502		// continue labelRaftDropHeartbeat
503		receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size()))
504
505		cr.mu.Lock()
506		paused := cr.paused
507		cr.mu.Unlock()
508
509		if paused {
510			continue
511		}
512
513		if isLinkHeartbeatMessage(&m) {
514			// raft is not interested in link layer
515			// heartbeat message, so we should ignore
516			// it.
517			continue
518		}
519
520		recvc := cr.recvc
521		if m.Type == raftpb.MsgProp {
522			recvc = cr.propc
523		}
524
525		select {
526		case recvc <- m:
527		default:
528			if cr.status.isActive() {
529				if cr.lg != nil {
530					cr.lg.Warn(
531						"dropped internal Raft message since receiving buffer is full (overloaded network)",
532						zap.String("message-type", m.Type.String()),
533						zap.String("local-member-id", cr.tr.ID.String()),
534						zap.String("from", types.ID(m.From).String()),
535						zap.String("remote-peer-id", types.ID(m.To).String()),
536						zap.Bool("remote-peer-active", cr.status.isActive()),
537					)
538				}
539			} else {
540				if cr.lg != nil {
541					cr.lg.Warn(
542						"dropped Raft message since receiving buffer is full (overloaded network)",
543						zap.String("message-type", m.Type.String()),
544						zap.String("local-member-id", cr.tr.ID.String()),
545						zap.String("from", types.ID(m.From).String()),
546						zap.String("remote-peer-id", types.ID(m.To).String()),
547						zap.Bool("remote-peer-active", cr.status.isActive()),
548					)
549				}
550			}
551			recvFailures.WithLabelValues(types.ID(m.From).String()).Inc()
552		}
553	}
554}
555
556func (cr *streamReader) stop() {
557	cr.mu.Lock()
558	cr.cancel()
559	cr.close()
560	cr.mu.Unlock()
561	<-cr.done
562}
563
564func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
565	u := cr.picker.pick()
566	uu := u
567	uu.Path = path.Join(t.endpoint(cr.lg), cr.tr.ID.String())
568
569	if cr.lg != nil {
570		cr.lg.Debug(
571			"dial stream reader",
572			zap.String("from", cr.tr.ID.String()),
573			zap.String("to", cr.peerID.String()),
574			zap.String("address", uu.String()),
575		)
576	}
577	req, err := http.NewRequest("GET", uu.String(), nil)
578	if err != nil {
579		cr.picker.unreachable(u)
580		return nil, fmt.Errorf("failed to make http request to %v (%v)", u, err)
581	}
582	req.Header.Set("X-Server-From", cr.tr.ID.String())
583	req.Header.Set("X-Server-Version", version.Version)
584	req.Header.Set("X-Min-Cluster-Version", version.MinClusterVersion)
585	req.Header.Set("X-Etcd-Cluster-ID", cr.tr.ClusterID.String())
586	req.Header.Set("X-Raft-To", cr.peerID.String())
587
588	setPeerURLsHeader(req, cr.tr.URLs)
589
590	req = req.WithContext(cr.ctx)
591
592	cr.mu.Lock()
593	select {
594	case <-cr.ctx.Done():
595		cr.mu.Unlock()
596		return nil, fmt.Errorf("stream reader is stopped")
597	default:
598	}
599	cr.mu.Unlock()
600
601	resp, err := cr.tr.streamRt.RoundTrip(req)
602	if err != nil {
603		cr.picker.unreachable(u)
604		return nil, err
605	}
606
607	rv := serverVersion(resp.Header)
608	lv := semver.Must(semver.NewVersion(version.Version))
609	if compareMajorMinorVersion(rv, lv) == -1 && !checkStreamSupport(rv, t) {
610		httputil.GracefulClose(resp)
611		cr.picker.unreachable(u)
612		return nil, errUnsupportedStreamType
613	}
614
615	switch resp.StatusCode {
616	case http.StatusGone:
617		httputil.GracefulClose(resp)
618		cr.picker.unreachable(u)
619		reportCriticalError(errMemberRemoved, cr.errorc)
620		return nil, errMemberRemoved
621
622	case http.StatusOK:
623		return resp.Body, nil
624
625	case http.StatusNotFound:
626		httputil.GracefulClose(resp)
627		cr.picker.unreachable(u)
628		return nil, fmt.Errorf("peer %s failed to find local node %s", cr.peerID, cr.tr.ID)
629
630	case http.StatusPreconditionFailed:
631		b, err := ioutil.ReadAll(resp.Body)
632		if err != nil {
633			cr.picker.unreachable(u)
634			return nil, err
635		}
636		httputil.GracefulClose(resp)
637		cr.picker.unreachable(u)
638
639		switch strings.TrimSuffix(string(b), "\n") {
640		case errIncompatibleVersion.Error():
641			if cr.lg != nil {
642				cr.lg.Warn(
643					"request sent was ignored by remote peer due to server version incompatibility",
644					zap.String("local-member-id", cr.tr.ID.String()),
645					zap.String("remote-peer-id", cr.peerID.String()),
646					zap.Error(errIncompatibleVersion),
647				)
648			}
649			return nil, errIncompatibleVersion
650
651		case errClusterIDMismatch.Error():
652			if cr.lg != nil {
653				cr.lg.Warn(
654					"request sent was ignored by remote peer due to cluster ID mismatch",
655					zap.String("remote-peer-id", cr.peerID.String()),
656					zap.String("remote-peer-cluster-id", resp.Header.Get("X-Etcd-Cluster-ID")),
657					zap.String("local-member-id", cr.tr.ID.String()),
658					zap.String("local-member-cluster-id", cr.tr.ClusterID.String()),
659					zap.Error(errClusterIDMismatch),
660				)
661			}
662			return nil, errClusterIDMismatch
663
664		default:
665			return nil, fmt.Errorf("unhandled error %q when precondition failed", string(b))
666		}
667
668	default:
669		httputil.GracefulClose(resp)
670		cr.picker.unreachable(u)
671		return nil, fmt.Errorf("unhandled http status %d", resp.StatusCode)
672	}
673}
674
675func (cr *streamReader) close() {
676	if cr.closer != nil {
677		if err := cr.closer.Close(); err != nil {
678			if cr.lg != nil {
679				cr.lg.Warn(
680					"failed to close remote peer connection",
681					zap.String("local-member-id", cr.tr.ID.String()),
682					zap.String("remote-peer-id", cr.peerID.String()),
683					zap.Error(err),
684				)
685			}
686		}
687	}
688	cr.closer = nil
689}
690
691func (cr *streamReader) pause() {
692	cr.mu.Lock()
693	defer cr.mu.Unlock()
694	cr.paused = true
695}
696
697func (cr *streamReader) resume() {
698	cr.mu.Lock()
699	defer cr.mu.Unlock()
700	cr.paused = false
701}
702
703// checkStreamSupport checks whether the stream type is supported in the
704// given version.
705func checkStreamSupport(v *semver.Version, t streamType) bool {
706	nv := &semver.Version{Major: v.Major, Minor: v.Minor}
707	for _, s := range supportedStream[nv.String()] {
708		if s == t {
709			return true
710		}
711	}
712	return false
713}
714