1// Copyright 2018 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package snapshot
16
17import (
18	"context"
19	"crypto/sha256"
20	"encoding/json"
21	"fmt"
22	"hash/crc32"
23	"io"
24	"os"
25	"path/filepath"
26	"reflect"
27	"strings"
28
29	bolt "go.etcd.io/bbolt"
30	"go.etcd.io/etcd/api/v3/etcdserverpb"
31	"go.etcd.io/etcd/client/pkg/v3/fileutil"
32	"go.etcd.io/etcd/client/pkg/v3/types"
33	"go.etcd.io/etcd/client/v3"
34	"go.etcd.io/etcd/client/v3/snapshot"
35	"go.etcd.io/etcd/raft/v3"
36	"go.etcd.io/etcd/raft/v3/raftpb"
37	"go.etcd.io/etcd/server/v3/config"
38	"go.etcd.io/etcd/server/v3/etcdserver"
39	"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
40	"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
41	"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
42	"go.etcd.io/etcd/server/v3/etcdserver/cindex"
43	"go.etcd.io/etcd/server/v3/mvcc/backend"
44	"go.etcd.io/etcd/server/v3/verify"
45	"go.etcd.io/etcd/server/v3/wal"
46	"go.etcd.io/etcd/server/v3/wal/walpb"
47	"go.uber.org/zap"
48)
49
50// Manager defines snapshot methods.
51type Manager interface {
52	// Save fetches snapshot from remote etcd server and saves data
53	// to target path. If the context "ctx" is canceled or timed out,
54	// snapshot save stream will error out (e.g. context.Canceled,
55	// context.DeadlineExceeded). Make sure to specify only one endpoint
56	// in client configuration. Snapshot API must be requested to a
57	// selected node, and saved snapshot is the point-in-time state of
58	// the selected node.
59	Save(ctx context.Context, cfg clientv3.Config, dbPath string) error
60
61	// Status returns the snapshot file information.
62	Status(dbPath string) (Status, error)
63
64	// Restore restores a new etcd data directory from given snapshot
65	// file. It returns an error if specified data directory already
66	// exists, to prevent unintended data directory overwrites.
67	Restore(cfg RestoreConfig) error
68}
69
70// NewV3 returns a new snapshot Manager for v3.x snapshot.
71func NewV3(lg *zap.Logger) Manager {
72	if lg == nil {
73		lg = zap.NewExample()
74	}
75	return &v3Manager{lg: lg}
76}
77
78type v3Manager struct {
79	lg *zap.Logger
80
81	name      string
82	srcDbPath string
83	walDir    string
84	snapDir   string
85	cl        *membership.RaftCluster
86
87	skipHashCheck bool
88}
89
90// hasChecksum returns "true" if the file size "n"
91// has appended sha256 hash digest.
92func hasChecksum(n int64) bool {
93	// 512 is chosen because it's a minimum disk sector size
94	// smaller than (and multiplies to) OS page size in most systems
95	return (n % 512) == sha256.Size
96}
97
98// Save fetches snapshot from remote etcd server and saves data to target path.
99func (s *v3Manager) Save(ctx context.Context, cfg clientv3.Config, dbPath string) error {
100	return snapshot.Save(ctx, s.lg, cfg, dbPath)
101}
102
103// Status is the snapshot file status.
104type Status struct {
105	Hash      uint32 `json:"hash"`
106	Revision  int64  `json:"revision"`
107	TotalKey  int    `json:"totalKey"`
108	TotalSize int64  `json:"totalSize"`
109}
110
111// Status returns the snapshot file information.
112func (s *v3Manager) Status(dbPath string) (ds Status, err error) {
113	if _, err = os.Stat(dbPath); err != nil {
114		return ds, err
115	}
116
117	db, err := bolt.Open(dbPath, 0400, &bolt.Options{ReadOnly: true})
118	if err != nil {
119		return ds, err
120	}
121	defer db.Close()
122
123	h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
124
125	if err = db.View(func(tx *bolt.Tx) error {
126		// check snapshot file integrity first
127		var dbErrStrings []string
128		for dbErr := range tx.Check() {
129			dbErrStrings = append(dbErrStrings, dbErr.Error())
130		}
131		if len(dbErrStrings) > 0 {
132			return fmt.Errorf("snapshot file integrity check failed. %d errors found.\n"+strings.Join(dbErrStrings, "\n"), len(dbErrStrings))
133		}
134		ds.TotalSize = tx.Size()
135		c := tx.Cursor()
136		for next, _ := c.First(); next != nil; next, _ = c.Next() {
137			b := tx.Bucket(next)
138			if b == nil {
139				return fmt.Errorf("cannot get hash of bucket %s", string(next))
140			}
141			if _, err := h.Write(next); err != nil {
142				return fmt.Errorf("cannot write bucket %s : %v", string(next), err)
143			}
144			iskeyb := (string(next) == "key")
145			if err := b.ForEach(func(k, v []byte) error {
146				if _, err := h.Write(k); err != nil {
147					return fmt.Errorf("cannot write to bucket %s", err.Error())
148				}
149				if _, err := h.Write(v); err != nil {
150					return fmt.Errorf("cannot write to bucket %s", err.Error())
151				}
152				if iskeyb {
153					rev := bytesToRev(k)
154					ds.Revision = rev.main
155				}
156				ds.TotalKey++
157				return nil
158			}); err != nil {
159				return fmt.Errorf("cannot write bucket %s : %v", string(next), err)
160			}
161		}
162		return nil
163	}); err != nil {
164		return ds, err
165	}
166
167	ds.Hash = h.Sum32()
168	return ds, nil
169}
170
171// RestoreConfig configures snapshot restore operation.
172type RestoreConfig struct {
173	// SnapshotPath is the path of snapshot file to restore from.
174	SnapshotPath string
175
176	// Name is the human-readable name of this member.
177	Name string
178
179	// OutputDataDir is the target data directory to save restored data.
180	// OutputDataDir should not conflict with existing etcd data directory.
181	// If OutputDataDir already exists, it will return an error to prevent
182	// unintended data directory overwrites.
183	// If empty, defaults to "[Name].etcd" if not given.
184	OutputDataDir string
185	// OutputWALDir is the target WAL data directory.
186	// If empty, defaults to "[OutputDataDir]/member/wal" if not given.
187	OutputWALDir string
188
189	// PeerURLs is a list of member's peer URLs to advertise to the rest of the cluster.
190	PeerURLs []string
191
192	// InitialCluster is the initial cluster configuration for restore bootstrap.
193	InitialCluster string
194	// InitialClusterToken is the initial cluster token for etcd cluster during restore bootstrap.
195	InitialClusterToken string
196
197	// SkipHashCheck is "true" to ignore snapshot integrity hash value
198	// (required if copied from data directory).
199	SkipHashCheck bool
200}
201
202// Restore restores a new etcd data directory from given snapshot file.
203func (s *v3Manager) Restore(cfg RestoreConfig) error {
204	pURLs, err := types.NewURLs(cfg.PeerURLs)
205	if err != nil {
206		return err
207	}
208	var ics types.URLsMap
209	ics, err = types.NewURLsMap(cfg.InitialCluster)
210	if err != nil {
211		return err
212	}
213
214	srv := config.ServerConfig{
215		Logger:              s.lg,
216		Name:                cfg.Name,
217		PeerURLs:            pURLs,
218		InitialPeerURLsMap:  ics,
219		InitialClusterToken: cfg.InitialClusterToken,
220	}
221	if err = srv.VerifyBootstrap(); err != nil {
222		return err
223	}
224
225	s.cl, err = membership.NewClusterFromURLsMap(s.lg, cfg.InitialClusterToken, ics)
226	if err != nil {
227		return err
228	}
229
230	dataDir := cfg.OutputDataDir
231	if dataDir == "" {
232		dataDir = cfg.Name + ".etcd"
233	}
234	if fileutil.Exist(dataDir) && !fileutil.DirEmpty(dataDir) {
235		return fmt.Errorf("data-dir %q not empty or could not be read", dataDir)
236	}
237
238	walDir := cfg.OutputWALDir
239	if walDir == "" {
240		walDir = filepath.Join(dataDir, "member", "wal")
241	} else if fileutil.Exist(walDir) {
242		return fmt.Errorf("wal-dir %q exists", walDir)
243	}
244
245	s.name = cfg.Name
246	s.srcDbPath = cfg.SnapshotPath
247	s.walDir = walDir
248	s.snapDir = filepath.Join(dataDir, "member", "snap")
249	s.skipHashCheck = cfg.SkipHashCheck
250
251	s.lg.Info(
252		"restoring snapshot",
253		zap.String("path", s.srcDbPath),
254		zap.String("wal-dir", s.walDir),
255		zap.String("data-dir", dataDir),
256		zap.String("snap-dir", s.snapDir),
257		zap.Stack("stack"),
258	)
259
260	if err = s.saveDB(); err != nil {
261		return err
262	}
263	hardstate, err := s.saveWALAndSnap()
264	if err != nil {
265		return err
266	}
267
268	if err := s.updateCIndex(hardstate.Commit, hardstate.Term); err != nil {
269		return err
270	}
271
272	s.lg.Info(
273		"restored snapshot",
274		zap.String("path", s.srcDbPath),
275		zap.String("wal-dir", s.walDir),
276		zap.String("data-dir", dataDir),
277		zap.String("snap-dir", s.snapDir),
278	)
279
280	return verify.VerifyIfEnabled(verify.Config{
281		ExactIndex: true,
282		Logger:     s.lg,
283		DataDir:    dataDir,
284	})
285}
286
287func (s *v3Manager) outDbPath() string {
288	return filepath.Join(s.snapDir, "db")
289}
290
291// saveDB copies the database snapshot to the snapshot directory
292func (s *v3Manager) saveDB() error {
293	err := s.copyAndVerifyDB()
294	if err != nil {
295		return err
296	}
297
298	be := backend.NewDefaultBackend(s.outDbPath())
299	defer be.Close()
300
301	err = membership.TrimMembershipFromBackend(s.lg, be)
302	if err != nil {
303		return err
304	}
305
306	return nil
307}
308
309func (s *v3Manager) copyAndVerifyDB() error {
310	srcf, ferr := os.Open(s.srcDbPath)
311	if ferr != nil {
312		return ferr
313	}
314	defer srcf.Close()
315
316	// get snapshot integrity hash
317	if _, err := srcf.Seek(-sha256.Size, io.SeekEnd); err != nil {
318		return err
319	}
320	sha := make([]byte, sha256.Size)
321	if _, err := srcf.Read(sha); err != nil {
322		return err
323	}
324	if _, err := srcf.Seek(0, io.SeekStart); err != nil {
325		return err
326	}
327
328	if err := fileutil.CreateDirAll(s.snapDir); err != nil {
329		return err
330	}
331
332	outDbPath := s.outDbPath()
333
334	db, dberr := os.OpenFile(outDbPath, os.O_RDWR|os.O_CREATE, 0600)
335	if dberr != nil {
336		return dberr
337	}
338	dbClosed := false
339	defer func() {
340		if !dbClosed {
341			db.Close()
342			dbClosed = true
343		}
344	}()
345	if _, err := io.Copy(db, srcf); err != nil {
346		return err
347	}
348
349	// truncate away integrity hash, if any.
350	off, serr := db.Seek(0, io.SeekEnd)
351	if serr != nil {
352		return serr
353	}
354	hasHash := hasChecksum(off)
355	if hasHash {
356		if err := db.Truncate(off - sha256.Size); err != nil {
357			return err
358		}
359	}
360
361	if !hasHash && !s.skipHashCheck {
362		return fmt.Errorf("snapshot missing hash but --skip-hash-check=false")
363	}
364
365	if hasHash && !s.skipHashCheck {
366		// check for match
367		if _, err := db.Seek(0, io.SeekStart); err != nil {
368			return err
369		}
370		h := sha256.New()
371		if _, err := io.Copy(h, db); err != nil {
372			return err
373		}
374		dbsha := h.Sum(nil)
375		if !reflect.DeepEqual(sha, dbsha) {
376			return fmt.Errorf("expected sha256 %v, got %v", sha, dbsha)
377		}
378	}
379
380	// db hash is OK, can now modify DB so it can be part of a new cluster
381	db.Close()
382	return nil
383}
384
385// saveWALAndSnap creates a WAL for the initial cluster
386//
387// TODO: This code ignores learners !!!
388func (s *v3Manager) saveWALAndSnap() (*raftpb.HardState, error) {
389	if err := fileutil.CreateDirAll(s.walDir); err != nil {
390		return nil, err
391	}
392
393	// add members again to persist them to the store we create.
394	st := v2store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix)
395	s.cl.SetStore(st)
396	be := backend.NewDefaultBackend(s.outDbPath())
397	defer be.Close()
398	s.cl.SetBackend(be)
399	for _, m := range s.cl.Members() {
400		s.cl.AddMember(m, true)
401	}
402
403	m := s.cl.MemberByName(s.name)
404	md := &etcdserverpb.Metadata{NodeID: uint64(m.ID), ClusterID: uint64(s.cl.ID())}
405	metadata, merr := md.Marshal()
406	if merr != nil {
407		return nil, merr
408	}
409	w, walerr := wal.Create(s.lg, s.walDir, metadata)
410	if walerr != nil {
411		return nil, walerr
412	}
413	defer w.Close()
414
415	peers := make([]raft.Peer, len(s.cl.MemberIDs()))
416	for i, id := range s.cl.MemberIDs() {
417		ctx, err := json.Marshal((*s.cl).Member(id))
418		if err != nil {
419			return nil, err
420		}
421		peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
422	}
423
424	ents := make([]raftpb.Entry, len(peers))
425	nodeIDs := make([]uint64, len(peers))
426	for i, p := range peers {
427		nodeIDs[i] = p.ID
428		cc := raftpb.ConfChange{
429			Type:    raftpb.ConfChangeAddNode,
430			NodeID:  p.ID,
431			Context: p.Context,
432		}
433		d, err := cc.Marshal()
434		if err != nil {
435			return nil, err
436		}
437		ents[i] = raftpb.Entry{
438			Type:  raftpb.EntryConfChange,
439			Term:  1,
440			Index: uint64(i + 1),
441			Data:  d,
442		}
443	}
444
445	commit, term := uint64(len(ents)), uint64(1)
446	hardState := raftpb.HardState{
447		Term:   term,
448		Vote:   peers[0].ID,
449		Commit: commit,
450	}
451	if err := w.Save(hardState, ents); err != nil {
452		return nil, err
453	}
454
455	b, berr := st.Save()
456	if berr != nil {
457		return nil, berr
458	}
459	confState := raftpb.ConfState{
460		Voters: nodeIDs,
461	}
462	raftSnap := raftpb.Snapshot{
463		Data: b,
464		Metadata: raftpb.SnapshotMetadata{
465			Index:     commit,
466			Term:      term,
467			ConfState: confState,
468		},
469	}
470	sn := snap.New(s.lg, s.snapDir)
471	if err := sn.SaveSnap(raftSnap); err != nil {
472		return nil, err
473	}
474	snapshot := walpb.Snapshot{Index: commit, Term: term, ConfState: &confState}
475	return &hardState, w.SaveSnapshot(snapshot)
476}
477
478func (s *v3Manager) updateCIndex(commit uint64, term uint64) error {
479	be := backend.NewDefaultBackend(s.outDbPath())
480	defer be.Close()
481
482	cindex.UpdateConsistentIndex(be.BatchTx(), commit, term, false)
483	return nil
484}
485