1// Copyright 2018 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package snapshot 16 17import ( 18 "context" 19 "fmt" 20 "math/rand" 21 "net/url" 22 "os" 23 "path/filepath" 24 "strings" 25 "testing" 26 "time" 27 28 "go.etcd.io/etcd/clientv3" 29 "go.etcd.io/etcd/embed" 30 "go.etcd.io/etcd/pkg/fileutil" 31 "go.etcd.io/etcd/pkg/testutil" 32 33 "go.uber.org/zap" 34) 35 36// TestSnapshotV3RestoreSingle tests single node cluster restoring 37// from a snapshot file. 38func TestSnapshotV3RestoreSingle(t *testing.T) { 39 kvs := []kv{{"foo1", "bar1"}, {"foo2", "bar2"}, {"foo3", "bar3"}} 40 dbPath := createSnapshotFile(t, kvs) 41 defer os.RemoveAll(dbPath) 42 43 clusterN := 1 44 urls := newEmbedURLs(clusterN * 2) 45 cURLs, pURLs := urls[:clusterN], urls[clusterN:] 46 47 cfg := embed.NewConfig() 48 cfg.Logger = "zap" 49 cfg.LogOutputs = []string{"/dev/null"} 50 cfg.Debug = false 51 cfg.Name = "s1" 52 cfg.InitialClusterToken = testClusterTkn 53 cfg.ClusterState = "existing" 54 cfg.LCUrls, cfg.ACUrls = cURLs, cURLs 55 cfg.LPUrls, cfg.APUrls = pURLs, pURLs 56 cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, pURLs[0].String()) 57 cfg.Dir = filepath.Join(os.TempDir(), fmt.Sprint(time.Now().Nanosecond())) 58 59 sp := NewV3(zap.NewExample()) 60 pss := make([]string, 0, len(pURLs)) 61 for _, p := range pURLs { 62 pss = append(pss, p.String()) 63 } 64 if err := sp.Restore(RestoreConfig{ 65 SnapshotPath: dbPath, 66 Name: cfg.Name, 67 OutputDataDir: cfg.Dir, 68 InitialCluster: cfg.InitialCluster, 69 InitialClusterToken: cfg.InitialClusterToken, 70 PeerURLs: pss, 71 }); err != nil { 72 t.Fatal(err) 73 } 74 75 srv, err := embed.StartEtcd(cfg) 76 if err != nil { 77 t.Fatal(err) 78 } 79 defer func() { 80 os.RemoveAll(cfg.Dir) 81 srv.Close() 82 }() 83 select { 84 case <-srv.Server.ReadyNotify(): 85 case <-time.After(3 * time.Second): 86 t.Fatalf("failed to start restored etcd member") 87 } 88 89 var cli *clientv3.Client 90 cli, err = clientv3.New(clientv3.Config{Endpoints: []string{cfg.ACUrls[0].String()}}) 91 if err != nil { 92 t.Fatal(err) 93 } 94 defer cli.Close() 95 for i := range kvs { 96 var gresp *clientv3.GetResponse 97 gresp, err = cli.Get(context.Background(), kvs[i].k) 98 if err != nil { 99 t.Fatal(err) 100 } 101 if string(gresp.Kvs[0].Value) != kvs[i].v { 102 t.Fatalf("#%d: value expected %s, got %s", i, kvs[i].v, string(gresp.Kvs[0].Value)) 103 } 104 } 105} 106 107// TestSnapshotV3RestoreMulti ensures that multiple members 108// can boot into the same cluster after being restored from a same 109// snapshot file. 110func TestSnapshotV3RestoreMulti(t *testing.T) { 111 kvs := []kv{{"foo1", "bar1"}, {"foo2", "bar2"}, {"foo3", "bar3"}} 112 dbPath := createSnapshotFile(t, kvs) 113 defer os.RemoveAll(dbPath) 114 115 clusterN := 3 116 cURLs, _, srvs := restoreCluster(t, clusterN, dbPath) 117 defer func() { 118 for i := 0; i < clusterN; i++ { 119 os.RemoveAll(srvs[i].Config().Dir) 120 srvs[i].Close() 121 } 122 }() 123 124 // wait for leader election 125 time.Sleep(time.Second) 126 127 for i := 0; i < clusterN; i++ { 128 cli, err := clientv3.New(clientv3.Config{Endpoints: []string{cURLs[i].String()}}) 129 if err != nil { 130 t.Fatal(err) 131 } 132 defer cli.Close() 133 for i := range kvs { 134 var gresp *clientv3.GetResponse 135 gresp, err = cli.Get(context.Background(), kvs[i].k) 136 if err != nil { 137 t.Fatal(err) 138 } 139 if string(gresp.Kvs[0].Value) != kvs[i].v { 140 t.Fatalf("#%d: value expected %s, got %s", i, kvs[i].v, string(gresp.Kvs[0].Value)) 141 } 142 } 143 } 144} 145 146// TestSnapshotFilePermissions ensures that the snapshot is saved with 147// the correct file permissions. 148func TestSnapshotFilePermissions(t *testing.T) { 149 expectedFileMode := os.FileMode(fileutil.PrivateFileMode) 150 kvs := []kv{{"foo1", "bar1"}, {"foo2", "bar2"}, {"foo3", "bar3"}} 151 dbPath := createSnapshotFile(t, kvs) 152 defer os.RemoveAll(dbPath) 153 154 dbInfo, err := os.Stat(dbPath) 155 if err != nil { 156 t.Fatalf("failed to get test snapshot file status: %v", err) 157 } 158 actualFileMode := dbInfo.Mode() 159 160 if expectedFileMode != actualFileMode { 161 t.Fatalf("expected test snapshot file mode %s, got %s:", expectedFileMode, actualFileMode) 162 } 163} 164 165// TestCorruptedBackupFileCheck tests if we can correctly identify a corrupted backup file. 166func TestCorruptedBackupFileCheck(t *testing.T) { 167 dbPath := "testdata/corrupted_backup.db" 168 if _, err := os.Stat(dbPath); err != nil { 169 t.Fatalf("test file [%s] does not exist: %v", dbPath, err) 170 } 171 172 sp := NewV3(zap.NewExample()) 173 _, err := sp.Status(dbPath) 174 expectedErrKeywords := "snapshot file integrity check failed" 175 /* example error message: 176 snapshot file integrity check failed. 2 errors found. 177 page 3: already freed 178 page 4: unreachable unfreed 179 */ 180 if err == nil { 181 t.Error("expected error due to corrupted snapshot file, got no error") 182 } 183 if !strings.Contains(err.Error(), expectedErrKeywords) { 184 t.Errorf("expected error message to contain the following keywords:\n%s\n"+ 185 "actual error message:\n%s", 186 expectedErrKeywords, err.Error()) 187 } 188} 189 190type kv struct { 191 k, v string 192} 193 194// creates a snapshot file and returns the file path. 195func createSnapshotFile(t *testing.T, kvs []kv) string { 196 clusterN := 1 197 urls := newEmbedURLs(clusterN * 2) 198 cURLs, pURLs := urls[:clusterN], urls[clusterN:] 199 200 cfg := embed.NewConfig() 201 cfg.Logger = "zap" 202 cfg.LogOutputs = []string{"/dev/null"} 203 cfg.Debug = false 204 cfg.Name = "default" 205 cfg.ClusterState = "new" 206 cfg.LCUrls, cfg.ACUrls = cURLs, cURLs 207 cfg.LPUrls, cfg.APUrls = pURLs, pURLs 208 cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, pURLs[0].String()) 209 cfg.Dir = filepath.Join(os.TempDir(), fmt.Sprint(time.Now().Nanosecond())) 210 srv, err := embed.StartEtcd(cfg) 211 if err != nil { 212 t.Fatal(err) 213 } 214 defer func() { 215 os.RemoveAll(cfg.Dir) 216 srv.Close() 217 }() 218 select { 219 case <-srv.Server.ReadyNotify(): 220 case <-time.After(3 * time.Second): 221 t.Fatalf("failed to start embed.Etcd for creating snapshots") 222 } 223 224 ccfg := clientv3.Config{Endpoints: []string{cfg.ACUrls[0].String()}} 225 cli, err := clientv3.New(ccfg) 226 if err != nil { 227 t.Fatal(err) 228 } 229 defer cli.Close() 230 for i := range kvs { 231 ctx, cancel := context.WithTimeout(context.Background(), testutil.RequestTimeout) 232 _, err = cli.Put(ctx, kvs[i].k, kvs[i].v) 233 cancel() 234 if err != nil { 235 t.Fatal(err) 236 } 237 } 238 239 sp := NewV3(zap.NewExample()) 240 dpPath := filepath.Join(os.TempDir(), fmt.Sprintf("snapshot%d.db", time.Now().Nanosecond())) 241 if err = sp.Save(context.Background(), ccfg, dpPath); err != nil { 242 t.Fatal(err) 243 } 244 245 os.RemoveAll(cfg.Dir) 246 srv.Close() 247 return dpPath 248} 249 250const testClusterTkn = "tkn" 251 252func restoreCluster(t *testing.T, clusterN int, dbPath string) ( 253 cURLs []url.URL, 254 pURLs []url.URL, 255 srvs []*embed.Etcd) { 256 urls := newEmbedURLs(clusterN * 2) 257 cURLs, pURLs = urls[:clusterN], urls[clusterN:] 258 259 ics := "" 260 for i := 0; i < clusterN; i++ { 261 ics += fmt.Sprintf(",%d=%s", i, pURLs[i].String()) 262 } 263 ics = ics[1:] 264 265 cfgs := make([]*embed.Config, clusterN) 266 for i := 0; i < clusterN; i++ { 267 cfg := embed.NewConfig() 268 cfg.Logger = "zap" 269 cfg.LogOutputs = []string{"/dev/null"} 270 cfg.Debug = false 271 cfg.Name = fmt.Sprintf("%d", i) 272 cfg.InitialClusterToken = testClusterTkn 273 cfg.ClusterState = "existing" 274 cfg.LCUrls, cfg.ACUrls = []url.URL{cURLs[i]}, []url.URL{cURLs[i]} 275 cfg.LPUrls, cfg.APUrls = []url.URL{pURLs[i]}, []url.URL{pURLs[i]} 276 cfg.InitialCluster = ics 277 cfg.Dir = filepath.Join(os.TempDir(), fmt.Sprint(time.Now().Nanosecond()+i)) 278 279 sp := NewV3(zap.NewExample()) 280 if err := sp.Restore(RestoreConfig{ 281 SnapshotPath: dbPath, 282 Name: cfg.Name, 283 OutputDataDir: cfg.Dir, 284 PeerURLs: []string{pURLs[i].String()}, 285 InitialCluster: ics, 286 InitialClusterToken: cfg.InitialClusterToken, 287 }); err != nil { 288 t.Fatal(err) 289 } 290 cfgs[i] = cfg 291 } 292 293 sch := make(chan *embed.Etcd) 294 for i := range cfgs { 295 go func(idx int) { 296 srv, err := embed.StartEtcd(cfgs[idx]) 297 if err != nil { 298 t.Error(err) 299 } 300 301 <-srv.Server.ReadyNotify() 302 sch <- srv 303 }(i) 304 } 305 306 srvs = make([]*embed.Etcd, clusterN) 307 for i := 0; i < clusterN; i++ { 308 select { 309 case srv := <-sch: 310 srvs[i] = srv 311 case <-time.After(5 * time.Second): 312 t.Fatalf("#%d: failed to start embed.Etcd", i) 313 } 314 } 315 return cURLs, pURLs, srvs 316} 317 318// TODO: TLS 319func newEmbedURLs(n int) (urls []url.URL) { 320 urls = make([]url.URL, n) 321 for i := 0; i < n; i++ { 322 rand.Seed(int64(time.Now().Nanosecond())) 323 u, _ := url.Parse(fmt.Sprintf("unix://localhost:%d", rand.Intn(45000))) 324 urls[i] = *u 325 } 326 return urls 327} 328