1// Copyright 2018 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package snapshot 16 17import ( 18 "context" 19 "fmt" 20 "math/rand" 21 "net/url" 22 "os" 23 "path/filepath" 24 "strings" 25 "testing" 26 "time" 27 28 "go.etcd.io/etcd/clientv3" 29 "go.etcd.io/etcd/embed" 30 "go.etcd.io/etcd/pkg/fileutil" 31 "go.etcd.io/etcd/pkg/testutil" 32 33 "go.uber.org/zap" 34) 35 36// TestSnapshotV3RestoreSingle tests single node cluster restoring 37// from a snapshot file. 38func TestSnapshotV3RestoreSingle(t *testing.T) { 39 kvs := []kv{{"foo1", "bar1"}, {"foo2", "bar2"}, {"foo3", "bar3"}} 40 dbPath := createSnapshotFile(t, kvs) 41 defer os.RemoveAll(dbPath) 42 43 clusterN := 1 44 urls := newEmbedURLs(clusterN * 2) 45 cURLs, pURLs := urls[:clusterN], urls[clusterN:] 46 47 cfg := embed.NewConfig() 48 cfg.Logger = "zap" 49 cfg.LogOutputs = []string{"/dev/null"} 50 cfg.Name = "s1" 51 cfg.InitialClusterToken = testClusterTkn 52 cfg.ClusterState = "existing" 53 cfg.LCUrls, cfg.ACUrls = cURLs, cURLs 54 cfg.LPUrls, cfg.APUrls = pURLs, pURLs 55 cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, pURLs[0].String()) 56 cfg.Dir = filepath.Join(os.TempDir(), fmt.Sprint(time.Now().Nanosecond())) 57 58 sp := NewV3(zap.NewExample()) 59 pss := make([]string, 0, len(pURLs)) 60 for _, p := range pURLs { 61 pss = append(pss, p.String()) 62 } 63 if err := sp.Restore(RestoreConfig{ 64 SnapshotPath: dbPath, 65 Name: cfg.Name, 66 OutputDataDir: cfg.Dir, 67 InitialCluster: cfg.InitialCluster, 68 InitialClusterToken: cfg.InitialClusterToken, 69 PeerURLs: pss, 70 }); err != nil { 71 t.Fatal(err) 72 } 73 74 srv, err := embed.StartEtcd(cfg) 75 if err != nil { 76 t.Fatal(err) 77 } 78 defer func() { 79 os.RemoveAll(cfg.Dir) 80 srv.Close() 81 }() 82 select { 83 case <-srv.Server.ReadyNotify(): 84 case <-time.After(3 * time.Second): 85 t.Fatalf("failed to start restored etcd member") 86 } 87 88 var cli *clientv3.Client 89 cli, err = clientv3.New(clientv3.Config{Endpoints: []string{cfg.ACUrls[0].String()}}) 90 if err != nil { 91 t.Fatal(err) 92 } 93 defer cli.Close() 94 for i := range kvs { 95 var gresp *clientv3.GetResponse 96 gresp, err = cli.Get(context.Background(), kvs[i].k) 97 if err != nil { 98 t.Fatal(err) 99 } 100 if string(gresp.Kvs[0].Value) != kvs[i].v { 101 t.Fatalf("#%d: value expected %s, got %s", i, kvs[i].v, string(gresp.Kvs[0].Value)) 102 } 103 } 104} 105 106// TestSnapshotV3RestoreMulti ensures that multiple members 107// can boot into the same cluster after being restored from a same 108// snapshot file. 109func TestSnapshotV3RestoreMulti(t *testing.T) { 110 kvs := []kv{{"foo1", "bar1"}, {"foo2", "bar2"}, {"foo3", "bar3"}} 111 dbPath := createSnapshotFile(t, kvs) 112 defer os.RemoveAll(dbPath) 113 114 clusterN := 3 115 cURLs, _, srvs := restoreCluster(t, clusterN, dbPath) 116 defer func() { 117 for i := 0; i < clusterN; i++ { 118 os.RemoveAll(srvs[i].Config().Dir) 119 srvs[i].Close() 120 } 121 }() 122 123 // wait for leader election 124 time.Sleep(time.Second) 125 126 for i := 0; i < clusterN; i++ { 127 cli, err := clientv3.New(clientv3.Config{Endpoints: []string{cURLs[i].String()}}) 128 if err != nil { 129 t.Fatal(err) 130 } 131 defer cli.Close() 132 for i := range kvs { 133 var gresp *clientv3.GetResponse 134 gresp, err = cli.Get(context.Background(), kvs[i].k) 135 if err != nil { 136 t.Fatal(err) 137 } 138 if string(gresp.Kvs[0].Value) != kvs[i].v { 139 t.Fatalf("#%d: value expected %s, got %s", i, kvs[i].v, string(gresp.Kvs[0].Value)) 140 } 141 } 142 } 143} 144 145// TestSnapshotFilePermissions ensures that the snapshot is saved with 146// the correct file permissions. 147func TestSnapshotFilePermissions(t *testing.T) { 148 expectedFileMode := os.FileMode(fileutil.PrivateFileMode) 149 kvs := []kv{{"foo1", "bar1"}, {"foo2", "bar2"}, {"foo3", "bar3"}} 150 dbPath := createSnapshotFile(t, kvs) 151 defer os.RemoveAll(dbPath) 152 153 dbInfo, err := os.Stat(dbPath) 154 if err != nil { 155 t.Fatalf("failed to get test snapshot file status: %v", err) 156 } 157 actualFileMode := dbInfo.Mode() 158 159 if expectedFileMode != actualFileMode { 160 t.Fatalf("expected test snapshot file mode %s, got %s:", expectedFileMode, actualFileMode) 161 } 162} 163 164// TestCorruptedBackupFileCheck tests if we can correctly identify a corrupted backup file. 165func TestCorruptedBackupFileCheck(t *testing.T) { 166 dbPath := "testdata/corrupted_backup.db" 167 if _, err := os.Stat(dbPath); err != nil { 168 t.Fatalf("test file [%s] does not exist: %v", dbPath, err) 169 } 170 171 sp := NewV3(zap.NewExample()) 172 _, err := sp.Status(dbPath) 173 expectedErrKeywords := "snapshot file integrity check failed" 174 /* example error message: 175 snapshot file integrity check failed. 2 errors found. 176 page 3: already freed 177 page 4: unreachable unfreed 178 */ 179 if err == nil { 180 t.Error("expected error due to corrupted snapshot file, got no error") 181 } 182 if !strings.Contains(err.Error(), expectedErrKeywords) { 183 t.Errorf("expected error message to contain the following keywords:\n%s\n"+ 184 "actual error message:\n%s", 185 expectedErrKeywords, err.Error()) 186 } 187} 188 189type kv struct { 190 k, v string 191} 192 193// creates a snapshot file and returns the file path. 194func createSnapshotFile(t *testing.T, kvs []kv) string { 195 clusterN := 1 196 urls := newEmbedURLs(clusterN * 2) 197 cURLs, pURLs := urls[:clusterN], urls[clusterN:] 198 199 cfg := embed.NewConfig() 200 cfg.Logger = "zap" 201 cfg.LogOutputs = []string{"/dev/null"} 202 cfg.Name = "default" 203 cfg.ClusterState = "new" 204 cfg.LCUrls, cfg.ACUrls = cURLs, cURLs 205 cfg.LPUrls, cfg.APUrls = pURLs, pURLs 206 cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, pURLs[0].String()) 207 cfg.Dir = filepath.Join(os.TempDir(), fmt.Sprint(time.Now().Nanosecond())) 208 srv, err := embed.StartEtcd(cfg) 209 if err != nil { 210 t.Fatal(err) 211 } 212 defer func() { 213 os.RemoveAll(cfg.Dir) 214 srv.Close() 215 }() 216 select { 217 case <-srv.Server.ReadyNotify(): 218 case <-time.After(3 * time.Second): 219 t.Fatalf("failed to start embed.Etcd for creating snapshots") 220 } 221 222 ccfg := clientv3.Config{Endpoints: []string{cfg.ACUrls[0].String()}} 223 cli, err := clientv3.New(ccfg) 224 if err != nil { 225 t.Fatal(err) 226 } 227 defer cli.Close() 228 for i := range kvs { 229 ctx, cancel := context.WithTimeout(context.Background(), testutil.RequestTimeout) 230 _, err = cli.Put(ctx, kvs[i].k, kvs[i].v) 231 cancel() 232 if err != nil { 233 t.Fatal(err) 234 } 235 } 236 237 sp := NewV3(zap.NewExample()) 238 dpPath := filepath.Join(os.TempDir(), fmt.Sprintf("snapshot%d.db", time.Now().Nanosecond())) 239 if err = sp.Save(context.Background(), ccfg, dpPath); err != nil { 240 t.Fatal(err) 241 } 242 243 os.RemoveAll(cfg.Dir) 244 srv.Close() 245 return dpPath 246} 247 248const testClusterTkn = "tkn" 249 250func restoreCluster(t *testing.T, clusterN int, dbPath string) ( 251 cURLs []url.URL, 252 pURLs []url.URL, 253 srvs []*embed.Etcd) { 254 urls := newEmbedURLs(clusterN * 2) 255 cURLs, pURLs = urls[:clusterN], urls[clusterN:] 256 257 ics := "" 258 for i := 0; i < clusterN; i++ { 259 ics += fmt.Sprintf(",%d=%s", i, pURLs[i].String()) 260 } 261 ics = ics[1:] 262 263 cfgs := make([]*embed.Config, clusterN) 264 for i := 0; i < clusterN; i++ { 265 cfg := embed.NewConfig() 266 cfg.Logger = "zap" 267 cfg.LogOutputs = []string{"/dev/null"} 268 cfg.Name = fmt.Sprintf("%d", i) 269 cfg.InitialClusterToken = testClusterTkn 270 cfg.ClusterState = "existing" 271 cfg.LCUrls, cfg.ACUrls = []url.URL{cURLs[i]}, []url.URL{cURLs[i]} 272 cfg.LPUrls, cfg.APUrls = []url.URL{pURLs[i]}, []url.URL{pURLs[i]} 273 cfg.InitialCluster = ics 274 cfg.Dir = filepath.Join(os.TempDir(), fmt.Sprint(time.Now().Nanosecond()+i)) 275 276 sp := NewV3(zap.NewExample()) 277 if err := sp.Restore(RestoreConfig{ 278 SnapshotPath: dbPath, 279 Name: cfg.Name, 280 OutputDataDir: cfg.Dir, 281 PeerURLs: []string{pURLs[i].String()}, 282 InitialCluster: ics, 283 InitialClusterToken: cfg.InitialClusterToken, 284 }); err != nil { 285 t.Fatal(err) 286 } 287 cfgs[i] = cfg 288 } 289 290 sch := make(chan *embed.Etcd) 291 for i := range cfgs { 292 go func(idx int) { 293 srv, err := embed.StartEtcd(cfgs[idx]) 294 if err != nil { 295 t.Error(err) 296 } 297 298 <-srv.Server.ReadyNotify() 299 sch <- srv 300 }(i) 301 } 302 303 srvs = make([]*embed.Etcd, clusterN) 304 for i := 0; i < clusterN; i++ { 305 select { 306 case srv := <-sch: 307 srvs[i] = srv 308 case <-time.After(5 * time.Second): 309 t.Fatalf("#%d: failed to start embed.Etcd", i) 310 } 311 } 312 return cURLs, pURLs, srvs 313} 314 315// TODO: TLS 316func newEmbedURLs(n int) (urls []url.URL) { 317 urls = make([]url.URL, n) 318 for i := 0; i < n; i++ { 319 rand.Seed(int64(time.Now().Nanosecond())) 320 u, _ := url.Parse(fmt.Sprintf("unix://localhost:%d", rand.Intn(45000))) 321 urls[i] = *u 322 } 323 return urls 324} 325