1// Copyright 2018 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package snapshot
16
17import (
18	"context"
19	"crypto/sha256"
20	"encoding/json"
21	"fmt"
22	"hash/crc32"
23	"io"
24	"math"
25	"os"
26	"path/filepath"
27	"reflect"
28	"time"
29
30	"github.com/coreos/etcd/clientv3"
31	"github.com/coreos/etcd/etcdserver"
32	"github.com/coreos/etcd/etcdserver/etcdserverpb"
33	"github.com/coreos/etcd/etcdserver/membership"
34	"github.com/coreos/etcd/lease"
35	"github.com/coreos/etcd/mvcc"
36	"github.com/coreos/etcd/mvcc/backend"
37	"github.com/coreos/etcd/pkg/fileutil"
38	"github.com/coreos/etcd/pkg/types"
39	"github.com/coreos/etcd/raft"
40	"github.com/coreos/etcd/raft/raftpb"
41	"github.com/coreos/etcd/snap"
42	"github.com/coreos/etcd/store"
43	"github.com/coreos/etcd/wal"
44	"github.com/coreos/etcd/wal/walpb"
45
46	bolt "github.com/coreos/bbolt"
47	"go.uber.org/zap"
48)
49
50// Manager defines snapshot methods.
51type Manager interface {
52	// Save fetches snapshot from remote etcd server and saves data
53	// to target path. If the context "ctx" is canceled or timed out,
54	// snapshot save stream will error out (e.g. context.Canceled,
55	// context.DeadlineExceeded). Make sure to specify only one endpoint
56	// in client configuration. Snapshot API must be requested to a
57	// selected node, and saved snapshot is the point-in-time state of
58	// the selected node.
59	Save(ctx context.Context, cfg clientv3.Config, dbPath string) error
60
61	// Status returns the snapshot file information.
62	Status(dbPath string) (Status, error)
63
64	// Restore restores a new etcd data directory from given snapshot
65	// file. It returns an error if specified data directory already
66	// exists, to prevent unintended data directory overwrites.
67	Restore(cfg RestoreConfig) error
68}
69
70// NewV3 returns a new snapshot Manager for v3.x snapshot.
71func NewV3(lg *zap.Logger) Manager {
72	if lg == nil {
73		lg = zap.NewExample()
74	}
75	return &v3Manager{lg: lg}
76}
77
78type v3Manager struct {
79	lg *zap.Logger
80
81	name    string
82	dbPath  string
83	walDir  string
84	snapDir string
85	cl      *membership.RaftCluster
86
87	skipHashCheck bool
88}
89
90// Save fetches snapshot from remote etcd server and saves data to target path.
91func (s *v3Manager) Save(ctx context.Context, cfg clientv3.Config, dbPath string) error {
92	if len(cfg.Endpoints) != 1 {
93		return fmt.Errorf("snapshot must be requested to one selected node, not multiple %v", cfg.Endpoints)
94	}
95	cli, err := clientv3.New(cfg)
96	if err != nil {
97		return err
98	}
99	defer cli.Close()
100
101	partpath := dbPath + ".part"
102	defer os.RemoveAll(partpath)
103
104	var f *os.File
105	f, err = os.Create(partpath)
106	if err != nil {
107		return fmt.Errorf("could not open %s (%v)", partpath, err)
108	}
109	s.lg.Info(
110		"created temporary db file",
111		zap.String("path", partpath),
112	)
113
114	now := time.Now()
115	var rd io.ReadCloser
116	rd, err = cli.Snapshot(ctx)
117	if err != nil {
118		return err
119	}
120	s.lg.Info(
121		"fetching snapshot",
122		zap.String("endpoint", cfg.Endpoints[0]),
123	)
124	if _, err = io.Copy(f, rd); err != nil {
125		return err
126	}
127	if err = fileutil.Fsync(f); err != nil {
128		return err
129	}
130	if err = f.Close(); err != nil {
131		return err
132	}
133	s.lg.Info(
134		"fetched snapshot",
135		zap.String("endpoint", cfg.Endpoints[0]),
136		zap.Duration("took", time.Since(now)),
137	)
138
139	if err = os.Rename(partpath, dbPath); err != nil {
140		return fmt.Errorf("could not rename %s to %s (%v)", partpath, dbPath, err)
141	}
142	s.lg.Info("saved", zap.String("path", dbPath))
143	return nil
144}
145
146// Status is the snapshot file status.
147type Status struct {
148	Hash      uint32 `json:"hash"`
149	Revision  int64  `json:"revision"`
150	TotalKey  int    `json:"totalKey"`
151	TotalSize int64  `json:"totalSize"`
152}
153
154// Status returns the snapshot file information.
155func (s *v3Manager) Status(dbPath string) (ds Status, err error) {
156	if _, err = os.Stat(dbPath); err != nil {
157		return ds, err
158	}
159
160	db, err := bolt.Open(dbPath, 0400, &bolt.Options{ReadOnly: true})
161	if err != nil {
162		return ds, err
163	}
164	defer db.Close()
165
166	h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
167
168	if err = db.View(func(tx *bolt.Tx) error {
169		ds.TotalSize = tx.Size()
170		c := tx.Cursor()
171		for next, _ := c.First(); next != nil; next, _ = c.Next() {
172			b := tx.Bucket(next)
173			if b == nil {
174				return fmt.Errorf("cannot get hash of bucket %s", string(next))
175			}
176			h.Write(next)
177			iskeyb := (string(next) == "key")
178			b.ForEach(func(k, v []byte) error {
179				h.Write(k)
180				h.Write(v)
181				if iskeyb {
182					rev := bytesToRev(k)
183					ds.Revision = rev.main
184				}
185				ds.TotalKey++
186				return nil
187			})
188		}
189		return nil
190	}); err != nil {
191		return ds, err
192	}
193
194	ds.Hash = h.Sum32()
195	return ds, nil
196}
197
198// RestoreConfig configures snapshot restore operation.
199type RestoreConfig struct {
200	// SnapshotPath is the path of snapshot file to restore from.
201	SnapshotPath string
202
203	// Name is the human-readable name of this member.
204	Name string
205
206	// OutputDataDir is the target data directory to save restored data.
207	// OutputDataDir should not conflict with existing etcd data directory.
208	// If OutputDataDir already exists, it will return an error to prevent
209	// unintended data directory overwrites.
210	// If empty, defaults to "[Name].etcd" if not given.
211	OutputDataDir string
212	// OutputWALDir is the target WAL data directory.
213	// If empty, defaults to "[OutputDataDir]/member/wal" if not given.
214	OutputWALDir string
215
216	// PeerURLs is a list of member's peer URLs to advertise to the rest of the cluster.
217	PeerURLs []string
218
219	// InitialCluster is the initial cluster configuration for restore bootstrap.
220	InitialCluster string
221	// InitialClusterToken is the initial cluster token for etcd cluster during restore bootstrap.
222	InitialClusterToken string
223
224	// SkipHashCheck is "true" to ignore snapshot integrity hash value
225	// (required if copied from data directory).
226	SkipHashCheck bool
227}
228
229// Restore restores a new etcd data directory from given snapshot file.
230func (s *v3Manager) Restore(cfg RestoreConfig) error {
231	pURLs, err := types.NewURLs(cfg.PeerURLs)
232	if err != nil {
233		return err
234	}
235	var ics types.URLsMap
236	ics, err = types.NewURLsMap(cfg.InitialCluster)
237	if err != nil {
238		return err
239	}
240
241	srv := etcdserver.ServerConfig{
242		Name:                cfg.Name,
243		PeerURLs:            pURLs,
244		InitialPeerURLsMap:  ics,
245		InitialClusterToken: cfg.InitialClusterToken,
246	}
247	if err = srv.VerifyBootstrap(); err != nil {
248		return err
249	}
250
251	s.cl, err = membership.NewClusterFromURLsMap(cfg.InitialClusterToken, ics)
252	if err != nil {
253		return err
254	}
255
256	dataDir := cfg.OutputDataDir
257	if dataDir == "" {
258		dataDir = cfg.Name + ".etcd"
259	}
260	if fileutil.Exist(dataDir) {
261		return fmt.Errorf("data-dir %q exists", dataDir)
262	}
263
264	walDir := cfg.OutputWALDir
265	if walDir == "" {
266		walDir = filepath.Join(dataDir, "member", "wal")
267	} else if fileutil.Exist(walDir) {
268		return fmt.Errorf("wal-dir %q exists", walDir)
269	}
270
271	s.name = cfg.Name
272	s.dbPath = cfg.SnapshotPath
273	s.walDir = walDir
274	s.snapDir = filepath.Join(dataDir, "member", "snap")
275	s.skipHashCheck = cfg.SkipHashCheck
276
277	s.lg.Info(
278		"restoring snapshot",
279		zap.String("path", s.dbPath),
280		zap.String("wal-dir", s.walDir),
281		zap.String("data-dir", dataDir),
282		zap.String("snap-dir", s.snapDir),
283	)
284	if err = s.saveDB(); err != nil {
285		return err
286	}
287	if err = s.saveWALAndSnap(); err != nil {
288		return err
289	}
290	s.lg.Info(
291		"restored snapshot",
292		zap.String("path", s.dbPath),
293		zap.String("wal-dir", s.walDir),
294		zap.String("data-dir", dataDir),
295		zap.String("snap-dir", s.snapDir),
296	)
297
298	return nil
299}
300
301// saveDB copies the database snapshot to the snapshot directory
302func (s *v3Manager) saveDB() error {
303	f, ferr := os.OpenFile(s.dbPath, os.O_RDONLY, 0600)
304	if ferr != nil {
305		return ferr
306	}
307	defer f.Close()
308
309	// get snapshot integrity hash
310	if _, err := f.Seek(-sha256.Size, io.SeekEnd); err != nil {
311		return err
312	}
313	sha := make([]byte, sha256.Size)
314	if _, err := f.Read(sha); err != nil {
315		return err
316	}
317	if _, err := f.Seek(0, io.SeekStart); err != nil {
318		return err
319	}
320
321	if err := fileutil.CreateDirAll(s.snapDir); err != nil {
322		return err
323	}
324
325	dbpath := filepath.Join(s.snapDir, "db")
326	db, dberr := os.OpenFile(dbpath, os.O_RDWR|os.O_CREATE, 0600)
327	if dberr != nil {
328		return dberr
329	}
330	if _, err := io.Copy(db, f); err != nil {
331		return err
332	}
333
334	// truncate away integrity hash, if any.
335	off, serr := db.Seek(0, io.SeekEnd)
336	if serr != nil {
337		return serr
338	}
339	hasHash := (off % 512) == sha256.Size
340	if hasHash {
341		if err := db.Truncate(off - sha256.Size); err != nil {
342			return err
343		}
344	}
345
346	if !hasHash && !s.skipHashCheck {
347		return fmt.Errorf("snapshot missing hash but --skip-hash-check=false")
348	}
349
350	if hasHash && !s.skipHashCheck {
351		// check for match
352		if _, err := db.Seek(0, io.SeekStart); err != nil {
353			return err
354		}
355		h := sha256.New()
356		if _, err := io.Copy(h, db); err != nil {
357			return err
358		}
359		dbsha := h.Sum(nil)
360		if !reflect.DeepEqual(sha, dbsha) {
361			return fmt.Errorf("expected sha256 %v, got %v", sha, dbsha)
362		}
363	}
364
365	// db hash is OK, can now modify DB so it can be part of a new cluster
366	db.Close()
367
368	commit := len(s.cl.Members())
369
370	// update consistentIndex so applies go through on etcdserver despite
371	// having a new raft instance
372	be := backend.NewDefaultBackend(dbpath)
373
374	// a lessor never timeouts leases
375	lessor := lease.NewLessor(be, math.MaxInt64)
376
377	mvs := mvcc.NewStore(be, lessor, (*initIndex)(&commit))
378	txn := mvs.Write()
379	btx := be.BatchTx()
380	del := func(k, v []byte) error {
381		txn.DeleteRange(k, nil)
382		return nil
383	}
384
385	// delete stored members from old cluster since using new members
386	btx.UnsafeForEach([]byte("members"), del)
387
388	// todo: add back new members when we start to deprecate old snap file.
389	btx.UnsafeForEach([]byte("members_removed"), del)
390
391	// trigger write-out of new consistent index
392	txn.End()
393
394	mvs.Commit()
395	mvs.Close()
396	be.Close()
397
398	return nil
399}
400
401// saveWALAndSnap creates a WAL for the initial cluster
402func (s *v3Manager) saveWALAndSnap() error {
403	if err := fileutil.CreateDirAll(s.walDir); err != nil {
404		return err
405	}
406
407	// add members again to persist them to the store we create.
408	st := store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix)
409	s.cl.SetStore(st)
410	for _, m := range s.cl.Members() {
411		s.cl.AddMember(m)
412	}
413
414	m := s.cl.MemberByName(s.name)
415	md := &etcdserverpb.Metadata{NodeID: uint64(m.ID), ClusterID: uint64(s.cl.ID())}
416	metadata, merr := md.Marshal()
417	if merr != nil {
418		return merr
419	}
420	w, walerr := wal.Create(s.walDir, metadata)
421	if walerr != nil {
422		return walerr
423	}
424	defer w.Close()
425
426	peers := make([]raft.Peer, len(s.cl.MemberIDs()))
427	for i, id := range s.cl.MemberIDs() {
428		ctx, err := json.Marshal((*s.cl).Member(id))
429		if err != nil {
430			return err
431		}
432		peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
433	}
434
435	ents := make([]raftpb.Entry, len(peers))
436	nodeIDs := make([]uint64, len(peers))
437	for i, p := range peers {
438		nodeIDs[i] = p.ID
439		cc := raftpb.ConfChange{
440			Type:    raftpb.ConfChangeAddNode,
441			NodeID:  p.ID,
442			Context: p.Context,
443		}
444		d, err := cc.Marshal()
445		if err != nil {
446			return err
447		}
448		ents[i] = raftpb.Entry{
449			Type:  raftpb.EntryConfChange,
450			Term:  1,
451			Index: uint64(i + 1),
452			Data:  d,
453		}
454	}
455
456	commit, term := uint64(len(ents)), uint64(1)
457	if err := w.Save(raftpb.HardState{
458		Term:   term,
459		Vote:   peers[0].ID,
460		Commit: commit,
461	}, ents); err != nil {
462		return err
463	}
464
465	b, berr := st.Save()
466	if berr != nil {
467		return berr
468	}
469	raftSnap := raftpb.Snapshot{
470		Data: b,
471		Metadata: raftpb.SnapshotMetadata{
472			Index: commit,
473			Term:  term,
474			ConfState: raftpb.ConfState{
475				Nodes: nodeIDs,
476			},
477		},
478	}
479	sn := snap.New(s.snapDir)
480	if err := sn.SaveSnap(raftSnap); err != nil {
481		return err
482	}
483
484	return w.SaveSnapshot(walpb.Snapshot{Index: commit, Term: term})
485}
486