1// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package rafthttp
16
17import (
18	"fmt"
19	"io"
20	"io/ioutil"
21	"net/http"
22	"net/http/httptest"
23	"os"
24	"strings"
25	"testing"
26	"time"
27
28	"go.etcd.io/etcd/etcdserver/api/snap"
29	"go.etcd.io/etcd/pkg/types"
30	"go.etcd.io/etcd/raft/raftpb"
31
32	"go.uber.org/zap"
33)
34
35type strReaderCloser struct{ *strings.Reader }
36
37func (s strReaderCloser) Close() error { return nil }
38
39func TestSnapshotSend(t *testing.T) {
40	tests := []struct {
41		m    raftpb.Message
42		rc   io.ReadCloser
43		size int64
44
45		wsent  bool
46		wfiles int
47	}{
48		// sent and receive with no errors
49		{
50			m:    raftpb.Message{Type: raftpb.MsgSnap, To: 1},
51			rc:   strReaderCloser{strings.NewReader("hello")},
52			size: 5,
53
54			wsent:  true,
55			wfiles: 1,
56		},
57		// error when reading snapshot for send
58		{
59			m:    raftpb.Message{Type: raftpb.MsgSnap, To: 1},
60			rc:   &errReadCloser{fmt.Errorf("snapshot error")},
61			size: 1,
62
63			wsent:  false,
64			wfiles: 0,
65		},
66		// sends less than the given snapshot length
67		{
68			m:    raftpb.Message{Type: raftpb.MsgSnap, To: 1},
69			rc:   strReaderCloser{strings.NewReader("hello")},
70			size: 10000,
71
72			wsent:  false,
73			wfiles: 0,
74		},
75		// sends less than actual snapshot length
76		{
77			m:    raftpb.Message{Type: raftpb.MsgSnap, To: 1},
78			rc:   strReaderCloser{strings.NewReader("hello")},
79			size: 1,
80
81			wsent:  false,
82			wfiles: 0,
83		},
84	}
85
86	for i, tt := range tests {
87		sent, files := testSnapshotSend(t, snap.NewMessage(tt.m, tt.rc, tt.size))
88		if tt.wsent != sent {
89			t.Errorf("#%d: snapshot expected %v, got %v", i, tt.wsent, sent)
90		}
91		if tt.wfiles != len(files) {
92			t.Fatalf("#%d: expected %d files, got %d files", i, tt.wfiles, len(files))
93		}
94	}
95}
96
97func testSnapshotSend(t *testing.T, sm *snap.Message) (bool, []os.FileInfo) {
98	d, err := ioutil.TempDir(os.TempDir(), "snapdir")
99	if err != nil {
100		t.Fatal(err)
101	}
102	defer os.RemoveAll(d)
103
104	r := &fakeRaft{}
105	tr := &Transport{pipelineRt: &http.Transport{}, ClusterID: types.ID(1), Raft: r}
106	ch := make(chan struct{}, 1)
107	h := &syncHandler{newSnapshotHandler(tr, r, snap.New(zap.NewExample(), d), types.ID(1)), ch}
108	srv := httptest.NewServer(h)
109	defer srv.Close()
110
111	picker := mustNewURLPicker(t, []string{srv.URL})
112	snapsend := newSnapshotSender(tr, picker, types.ID(1), newPeerStatus(zap.NewExample(), types.ID(0), types.ID(1)))
113	defer snapsend.stop()
114
115	snapsend.send(*sm)
116
117	sent := false
118	select {
119	case <-time.After(time.Second):
120		t.Fatalf("timed out sending snapshot")
121	case sent = <-sm.CloseNotify():
122	}
123
124	// wait for handler to finish accepting snapshot
125	<-ch
126
127	files, rerr := ioutil.ReadDir(d)
128	if rerr != nil {
129		t.Fatal(rerr)
130	}
131	return sent, files
132}
133
134type errReadCloser struct{ err error }
135
136func (s *errReadCloser) Read(p []byte) (int, error) { return 0, s.err }
137func (s *errReadCloser) Close() error               { return s.err }
138
139type syncHandler struct {
140	h  http.Handler
141	ch chan<- struct{}
142}
143
144func (sh *syncHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
145	sh.h.ServeHTTP(w, r)
146	sh.ch <- struct{}{}
147}
148