1// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package rafthttp
16
17import (
18	"bytes"
19	"errors"
20	"fmt"
21	"io"
22	"net/http"
23	"net/http/httptest"
24	"net/url"
25	"strings"
26	"testing"
27	"time"
28
29	"go.etcd.io/etcd/etcdserver/api/snap"
30	"go.etcd.io/etcd/pkg/pbutil"
31	"go.etcd.io/etcd/pkg/types"
32	"go.etcd.io/etcd/raft/raftpb"
33	"go.etcd.io/etcd/version"
34
35	"go.uber.org/zap"
36)
37
38func TestServeRaftPrefix(t *testing.T) {
39	testCases := []struct {
40		method    string
41		body      io.Reader
42		p         Raft
43		clusterID string
44
45		wcode int
46	}{
47		{
48			// bad method
49			"GET",
50			bytes.NewReader(
51				pbutil.MustMarshal(&raftpb.Message{}),
52			),
53			&fakeRaft{},
54			"0",
55			http.StatusMethodNotAllowed,
56		},
57		{
58			// bad method
59			"PUT",
60			bytes.NewReader(
61				pbutil.MustMarshal(&raftpb.Message{}),
62			),
63			&fakeRaft{},
64			"0",
65			http.StatusMethodNotAllowed,
66		},
67		{
68			// bad method
69			"DELETE",
70			bytes.NewReader(
71				pbutil.MustMarshal(&raftpb.Message{}),
72			),
73			&fakeRaft{},
74			"0",
75			http.StatusMethodNotAllowed,
76		},
77		{
78			// bad request body
79			"POST",
80			&errReader{},
81			&fakeRaft{},
82			"0",
83			http.StatusBadRequest,
84		},
85		{
86			// bad request protobuf
87			"POST",
88			strings.NewReader("malformed garbage"),
89			&fakeRaft{},
90			"0",
91			http.StatusBadRequest,
92		},
93		{
94			// good request, wrong cluster ID
95			"POST",
96			bytes.NewReader(
97				pbutil.MustMarshal(&raftpb.Message{}),
98			),
99			&fakeRaft{},
100			"1",
101			http.StatusPreconditionFailed,
102		},
103		{
104			// good request, Processor failure
105			"POST",
106			bytes.NewReader(
107				pbutil.MustMarshal(&raftpb.Message{}),
108			),
109			&fakeRaft{
110				err: &resWriterToError{code: http.StatusForbidden},
111			},
112			"0",
113			http.StatusForbidden,
114		},
115		{
116			// good request, Processor failure
117			"POST",
118			bytes.NewReader(
119				pbutil.MustMarshal(&raftpb.Message{}),
120			),
121			&fakeRaft{
122				err: &resWriterToError{code: http.StatusInternalServerError},
123			},
124			"0",
125			http.StatusInternalServerError,
126		},
127		{
128			// good request, Processor failure
129			"POST",
130			bytes.NewReader(
131				pbutil.MustMarshal(&raftpb.Message{}),
132			),
133			&fakeRaft{err: errors.New("blah")},
134			"0",
135			http.StatusInternalServerError,
136		},
137		{
138			// good request
139			"POST",
140			bytes.NewReader(
141				pbutil.MustMarshal(&raftpb.Message{}),
142			),
143			&fakeRaft{},
144			"0",
145			http.StatusNoContent,
146		},
147	}
148	for i, tt := range testCases {
149		req, err := http.NewRequest(tt.method, "foo", tt.body)
150		if err != nil {
151			t.Fatalf("#%d: could not create request: %#v", i, err)
152		}
153		req.Header.Set("X-Etcd-Cluster-ID", tt.clusterID)
154		req.Header.Set("X-Server-Version", version.Version)
155		rw := httptest.NewRecorder()
156		h := newPipelineHandler(&Transport{Logger: zap.NewExample()}, tt.p, types.ID(0))
157
158		// goroutine because the handler panics to disconnect on raft error
159		donec := make(chan struct{})
160		go func() {
161			defer func() {
162				recover()
163				close(donec)
164			}()
165			h.ServeHTTP(rw, req)
166		}()
167		<-donec
168
169		if rw.Code != tt.wcode {
170			t.Errorf("#%d: got code=%d, want %d", i, rw.Code, tt.wcode)
171		}
172	}
173}
174
175func TestServeRaftStreamPrefix(t *testing.T) {
176	tests := []struct {
177		path  string
178		wtype streamType
179	}{
180		{
181			RaftStreamPrefix + "/message/1",
182			streamTypeMessage,
183		},
184		{
185			RaftStreamPrefix + "/msgapp/1",
186			streamTypeMsgAppV2,
187		},
188	}
189	for i, tt := range tests {
190		req, err := http.NewRequest("GET", "http://localhost:2380"+tt.path, nil)
191		if err != nil {
192			t.Fatalf("#%d: could not create request: %#v", i, err)
193		}
194		req.Header.Set("X-Etcd-Cluster-ID", "1")
195		req.Header.Set("X-Server-Version", version.Version)
196		req.Header.Set("X-Raft-To", "2")
197
198		peer := newFakePeer()
199		peerGetter := &fakePeerGetter{peers: map[types.ID]Peer{types.ID(1): peer}}
200		tr := &Transport{}
201		h := newStreamHandler(tr, peerGetter, &fakeRaft{}, types.ID(2), types.ID(1))
202
203		rw := httptest.NewRecorder()
204		go h.ServeHTTP(rw, req)
205
206		var conn *outgoingConn
207		select {
208		case conn = <-peer.connc:
209		case <-time.After(time.Second):
210			t.Fatalf("#%d: failed to attach outgoingConn", i)
211		}
212		if g := rw.Header().Get("X-Server-Version"); g != version.Version {
213			t.Errorf("#%d: X-Server-Version = %s, want %s", i, g, version.Version)
214		}
215		if conn.t != tt.wtype {
216			t.Errorf("#%d: type = %s, want %s", i, conn.t, tt.wtype)
217		}
218		conn.Close()
219	}
220}
221
222func TestServeRaftStreamPrefixBad(t *testing.T) {
223	removedID := uint64(5)
224	tests := []struct {
225		method    string
226		path      string
227		clusterID string
228		remote    string
229
230		wcode int
231	}{
232		// bad method
233		{
234			"PUT",
235			RaftStreamPrefix + "/message/1",
236			"1",
237			"1",
238			http.StatusMethodNotAllowed,
239		},
240		// bad method
241		{
242			"POST",
243			RaftStreamPrefix + "/message/1",
244			"1",
245			"1",
246			http.StatusMethodNotAllowed,
247		},
248		// bad method
249		{
250			"DELETE",
251			RaftStreamPrefix + "/message/1",
252			"1",
253			"1",
254			http.StatusMethodNotAllowed,
255		},
256		// bad path
257		{
258			"GET",
259			RaftStreamPrefix + "/strange/1",
260			"1",
261			"1",
262			http.StatusNotFound,
263		},
264		// bad path
265		{
266			"GET",
267			RaftStreamPrefix + "/strange",
268			"1",
269			"1",
270			http.StatusNotFound,
271		},
272		// non-existent peer
273		{
274			"GET",
275			RaftStreamPrefix + "/message/2",
276			"1",
277			"1",
278			http.StatusNotFound,
279		},
280		// removed peer
281		{
282			"GET",
283			RaftStreamPrefix + "/message/" + fmt.Sprint(removedID),
284			"1",
285			"1",
286			http.StatusGone,
287		},
288		// wrong cluster ID
289		{
290			"GET",
291			RaftStreamPrefix + "/message/1",
292			"2",
293			"1",
294			http.StatusPreconditionFailed,
295		},
296		// wrong remote id
297		{
298			"GET",
299			RaftStreamPrefix + "/message/1",
300			"1",
301			"2",
302			http.StatusPreconditionFailed,
303		},
304	}
305	for i, tt := range tests {
306		req, err := http.NewRequest(tt.method, "http://localhost:2380"+tt.path, nil)
307		if err != nil {
308			t.Fatalf("#%d: could not create request: %#v", i, err)
309		}
310		req.Header.Set("X-Etcd-Cluster-ID", tt.clusterID)
311		req.Header.Set("X-Server-Version", version.Version)
312		req.Header.Set("X-Raft-To", tt.remote)
313		rw := httptest.NewRecorder()
314		tr := &Transport{}
315		peerGetter := &fakePeerGetter{peers: map[types.ID]Peer{types.ID(1): newFakePeer()}}
316		r := &fakeRaft{removedID: removedID}
317		h := newStreamHandler(tr, peerGetter, r, types.ID(1), types.ID(1))
318		h.ServeHTTP(rw, req)
319
320		if rw.Code != tt.wcode {
321			t.Errorf("#%d: code = %d, want %d", i, rw.Code, tt.wcode)
322		}
323	}
324}
325
326func TestCloseNotifier(t *testing.T) {
327	c := newCloseNotifier()
328	select {
329	case <-c.closeNotify():
330		t.Fatalf("received unexpected close notification")
331	default:
332	}
333	c.Close()
334	select {
335	case <-c.closeNotify():
336	default:
337		t.Fatalf("failed to get close notification")
338	}
339}
340
341// errReader implements io.Reader to facilitate a broken request.
342type errReader struct{}
343
344func (er *errReader) Read(_ []byte) (int, error) { return 0, errors.New("some error") }
345
346type resWriterToError struct {
347	code int
348}
349
350func (e *resWriterToError) Error() string                 { return "" }
351func (e *resWriterToError) WriteTo(w http.ResponseWriter) { w.WriteHeader(e.code) }
352
353type fakePeerGetter struct {
354	peers map[types.ID]Peer
355}
356
357func (pg *fakePeerGetter) Get(id types.ID) Peer { return pg.peers[id] }
358
359type fakePeer struct {
360	msgs     []raftpb.Message
361	snapMsgs []snap.Message
362	peerURLs types.URLs
363	connc    chan *outgoingConn
364	paused   bool
365}
366
367func newFakePeer() *fakePeer {
368	fakeURL, _ := url.Parse("http://localhost")
369	return &fakePeer{
370		connc:    make(chan *outgoingConn, 1),
371		peerURLs: types.URLs{*fakeURL},
372	}
373}
374
375func (pr *fakePeer) send(m raftpb.Message) {
376	if pr.paused {
377		return
378	}
379	pr.msgs = append(pr.msgs, m)
380}
381
382func (pr *fakePeer) sendSnap(m snap.Message) {
383	if pr.paused {
384		return
385	}
386	pr.snapMsgs = append(pr.snapMsgs, m)
387}
388
389func (pr *fakePeer) update(urls types.URLs)                { pr.peerURLs = urls }
390func (pr *fakePeer) attachOutgoingConn(conn *outgoingConn) { pr.connc <- conn }
391func (pr *fakePeer) activeSince() time.Time                { return time.Time{} }
392func (pr *fakePeer) stop()                                 {}
393func (pr *fakePeer) Pause()                                { pr.paused = true }
394func (pr *fakePeer) Resume()                               { pr.paused = false }
395