1// Copyright 2018 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package snapshot
16
17import (
18	"context"
19	"fmt"
20	"math/rand"
21	"net/url"
22	"os"
23	"path/filepath"
24	"strings"
25	"testing"
26	"time"
27
28	"go.etcd.io/etcd/clientv3"
29	"go.etcd.io/etcd/embed"
30	"go.etcd.io/etcd/pkg/fileutil"
31	"go.etcd.io/etcd/pkg/testutil"
32
33	"go.uber.org/zap"
34)
35
36// TestSnapshotV3RestoreSingle tests single node cluster restoring
37// from a snapshot file.
38func TestSnapshotV3RestoreSingle(t *testing.T) {
39	kvs := []kv{{"foo1", "bar1"}, {"foo2", "bar2"}, {"foo3", "bar3"}}
40	dbPath := createSnapshotFile(t, kvs)
41	defer os.RemoveAll(dbPath)
42
43	clusterN := 1
44	urls := newEmbedURLs(clusterN * 2)
45	cURLs, pURLs := urls[:clusterN], urls[clusterN:]
46
47	cfg := embed.NewConfig()
48	cfg.Logger = "zap"
49	cfg.LogOutputs = []string{"/dev/null"}
50	cfg.Debug = false
51	cfg.Name = "s1"
52	cfg.InitialClusterToken = testClusterTkn
53	cfg.ClusterState = "existing"
54	cfg.LCUrls, cfg.ACUrls = cURLs, cURLs
55	cfg.LPUrls, cfg.APUrls = pURLs, pURLs
56	cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, pURLs[0].String())
57	cfg.Dir = filepath.Join(os.TempDir(), fmt.Sprint(time.Now().Nanosecond()))
58
59	sp := NewV3(zap.NewExample())
60	pss := make([]string, 0, len(pURLs))
61	for _, p := range pURLs {
62		pss = append(pss, p.String())
63	}
64	if err := sp.Restore(RestoreConfig{
65		SnapshotPath:        dbPath,
66		Name:                cfg.Name,
67		OutputDataDir:       cfg.Dir,
68		InitialCluster:      cfg.InitialCluster,
69		InitialClusterToken: cfg.InitialClusterToken,
70		PeerURLs:            pss,
71	}); err != nil {
72		t.Fatal(err)
73	}
74
75	srv, err := embed.StartEtcd(cfg)
76	if err != nil {
77		t.Fatal(err)
78	}
79	defer func() {
80		os.RemoveAll(cfg.Dir)
81		srv.Close()
82	}()
83	select {
84	case <-srv.Server.ReadyNotify():
85	case <-time.After(3 * time.Second):
86		t.Fatalf("failed to start restored etcd member")
87	}
88
89	var cli *clientv3.Client
90	cli, err = clientv3.New(clientv3.Config{Endpoints: []string{cfg.ACUrls[0].String()}})
91	if err != nil {
92		t.Fatal(err)
93	}
94	defer cli.Close()
95	for i := range kvs {
96		var gresp *clientv3.GetResponse
97		gresp, err = cli.Get(context.Background(), kvs[i].k)
98		if err != nil {
99			t.Fatal(err)
100		}
101		if string(gresp.Kvs[0].Value) != kvs[i].v {
102			t.Fatalf("#%d: value expected %s, got %s", i, kvs[i].v, string(gresp.Kvs[0].Value))
103		}
104	}
105}
106
107// TestSnapshotV3RestoreMulti ensures that multiple members
108// can boot into the same cluster after being restored from a same
109// snapshot file.
110func TestSnapshotV3RestoreMulti(t *testing.T) {
111	kvs := []kv{{"foo1", "bar1"}, {"foo2", "bar2"}, {"foo3", "bar3"}}
112	dbPath := createSnapshotFile(t, kvs)
113	defer os.RemoveAll(dbPath)
114
115	clusterN := 3
116	cURLs, _, srvs := restoreCluster(t, clusterN, dbPath)
117	defer func() {
118		for i := 0; i < clusterN; i++ {
119			os.RemoveAll(srvs[i].Config().Dir)
120			srvs[i].Close()
121		}
122	}()
123
124	// wait for leader election
125	time.Sleep(time.Second)
126
127	for i := 0; i < clusterN; i++ {
128		cli, err := clientv3.New(clientv3.Config{Endpoints: []string{cURLs[i].String()}})
129		if err != nil {
130			t.Fatal(err)
131		}
132		defer cli.Close()
133		for i := range kvs {
134			var gresp *clientv3.GetResponse
135			gresp, err = cli.Get(context.Background(), kvs[i].k)
136			if err != nil {
137				t.Fatal(err)
138			}
139			if string(gresp.Kvs[0].Value) != kvs[i].v {
140				t.Fatalf("#%d: value expected %s, got %s", i, kvs[i].v, string(gresp.Kvs[0].Value))
141			}
142		}
143	}
144}
145
146// TestSnapshotFilePermissions ensures that the snapshot is saved with
147// the correct file permissions.
148func TestSnapshotFilePermissions(t *testing.T) {
149	expectedFileMode := os.FileMode(fileutil.PrivateFileMode)
150	kvs := []kv{{"foo1", "bar1"}, {"foo2", "bar2"}, {"foo3", "bar3"}}
151	dbPath := createSnapshotFile(t, kvs)
152	defer os.RemoveAll(dbPath)
153
154	dbInfo, err := os.Stat(dbPath)
155	if err != nil {
156		t.Fatalf("failed to get test snapshot file status: %v", err)
157	}
158	actualFileMode := dbInfo.Mode()
159
160	if expectedFileMode != actualFileMode {
161		t.Fatalf("expected test snapshot file mode %s, got %s:", expectedFileMode, actualFileMode)
162	}
163}
164
165// TestCorruptedBackupFileCheck tests if we can correctly identify a corrupted backup file.
166func TestCorruptedBackupFileCheck(t *testing.T) {
167	dbPath := "testdata/corrupted_backup.db"
168	if _, err := os.Stat(dbPath); err != nil {
169		t.Fatalf("test file [%s] does not exist: %v", dbPath, err)
170	}
171
172	sp := NewV3(zap.NewExample())
173	_, err := sp.Status(dbPath)
174	expectedErrKeywords := "snapshot file integrity check failed"
175	/* example error message:
176	snapshot file integrity check failed. 2 errors found.
177	page 3: already freed
178	page 4: unreachable unfreed
179	*/
180	if err == nil {
181		t.Error("expected error due to corrupted snapshot file, got no error")
182	}
183	if !strings.Contains(err.Error(), expectedErrKeywords) {
184		t.Errorf("expected error message to contain the following keywords:\n%s\n"+
185			"actual error message:\n%s",
186			expectedErrKeywords, err.Error())
187	}
188}
189
190type kv struct {
191	k, v string
192}
193
194// creates a snapshot file and returns the file path.
195func createSnapshotFile(t *testing.T, kvs []kv) string {
196	clusterN := 1
197	urls := newEmbedURLs(clusterN * 2)
198	cURLs, pURLs := urls[:clusterN], urls[clusterN:]
199
200	cfg := embed.NewConfig()
201	cfg.Logger = "zap"
202	cfg.LogOutputs = []string{"/dev/null"}
203	cfg.Debug = false
204	cfg.Name = "default"
205	cfg.ClusterState = "new"
206	cfg.LCUrls, cfg.ACUrls = cURLs, cURLs
207	cfg.LPUrls, cfg.APUrls = pURLs, pURLs
208	cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, pURLs[0].String())
209	cfg.Dir = filepath.Join(os.TempDir(), fmt.Sprint(time.Now().Nanosecond()))
210	srv, err := embed.StartEtcd(cfg)
211	if err != nil {
212		t.Fatal(err)
213	}
214	defer func() {
215		os.RemoveAll(cfg.Dir)
216		srv.Close()
217	}()
218	select {
219	case <-srv.Server.ReadyNotify():
220	case <-time.After(3 * time.Second):
221		t.Fatalf("failed to start embed.Etcd for creating snapshots")
222	}
223
224	ccfg := clientv3.Config{Endpoints: []string{cfg.ACUrls[0].String()}}
225	cli, err := clientv3.New(ccfg)
226	if err != nil {
227		t.Fatal(err)
228	}
229	defer cli.Close()
230	for i := range kvs {
231		ctx, cancel := context.WithTimeout(context.Background(), testutil.RequestTimeout)
232		_, err = cli.Put(ctx, kvs[i].k, kvs[i].v)
233		cancel()
234		if err != nil {
235			t.Fatal(err)
236		}
237	}
238
239	sp := NewV3(zap.NewExample())
240	dpPath := filepath.Join(os.TempDir(), fmt.Sprintf("snapshot%d.db", time.Now().Nanosecond()))
241	if err = sp.Save(context.Background(), ccfg, dpPath); err != nil {
242		t.Fatal(err)
243	}
244
245	os.RemoveAll(cfg.Dir)
246	srv.Close()
247	return dpPath
248}
249
250const testClusterTkn = "tkn"
251
252func restoreCluster(t *testing.T, clusterN int, dbPath string) (
253	cURLs []url.URL,
254	pURLs []url.URL,
255	srvs []*embed.Etcd) {
256	urls := newEmbedURLs(clusterN * 2)
257	cURLs, pURLs = urls[:clusterN], urls[clusterN:]
258
259	ics := ""
260	for i := 0; i < clusterN; i++ {
261		ics += fmt.Sprintf(",%d=%s", i, pURLs[i].String())
262	}
263	ics = ics[1:]
264
265	cfgs := make([]*embed.Config, clusterN)
266	for i := 0; i < clusterN; i++ {
267		cfg := embed.NewConfig()
268		cfg.Logger = "zap"
269		cfg.LogOutputs = []string{"/dev/null"}
270		cfg.Debug = false
271		cfg.Name = fmt.Sprintf("%d", i)
272		cfg.InitialClusterToken = testClusterTkn
273		cfg.ClusterState = "existing"
274		cfg.LCUrls, cfg.ACUrls = []url.URL{cURLs[i]}, []url.URL{cURLs[i]}
275		cfg.LPUrls, cfg.APUrls = []url.URL{pURLs[i]}, []url.URL{pURLs[i]}
276		cfg.InitialCluster = ics
277		cfg.Dir = filepath.Join(os.TempDir(), fmt.Sprint(time.Now().Nanosecond()+i))
278
279		sp := NewV3(zap.NewExample())
280		if err := sp.Restore(RestoreConfig{
281			SnapshotPath:        dbPath,
282			Name:                cfg.Name,
283			OutputDataDir:       cfg.Dir,
284			PeerURLs:            []string{pURLs[i].String()},
285			InitialCluster:      ics,
286			InitialClusterToken: cfg.InitialClusterToken,
287		}); err != nil {
288			t.Fatal(err)
289		}
290		cfgs[i] = cfg
291	}
292
293	sch := make(chan *embed.Etcd)
294	for i := range cfgs {
295		go func(idx int) {
296			srv, err := embed.StartEtcd(cfgs[idx])
297			if err != nil {
298				t.Error(err)
299			}
300
301			<-srv.Server.ReadyNotify()
302			sch <- srv
303		}(i)
304	}
305
306	srvs = make([]*embed.Etcd, clusterN)
307	for i := 0; i < clusterN; i++ {
308		select {
309		case srv := <-sch:
310			srvs[i] = srv
311		case <-time.After(5 * time.Second):
312			t.Fatalf("#%d: failed to start embed.Etcd", i)
313		}
314	}
315	return cURLs, pURLs, srvs
316}
317
318// TODO: TLS
319func newEmbedURLs(n int) (urls []url.URL) {
320	urls = make([]url.URL, n)
321	for i := 0; i < n; i++ {
322		rand.Seed(int64(time.Now().Nanosecond()))
323		u, _ := url.Parse(fmt.Sprintf("unix://localhost:%d", rand.Intn(45000)))
324		urls[i] = *u
325	}
326	return urls
327}
328