1// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package e2e
16
17import (
18	"encoding/json"
19	"fmt"
20	"io"
21	"io/ioutil"
22	"os"
23	"path/filepath"
24	"strings"
25	"testing"
26	"time"
27
28	"github.com/coreos/etcd/pkg/expect"
29	"github.com/coreos/etcd/pkg/testutil"
30)
31
32func TestCtlV3Snapshot(t *testing.T) { testCtl(t, snapshotTest) }
33
34func snapshotTest(cx ctlCtx) {
35	maintenanceInitKeys(cx)
36
37	leaseID, err := ctlV3LeaseGrant(cx, 100)
38	if err != nil {
39		cx.t.Fatalf("snapshot: ctlV3LeaseGrant error (%v)", err)
40	}
41	if err = ctlV3Put(cx, "withlease", "withlease", leaseID); err != nil {
42		cx.t.Fatalf("snapshot: ctlV3Put error (%v)", err)
43	}
44
45	fpath := "test.snapshot"
46	defer os.RemoveAll(fpath)
47
48	if err = ctlV3SnapshotSave(cx, fpath); err != nil {
49		cx.t.Fatalf("snapshotTest ctlV3SnapshotSave error (%v)", err)
50	}
51
52	st, err := getSnapshotStatus(cx, fpath)
53	if err != nil {
54		cx.t.Fatalf("snapshotTest getSnapshotStatus error (%v)", err)
55	}
56	if st.Revision != 5 {
57		cx.t.Fatalf("expected 4, got %d", st.Revision)
58	}
59	if st.TotalKey < 4 {
60		cx.t.Fatalf("expected at least 4, got %d", st.TotalKey)
61	}
62}
63
64func TestCtlV3SnapshotCorrupt(t *testing.T) { testCtl(t, snapshotCorruptTest) }
65
66func snapshotCorruptTest(cx ctlCtx) {
67	fpath := "test.snapshot"
68	defer os.RemoveAll(fpath)
69
70	if err := ctlV3SnapshotSave(cx, fpath); err != nil {
71		cx.t.Fatalf("snapshotTest ctlV3SnapshotSave error (%v)", err)
72	}
73
74	// corrupt file
75	f, oerr := os.OpenFile(fpath, os.O_WRONLY, 0)
76	if oerr != nil {
77		cx.t.Fatal(oerr)
78	}
79	if _, err := f.Write(make([]byte, 512)); err != nil {
80		cx.t.Fatal(err)
81	}
82	f.Close()
83
84	defer os.RemoveAll("snap.etcd")
85	serr := spawnWithExpect(
86		append(cx.PrefixArgs(), "snapshot", "restore",
87			"--data-dir", "snap.etcd",
88			fpath),
89		"expected sha256")
90
91	if serr != nil {
92		cx.t.Fatal(serr)
93	}
94}
95
96// This test ensures that the snapshot status does not modify the snapshot file
97func TestCtlV3SnapshotStatusBeforeRestore(t *testing.T) { testCtl(t, snapshotStatusBeforeRestoreTest) }
98
99func snapshotStatusBeforeRestoreTest(cx ctlCtx) {
100	fpath := "test.snapshot"
101	defer os.RemoveAll(fpath)
102
103	if err := ctlV3SnapshotSave(cx, fpath); err != nil {
104		cx.t.Fatalf("snapshotTest ctlV3SnapshotSave error (%v)", err)
105	}
106
107	// snapshot status on the fresh snapshot file
108	_, err := getSnapshotStatus(cx, fpath)
109	if err != nil {
110		cx.t.Fatalf("snapshotTest getSnapshotStatus error (%v)", err)
111	}
112
113	defer os.RemoveAll("snap.etcd")
114	serr := spawnWithExpect(
115		append(cx.PrefixArgs(), "snapshot", "restore",
116			"--data-dir", "snap.etcd",
117			fpath),
118		"added member")
119
120	if serr != nil {
121		cx.t.Fatal(serr)
122	}
123}
124
125func ctlV3SnapshotSave(cx ctlCtx, fpath string) error {
126	cmdArgs := append(cx.PrefixArgs(), "snapshot", "save", fpath)
127	return spawnWithExpect(cmdArgs, fmt.Sprintf("Snapshot saved at %s", fpath))
128}
129
130type snapshotStatus struct {
131	Hash      uint32 `json:"hash"`
132	Revision  int64  `json:"revision"`
133	TotalKey  int    `json:"totalKey"`
134	TotalSize int64  `json:"totalSize"`
135}
136
137func getSnapshotStatus(cx ctlCtx, fpath string) (snapshotStatus, error) {
138	cmdArgs := append(cx.PrefixArgs(), "--write-out", "json", "snapshot", "status", fpath)
139
140	proc, err := spawnCmd(cmdArgs)
141	if err != nil {
142		return snapshotStatus{}, err
143	}
144	var txt string
145	txt, err = proc.Expect("totalKey")
146	if err != nil {
147		return snapshotStatus{}, err
148	}
149	if err = proc.Close(); err != nil {
150		return snapshotStatus{}, err
151	}
152
153	resp := snapshotStatus{}
154	dec := json.NewDecoder(strings.NewReader(txt))
155	if err := dec.Decode(&resp); err == io.EOF {
156		return snapshotStatus{}, err
157	}
158	return resp, nil
159}
160
161// TestIssue6361 ensures new member that starts with snapshot correctly
162// syncs up with other members and serve correct data.
163func TestIssue6361(t *testing.T) {
164	defer testutil.AfterTest(t)
165	mustEtcdctl(t)
166	os.Setenv("ETCDCTL_API", "3")
167	defer os.Unsetenv("ETCDCTL_API")
168
169	epc, err := newEtcdProcessCluster(&etcdProcessClusterConfig{
170		clusterSize:  1,
171		initialToken: "new",
172		keepDataDir:  true,
173	})
174	if err != nil {
175		t.Fatalf("could not start etcd process cluster (%v)", err)
176	}
177	defer func() {
178		if errC := epc.Close(); errC != nil {
179			t.Fatalf("error closing etcd processes (%v)", errC)
180		}
181	}()
182
183	dialTimeout := 7 * time.Second
184	prefixArgs := []string{ctlBinPath, "--endpoints", strings.Join(epc.EndpointsV3(), ","), "--dial-timeout", dialTimeout.String()}
185
186	// write some keys
187	kvs := []kv{{"foo1", "val1"}, {"foo2", "val2"}, {"foo3", "val3"}}
188	for i := range kvs {
189		if err = spawnWithExpect(append(prefixArgs, "put", kvs[i].key, kvs[i].val), "OK"); err != nil {
190			t.Fatal(err)
191		}
192	}
193
194	fpath := filepath.Join(os.TempDir(), "test.snapshot")
195	defer os.RemoveAll(fpath)
196
197	// etcdctl save snapshot
198	if err = spawnWithExpect(append(prefixArgs, "snapshot", "save", fpath), fmt.Sprintf("Snapshot saved at %s", fpath)); err != nil {
199		t.Fatal(err)
200	}
201
202	if err = epc.procs[0].Stop(); err != nil {
203		t.Fatal(err)
204	}
205
206	newDataDir := filepath.Join(os.TempDir(), "test.data")
207	defer os.RemoveAll(newDataDir)
208
209	// etcdctl restore the snapshot
210	err = spawnWithExpect([]string{ctlBinPath, "snapshot", "restore", fpath, "--name", epc.procs[0].Config().name, "--initial-cluster", epc.procs[0].Config().initialCluster, "--initial-cluster-token", epc.procs[0].Config().initialToken, "--initial-advertise-peer-urls", epc.procs[0].Config().purl.String(), "--data-dir", newDataDir}, "membership: added member")
211	if err != nil {
212		t.Fatal(err)
213	}
214
215	// start the etcd member using the restored snapshot
216	epc.procs[0].Config().dataDirPath = newDataDir
217	for i := range epc.procs[0].Config().args {
218		if epc.procs[0].Config().args[i] == "--data-dir" {
219			epc.procs[0].Config().args[i+1] = newDataDir
220		}
221	}
222	if err = epc.procs[0].Restart(); err != nil {
223		t.Fatal(err)
224	}
225
226	// ensure the restored member has the correct data
227	for i := range kvs {
228		if err = spawnWithExpect(append(prefixArgs, "get", kvs[i].key), kvs[i].val); err != nil {
229			t.Fatal(err)
230		}
231	}
232
233	// add a new member into the cluster
234	clientURL := fmt.Sprintf("http://localhost:%d", etcdProcessBasePort+30)
235	peerURL := fmt.Sprintf("http://localhost:%d", etcdProcessBasePort+31)
236	err = spawnWithExpect(append(prefixArgs, "member", "add", "newmember", fmt.Sprintf("--peer-urls=%s", peerURL)), " added to cluster ")
237	if err != nil {
238		t.Fatal(err)
239	}
240
241	var newDataDir2 string
242	newDataDir2, err = ioutil.TempDir("", "newdata2")
243	if err != nil {
244		t.Fatal(err)
245	}
246	defer os.RemoveAll(newDataDir2)
247
248	name2 := "infra2"
249	initialCluster2 := epc.procs[0].Config().initialCluster + fmt.Sprintf(",%s=%s", name2, peerURL)
250
251	// start the new member
252	var nepc *expect.ExpectProcess
253	nepc, err = spawnCmd([]string{epc.procs[0].Config().execPath, "--name", name2,
254		"--listen-client-urls", clientURL, "--advertise-client-urls", clientURL,
255		"--listen-peer-urls", peerURL, "--initial-advertise-peer-urls", peerURL,
256		"--initial-cluster", initialCluster2, "--initial-cluster-state", "existing", "--data-dir", newDataDir2})
257	if err != nil {
258		t.Fatal(err)
259	}
260	if _, err = nepc.Expect("enabled capabilities for version"); err != nil {
261		t.Fatal(err)
262	}
263
264	prefixArgs = []string{ctlBinPath, "--endpoints", clientURL, "--dial-timeout", dialTimeout.String()}
265
266	// ensure added member has data from incoming snapshot
267	for i := range kvs {
268		if err = spawnWithExpect(append(prefixArgs, "get", kvs[i].key), kvs[i].val); err != nil {
269			t.Fatal(err)
270		}
271	}
272
273	if err = nepc.Stop(); err != nil {
274		t.Fatal(err)
275	}
276}
277