1// Copyright 2016 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package rafthttp 16 17import ( 18 "fmt" 19 "io" 20 "io/ioutil" 21 "net/http" 22 "net/http/httptest" 23 "os" 24 "strings" 25 "testing" 26 "time" 27 28 "go.etcd.io/etcd/etcdserver/api/snap" 29 "go.etcd.io/etcd/pkg/types" 30 "go.etcd.io/etcd/raft/raftpb" 31 32 "go.uber.org/zap" 33) 34 35type strReaderCloser struct{ *strings.Reader } 36 37func (s strReaderCloser) Close() error { return nil } 38 39func TestSnapshotSend(t *testing.T) { 40 tests := []struct { 41 m raftpb.Message 42 rc io.ReadCloser 43 size int64 44 45 wsent bool 46 wfiles int 47 }{ 48 // sent and receive with no errors 49 { 50 m: raftpb.Message{Type: raftpb.MsgSnap, To: 1}, 51 rc: strReaderCloser{strings.NewReader("hello")}, 52 size: 5, 53 54 wsent: true, 55 wfiles: 1, 56 }, 57 // error when reading snapshot for send 58 { 59 m: raftpb.Message{Type: raftpb.MsgSnap, To: 1}, 60 rc: &errReadCloser{fmt.Errorf("snapshot error")}, 61 size: 1, 62 63 wsent: false, 64 wfiles: 0, 65 }, 66 // sends less than the given snapshot length 67 { 68 m: raftpb.Message{Type: raftpb.MsgSnap, To: 1}, 69 rc: strReaderCloser{strings.NewReader("hello")}, 70 size: 10000, 71 72 wsent: false, 73 wfiles: 0, 74 }, 75 // sends less than actual snapshot length 76 { 77 m: raftpb.Message{Type: raftpb.MsgSnap, To: 1}, 78 rc: strReaderCloser{strings.NewReader("hello")}, 79 size: 1, 80 81 wsent: false, 82 wfiles: 0, 83 }, 84 } 85 86 for i, tt := range tests { 87 sent, files := testSnapshotSend(t, snap.NewMessage(tt.m, tt.rc, tt.size)) 88 if tt.wsent != sent { 89 t.Errorf("#%d: snapshot expected %v, got %v", i, tt.wsent, sent) 90 } 91 if tt.wfiles != len(files) { 92 t.Fatalf("#%d: expected %d files, got %d files", i, tt.wfiles, len(files)) 93 } 94 } 95} 96 97func testSnapshotSend(t *testing.T, sm *snap.Message) (bool, []os.FileInfo) { 98 d, err := ioutil.TempDir(os.TempDir(), "snapdir") 99 if err != nil { 100 t.Fatal(err) 101 } 102 defer os.RemoveAll(d) 103 104 r := &fakeRaft{} 105 tr := &Transport{pipelineRt: &http.Transport{}, ClusterID: types.ID(1), Raft: r} 106 ch := make(chan struct{}, 1) 107 h := &syncHandler{newSnapshotHandler(tr, r, snap.New(zap.NewExample(), d), types.ID(1)), ch} 108 srv := httptest.NewServer(h) 109 defer srv.Close() 110 111 picker := mustNewURLPicker(t, []string{srv.URL}) 112 snapsend := newSnapshotSender(tr, picker, types.ID(1), newPeerStatus(zap.NewExample(), types.ID(0), types.ID(1))) 113 defer snapsend.stop() 114 115 snapsend.send(*sm) 116 117 sent := false 118 select { 119 case <-time.After(time.Second): 120 t.Fatalf("timed out sending snapshot") 121 case sent = <-sm.CloseNotify(): 122 } 123 124 // wait for handler to finish accepting snapshot 125 <-ch 126 127 files, rerr := ioutil.ReadDir(d) 128 if rerr != nil { 129 t.Fatal(rerr) 130 } 131 return sent, files 132} 133 134type errReadCloser struct{ err error } 135 136func (s *errReadCloser) Read(p []byte) (int, error) { return 0, s.err } 137func (s *errReadCloser) Close() error { return s.err } 138 139type syncHandler struct { 140 h http.Handler 141 ch chan<- struct{} 142} 143 144func (sh *syncHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { 145 sh.h.ServeHTTP(w, r) 146 sh.ch <- struct{}{} 147} 148