1// Copyright 2015 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package rafthttp 16 17import ( 18 "context" 19 "errors" 20 "fmt" 21 "io" 22 "net/http" 23 "net/http/httptest" 24 "reflect" 25 "sync" 26 "testing" 27 "time" 28 29 stats "go.etcd.io/etcd/etcdserver/api/v2stats" 30 "go.etcd.io/etcd/pkg/testutil" 31 "go.etcd.io/etcd/pkg/types" 32 "go.etcd.io/etcd/raft/raftpb" 33 "go.etcd.io/etcd/version" 34 35 "github.com/coreos/go-semver/semver" 36 "go.uber.org/zap" 37 "golang.org/x/time/rate" 38) 39 40// TestStreamWriterAttachOutgoingConn tests that outgoingConn can be attached 41// to streamWriter. After that, streamWriter can use it to send messages 42// continuously, and closes it when stopped. 43func TestStreamWriterAttachOutgoingConn(t *testing.T) { 44 sw := startStreamWriter(zap.NewExample(), types.ID(0), types.ID(1), newPeerStatus(zap.NewExample(), types.ID(0), types.ID(1)), &stats.FollowerStats{}, &fakeRaft{}) 45 // the expected initial state of streamWriter is not working 46 if _, ok := sw.writec(); ok { 47 t.Errorf("initial working status = %v, want false", ok) 48 } 49 50 // repeat tests to ensure streamWriter can use last attached connection 51 var wfc *fakeWriteFlushCloser 52 for i := 0; i < 3; i++ { 53 prevwfc := wfc 54 wfc = newFakeWriteFlushCloser(nil) 55 sw.attach(&outgoingConn{t: streamTypeMessage, Writer: wfc, Flusher: wfc, Closer: wfc}) 56 57 // previous attached connection should be closed 58 if prevwfc != nil { 59 select { 60 case <-prevwfc.closed: 61 case <-time.After(time.Second): 62 t.Errorf("#%d: close of previous connection timed out", i) 63 } 64 } 65 66 // if prevwfc != nil, the new msgc is ready since prevwfc has closed 67 // if prevwfc == nil, the first connection may be pending, but the first 68 // msgc is already available since it's set on calling startStreamwriter 69 msgc, _ := sw.writec() 70 msgc <- raftpb.Message{} 71 72 select { 73 case <-wfc.writec: 74 case <-time.After(time.Second): 75 t.Errorf("#%d: failed to write to the underlying connection", i) 76 } 77 // write chan is still available 78 if _, ok := sw.writec(); !ok { 79 t.Errorf("#%d: working status = %v, want true", i, ok) 80 } 81 } 82 83 sw.stop() 84 // write chan is unavailable since the writer is stopped. 85 if _, ok := sw.writec(); ok { 86 t.Errorf("working status after stop = %v, want false", ok) 87 } 88 if !wfc.Closed() { 89 t.Errorf("failed to close the underlying connection") 90 } 91} 92 93// TestStreamWriterAttachBadOutgoingConn tests that streamWriter with bad 94// outgoingConn will close the outgoingConn and fall back to non-working status. 95func TestStreamWriterAttachBadOutgoingConn(t *testing.T) { 96 sw := startStreamWriter(zap.NewExample(), types.ID(0), types.ID(1), newPeerStatus(zap.NewExample(), types.ID(0), types.ID(1)), &stats.FollowerStats{}, &fakeRaft{}) 97 defer sw.stop() 98 wfc := newFakeWriteFlushCloser(errors.New("blah")) 99 sw.attach(&outgoingConn{t: streamTypeMessage, Writer: wfc, Flusher: wfc, Closer: wfc}) 100 101 sw.msgc <- raftpb.Message{} 102 select { 103 case <-wfc.closed: 104 case <-time.After(time.Second): 105 t.Errorf("failed to close the underlying connection in time") 106 } 107 // no longer working 108 if _, ok := sw.writec(); ok { 109 t.Errorf("working = %v, want false", ok) 110 } 111} 112 113func TestStreamReaderDialRequest(t *testing.T) { 114 for i, tt := range []streamType{streamTypeMessage, streamTypeMsgAppV2} { 115 tr := &roundTripperRecorder{rec: &testutil.RecorderBuffered{}} 116 sr := &streamReader{ 117 peerID: types.ID(2), 118 tr: &Transport{streamRt: tr, ClusterID: types.ID(1), ID: types.ID(1)}, 119 picker: mustNewURLPicker(t, []string{"http://localhost:2380"}), 120 ctx: context.Background(), 121 } 122 sr.dial(tt) 123 124 act, err := tr.rec.Wait(1) 125 if err != nil { 126 t.Fatal(err) 127 } 128 req := act[0].Params[0].(*http.Request) 129 130 wurl := fmt.Sprintf("http://localhost:2380" + tt.endpoint(zap.NewExample()) + "/1") 131 if req.URL.String() != wurl { 132 t.Errorf("#%d: url = %s, want %s", i, req.URL.String(), wurl) 133 } 134 if w := "GET"; req.Method != w { 135 t.Errorf("#%d: method = %s, want %s", i, req.Method, w) 136 } 137 if g := req.Header.Get("X-Etcd-Cluster-ID"); g != "1" { 138 t.Errorf("#%d: header X-Etcd-Cluster-ID = %s, want 1", i, g) 139 } 140 if g := req.Header.Get("X-Raft-To"); g != "2" { 141 t.Errorf("#%d: header X-Raft-To = %s, want 2", i, g) 142 } 143 } 144} 145 146// TestStreamReaderDialResult tests the result of the dial func call meets the 147// HTTP response received. 148func TestStreamReaderDialResult(t *testing.T) { 149 tests := []struct { 150 code int 151 err error 152 wok bool 153 whalt bool 154 }{ 155 {0, errors.New("blah"), false, false}, 156 {http.StatusOK, nil, true, false}, 157 {http.StatusMethodNotAllowed, nil, false, false}, 158 {http.StatusNotFound, nil, false, false}, 159 {http.StatusPreconditionFailed, nil, false, false}, 160 {http.StatusGone, nil, false, true}, 161 } 162 for i, tt := range tests { 163 h := http.Header{} 164 h.Add("X-Server-Version", version.Version) 165 tr := &respRoundTripper{ 166 code: tt.code, 167 header: h, 168 err: tt.err, 169 } 170 sr := &streamReader{ 171 peerID: types.ID(2), 172 tr: &Transport{streamRt: tr, ClusterID: types.ID(1)}, 173 picker: mustNewURLPicker(t, []string{"http://localhost:2380"}), 174 errorc: make(chan error, 1), 175 ctx: context.Background(), 176 } 177 178 _, err := sr.dial(streamTypeMessage) 179 if ok := err == nil; ok != tt.wok { 180 t.Errorf("#%d: ok = %v, want %v", i, ok, tt.wok) 181 } 182 if halt := len(sr.errorc) > 0; halt != tt.whalt { 183 t.Errorf("#%d: halt = %v, want %v", i, halt, tt.whalt) 184 } 185 } 186} 187 188// TestStreamReaderStopOnDial tests a stream reader closes the connection on stop. 189func TestStreamReaderStopOnDial(t *testing.T) { 190 defer testutil.AfterTest(t) 191 h := http.Header{} 192 h.Add("X-Server-Version", version.Version) 193 tr := &respWaitRoundTripper{rrt: &respRoundTripper{code: http.StatusOK, header: h}} 194 sr := &streamReader{ 195 peerID: types.ID(2), 196 tr: &Transport{streamRt: tr, ClusterID: types.ID(1)}, 197 picker: mustNewURLPicker(t, []string{"http://localhost:2380"}), 198 errorc: make(chan error, 1), 199 typ: streamTypeMessage, 200 status: newPeerStatus(zap.NewExample(), types.ID(1), types.ID(2)), 201 rl: rate.NewLimiter(rate.Every(100*time.Millisecond), 1), 202 } 203 tr.onResp = func() { 204 // stop() waits for the run() goroutine to exit, but that exit 205 // needs a response from RoundTrip() first; use goroutine 206 go sr.stop() 207 // wait so that stop() is blocked on run() exiting 208 time.Sleep(10 * time.Millisecond) 209 // sr.run() completes dialing then begins decoding while stopped 210 } 211 sr.start() 212 select { 213 case <-sr.done: 214 case <-time.After(time.Second): 215 t.Fatal("streamReader did not stop in time") 216 } 217} 218 219type respWaitRoundTripper struct { 220 rrt *respRoundTripper 221 onResp func() 222} 223 224func (t *respWaitRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { 225 resp, err := t.rrt.RoundTrip(req) 226 resp.Body = newWaitReadCloser() 227 t.onResp() 228 return resp, err 229} 230 231type waitReadCloser struct{ closec chan struct{} } 232 233func newWaitReadCloser() *waitReadCloser { return &waitReadCloser{make(chan struct{})} } 234func (wrc *waitReadCloser) Read(p []byte) (int, error) { 235 <-wrc.closec 236 return 0, io.EOF 237} 238func (wrc *waitReadCloser) Close() error { 239 close(wrc.closec) 240 return nil 241} 242 243// TestStreamReaderDialDetectUnsupport tests that dial func could find 244// out that the stream type is not supported by the remote. 245func TestStreamReaderDialDetectUnsupport(t *testing.T) { 246 for i, typ := range []streamType{streamTypeMsgAppV2, streamTypeMessage} { 247 // the response from etcd 2.0 248 tr := &respRoundTripper{ 249 code: http.StatusNotFound, 250 header: http.Header{}, 251 } 252 sr := &streamReader{ 253 peerID: types.ID(2), 254 tr: &Transport{streamRt: tr, ClusterID: types.ID(1)}, 255 picker: mustNewURLPicker(t, []string{"http://localhost:2380"}), 256 ctx: context.Background(), 257 } 258 259 _, err := sr.dial(typ) 260 if err != errUnsupportedStreamType { 261 t.Errorf("#%d: error = %v, want %v", i, err, errUnsupportedStreamType) 262 } 263 } 264} 265 266// TestStream tests that streamReader and streamWriter can build stream to 267// send messages between each other. 268func TestStream(t *testing.T) { 269 recvc := make(chan raftpb.Message, streamBufSize) 270 propc := make(chan raftpb.Message, streamBufSize) 271 msgapp := raftpb.Message{ 272 Type: raftpb.MsgApp, 273 From: 2, 274 To: 1, 275 Term: 1, 276 LogTerm: 1, 277 Index: 3, 278 Entries: []raftpb.Entry{{Term: 1, Index: 4}}, 279 } 280 281 tests := []struct { 282 t streamType 283 m raftpb.Message 284 wc chan raftpb.Message 285 }{ 286 { 287 streamTypeMessage, 288 raftpb.Message{Type: raftpb.MsgProp, To: 2}, 289 propc, 290 }, 291 { 292 streamTypeMessage, 293 msgapp, 294 recvc, 295 }, 296 { 297 streamTypeMsgAppV2, 298 msgapp, 299 recvc, 300 }, 301 } 302 for i, tt := range tests { 303 h := &fakeStreamHandler{t: tt.t} 304 srv := httptest.NewServer(h) 305 defer srv.Close() 306 307 sw := startStreamWriter(zap.NewExample(), types.ID(0), types.ID(1), newPeerStatus(zap.NewExample(), types.ID(0), types.ID(1)), &stats.FollowerStats{}, &fakeRaft{}) 308 defer sw.stop() 309 h.sw = sw 310 311 picker := mustNewURLPicker(t, []string{srv.URL}) 312 tr := &Transport{streamRt: &http.Transport{}, ClusterID: types.ID(1)} 313 314 sr := &streamReader{ 315 peerID: types.ID(2), 316 typ: tt.t, 317 tr: tr, 318 picker: picker, 319 status: newPeerStatus(zap.NewExample(), types.ID(0), types.ID(2)), 320 recvc: recvc, 321 propc: propc, 322 rl: rate.NewLimiter(rate.Every(100*time.Millisecond), 1), 323 } 324 sr.start() 325 326 // wait for stream to work 327 var writec chan<- raftpb.Message 328 for { 329 var ok bool 330 if writec, ok = sw.writec(); ok { 331 break 332 } 333 time.Sleep(time.Millisecond) 334 } 335 336 writec <- tt.m 337 var m raftpb.Message 338 select { 339 case m = <-tt.wc: 340 case <-time.After(time.Second): 341 t.Fatalf("#%d: failed to receive message from the channel", i) 342 } 343 if !reflect.DeepEqual(m, tt.m) { 344 t.Fatalf("#%d: message = %+v, want %+v", i, m, tt.m) 345 } 346 347 sr.stop() 348 } 349} 350 351func TestCheckStreamSupport(t *testing.T) { 352 tests := []struct { 353 v *semver.Version 354 t streamType 355 w bool 356 }{ 357 // support 358 { 359 semver.Must(semver.NewVersion("2.1.0")), 360 streamTypeMsgAppV2, 361 true, 362 }, 363 // ignore patch 364 { 365 semver.Must(semver.NewVersion("2.1.9")), 366 streamTypeMsgAppV2, 367 true, 368 }, 369 // ignore prerelease 370 { 371 semver.Must(semver.NewVersion("2.1.0-alpha")), 372 streamTypeMsgAppV2, 373 true, 374 }, 375 } 376 for i, tt := range tests { 377 if g := checkStreamSupport(tt.v, tt.t); g != tt.w { 378 t.Errorf("#%d: check = %v, want %v", i, g, tt.w) 379 } 380 } 381} 382 383func TestStreamSupportCurrentVersion(t *testing.T) { 384 cv := version.Cluster(version.Version) 385 cv = cv + ".0" 386 if _, ok := supportedStream[cv]; !ok { 387 t.Errorf("Current version does not have stream support.") 388 } 389} 390 391type fakeWriteFlushCloser struct { 392 mu sync.Mutex 393 err error 394 written int 395 closed chan struct{} 396 writec chan struct{} 397} 398 399func newFakeWriteFlushCloser(err error) *fakeWriteFlushCloser { 400 return &fakeWriteFlushCloser{ 401 err: err, 402 closed: make(chan struct{}), 403 writec: make(chan struct{}, 1), 404 } 405} 406 407func (wfc *fakeWriteFlushCloser) Write(p []byte) (n int, err error) { 408 wfc.mu.Lock() 409 defer wfc.mu.Unlock() 410 select { 411 case wfc.writec <- struct{}{}: 412 default: 413 } 414 wfc.written += len(p) 415 return len(p), wfc.err 416} 417 418func (wfc *fakeWriteFlushCloser) Flush() {} 419 420func (wfc *fakeWriteFlushCloser) Close() error { 421 close(wfc.closed) 422 return wfc.err 423} 424 425func (wfc *fakeWriteFlushCloser) Written() int { 426 wfc.mu.Lock() 427 defer wfc.mu.Unlock() 428 return wfc.written 429} 430 431func (wfc *fakeWriteFlushCloser) Closed() bool { 432 select { 433 case <-wfc.closed: 434 return true 435 default: 436 return false 437 } 438} 439 440type fakeStreamHandler struct { 441 t streamType 442 sw *streamWriter 443} 444 445func (h *fakeStreamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { 446 w.Header().Add("X-Server-Version", version.Version) 447 w.(http.Flusher).Flush() 448 c := newCloseNotifier() 449 h.sw.attach(&outgoingConn{ 450 t: h.t, 451 Writer: w, 452 Flusher: w.(http.Flusher), 453 Closer: c, 454 }) 455 <-c.closeNotify() 456} 457