1// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package rafthttp
16
17import (
18	"context"
19	"errors"
20	"fmt"
21	"io"
22	"net/http"
23	"net/http/httptest"
24	"reflect"
25	"sync"
26	"testing"
27	"time"
28
29	stats "go.etcd.io/etcd/etcdserver/api/v2stats"
30	"go.etcd.io/etcd/pkg/testutil"
31	"go.etcd.io/etcd/pkg/types"
32	"go.etcd.io/etcd/raft/raftpb"
33	"go.etcd.io/etcd/version"
34
35	"github.com/coreos/go-semver/semver"
36	"go.uber.org/zap"
37	"golang.org/x/time/rate"
38)
39
40// TestStreamWriterAttachOutgoingConn tests that outgoingConn can be attached
41// to streamWriter. After that, streamWriter can use it to send messages
42// continuously, and closes it when stopped.
43func TestStreamWriterAttachOutgoingConn(t *testing.T) {
44	sw := startStreamWriter(zap.NewExample(), types.ID(0), types.ID(1), newPeerStatus(zap.NewExample(), types.ID(0), types.ID(1)), &stats.FollowerStats{}, &fakeRaft{})
45	// the expected initial state of streamWriter is not working
46	if _, ok := sw.writec(); ok {
47		t.Errorf("initial working status = %v, want false", ok)
48	}
49
50	// repeat tests to ensure streamWriter can use last attached connection
51	var wfc *fakeWriteFlushCloser
52	for i := 0; i < 3; i++ {
53		prevwfc := wfc
54		wfc = newFakeWriteFlushCloser(nil)
55		sw.attach(&outgoingConn{t: streamTypeMessage, Writer: wfc, Flusher: wfc, Closer: wfc})
56
57		// previous attached connection should be closed
58		if prevwfc != nil {
59			select {
60			case <-prevwfc.closed:
61			case <-time.After(time.Second):
62				t.Errorf("#%d: close of previous connection timed out", i)
63			}
64		}
65
66		// if prevwfc != nil, the new msgc is ready since prevwfc has closed
67		// if prevwfc == nil, the first connection may be pending, but the first
68		// msgc is already available since it's set on calling startStreamwriter
69		msgc, _ := sw.writec()
70		msgc <- raftpb.Message{}
71
72		select {
73		case <-wfc.writec:
74		case <-time.After(time.Second):
75			t.Errorf("#%d: failed to write to the underlying connection", i)
76		}
77		// write chan is still available
78		if _, ok := sw.writec(); !ok {
79			t.Errorf("#%d: working status = %v, want true", i, ok)
80		}
81	}
82
83	sw.stop()
84	// write chan is unavailable since the writer is stopped.
85	if _, ok := sw.writec(); ok {
86		t.Errorf("working status after stop = %v, want false", ok)
87	}
88	if !wfc.Closed() {
89		t.Errorf("failed to close the underlying connection")
90	}
91}
92
93// TestStreamWriterAttachBadOutgoingConn tests that streamWriter with bad
94// outgoingConn will close the outgoingConn and fall back to non-working status.
95func TestStreamWriterAttachBadOutgoingConn(t *testing.T) {
96	sw := startStreamWriter(zap.NewExample(), types.ID(0), types.ID(1), newPeerStatus(zap.NewExample(), types.ID(0), types.ID(1)), &stats.FollowerStats{}, &fakeRaft{})
97	defer sw.stop()
98	wfc := newFakeWriteFlushCloser(errors.New("blah"))
99	sw.attach(&outgoingConn{t: streamTypeMessage, Writer: wfc, Flusher: wfc, Closer: wfc})
100
101	sw.msgc <- raftpb.Message{}
102	select {
103	case <-wfc.closed:
104	case <-time.After(time.Second):
105		t.Errorf("failed to close the underlying connection in time")
106	}
107	// no longer working
108	if _, ok := sw.writec(); ok {
109		t.Errorf("working = %v, want false", ok)
110	}
111}
112
113func TestStreamReaderDialRequest(t *testing.T) {
114	for i, tt := range []streamType{streamTypeMessage, streamTypeMsgAppV2} {
115		tr := &roundTripperRecorder{rec: &testutil.RecorderBuffered{}}
116		sr := &streamReader{
117			peerID: types.ID(2),
118			tr:     &Transport{streamRt: tr, ClusterID: types.ID(1), ID: types.ID(1)},
119			picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
120			ctx:    context.Background(),
121		}
122		sr.dial(tt)
123
124		act, err := tr.rec.Wait(1)
125		if err != nil {
126			t.Fatal(err)
127		}
128		req := act[0].Params[0].(*http.Request)
129
130		wurl := fmt.Sprintf("http://localhost:2380" + tt.endpoint(zap.NewExample()) + "/1")
131		if req.URL.String() != wurl {
132			t.Errorf("#%d: url = %s, want %s", i, req.URL.String(), wurl)
133		}
134		if w := "GET"; req.Method != w {
135			t.Errorf("#%d: method = %s, want %s", i, req.Method, w)
136		}
137		if g := req.Header.Get("X-Etcd-Cluster-ID"); g != "1" {
138			t.Errorf("#%d: header X-Etcd-Cluster-ID = %s, want 1", i, g)
139		}
140		if g := req.Header.Get("X-Raft-To"); g != "2" {
141			t.Errorf("#%d: header X-Raft-To = %s, want 2", i, g)
142		}
143	}
144}
145
146// TestStreamReaderDialResult tests the result of the dial func call meets the
147// HTTP response received.
148func TestStreamReaderDialResult(t *testing.T) {
149	tests := []struct {
150		code  int
151		err   error
152		wok   bool
153		whalt bool
154	}{
155		{0, errors.New("blah"), false, false},
156		{http.StatusOK, nil, true, false},
157		{http.StatusMethodNotAllowed, nil, false, false},
158		{http.StatusNotFound, nil, false, false},
159		{http.StatusPreconditionFailed, nil, false, false},
160		{http.StatusGone, nil, false, true},
161	}
162	for i, tt := range tests {
163		h := http.Header{}
164		h.Add("X-Server-Version", version.Version)
165		tr := &respRoundTripper{
166			code:   tt.code,
167			header: h,
168			err:    tt.err,
169		}
170		sr := &streamReader{
171			peerID: types.ID(2),
172			tr:     &Transport{streamRt: tr, ClusterID: types.ID(1)},
173			picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
174			errorc: make(chan error, 1),
175			ctx:    context.Background(),
176		}
177
178		_, err := sr.dial(streamTypeMessage)
179		if ok := err == nil; ok != tt.wok {
180			t.Errorf("#%d: ok = %v, want %v", i, ok, tt.wok)
181		}
182		if halt := len(sr.errorc) > 0; halt != tt.whalt {
183			t.Errorf("#%d: halt = %v, want %v", i, halt, tt.whalt)
184		}
185	}
186}
187
188// TestStreamReaderStopOnDial tests a stream reader closes the connection on stop.
189func TestStreamReaderStopOnDial(t *testing.T) {
190	defer testutil.AfterTest(t)
191	h := http.Header{}
192	h.Add("X-Server-Version", version.Version)
193	tr := &respWaitRoundTripper{rrt: &respRoundTripper{code: http.StatusOK, header: h}}
194	sr := &streamReader{
195		peerID: types.ID(2),
196		tr:     &Transport{streamRt: tr, ClusterID: types.ID(1)},
197		picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
198		errorc: make(chan error, 1),
199		typ:    streamTypeMessage,
200		status: newPeerStatus(zap.NewExample(), types.ID(1), types.ID(2)),
201		rl:     rate.NewLimiter(rate.Every(100*time.Millisecond), 1),
202	}
203	tr.onResp = func() {
204		// stop() waits for the run() goroutine to exit, but that exit
205		// needs a response from RoundTrip() first; use goroutine
206		go sr.stop()
207		// wait so that stop() is blocked on run() exiting
208		time.Sleep(10 * time.Millisecond)
209		// sr.run() completes dialing then begins decoding while stopped
210	}
211	sr.start()
212	select {
213	case <-sr.done:
214	case <-time.After(time.Second):
215		t.Fatal("streamReader did not stop in time")
216	}
217}
218
219type respWaitRoundTripper struct {
220	rrt    *respRoundTripper
221	onResp func()
222}
223
224func (t *respWaitRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
225	resp, err := t.rrt.RoundTrip(req)
226	resp.Body = newWaitReadCloser()
227	t.onResp()
228	return resp, err
229}
230
231type waitReadCloser struct{ closec chan struct{} }
232
233func newWaitReadCloser() *waitReadCloser { return &waitReadCloser{make(chan struct{})} }
234func (wrc *waitReadCloser) Read(p []byte) (int, error) {
235	<-wrc.closec
236	return 0, io.EOF
237}
238func (wrc *waitReadCloser) Close() error {
239	close(wrc.closec)
240	return nil
241}
242
243// TestStreamReaderDialDetectUnsupport tests that dial func could find
244// out that the stream type is not supported by the remote.
245func TestStreamReaderDialDetectUnsupport(t *testing.T) {
246	for i, typ := range []streamType{streamTypeMsgAppV2, streamTypeMessage} {
247		// the response from etcd 2.0
248		tr := &respRoundTripper{
249			code:   http.StatusNotFound,
250			header: http.Header{},
251		}
252		sr := &streamReader{
253			peerID: types.ID(2),
254			tr:     &Transport{streamRt: tr, ClusterID: types.ID(1)},
255			picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
256			ctx:    context.Background(),
257		}
258
259		_, err := sr.dial(typ)
260		if err != errUnsupportedStreamType {
261			t.Errorf("#%d: error = %v, want %v", i, err, errUnsupportedStreamType)
262		}
263	}
264}
265
266// TestStream tests that streamReader and streamWriter can build stream to
267// send messages between each other.
268func TestStream(t *testing.T) {
269	recvc := make(chan raftpb.Message, streamBufSize)
270	propc := make(chan raftpb.Message, streamBufSize)
271	msgapp := raftpb.Message{
272		Type:    raftpb.MsgApp,
273		From:    2,
274		To:      1,
275		Term:    1,
276		LogTerm: 1,
277		Index:   3,
278		Entries: []raftpb.Entry{{Term: 1, Index: 4}},
279	}
280
281	tests := []struct {
282		t  streamType
283		m  raftpb.Message
284		wc chan raftpb.Message
285	}{
286		{
287			streamTypeMessage,
288			raftpb.Message{Type: raftpb.MsgProp, To: 2},
289			propc,
290		},
291		{
292			streamTypeMessage,
293			msgapp,
294			recvc,
295		},
296		{
297			streamTypeMsgAppV2,
298			msgapp,
299			recvc,
300		},
301	}
302	for i, tt := range tests {
303		h := &fakeStreamHandler{t: tt.t}
304		srv := httptest.NewServer(h)
305		defer srv.Close()
306
307		sw := startStreamWriter(zap.NewExample(), types.ID(0), types.ID(1), newPeerStatus(zap.NewExample(), types.ID(0), types.ID(1)), &stats.FollowerStats{}, &fakeRaft{})
308		defer sw.stop()
309		h.sw = sw
310
311		picker := mustNewURLPicker(t, []string{srv.URL})
312		tr := &Transport{streamRt: &http.Transport{}, ClusterID: types.ID(1)}
313
314		sr := &streamReader{
315			peerID: types.ID(2),
316			typ:    tt.t,
317			tr:     tr,
318			picker: picker,
319			status: newPeerStatus(zap.NewExample(), types.ID(0), types.ID(2)),
320			recvc:  recvc,
321			propc:  propc,
322			rl:     rate.NewLimiter(rate.Every(100*time.Millisecond), 1),
323		}
324		sr.start()
325
326		// wait for stream to work
327		var writec chan<- raftpb.Message
328		for {
329			var ok bool
330			if writec, ok = sw.writec(); ok {
331				break
332			}
333			time.Sleep(time.Millisecond)
334		}
335
336		writec <- tt.m
337		var m raftpb.Message
338		select {
339		case m = <-tt.wc:
340		case <-time.After(time.Second):
341			t.Fatalf("#%d: failed to receive message from the channel", i)
342		}
343		if !reflect.DeepEqual(m, tt.m) {
344			t.Fatalf("#%d: message = %+v, want %+v", i, m, tt.m)
345		}
346
347		sr.stop()
348	}
349}
350
351func TestCheckStreamSupport(t *testing.T) {
352	tests := []struct {
353		v *semver.Version
354		t streamType
355		w bool
356	}{
357		// support
358		{
359			semver.Must(semver.NewVersion("2.1.0")),
360			streamTypeMsgAppV2,
361			true,
362		},
363		// ignore patch
364		{
365			semver.Must(semver.NewVersion("2.1.9")),
366			streamTypeMsgAppV2,
367			true,
368		},
369		// ignore prerelease
370		{
371			semver.Must(semver.NewVersion("2.1.0-alpha")),
372			streamTypeMsgAppV2,
373			true,
374		},
375	}
376	for i, tt := range tests {
377		if g := checkStreamSupport(tt.v, tt.t); g != tt.w {
378			t.Errorf("#%d: check = %v, want %v", i, g, tt.w)
379		}
380	}
381}
382
383func TestStreamSupportCurrentVersion(t *testing.T) {
384	cv := version.Cluster(version.Version)
385	cv = cv + ".0"
386	if _, ok := supportedStream[cv]; !ok {
387		t.Errorf("Current version does not have stream support.")
388	}
389}
390
391type fakeWriteFlushCloser struct {
392	mu      sync.Mutex
393	err     error
394	written int
395	closed  chan struct{}
396	writec  chan struct{}
397}
398
399func newFakeWriteFlushCloser(err error) *fakeWriteFlushCloser {
400	return &fakeWriteFlushCloser{
401		err:    err,
402		closed: make(chan struct{}),
403		writec: make(chan struct{}, 1),
404	}
405}
406
407func (wfc *fakeWriteFlushCloser) Write(p []byte) (n int, err error) {
408	wfc.mu.Lock()
409	defer wfc.mu.Unlock()
410	select {
411	case wfc.writec <- struct{}{}:
412	default:
413	}
414	wfc.written += len(p)
415	return len(p), wfc.err
416}
417
418func (wfc *fakeWriteFlushCloser) Flush() {}
419
420func (wfc *fakeWriteFlushCloser) Close() error {
421	close(wfc.closed)
422	return wfc.err
423}
424
425func (wfc *fakeWriteFlushCloser) Written() int {
426	wfc.mu.Lock()
427	defer wfc.mu.Unlock()
428	return wfc.written
429}
430
431func (wfc *fakeWriteFlushCloser) Closed() bool {
432	select {
433	case <-wfc.closed:
434		return true
435	default:
436		return false
437	}
438}
439
440type fakeStreamHandler struct {
441	t  streamType
442	sw *streamWriter
443}
444
445func (h *fakeStreamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
446	w.Header().Add("X-Server-Version", version.Version)
447	w.(http.Flusher).Flush()
448	c := newCloseNotifier()
449	h.sw.attach(&outgoingConn{
450		t:       h.t,
451		Writer:  w,
452		Flusher: w.(http.Flusher),
453		Closer:  c,
454	})
455	<-c.closeNotify()
456}
457