1// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package rafthttp
16
17import (
18	"fmt"
19	"io"
20	"io/ioutil"
21	"net"
22	"net/http"
23	"path"
24	"strings"
25	"sync"
26	"time"
27
28	"github.com/coreos/etcd/etcdserver/stats"
29	"github.com/coreos/etcd/pkg/httputil"
30	"github.com/coreos/etcd/pkg/types"
31	"github.com/coreos/etcd/raft/raftpb"
32	"github.com/coreos/etcd/version"
33	"github.com/coreos/go-semver/semver"
34)
35
36const (
37	streamTypeMessage  streamType = "message"
38	streamTypeMsgAppV2 streamType = "msgappv2"
39
40	streamBufSize = 4096
41)
42
43var (
44	errUnsupportedStreamType = fmt.Errorf("unsupported stream type")
45
46	// the key is in string format "major.minor.patch"
47	supportedStream = map[string][]streamType{
48		"2.0.0": {},
49		"2.1.0": {streamTypeMsgAppV2, streamTypeMessage},
50		"2.2.0": {streamTypeMsgAppV2, streamTypeMessage},
51		"2.3.0": {streamTypeMsgAppV2, streamTypeMessage},
52		"3.0.0": {streamTypeMsgAppV2, streamTypeMessage},
53		"3.1.0": {streamTypeMsgAppV2, streamTypeMessage},
54	}
55)
56
57type streamType string
58
59func (t streamType) endpoint() string {
60	switch t {
61	case streamTypeMsgAppV2:
62		return path.Join(RaftStreamPrefix, "msgapp")
63	case streamTypeMessage:
64		return path.Join(RaftStreamPrefix, "message")
65	default:
66		plog.Panicf("unhandled stream type %v", t)
67		return ""
68	}
69}
70
71func (t streamType) String() string {
72	switch t {
73	case streamTypeMsgAppV2:
74		return "stream MsgApp v2"
75	case streamTypeMessage:
76		return "stream Message"
77	default:
78		return "unknown stream"
79	}
80}
81
82var (
83	// linkHeartbeatMessage is a special message used as heartbeat message in
84	// link layer. It never conflicts with messages from raft because raft
85	// doesn't send out messages without From and To fields.
86	linkHeartbeatMessage = raftpb.Message{Type: raftpb.MsgHeartbeat}
87)
88
89func isLinkHeartbeatMessage(m *raftpb.Message) bool {
90	return m.Type == raftpb.MsgHeartbeat && m.From == 0 && m.To == 0
91}
92
93type outgoingConn struct {
94	t streamType
95	io.Writer
96	http.Flusher
97	io.Closer
98}
99
100// streamWriter writes messages to the attached outgoingConn.
101type streamWriter struct {
102	peerID types.ID
103	status *peerStatus
104	fs     *stats.FollowerStats
105	r      Raft
106
107	mu      sync.Mutex // guard field working and closer
108	closer  io.Closer
109	working bool
110
111	msgc  chan raftpb.Message
112	connc chan *outgoingConn
113	stopc chan struct{}
114	done  chan struct{}
115}
116
117// startStreamWriter creates a streamWrite and starts a long running go-routine that accepts
118// messages and writes to the attached outgoing connection.
119func startStreamWriter(id types.ID, status *peerStatus, fs *stats.FollowerStats, r Raft) *streamWriter {
120	w := &streamWriter{
121		peerID: id,
122		status: status,
123		fs:     fs,
124		r:      r,
125		msgc:   make(chan raftpb.Message, streamBufSize),
126		connc:  make(chan *outgoingConn),
127		stopc:  make(chan struct{}),
128		done:   make(chan struct{}),
129	}
130	go w.run()
131	return w
132}
133
134func (cw *streamWriter) run() {
135	var (
136		msgc       chan raftpb.Message
137		heartbeatc <-chan time.Time
138		t          streamType
139		enc        encoder
140		flusher    http.Flusher
141		batched    int
142	)
143	tickc := time.Tick(ConnReadTimeout / 3)
144	unflushed := 0
145
146	plog.Infof("started streaming with peer %s (writer)", cw.peerID)
147
148	for {
149		select {
150		case <-heartbeatc:
151			err := enc.encode(&linkHeartbeatMessage)
152			unflushed += linkHeartbeatMessage.Size()
153			if err == nil {
154				flusher.Flush()
155				batched = 0
156				sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed))
157				unflushed = 0
158				continue
159			}
160
161			cw.status.deactivate(failureType{source: t.String(), action: "heartbeat"}, err.Error())
162
163			sentFailures.WithLabelValues(cw.peerID.String()).Inc()
164			cw.close()
165			plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
166			heartbeatc, msgc = nil, nil
167
168		case m := <-msgc:
169			err := enc.encode(&m)
170			if err == nil {
171				unflushed += m.Size()
172
173				if len(msgc) == 0 || batched > streamBufSize/2 {
174					flusher.Flush()
175					sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed))
176					unflushed = 0
177					batched = 0
178				} else {
179					batched++
180				}
181
182				continue
183			}
184
185			cw.status.deactivate(failureType{source: t.String(), action: "write"}, err.Error())
186			cw.close()
187			plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
188			heartbeatc, msgc = nil, nil
189			cw.r.ReportUnreachable(m.To)
190			sentFailures.WithLabelValues(cw.peerID.String()).Inc()
191
192		case conn := <-cw.connc:
193			cw.mu.Lock()
194			closed := cw.closeUnlocked()
195			t = conn.t
196			switch conn.t {
197			case streamTypeMsgAppV2:
198				enc = newMsgAppV2Encoder(conn.Writer, cw.fs)
199			case streamTypeMessage:
200				enc = &messageEncoder{w: conn.Writer}
201			default:
202				plog.Panicf("unhandled stream type %s", conn.t)
203			}
204			flusher = conn.Flusher
205			unflushed = 0
206			cw.status.activate()
207			cw.closer = conn.Closer
208			cw.working = true
209			cw.mu.Unlock()
210
211			if closed {
212				plog.Warningf("closed an existing TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
213			}
214			plog.Infof("established a TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
215			heartbeatc, msgc = tickc, cw.msgc
216		case <-cw.stopc:
217			if cw.close() {
218				plog.Infof("closed the TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
219			}
220			plog.Infof("stopped streaming with peer %s (writer)", cw.peerID)
221			close(cw.done)
222			return
223		}
224	}
225}
226
227func (cw *streamWriter) writec() (chan<- raftpb.Message, bool) {
228	cw.mu.Lock()
229	defer cw.mu.Unlock()
230	return cw.msgc, cw.working
231}
232
233func (cw *streamWriter) close() bool {
234	cw.mu.Lock()
235	defer cw.mu.Unlock()
236	return cw.closeUnlocked()
237}
238
239func (cw *streamWriter) closeUnlocked() bool {
240	if !cw.working {
241		return false
242	}
243	cw.closer.Close()
244	if len(cw.msgc) > 0 {
245		cw.r.ReportUnreachable(uint64(cw.peerID))
246	}
247	cw.msgc = make(chan raftpb.Message, streamBufSize)
248	cw.working = false
249	return true
250}
251
252func (cw *streamWriter) attach(conn *outgoingConn) bool {
253	select {
254	case cw.connc <- conn:
255		return true
256	case <-cw.done:
257		return false
258	}
259}
260
261func (cw *streamWriter) stop() {
262	close(cw.stopc)
263	<-cw.done
264}
265
266// streamReader is a long-running go-routine that dials to the remote stream
267// endpoint and reads messages from the response body returned.
268type streamReader struct {
269	peerID types.ID
270	typ    streamType
271
272	tr     *Transport
273	picker *urlPicker
274	status *peerStatus
275	recvc  chan<- raftpb.Message
276	propc  chan<- raftpb.Message
277
278	errorc chan<- error
279
280	mu     sync.Mutex
281	paused bool
282	cancel func()
283	closer io.Closer
284
285	stopc chan struct{}
286	done  chan struct{}
287}
288
289func (r *streamReader) start() {
290	r.stopc = make(chan struct{})
291	r.done = make(chan struct{})
292	if r.errorc == nil {
293		r.errorc = r.tr.ErrorC
294	}
295
296	go r.run()
297}
298
299func (cr *streamReader) run() {
300	t := cr.typ
301	plog.Infof("started streaming with peer %s (%s reader)", cr.peerID, t)
302	for {
303		rc, err := cr.dial(t)
304		if err != nil {
305			if err != errUnsupportedStreamType {
306				cr.status.deactivate(failureType{source: t.String(), action: "dial"}, err.Error())
307			}
308		} else {
309			cr.status.activate()
310			plog.Infof("established a TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ)
311			err := cr.decodeLoop(rc, t)
312			plog.Warningf("lost the TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ)
313			switch {
314			// all data is read out
315			case err == io.EOF:
316			// connection is closed by the remote
317			case isClosedConnectionError(err):
318			default:
319				cr.status.deactivate(failureType{source: t.String(), action: "read"}, err.Error())
320			}
321		}
322		select {
323		// Wait 100ms to create a new stream, so it doesn't bring too much
324		// overhead when retry.
325		case <-time.After(100 * time.Millisecond):
326		case <-cr.stopc:
327			plog.Infof("stopped streaming with peer %s (%s reader)", cr.peerID, t)
328			close(cr.done)
329			return
330		}
331	}
332}
333
334func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
335	var dec decoder
336	cr.mu.Lock()
337	switch t {
338	case streamTypeMsgAppV2:
339		dec = newMsgAppV2Decoder(rc, cr.tr.ID, cr.peerID)
340	case streamTypeMessage:
341		dec = &messageDecoder{r: rc}
342	default:
343		plog.Panicf("unhandled stream type %s", t)
344	}
345	select {
346	case <-cr.stopc:
347		cr.mu.Unlock()
348		if err := rc.Close(); err != nil {
349			return err
350		}
351		return io.EOF
352	default:
353		cr.closer = rc
354	}
355	cr.mu.Unlock()
356
357	for {
358		m, err := dec.decode()
359		if err != nil {
360			cr.mu.Lock()
361			cr.close()
362			cr.mu.Unlock()
363			return err
364		}
365
366		receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size()))
367
368		cr.mu.Lock()
369		paused := cr.paused
370		cr.mu.Unlock()
371
372		if paused {
373			continue
374		}
375
376		if isLinkHeartbeatMessage(&m) {
377			// raft is not interested in link layer
378			// heartbeat message, so we should ignore
379			// it.
380			continue
381		}
382
383		recvc := cr.recvc
384		if m.Type == raftpb.MsgProp {
385			recvc = cr.propc
386		}
387
388		select {
389		case recvc <- m:
390		default:
391			if cr.status.isActive() {
392				plog.MergeWarningf("dropped internal raft message from %s since receiving buffer is full (overloaded network)", types.ID(m.From))
393			}
394			plog.Debugf("dropped %s from %s since receiving buffer is full", m.Type, types.ID(m.From))
395			recvFailures.WithLabelValues(types.ID(m.From).String()).Inc()
396		}
397	}
398}
399
400func (cr *streamReader) stop() {
401	close(cr.stopc)
402	cr.mu.Lock()
403	if cr.cancel != nil {
404		cr.cancel()
405	}
406	cr.close()
407	cr.mu.Unlock()
408	<-cr.done
409}
410
411func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
412	u := cr.picker.pick()
413	uu := u
414	uu.Path = path.Join(t.endpoint(), cr.tr.ID.String())
415
416	req, err := http.NewRequest("GET", uu.String(), nil)
417	if err != nil {
418		cr.picker.unreachable(u)
419		return nil, fmt.Errorf("failed to make http request to %v (%v)", u, err)
420	}
421	req.Header.Set("X-Server-From", cr.tr.ID.String())
422	req.Header.Set("X-Server-Version", version.Version)
423	req.Header.Set("X-Min-Cluster-Version", version.MinClusterVersion)
424	req.Header.Set("X-Etcd-Cluster-ID", cr.tr.ClusterID.String())
425	req.Header.Set("X-Raft-To", cr.peerID.String())
426
427	setPeerURLsHeader(req, cr.tr.URLs)
428
429	cr.mu.Lock()
430	select {
431	case <-cr.stopc:
432		cr.mu.Unlock()
433		return nil, fmt.Errorf("stream reader is stopped")
434	default:
435	}
436	cr.cancel = httputil.RequestCanceler(req)
437	cr.mu.Unlock()
438
439	resp, err := cr.tr.streamRt.RoundTrip(req)
440	if err != nil {
441		cr.picker.unreachable(u)
442		return nil, err
443	}
444
445	rv := serverVersion(resp.Header)
446	lv := semver.Must(semver.NewVersion(version.Version))
447	if compareMajorMinorVersion(rv, lv) == -1 && !checkStreamSupport(rv, t) {
448		httputil.GracefulClose(resp)
449		cr.picker.unreachable(u)
450		return nil, errUnsupportedStreamType
451	}
452
453	switch resp.StatusCode {
454	case http.StatusGone:
455		httputil.GracefulClose(resp)
456		cr.picker.unreachable(u)
457		reportCriticalError(errMemberRemoved, cr.errorc)
458		return nil, errMemberRemoved
459	case http.StatusOK:
460		return resp.Body, nil
461	case http.StatusNotFound:
462		httputil.GracefulClose(resp)
463		cr.picker.unreachable(u)
464		return nil, fmt.Errorf("peer %s failed to find local node %s", cr.peerID, cr.tr.ID)
465	case http.StatusPreconditionFailed:
466		b, err := ioutil.ReadAll(resp.Body)
467		if err != nil {
468			cr.picker.unreachable(u)
469			return nil, err
470		}
471		httputil.GracefulClose(resp)
472		cr.picker.unreachable(u)
473
474		switch strings.TrimSuffix(string(b), "\n") {
475		case errIncompatibleVersion.Error():
476			plog.Errorf("request sent was ignored by peer %s (server version incompatible)", cr.peerID)
477			return nil, errIncompatibleVersion
478		case errClusterIDMismatch.Error():
479			plog.Errorf("request sent was ignored (cluster ID mismatch: peer[%s]=%s, local=%s)",
480				cr.peerID, resp.Header.Get("X-Etcd-Cluster-ID"), cr.tr.ClusterID)
481			return nil, errClusterIDMismatch
482		default:
483			return nil, fmt.Errorf("unhandled error %q when precondition failed", string(b))
484		}
485	default:
486		httputil.GracefulClose(resp)
487		cr.picker.unreachable(u)
488		return nil, fmt.Errorf("unhandled http status %d", resp.StatusCode)
489	}
490}
491
492func (cr *streamReader) close() {
493	if cr.closer != nil {
494		cr.closer.Close()
495	}
496	cr.closer = nil
497}
498
499func (cr *streamReader) pause() {
500	cr.mu.Lock()
501	defer cr.mu.Unlock()
502	cr.paused = true
503}
504
505func (cr *streamReader) resume() {
506	cr.mu.Lock()
507	defer cr.mu.Unlock()
508	cr.paused = false
509}
510
511func isClosedConnectionError(err error) bool {
512	operr, ok := err.(*net.OpError)
513	return ok && operr.Err.Error() == "use of closed network connection"
514}
515
516// checkStreamSupport checks whether the stream type is supported in the
517// given version.
518func checkStreamSupport(v *semver.Version, t streamType) bool {
519	nv := &semver.Version{Major: v.Major, Minor: v.Minor}
520	for _, s := range supportedStream[nv.String()] {
521		if s == t {
522			return true
523		}
524	}
525	return false
526}
527