1// Copyright 2015 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package rafthttp 16 17import ( 18 "bytes" 19 "errors" 20 "fmt" 21 "io" 22 "net/http" 23 "net/http/httptest" 24 "net/url" 25 "strings" 26 "testing" 27 "time" 28 29 "go.etcd.io/etcd/etcdserver/api/snap" 30 "go.etcd.io/etcd/pkg/pbutil" 31 "go.etcd.io/etcd/pkg/types" 32 "go.etcd.io/etcd/raft/raftpb" 33 "go.etcd.io/etcd/version" 34 35 "go.uber.org/zap" 36) 37 38func TestServeRaftPrefix(t *testing.T) { 39 testCases := []struct { 40 method string 41 body io.Reader 42 p Raft 43 clusterID string 44 45 wcode int 46 }{ 47 { 48 // bad method 49 "GET", 50 bytes.NewReader( 51 pbutil.MustMarshal(&raftpb.Message{}), 52 ), 53 &fakeRaft{}, 54 "0", 55 http.StatusMethodNotAllowed, 56 }, 57 { 58 // bad method 59 "PUT", 60 bytes.NewReader( 61 pbutil.MustMarshal(&raftpb.Message{}), 62 ), 63 &fakeRaft{}, 64 "0", 65 http.StatusMethodNotAllowed, 66 }, 67 { 68 // bad method 69 "DELETE", 70 bytes.NewReader( 71 pbutil.MustMarshal(&raftpb.Message{}), 72 ), 73 &fakeRaft{}, 74 "0", 75 http.StatusMethodNotAllowed, 76 }, 77 { 78 // bad request body 79 "POST", 80 &errReader{}, 81 &fakeRaft{}, 82 "0", 83 http.StatusBadRequest, 84 }, 85 { 86 // bad request protobuf 87 "POST", 88 strings.NewReader("malformed garbage"), 89 &fakeRaft{}, 90 "0", 91 http.StatusBadRequest, 92 }, 93 { 94 // good request, wrong cluster ID 95 "POST", 96 bytes.NewReader( 97 pbutil.MustMarshal(&raftpb.Message{}), 98 ), 99 &fakeRaft{}, 100 "1", 101 http.StatusPreconditionFailed, 102 }, 103 { 104 // good request, Processor failure 105 "POST", 106 bytes.NewReader( 107 pbutil.MustMarshal(&raftpb.Message{}), 108 ), 109 &fakeRaft{ 110 err: &resWriterToError{code: http.StatusForbidden}, 111 }, 112 "0", 113 http.StatusForbidden, 114 }, 115 { 116 // good request, Processor failure 117 "POST", 118 bytes.NewReader( 119 pbutil.MustMarshal(&raftpb.Message{}), 120 ), 121 &fakeRaft{ 122 err: &resWriterToError{code: http.StatusInternalServerError}, 123 }, 124 "0", 125 http.StatusInternalServerError, 126 }, 127 { 128 // good request, Processor failure 129 "POST", 130 bytes.NewReader( 131 pbutil.MustMarshal(&raftpb.Message{}), 132 ), 133 &fakeRaft{err: errors.New("blah")}, 134 "0", 135 http.StatusInternalServerError, 136 }, 137 { 138 // good request 139 "POST", 140 bytes.NewReader( 141 pbutil.MustMarshal(&raftpb.Message{}), 142 ), 143 &fakeRaft{}, 144 "0", 145 http.StatusNoContent, 146 }, 147 } 148 for i, tt := range testCases { 149 req, err := http.NewRequest(tt.method, "foo", tt.body) 150 if err != nil { 151 t.Fatalf("#%d: could not create request: %#v", i, err) 152 } 153 req.Header.Set("X-Etcd-Cluster-ID", tt.clusterID) 154 req.Header.Set("X-Server-Version", version.Version) 155 rw := httptest.NewRecorder() 156 h := newPipelineHandler(&Transport{Logger: zap.NewExample()}, tt.p, types.ID(0)) 157 158 // goroutine because the handler panics to disconnect on raft error 159 donec := make(chan struct{}) 160 go func() { 161 defer func() { 162 recover() 163 close(donec) 164 }() 165 h.ServeHTTP(rw, req) 166 }() 167 <-donec 168 169 if rw.Code != tt.wcode { 170 t.Errorf("#%d: got code=%d, want %d", i, rw.Code, tt.wcode) 171 } 172 } 173} 174 175func TestServeRaftStreamPrefix(t *testing.T) { 176 tests := []struct { 177 path string 178 wtype streamType 179 }{ 180 { 181 RaftStreamPrefix + "/message/1", 182 streamTypeMessage, 183 }, 184 { 185 RaftStreamPrefix + "/msgapp/1", 186 streamTypeMsgAppV2, 187 }, 188 } 189 for i, tt := range tests { 190 req, err := http.NewRequest("GET", "http://localhost:2380"+tt.path, nil) 191 if err != nil { 192 t.Fatalf("#%d: could not create request: %#v", i, err) 193 } 194 req.Header.Set("X-Etcd-Cluster-ID", "1") 195 req.Header.Set("X-Server-Version", version.Version) 196 req.Header.Set("X-Raft-To", "2") 197 198 peer := newFakePeer() 199 peerGetter := &fakePeerGetter{peers: map[types.ID]Peer{types.ID(1): peer}} 200 tr := &Transport{} 201 h := newStreamHandler(tr, peerGetter, &fakeRaft{}, types.ID(2), types.ID(1)) 202 203 rw := httptest.NewRecorder() 204 go h.ServeHTTP(rw, req) 205 206 var conn *outgoingConn 207 select { 208 case conn = <-peer.connc: 209 case <-time.After(time.Second): 210 t.Fatalf("#%d: failed to attach outgoingConn", i) 211 } 212 if g := rw.Header().Get("X-Server-Version"); g != version.Version { 213 t.Errorf("#%d: X-Server-Version = %s, want %s", i, g, version.Version) 214 } 215 if conn.t != tt.wtype { 216 t.Errorf("#%d: type = %s, want %s", i, conn.t, tt.wtype) 217 } 218 conn.Close() 219 } 220} 221 222func TestServeRaftStreamPrefixBad(t *testing.T) { 223 removedID := uint64(5) 224 tests := []struct { 225 method string 226 path string 227 clusterID string 228 remote string 229 230 wcode int 231 }{ 232 // bad method 233 { 234 "PUT", 235 RaftStreamPrefix + "/message/1", 236 "1", 237 "1", 238 http.StatusMethodNotAllowed, 239 }, 240 // bad method 241 { 242 "POST", 243 RaftStreamPrefix + "/message/1", 244 "1", 245 "1", 246 http.StatusMethodNotAllowed, 247 }, 248 // bad method 249 { 250 "DELETE", 251 RaftStreamPrefix + "/message/1", 252 "1", 253 "1", 254 http.StatusMethodNotAllowed, 255 }, 256 // bad path 257 { 258 "GET", 259 RaftStreamPrefix + "/strange/1", 260 "1", 261 "1", 262 http.StatusNotFound, 263 }, 264 // bad path 265 { 266 "GET", 267 RaftStreamPrefix + "/strange", 268 "1", 269 "1", 270 http.StatusNotFound, 271 }, 272 // non-existent peer 273 { 274 "GET", 275 RaftStreamPrefix + "/message/2", 276 "1", 277 "1", 278 http.StatusNotFound, 279 }, 280 // removed peer 281 { 282 "GET", 283 RaftStreamPrefix + "/message/" + fmt.Sprint(removedID), 284 "1", 285 "1", 286 http.StatusGone, 287 }, 288 // wrong cluster ID 289 { 290 "GET", 291 RaftStreamPrefix + "/message/1", 292 "2", 293 "1", 294 http.StatusPreconditionFailed, 295 }, 296 // wrong remote id 297 { 298 "GET", 299 RaftStreamPrefix + "/message/1", 300 "1", 301 "2", 302 http.StatusPreconditionFailed, 303 }, 304 } 305 for i, tt := range tests { 306 req, err := http.NewRequest(tt.method, "http://localhost:2380"+tt.path, nil) 307 if err != nil { 308 t.Fatalf("#%d: could not create request: %#v", i, err) 309 } 310 req.Header.Set("X-Etcd-Cluster-ID", tt.clusterID) 311 req.Header.Set("X-Server-Version", version.Version) 312 req.Header.Set("X-Raft-To", tt.remote) 313 rw := httptest.NewRecorder() 314 tr := &Transport{} 315 peerGetter := &fakePeerGetter{peers: map[types.ID]Peer{types.ID(1): newFakePeer()}} 316 r := &fakeRaft{removedID: removedID} 317 h := newStreamHandler(tr, peerGetter, r, types.ID(1), types.ID(1)) 318 h.ServeHTTP(rw, req) 319 320 if rw.Code != tt.wcode { 321 t.Errorf("#%d: code = %d, want %d", i, rw.Code, tt.wcode) 322 } 323 } 324} 325 326func TestCloseNotifier(t *testing.T) { 327 c := newCloseNotifier() 328 select { 329 case <-c.closeNotify(): 330 t.Fatalf("received unexpected close notification") 331 default: 332 } 333 c.Close() 334 select { 335 case <-c.closeNotify(): 336 default: 337 t.Fatalf("failed to get close notification") 338 } 339} 340 341// errReader implements io.Reader to facilitate a broken request. 342type errReader struct{} 343 344func (er *errReader) Read(_ []byte) (int, error) { return 0, errors.New("some error") } 345 346type resWriterToError struct { 347 code int 348} 349 350func (e *resWriterToError) Error() string { return "" } 351func (e *resWriterToError) WriteTo(w http.ResponseWriter) { w.WriteHeader(e.code) } 352 353type fakePeerGetter struct { 354 peers map[types.ID]Peer 355} 356 357func (pg *fakePeerGetter) Get(id types.ID) Peer { return pg.peers[id] } 358 359type fakePeer struct { 360 msgs []raftpb.Message 361 snapMsgs []snap.Message 362 peerURLs types.URLs 363 connc chan *outgoingConn 364 paused bool 365} 366 367func newFakePeer() *fakePeer { 368 fakeURL, _ := url.Parse("http://localhost") 369 return &fakePeer{ 370 connc: make(chan *outgoingConn, 1), 371 peerURLs: types.URLs{*fakeURL}, 372 } 373} 374 375func (pr *fakePeer) send(m raftpb.Message) { 376 if pr.paused { 377 return 378 } 379 pr.msgs = append(pr.msgs, m) 380} 381 382func (pr *fakePeer) sendSnap(m snap.Message) { 383 if pr.paused { 384 return 385 } 386 pr.snapMsgs = append(pr.snapMsgs, m) 387} 388 389func (pr *fakePeer) update(urls types.URLs) { pr.peerURLs = urls } 390func (pr *fakePeer) attachOutgoingConn(conn *outgoingConn) { pr.connc <- conn } 391func (pr *fakePeer) activeSince() time.Time { return time.Time{} } 392func (pr *fakePeer) stop() {} 393func (pr *fakePeer) Pause() { pr.paused = true } 394func (pr *fakePeer) Resume() { pr.paused = false } 395