1// Copyright 2018 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package snapshot 16 17import ( 18 "context" 19 "crypto/sha256" 20 "encoding/json" 21 "fmt" 22 "hash/crc32" 23 "io" 24 "os" 25 "path/filepath" 26 "reflect" 27 "strings" 28 29 bolt "go.etcd.io/bbolt" 30 "go.etcd.io/etcd/api/v3/etcdserverpb" 31 "go.etcd.io/etcd/client/pkg/v3/fileutil" 32 "go.etcd.io/etcd/client/pkg/v3/types" 33 "go.etcd.io/etcd/client/v3" 34 "go.etcd.io/etcd/client/v3/snapshot" 35 "go.etcd.io/etcd/raft/v3" 36 "go.etcd.io/etcd/raft/v3/raftpb" 37 "go.etcd.io/etcd/server/v3/config" 38 "go.etcd.io/etcd/server/v3/etcdserver" 39 "go.etcd.io/etcd/server/v3/etcdserver/api/membership" 40 "go.etcd.io/etcd/server/v3/etcdserver/api/snap" 41 "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" 42 "go.etcd.io/etcd/server/v3/etcdserver/cindex" 43 "go.etcd.io/etcd/server/v3/mvcc/backend" 44 "go.etcd.io/etcd/server/v3/verify" 45 "go.etcd.io/etcd/server/v3/wal" 46 "go.etcd.io/etcd/server/v3/wal/walpb" 47 "go.uber.org/zap" 48) 49 50// Manager defines snapshot methods. 51type Manager interface { 52 // Save fetches snapshot from remote etcd server and saves data 53 // to target path. If the context "ctx" is canceled or timed out, 54 // snapshot save stream will error out (e.g. context.Canceled, 55 // context.DeadlineExceeded). Make sure to specify only one endpoint 56 // in client configuration. Snapshot API must be requested to a 57 // selected node, and saved snapshot is the point-in-time state of 58 // the selected node. 59 Save(ctx context.Context, cfg clientv3.Config, dbPath string) error 60 61 // Status returns the snapshot file information. 62 Status(dbPath string) (Status, error) 63 64 // Restore restores a new etcd data directory from given snapshot 65 // file. It returns an error if specified data directory already 66 // exists, to prevent unintended data directory overwrites. 67 Restore(cfg RestoreConfig) error 68} 69 70// NewV3 returns a new snapshot Manager for v3.x snapshot. 71func NewV3(lg *zap.Logger) Manager { 72 if lg == nil { 73 lg = zap.NewExample() 74 } 75 return &v3Manager{lg: lg} 76} 77 78type v3Manager struct { 79 lg *zap.Logger 80 81 name string 82 srcDbPath string 83 walDir string 84 snapDir string 85 cl *membership.RaftCluster 86 87 skipHashCheck bool 88} 89 90// hasChecksum returns "true" if the file size "n" 91// has appended sha256 hash digest. 92func hasChecksum(n int64) bool { 93 // 512 is chosen because it's a minimum disk sector size 94 // smaller than (and multiplies to) OS page size in most systems 95 return (n % 512) == sha256.Size 96} 97 98// Save fetches snapshot from remote etcd server and saves data to target path. 99func (s *v3Manager) Save(ctx context.Context, cfg clientv3.Config, dbPath string) error { 100 return snapshot.Save(ctx, s.lg, cfg, dbPath) 101} 102 103// Status is the snapshot file status. 104type Status struct { 105 Hash uint32 `json:"hash"` 106 Revision int64 `json:"revision"` 107 TotalKey int `json:"totalKey"` 108 TotalSize int64 `json:"totalSize"` 109} 110 111// Status returns the snapshot file information. 112func (s *v3Manager) Status(dbPath string) (ds Status, err error) { 113 if _, err = os.Stat(dbPath); err != nil { 114 return ds, err 115 } 116 117 db, err := bolt.Open(dbPath, 0400, &bolt.Options{ReadOnly: true}) 118 if err != nil { 119 return ds, err 120 } 121 defer db.Close() 122 123 h := crc32.New(crc32.MakeTable(crc32.Castagnoli)) 124 125 if err = db.View(func(tx *bolt.Tx) error { 126 // check snapshot file integrity first 127 var dbErrStrings []string 128 for dbErr := range tx.Check() { 129 dbErrStrings = append(dbErrStrings, dbErr.Error()) 130 } 131 if len(dbErrStrings) > 0 { 132 return fmt.Errorf("snapshot file integrity check failed. %d errors found.\n"+strings.Join(dbErrStrings, "\n"), len(dbErrStrings)) 133 } 134 ds.TotalSize = tx.Size() 135 c := tx.Cursor() 136 for next, _ := c.First(); next != nil; next, _ = c.Next() { 137 b := tx.Bucket(next) 138 if b == nil { 139 return fmt.Errorf("cannot get hash of bucket %s", string(next)) 140 } 141 if _, err := h.Write(next); err != nil { 142 return fmt.Errorf("cannot write bucket %s : %v", string(next), err) 143 } 144 iskeyb := (string(next) == "key") 145 if err := b.ForEach(func(k, v []byte) error { 146 if _, err := h.Write(k); err != nil { 147 return fmt.Errorf("cannot write to bucket %s", err.Error()) 148 } 149 if _, err := h.Write(v); err != nil { 150 return fmt.Errorf("cannot write to bucket %s", err.Error()) 151 } 152 if iskeyb { 153 rev := bytesToRev(k) 154 ds.Revision = rev.main 155 } 156 ds.TotalKey++ 157 return nil 158 }); err != nil { 159 return fmt.Errorf("cannot write bucket %s : %v", string(next), err) 160 } 161 } 162 return nil 163 }); err != nil { 164 return ds, err 165 } 166 167 ds.Hash = h.Sum32() 168 return ds, nil 169} 170 171// RestoreConfig configures snapshot restore operation. 172type RestoreConfig struct { 173 // SnapshotPath is the path of snapshot file to restore from. 174 SnapshotPath string 175 176 // Name is the human-readable name of this member. 177 Name string 178 179 // OutputDataDir is the target data directory to save restored data. 180 // OutputDataDir should not conflict with existing etcd data directory. 181 // If OutputDataDir already exists, it will return an error to prevent 182 // unintended data directory overwrites. 183 // If empty, defaults to "[Name].etcd" if not given. 184 OutputDataDir string 185 // OutputWALDir is the target WAL data directory. 186 // If empty, defaults to "[OutputDataDir]/member/wal" if not given. 187 OutputWALDir string 188 189 // PeerURLs is a list of member's peer URLs to advertise to the rest of the cluster. 190 PeerURLs []string 191 192 // InitialCluster is the initial cluster configuration for restore bootstrap. 193 InitialCluster string 194 // InitialClusterToken is the initial cluster token for etcd cluster during restore bootstrap. 195 InitialClusterToken string 196 197 // SkipHashCheck is "true" to ignore snapshot integrity hash value 198 // (required if copied from data directory). 199 SkipHashCheck bool 200} 201 202// Restore restores a new etcd data directory from given snapshot file. 203func (s *v3Manager) Restore(cfg RestoreConfig) error { 204 pURLs, err := types.NewURLs(cfg.PeerURLs) 205 if err != nil { 206 return err 207 } 208 var ics types.URLsMap 209 ics, err = types.NewURLsMap(cfg.InitialCluster) 210 if err != nil { 211 return err 212 } 213 214 srv := config.ServerConfig{ 215 Logger: s.lg, 216 Name: cfg.Name, 217 PeerURLs: pURLs, 218 InitialPeerURLsMap: ics, 219 InitialClusterToken: cfg.InitialClusterToken, 220 } 221 if err = srv.VerifyBootstrap(); err != nil { 222 return err 223 } 224 225 s.cl, err = membership.NewClusterFromURLsMap(s.lg, cfg.InitialClusterToken, ics) 226 if err != nil { 227 return err 228 } 229 230 dataDir := cfg.OutputDataDir 231 if dataDir == "" { 232 dataDir = cfg.Name + ".etcd" 233 } 234 if fileutil.Exist(dataDir) && !fileutil.DirEmpty(dataDir) { 235 return fmt.Errorf("data-dir %q not empty or could not be read", dataDir) 236 } 237 238 walDir := cfg.OutputWALDir 239 if walDir == "" { 240 walDir = filepath.Join(dataDir, "member", "wal") 241 } else if fileutil.Exist(walDir) { 242 return fmt.Errorf("wal-dir %q exists", walDir) 243 } 244 245 s.name = cfg.Name 246 s.srcDbPath = cfg.SnapshotPath 247 s.walDir = walDir 248 s.snapDir = filepath.Join(dataDir, "member", "snap") 249 s.skipHashCheck = cfg.SkipHashCheck 250 251 s.lg.Info( 252 "restoring snapshot", 253 zap.String("path", s.srcDbPath), 254 zap.String("wal-dir", s.walDir), 255 zap.String("data-dir", dataDir), 256 zap.String("snap-dir", s.snapDir), 257 zap.Stack("stack"), 258 ) 259 260 if err = s.saveDB(); err != nil { 261 return err 262 } 263 hardstate, err := s.saveWALAndSnap() 264 if err != nil { 265 return err 266 } 267 268 if err := s.updateCIndex(hardstate.Commit, hardstate.Term); err != nil { 269 return err 270 } 271 272 s.lg.Info( 273 "restored snapshot", 274 zap.String("path", s.srcDbPath), 275 zap.String("wal-dir", s.walDir), 276 zap.String("data-dir", dataDir), 277 zap.String("snap-dir", s.snapDir), 278 ) 279 280 return verify.VerifyIfEnabled(verify.Config{ 281 ExactIndex: true, 282 Logger: s.lg, 283 DataDir: dataDir, 284 }) 285} 286 287func (s *v3Manager) outDbPath() string { 288 return filepath.Join(s.snapDir, "db") 289} 290 291// saveDB copies the database snapshot to the snapshot directory 292func (s *v3Manager) saveDB() error { 293 err := s.copyAndVerifyDB() 294 if err != nil { 295 return err 296 } 297 298 be := backend.NewDefaultBackend(s.outDbPath()) 299 defer be.Close() 300 301 err = membership.TrimMembershipFromBackend(s.lg, be) 302 if err != nil { 303 return err 304 } 305 306 return nil 307} 308 309func (s *v3Manager) copyAndVerifyDB() error { 310 srcf, ferr := os.Open(s.srcDbPath) 311 if ferr != nil { 312 return ferr 313 } 314 defer srcf.Close() 315 316 // get snapshot integrity hash 317 if _, err := srcf.Seek(-sha256.Size, io.SeekEnd); err != nil { 318 return err 319 } 320 sha := make([]byte, sha256.Size) 321 if _, err := srcf.Read(sha); err != nil { 322 return err 323 } 324 if _, err := srcf.Seek(0, io.SeekStart); err != nil { 325 return err 326 } 327 328 if err := fileutil.CreateDirAll(s.snapDir); err != nil { 329 return err 330 } 331 332 outDbPath := s.outDbPath() 333 334 db, dberr := os.OpenFile(outDbPath, os.O_RDWR|os.O_CREATE, 0600) 335 if dberr != nil { 336 return dberr 337 } 338 dbClosed := false 339 defer func() { 340 if !dbClosed { 341 db.Close() 342 dbClosed = true 343 } 344 }() 345 if _, err := io.Copy(db, srcf); err != nil { 346 return err 347 } 348 349 // truncate away integrity hash, if any. 350 off, serr := db.Seek(0, io.SeekEnd) 351 if serr != nil { 352 return serr 353 } 354 hasHash := hasChecksum(off) 355 if hasHash { 356 if err := db.Truncate(off - sha256.Size); err != nil { 357 return err 358 } 359 } 360 361 if !hasHash && !s.skipHashCheck { 362 return fmt.Errorf("snapshot missing hash but --skip-hash-check=false") 363 } 364 365 if hasHash && !s.skipHashCheck { 366 // check for match 367 if _, err := db.Seek(0, io.SeekStart); err != nil { 368 return err 369 } 370 h := sha256.New() 371 if _, err := io.Copy(h, db); err != nil { 372 return err 373 } 374 dbsha := h.Sum(nil) 375 if !reflect.DeepEqual(sha, dbsha) { 376 return fmt.Errorf("expected sha256 %v, got %v", sha, dbsha) 377 } 378 } 379 380 // db hash is OK, can now modify DB so it can be part of a new cluster 381 db.Close() 382 return nil 383} 384 385// saveWALAndSnap creates a WAL for the initial cluster 386// 387// TODO: This code ignores learners !!! 388func (s *v3Manager) saveWALAndSnap() (*raftpb.HardState, error) { 389 if err := fileutil.CreateDirAll(s.walDir); err != nil { 390 return nil, err 391 } 392 393 // add members again to persist them to the store we create. 394 st := v2store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix) 395 s.cl.SetStore(st) 396 be := backend.NewDefaultBackend(s.outDbPath()) 397 defer be.Close() 398 s.cl.SetBackend(be) 399 for _, m := range s.cl.Members() { 400 s.cl.AddMember(m, true) 401 } 402 403 m := s.cl.MemberByName(s.name) 404 md := &etcdserverpb.Metadata{NodeID: uint64(m.ID), ClusterID: uint64(s.cl.ID())} 405 metadata, merr := md.Marshal() 406 if merr != nil { 407 return nil, merr 408 } 409 w, walerr := wal.Create(s.lg, s.walDir, metadata) 410 if walerr != nil { 411 return nil, walerr 412 } 413 defer w.Close() 414 415 peers := make([]raft.Peer, len(s.cl.MemberIDs())) 416 for i, id := range s.cl.MemberIDs() { 417 ctx, err := json.Marshal((*s.cl).Member(id)) 418 if err != nil { 419 return nil, err 420 } 421 peers[i] = raft.Peer{ID: uint64(id), Context: ctx} 422 } 423 424 ents := make([]raftpb.Entry, len(peers)) 425 nodeIDs := make([]uint64, len(peers)) 426 for i, p := range peers { 427 nodeIDs[i] = p.ID 428 cc := raftpb.ConfChange{ 429 Type: raftpb.ConfChangeAddNode, 430 NodeID: p.ID, 431 Context: p.Context, 432 } 433 d, err := cc.Marshal() 434 if err != nil { 435 return nil, err 436 } 437 ents[i] = raftpb.Entry{ 438 Type: raftpb.EntryConfChange, 439 Term: 1, 440 Index: uint64(i + 1), 441 Data: d, 442 } 443 } 444 445 commit, term := uint64(len(ents)), uint64(1) 446 hardState := raftpb.HardState{ 447 Term: term, 448 Vote: peers[0].ID, 449 Commit: commit, 450 } 451 if err := w.Save(hardState, ents); err != nil { 452 return nil, err 453 } 454 455 b, berr := st.Save() 456 if berr != nil { 457 return nil, berr 458 } 459 confState := raftpb.ConfState{ 460 Voters: nodeIDs, 461 } 462 raftSnap := raftpb.Snapshot{ 463 Data: b, 464 Metadata: raftpb.SnapshotMetadata{ 465 Index: commit, 466 Term: term, 467 ConfState: confState, 468 }, 469 } 470 sn := snap.New(s.lg, s.snapDir) 471 if err := sn.SaveSnap(raftSnap); err != nil { 472 return nil, err 473 } 474 snapshot := walpb.Snapshot{Index: commit, Term: term, ConfState: &confState} 475 return &hardState, w.SaveSnapshot(snapshot) 476} 477 478func (s *v3Manager) updateCIndex(commit uint64, term uint64) error { 479 be := backend.NewDefaultBackend(s.outDbPath()) 480 defer be.Close() 481 482 cindex.UpdateConsistentIndex(be.BatchTx(), commit, term, false) 483 return nil 484} 485