1// Copyright 2018 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package tester 16 17import ( 18 "context" 19 "fmt" 20 "strings" 21 "time" 22 23 "go.etcd.io/etcd/clientv3" 24 "go.etcd.io/etcd/functional/rpcpb" 25 26 "go.uber.org/zap" 27) 28 29type fetchSnapshotCaseQuorum struct { 30 desc string 31 rpcpbCase rpcpb.Case 32 injected map[int]struct{} 33 snapshotted int 34} 35 36func (c *fetchSnapshotCaseQuorum) Inject(clus *Cluster) error { 37 // 1. Assume node C is the current leader with most up-to-date data. 38 lead, err := clus.GetLeader() 39 if err != nil { 40 return err 41 } 42 c.snapshotted = lead 43 44 // 2. Download snapshot from node C, before destroying node A and B. 45 clus.lg.Info( 46 "save snapshot on leader node START", 47 zap.String("target-endpoint", clus.Members[lead].EtcdClientEndpoint), 48 ) 49 var resp *rpcpb.Response 50 resp, err = clus.sendOpWithResp(lead, rpcpb.Operation_SAVE_SNAPSHOT) 51 if resp == nil || (resp != nil && !resp.Success) || err != nil { 52 clus.lg.Info( 53 "save snapshot on leader node FAIL", 54 zap.String("target-endpoint", clus.Members[lead].EtcdClientEndpoint), 55 zap.Error(err), 56 ) 57 return err 58 } 59 clus.lg.Info( 60 "save snapshot on leader node SUCCESS", 61 zap.String("target-endpoint", clus.Members[lead].EtcdClientEndpoint), 62 zap.String("member-name", resp.SnapshotInfo.MemberName), 63 zap.Strings("member-client-urls", resp.SnapshotInfo.MemberClientURLs), 64 zap.String("snapshot-path", resp.SnapshotInfo.SnapshotPath), 65 zap.String("snapshot-file-size", resp.SnapshotInfo.SnapshotFileSize), 66 zap.String("snapshot-total-size", resp.SnapshotInfo.SnapshotTotalSize), 67 zap.Int64("snapshot-total-key", resp.SnapshotInfo.SnapshotTotalKey), 68 zap.Int64("snapshot-hash", resp.SnapshotInfo.SnapshotHash), 69 zap.Int64("snapshot-revision", resp.SnapshotInfo.SnapshotRevision), 70 zap.String("took", resp.SnapshotInfo.Took), 71 zap.Error(err), 72 ) 73 if err != nil { 74 return err 75 } 76 clus.Members[lead].SnapshotInfo = resp.SnapshotInfo 77 78 leaderc, err := clus.Members[lead].CreateEtcdClient() 79 if err != nil { 80 return err 81 } 82 defer leaderc.Close() 83 var mresp *clientv3.MemberListResponse 84 mresp, err = leaderc.MemberList(context.Background()) 85 mss := []string{} 86 if err == nil && mresp != nil { 87 mss = describeMembers(mresp) 88 } 89 clus.lg.Info( 90 "member list before disastrous machine failure", 91 zap.String("request-to", clus.Members[lead].EtcdClientEndpoint), 92 zap.Strings("members", mss), 93 zap.Error(err), 94 ) 95 if err != nil { 96 return err 97 } 98 99 // simulate real life; machine failures may happen 100 // after some time since last snapshot save 101 time.Sleep(time.Second) 102 103 // 3. Destroy node A and B, and make the whole cluster inoperable. 104 for { 105 c.injected = pickQuorum(len(clus.Members)) 106 if _, ok := c.injected[lead]; !ok { 107 break 108 } 109 } 110 for idx := range c.injected { 111 clus.lg.Info( 112 "disastrous machine failure to quorum START", 113 zap.String("target-endpoint", clus.Members[idx].EtcdClientEndpoint), 114 ) 115 err = clus.sendOp(idx, rpcpb.Operation_SIGQUIT_ETCD_AND_REMOVE_DATA) 116 clus.lg.Info( 117 "disastrous machine failure to quorum END", 118 zap.String("target-endpoint", clus.Members[idx].EtcdClientEndpoint), 119 zap.Error(err), 120 ) 121 if err != nil { 122 return err 123 } 124 } 125 126 // 4. Now node C cannot operate either. 127 // 5. SIGTERM node C and remove its data directories. 128 clus.lg.Info( 129 "disastrous machine failure to old leader START", 130 zap.String("target-endpoint", clus.Members[lead].EtcdClientEndpoint), 131 ) 132 err = clus.sendOp(lead, rpcpb.Operation_SIGQUIT_ETCD_AND_REMOVE_DATA) 133 clus.lg.Info( 134 "disastrous machine failure to old leader END", 135 zap.String("target-endpoint", clus.Members[lead].EtcdClientEndpoint), 136 zap.Error(err), 137 ) 138 return err 139} 140 141func (c *fetchSnapshotCaseQuorum) Recover(clus *Cluster) error { 142 // 6. Restore a new seed member from node C's latest snapshot file. 143 oldlead := c.snapshotted 144 145 // configuration on restart from recovered snapshot 146 // seed member's configuration is all the same as previous one 147 // except initial cluster string is now a single-node cluster 148 clus.Members[oldlead].EtcdOnSnapshotRestore = clus.Members[oldlead].Etcd 149 clus.Members[oldlead].EtcdOnSnapshotRestore.InitialClusterState = "existing" 150 name := clus.Members[oldlead].Etcd.Name 151 initClus := []string{} 152 for _, u := range clus.Members[oldlead].Etcd.AdvertisePeerURLs { 153 initClus = append(initClus, fmt.Sprintf("%s=%s", name, u)) 154 } 155 clus.Members[oldlead].EtcdOnSnapshotRestore.InitialCluster = strings.Join(initClus, ",") 156 157 clus.lg.Info( 158 "restore snapshot and restart from snapshot request START", 159 zap.String("target-endpoint", clus.Members[oldlead].EtcdClientEndpoint), 160 zap.Strings("initial-cluster", initClus), 161 ) 162 err := clus.sendOp(oldlead, rpcpb.Operation_RESTORE_RESTART_FROM_SNAPSHOT) 163 clus.lg.Info( 164 "restore snapshot and restart from snapshot request END", 165 zap.String("target-endpoint", clus.Members[oldlead].EtcdClientEndpoint), 166 zap.Strings("initial-cluster", initClus), 167 zap.Error(err), 168 ) 169 if err != nil { 170 return err 171 } 172 173 leaderc, err := clus.Members[oldlead].CreateEtcdClient() 174 if err != nil { 175 return err 176 } 177 defer leaderc.Close() 178 179 // 7. Add another member to establish 2-node cluster. 180 // 8. Add another member to establish 3-node cluster. 181 // 9. Add more if any. 182 idxs := make([]int, 0, len(c.injected)) 183 for idx := range c.injected { 184 idxs = append(idxs, idx) 185 } 186 clus.lg.Info("member add START", zap.Int("members-to-add", len(idxs))) 187 for i, idx := range idxs { 188 clus.lg.Info( 189 "member add request SENT", 190 zap.String("target-endpoint", clus.Members[idx].EtcdClientEndpoint), 191 zap.Strings("peer-urls", clus.Members[idx].Etcd.AdvertisePeerURLs), 192 ) 193 ctx, cancel := context.WithTimeout(context.Background(), time.Minute) 194 _, err := leaderc.MemberAdd(ctx, clus.Members[idx].Etcd.AdvertisePeerURLs) 195 cancel() 196 clus.lg.Info( 197 "member add request DONE", 198 zap.String("target-endpoint", clus.Members[idx].EtcdClientEndpoint), 199 zap.Strings("peer-urls", clus.Members[idx].Etcd.AdvertisePeerURLs), 200 zap.Error(err), 201 ) 202 if err != nil { 203 return err 204 } 205 206 // start the added(new) member with fresh data 207 clus.Members[idx].EtcdOnSnapshotRestore = clus.Members[idx].Etcd 208 clus.Members[idx].EtcdOnSnapshotRestore.InitialClusterState = "existing" 209 name := clus.Members[idx].Etcd.Name 210 for _, u := range clus.Members[idx].Etcd.AdvertisePeerURLs { 211 initClus = append(initClus, fmt.Sprintf("%s=%s", name, u)) 212 } 213 clus.Members[idx].EtcdOnSnapshotRestore.InitialCluster = strings.Join(initClus, ",") 214 clus.lg.Info( 215 "restart from snapshot request SENT", 216 zap.String("target-endpoint", clus.Members[idx].EtcdClientEndpoint), 217 zap.Strings("initial-cluster", initClus), 218 ) 219 err = clus.sendOp(idx, rpcpb.Operation_RESTART_FROM_SNAPSHOT) 220 clus.lg.Info( 221 "restart from snapshot request DONE", 222 zap.String("target-endpoint", clus.Members[idx].EtcdClientEndpoint), 223 zap.Strings("initial-cluster", initClus), 224 zap.Error(err), 225 ) 226 if err != nil { 227 return err 228 } 229 230 if i != len(c.injected)-1 { 231 // wait until membership reconfiguration entry gets applied 232 // TODO: test concurrent member add 233 dur := 5 * clus.Members[idx].ElectionTimeout() 234 clus.lg.Info( 235 "waiting after restart from snapshot request", 236 zap.Int("i", i), 237 zap.Int("idx", idx), 238 zap.Duration("sleep", dur), 239 ) 240 time.Sleep(dur) 241 } else { 242 clus.lg.Info( 243 "restart from snapshot request ALL END", 244 zap.Int("i", i), 245 zap.Int("idx", idx), 246 ) 247 } 248 } 249 return nil 250} 251 252func (c *fetchSnapshotCaseQuorum) Desc() string { 253 if c.desc != "" { 254 return c.desc 255 } 256 return c.rpcpbCase.String() 257} 258 259func (c *fetchSnapshotCaseQuorum) TestCase() rpcpb.Case { 260 return c.rpcpbCase 261} 262 263func new_Case_SIGQUIT_AND_REMOVE_QUORUM_AND_RESTORE_LEADER_SNAPSHOT_FROM_SCRATCH(clus *Cluster) Case { 264 c := &fetchSnapshotCaseQuorum{ 265 rpcpbCase: rpcpb.Case_SIGQUIT_AND_REMOVE_QUORUM_AND_RESTORE_LEADER_SNAPSHOT_FROM_SCRATCH, 266 injected: make(map[int]struct{}), 267 snapshotted: -1, 268 } 269 // simulate real life; machine replacements may happen 270 // after some time since disaster 271 return &caseDelay{ 272 Case: c, 273 delayDuration: clus.GetCaseDelayDuration(), 274 } 275} 276