1// Copyright 2018 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package snapshot
16
17import (
18	"context"
19	"fmt"
20	"math/rand"
21	"net/url"
22	"os"
23	"path/filepath"
24	"strings"
25	"testing"
26	"time"
27
28	"go.etcd.io/etcd/clientv3"
29	"go.etcd.io/etcd/embed"
30	"go.etcd.io/etcd/pkg/fileutil"
31	"go.etcd.io/etcd/pkg/testutil"
32
33	"go.uber.org/zap"
34)
35
36// TestSnapshotV3RestoreSingle tests single node cluster restoring
37// from a snapshot file.
38func TestSnapshotV3RestoreSingle(t *testing.T) {
39	kvs := []kv{{"foo1", "bar1"}, {"foo2", "bar2"}, {"foo3", "bar3"}}
40	dbPath := createSnapshotFile(t, kvs)
41	defer os.RemoveAll(dbPath)
42
43	clusterN := 1
44	urls := newEmbedURLs(clusterN * 2)
45	cURLs, pURLs := urls[:clusterN], urls[clusterN:]
46
47	cfg := embed.NewConfig()
48	cfg.Logger = "zap"
49	cfg.LogOutputs = []string{"/dev/null"}
50	cfg.Name = "s1"
51	cfg.InitialClusterToken = testClusterTkn
52	cfg.ClusterState = "existing"
53	cfg.LCUrls, cfg.ACUrls = cURLs, cURLs
54	cfg.LPUrls, cfg.APUrls = pURLs, pURLs
55	cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, pURLs[0].String())
56	cfg.Dir = filepath.Join(os.TempDir(), fmt.Sprint(time.Now().Nanosecond()))
57
58	sp := NewV3(zap.NewExample())
59	pss := make([]string, 0, len(pURLs))
60	for _, p := range pURLs {
61		pss = append(pss, p.String())
62	}
63	if err := sp.Restore(RestoreConfig{
64		SnapshotPath:        dbPath,
65		Name:                cfg.Name,
66		OutputDataDir:       cfg.Dir,
67		InitialCluster:      cfg.InitialCluster,
68		InitialClusterToken: cfg.InitialClusterToken,
69		PeerURLs:            pss,
70	}); err != nil {
71		t.Fatal(err)
72	}
73
74	srv, err := embed.StartEtcd(cfg)
75	if err != nil {
76		t.Fatal(err)
77	}
78	defer func() {
79		os.RemoveAll(cfg.Dir)
80		srv.Close()
81	}()
82	select {
83	case <-srv.Server.ReadyNotify():
84	case <-time.After(3 * time.Second):
85		t.Fatalf("failed to start restored etcd member")
86	}
87
88	var cli *clientv3.Client
89	cli, err = clientv3.New(clientv3.Config{Endpoints: []string{cfg.ACUrls[0].String()}})
90	if err != nil {
91		t.Fatal(err)
92	}
93	defer cli.Close()
94	for i := range kvs {
95		var gresp *clientv3.GetResponse
96		gresp, err = cli.Get(context.Background(), kvs[i].k)
97		if err != nil {
98			t.Fatal(err)
99		}
100		if string(gresp.Kvs[0].Value) != kvs[i].v {
101			t.Fatalf("#%d: value expected %s, got %s", i, kvs[i].v, string(gresp.Kvs[0].Value))
102		}
103	}
104}
105
106// TestSnapshotV3RestoreMulti ensures that multiple members
107// can boot into the same cluster after being restored from a same
108// snapshot file.
109func TestSnapshotV3RestoreMulti(t *testing.T) {
110	kvs := []kv{{"foo1", "bar1"}, {"foo2", "bar2"}, {"foo3", "bar3"}}
111	dbPath := createSnapshotFile(t, kvs)
112	defer os.RemoveAll(dbPath)
113
114	clusterN := 3
115	cURLs, _, srvs := restoreCluster(t, clusterN, dbPath)
116	defer func() {
117		for i := 0; i < clusterN; i++ {
118			os.RemoveAll(srvs[i].Config().Dir)
119			srvs[i].Close()
120		}
121	}()
122
123	// wait for leader election
124	time.Sleep(time.Second)
125
126	for i := 0; i < clusterN; i++ {
127		cli, err := clientv3.New(clientv3.Config{Endpoints: []string{cURLs[i].String()}})
128		if err != nil {
129			t.Fatal(err)
130		}
131		defer cli.Close()
132		for i := range kvs {
133			var gresp *clientv3.GetResponse
134			gresp, err = cli.Get(context.Background(), kvs[i].k)
135			if err != nil {
136				t.Fatal(err)
137			}
138			if string(gresp.Kvs[0].Value) != kvs[i].v {
139				t.Fatalf("#%d: value expected %s, got %s", i, kvs[i].v, string(gresp.Kvs[0].Value))
140			}
141		}
142	}
143}
144
145// TestSnapshotFilePermissions ensures that the snapshot is saved with
146// the correct file permissions.
147func TestSnapshotFilePermissions(t *testing.T) {
148	expectedFileMode := os.FileMode(fileutil.PrivateFileMode)
149	kvs := []kv{{"foo1", "bar1"}, {"foo2", "bar2"}, {"foo3", "bar3"}}
150	dbPath := createSnapshotFile(t, kvs)
151	defer os.RemoveAll(dbPath)
152
153	dbInfo, err := os.Stat(dbPath)
154	if err != nil {
155		t.Fatalf("failed to get test snapshot file status: %v", err)
156	}
157	actualFileMode := dbInfo.Mode()
158
159	if expectedFileMode != actualFileMode {
160		t.Fatalf("expected test snapshot file mode %s, got %s:", expectedFileMode, actualFileMode)
161	}
162}
163
164// TestCorruptedBackupFileCheck tests if we can correctly identify a corrupted backup file.
165func TestCorruptedBackupFileCheck(t *testing.T) {
166	dbPath := "testdata/corrupted_backup.db"
167	if _, err := os.Stat(dbPath); err != nil {
168		t.Fatalf("test file [%s] does not exist: %v", dbPath, err)
169	}
170
171	sp := NewV3(zap.NewExample())
172	_, err := sp.Status(dbPath)
173	expectedErrKeywords := "snapshot file integrity check failed"
174	/* example error message:
175	snapshot file integrity check failed. 2 errors found.
176	page 3: already freed
177	page 4: unreachable unfreed
178	*/
179	if err == nil {
180		t.Error("expected error due to corrupted snapshot file, got no error")
181	}
182	if !strings.Contains(err.Error(), expectedErrKeywords) {
183		t.Errorf("expected error message to contain the following keywords:\n%s\n"+
184			"actual error message:\n%s",
185			expectedErrKeywords, err.Error())
186	}
187}
188
189type kv struct {
190	k, v string
191}
192
193// creates a snapshot file and returns the file path.
194func createSnapshotFile(t *testing.T, kvs []kv) string {
195	clusterN := 1
196	urls := newEmbedURLs(clusterN * 2)
197	cURLs, pURLs := urls[:clusterN], urls[clusterN:]
198
199	cfg := embed.NewConfig()
200	cfg.Logger = "zap"
201	cfg.LogOutputs = []string{"/dev/null"}
202	cfg.Name = "default"
203	cfg.ClusterState = "new"
204	cfg.LCUrls, cfg.ACUrls = cURLs, cURLs
205	cfg.LPUrls, cfg.APUrls = pURLs, pURLs
206	cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, pURLs[0].String())
207	cfg.Dir = filepath.Join(os.TempDir(), fmt.Sprint(time.Now().Nanosecond()))
208	srv, err := embed.StartEtcd(cfg)
209	if err != nil {
210		t.Fatal(err)
211	}
212	defer func() {
213		os.RemoveAll(cfg.Dir)
214		srv.Close()
215	}()
216	select {
217	case <-srv.Server.ReadyNotify():
218	case <-time.After(3 * time.Second):
219		t.Fatalf("failed to start embed.Etcd for creating snapshots")
220	}
221
222	ccfg := clientv3.Config{Endpoints: []string{cfg.ACUrls[0].String()}}
223	cli, err := clientv3.New(ccfg)
224	if err != nil {
225		t.Fatal(err)
226	}
227	defer cli.Close()
228	for i := range kvs {
229		ctx, cancel := context.WithTimeout(context.Background(), testutil.RequestTimeout)
230		_, err = cli.Put(ctx, kvs[i].k, kvs[i].v)
231		cancel()
232		if err != nil {
233			t.Fatal(err)
234		}
235	}
236
237	sp := NewV3(zap.NewExample())
238	dpPath := filepath.Join(os.TempDir(), fmt.Sprintf("snapshot%d.db", time.Now().Nanosecond()))
239	if err = sp.Save(context.Background(), ccfg, dpPath); err != nil {
240		t.Fatal(err)
241	}
242
243	os.RemoveAll(cfg.Dir)
244	srv.Close()
245	return dpPath
246}
247
248const testClusterTkn = "tkn"
249
250func restoreCluster(t *testing.T, clusterN int, dbPath string) (
251	cURLs []url.URL,
252	pURLs []url.URL,
253	srvs []*embed.Etcd) {
254	urls := newEmbedURLs(clusterN * 2)
255	cURLs, pURLs = urls[:clusterN], urls[clusterN:]
256
257	ics := ""
258	for i := 0; i < clusterN; i++ {
259		ics += fmt.Sprintf(",%d=%s", i, pURLs[i].String())
260	}
261	ics = ics[1:]
262
263	cfgs := make([]*embed.Config, clusterN)
264	for i := 0; i < clusterN; i++ {
265		cfg := embed.NewConfig()
266		cfg.Logger = "zap"
267		cfg.LogOutputs = []string{"/dev/null"}
268		cfg.Name = fmt.Sprintf("%d", i)
269		cfg.InitialClusterToken = testClusterTkn
270		cfg.ClusterState = "existing"
271		cfg.LCUrls, cfg.ACUrls = []url.URL{cURLs[i]}, []url.URL{cURLs[i]}
272		cfg.LPUrls, cfg.APUrls = []url.URL{pURLs[i]}, []url.URL{pURLs[i]}
273		cfg.InitialCluster = ics
274		cfg.Dir = filepath.Join(os.TempDir(), fmt.Sprint(time.Now().Nanosecond()+i))
275
276		sp := NewV3(zap.NewExample())
277		if err := sp.Restore(RestoreConfig{
278			SnapshotPath:        dbPath,
279			Name:                cfg.Name,
280			OutputDataDir:       cfg.Dir,
281			PeerURLs:            []string{pURLs[i].String()},
282			InitialCluster:      ics,
283			InitialClusterToken: cfg.InitialClusterToken,
284		}); err != nil {
285			t.Fatal(err)
286		}
287		cfgs[i] = cfg
288	}
289
290	sch := make(chan *embed.Etcd)
291	for i := range cfgs {
292		go func(idx int) {
293			srv, err := embed.StartEtcd(cfgs[idx])
294			if err != nil {
295				t.Error(err)
296			}
297
298			<-srv.Server.ReadyNotify()
299			sch <- srv
300		}(i)
301	}
302
303	srvs = make([]*embed.Etcd, clusterN)
304	for i := 0; i < clusterN; i++ {
305		select {
306		case srv := <-sch:
307			srvs[i] = srv
308		case <-time.After(5 * time.Second):
309			t.Fatalf("#%d: failed to start embed.Etcd", i)
310		}
311	}
312	return cURLs, pURLs, srvs
313}
314
315// TODO: TLS
316func newEmbedURLs(n int) (urls []url.URL) {
317	urls = make([]url.URL, n)
318	for i := 0; i < n; i++ {
319		rand.Seed(int64(time.Now().Nanosecond()))
320		u, _ := url.Parse(fmt.Sprintf("unix://localhost:%d", rand.Intn(45000)))
321		urls[i] = *u
322	}
323	return urls
324}
325