1// Copyright 2018 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package tester
16
17import (
18	"context"
19	"fmt"
20	"strings"
21	"time"
22
23	"go.etcd.io/etcd/clientv3"
24	"go.etcd.io/etcd/functional/rpcpb"
25
26	"go.uber.org/zap"
27)
28
29type fetchSnapshotCaseQuorum struct {
30	desc        string
31	rpcpbCase   rpcpb.Case
32	injected    map[int]struct{}
33	snapshotted int
34}
35
36func (c *fetchSnapshotCaseQuorum) Inject(clus *Cluster) error {
37	// 1. Assume node C is the current leader with most up-to-date data.
38	lead, err := clus.GetLeader()
39	if err != nil {
40		return err
41	}
42	c.snapshotted = lead
43
44	// 2. Download snapshot from node C, before destroying node A and B.
45	clus.lg.Info(
46		"save snapshot on leader node START",
47		zap.String("target-endpoint", clus.Members[lead].EtcdClientEndpoint),
48	)
49	var resp *rpcpb.Response
50	resp, err = clus.sendOpWithResp(lead, rpcpb.Operation_SAVE_SNAPSHOT)
51	if resp == nil || (resp != nil && !resp.Success) || err != nil {
52		clus.lg.Info(
53			"save snapshot on leader node FAIL",
54			zap.String("target-endpoint", clus.Members[lead].EtcdClientEndpoint),
55			zap.Error(err),
56		)
57		return err
58	}
59	clus.lg.Info(
60		"save snapshot on leader node SUCCESS",
61		zap.String("target-endpoint", clus.Members[lead].EtcdClientEndpoint),
62		zap.String("member-name", resp.SnapshotInfo.MemberName),
63		zap.Strings("member-client-urls", resp.SnapshotInfo.MemberClientURLs),
64		zap.String("snapshot-path", resp.SnapshotInfo.SnapshotPath),
65		zap.String("snapshot-file-size", resp.SnapshotInfo.SnapshotFileSize),
66		zap.String("snapshot-total-size", resp.SnapshotInfo.SnapshotTotalSize),
67		zap.Int64("snapshot-total-key", resp.SnapshotInfo.SnapshotTotalKey),
68		zap.Int64("snapshot-hash", resp.SnapshotInfo.SnapshotHash),
69		zap.Int64("snapshot-revision", resp.SnapshotInfo.SnapshotRevision),
70		zap.String("took", resp.SnapshotInfo.Took),
71		zap.Error(err),
72	)
73	if err != nil {
74		return err
75	}
76	clus.Members[lead].SnapshotInfo = resp.SnapshotInfo
77
78	leaderc, err := clus.Members[lead].CreateEtcdClient()
79	if err != nil {
80		return err
81	}
82	defer leaderc.Close()
83	var mresp *clientv3.MemberListResponse
84	mresp, err = leaderc.MemberList(context.Background())
85	mss := []string{}
86	if err == nil && mresp != nil {
87		mss = describeMembers(mresp)
88	}
89	clus.lg.Info(
90		"member list before disastrous machine failure",
91		zap.String("request-to", clus.Members[lead].EtcdClientEndpoint),
92		zap.Strings("members", mss),
93		zap.Error(err),
94	)
95	if err != nil {
96		return err
97	}
98
99	// simulate real life; machine failures may happen
100	// after some time since last snapshot save
101	time.Sleep(time.Second)
102
103	// 3. Destroy node A and B, and make the whole cluster inoperable.
104	for {
105		c.injected = pickQuorum(len(clus.Members))
106		if _, ok := c.injected[lead]; !ok {
107			break
108		}
109	}
110	for idx := range c.injected {
111		clus.lg.Info(
112			"disastrous machine failure to quorum START",
113			zap.String("target-endpoint", clus.Members[idx].EtcdClientEndpoint),
114		)
115		err = clus.sendOp(idx, rpcpb.Operation_SIGQUIT_ETCD_AND_REMOVE_DATA)
116		clus.lg.Info(
117			"disastrous machine failure to quorum END",
118			zap.String("target-endpoint", clus.Members[idx].EtcdClientEndpoint),
119			zap.Error(err),
120		)
121		if err != nil {
122			return err
123		}
124	}
125
126	// 4. Now node C cannot operate either.
127	// 5. SIGTERM node C and remove its data directories.
128	clus.lg.Info(
129		"disastrous machine failure to old leader START",
130		zap.String("target-endpoint", clus.Members[lead].EtcdClientEndpoint),
131	)
132	err = clus.sendOp(lead, rpcpb.Operation_SIGQUIT_ETCD_AND_REMOVE_DATA)
133	clus.lg.Info(
134		"disastrous machine failure to old leader END",
135		zap.String("target-endpoint", clus.Members[lead].EtcdClientEndpoint),
136		zap.Error(err),
137	)
138	return err
139}
140
141func (c *fetchSnapshotCaseQuorum) Recover(clus *Cluster) error {
142	// 6. Restore a new seed member from node C's latest snapshot file.
143	oldlead := c.snapshotted
144
145	// configuration on restart from recovered snapshot
146	// seed member's configuration is all the same as previous one
147	// except initial cluster string is now a single-node cluster
148	clus.Members[oldlead].EtcdOnSnapshotRestore = clus.Members[oldlead].Etcd
149	clus.Members[oldlead].EtcdOnSnapshotRestore.InitialClusterState = "existing"
150	name := clus.Members[oldlead].Etcd.Name
151	initClus := []string{}
152	for _, u := range clus.Members[oldlead].Etcd.AdvertisePeerURLs {
153		initClus = append(initClus, fmt.Sprintf("%s=%s", name, u))
154	}
155	clus.Members[oldlead].EtcdOnSnapshotRestore.InitialCluster = strings.Join(initClus, ",")
156
157	clus.lg.Info(
158		"restore snapshot and restart from snapshot request START",
159		zap.String("target-endpoint", clus.Members[oldlead].EtcdClientEndpoint),
160		zap.Strings("initial-cluster", initClus),
161	)
162	err := clus.sendOp(oldlead, rpcpb.Operation_RESTORE_RESTART_FROM_SNAPSHOT)
163	clus.lg.Info(
164		"restore snapshot and restart from snapshot request END",
165		zap.String("target-endpoint", clus.Members[oldlead].EtcdClientEndpoint),
166		zap.Strings("initial-cluster", initClus),
167		zap.Error(err),
168	)
169	if err != nil {
170		return err
171	}
172
173	leaderc, err := clus.Members[oldlead].CreateEtcdClient()
174	if err != nil {
175		return err
176	}
177	defer leaderc.Close()
178
179	// 7. Add another member to establish 2-node cluster.
180	// 8. Add another member to establish 3-node cluster.
181	// 9. Add more if any.
182	idxs := make([]int, 0, len(c.injected))
183	for idx := range c.injected {
184		idxs = append(idxs, idx)
185	}
186	clus.lg.Info("member add START", zap.Int("members-to-add", len(idxs)))
187	for i, idx := range idxs {
188		clus.lg.Info(
189			"member add request SENT",
190			zap.String("target-endpoint", clus.Members[idx].EtcdClientEndpoint),
191			zap.Strings("peer-urls", clus.Members[idx].Etcd.AdvertisePeerURLs),
192		)
193		ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
194		_, err := leaderc.MemberAdd(ctx, clus.Members[idx].Etcd.AdvertisePeerURLs)
195		cancel()
196		clus.lg.Info(
197			"member add request DONE",
198			zap.String("target-endpoint", clus.Members[idx].EtcdClientEndpoint),
199			zap.Strings("peer-urls", clus.Members[idx].Etcd.AdvertisePeerURLs),
200			zap.Error(err),
201		)
202		if err != nil {
203			return err
204		}
205
206		// start the added(new) member with fresh data
207		clus.Members[idx].EtcdOnSnapshotRestore = clus.Members[idx].Etcd
208		clus.Members[idx].EtcdOnSnapshotRestore.InitialClusterState = "existing"
209		name := clus.Members[idx].Etcd.Name
210		for _, u := range clus.Members[idx].Etcd.AdvertisePeerURLs {
211			initClus = append(initClus, fmt.Sprintf("%s=%s", name, u))
212		}
213		clus.Members[idx].EtcdOnSnapshotRestore.InitialCluster = strings.Join(initClus, ",")
214		clus.lg.Info(
215			"restart from snapshot request SENT",
216			zap.String("target-endpoint", clus.Members[idx].EtcdClientEndpoint),
217			zap.Strings("initial-cluster", initClus),
218		)
219		err = clus.sendOp(idx, rpcpb.Operation_RESTART_FROM_SNAPSHOT)
220		clus.lg.Info(
221			"restart from snapshot request DONE",
222			zap.String("target-endpoint", clus.Members[idx].EtcdClientEndpoint),
223			zap.Strings("initial-cluster", initClus),
224			zap.Error(err),
225		)
226		if err != nil {
227			return err
228		}
229
230		if i != len(c.injected)-1 {
231			// wait until membership reconfiguration entry gets applied
232			// TODO: test concurrent member add
233			dur := 5 * clus.Members[idx].ElectionTimeout()
234			clus.lg.Info(
235				"waiting after restart from snapshot request",
236				zap.Int("i", i),
237				zap.Int("idx", idx),
238				zap.Duration("sleep", dur),
239			)
240			time.Sleep(dur)
241		} else {
242			clus.lg.Info(
243				"restart from snapshot request ALL END",
244				zap.Int("i", i),
245				zap.Int("idx", idx),
246			)
247		}
248	}
249	return nil
250}
251
252func (c *fetchSnapshotCaseQuorum) Desc() string {
253	if c.desc != "" {
254		return c.desc
255	}
256	return c.rpcpbCase.String()
257}
258
259func (c *fetchSnapshotCaseQuorum) TestCase() rpcpb.Case {
260	return c.rpcpbCase
261}
262
263func new_Case_SIGQUIT_AND_REMOVE_QUORUM_AND_RESTORE_LEADER_SNAPSHOT_FROM_SCRATCH(clus *Cluster) Case {
264	c := &fetchSnapshotCaseQuorum{
265		rpcpbCase:   rpcpb.Case_SIGQUIT_AND_REMOVE_QUORUM_AND_RESTORE_LEADER_SNAPSHOT_FROM_SCRATCH,
266		injected:    make(map[int]struct{}),
267		snapshotted: -1,
268	}
269	// simulate real life; machine replacements may happen
270	// after some time since disaster
271	return &caseDelay{
272		Case:          c,
273		delayDuration: clus.GetCaseDelayDuration(),
274	}
275}
276