1package raft
2
3import (
4	"bufio"
5	"bytes"
6	"encoding/json"
7	"fmt"
8	"hash"
9	"hash/crc64"
10	"io"
11	"io/ioutil"
12	"os"
13	"path/filepath"
14	"runtime"
15	"sort"
16	"strings"
17	"time"
18
19	hclog "github.com/hashicorp/go-hclog"
20)
21
22const (
23	testPath      = "permTest"
24	snapPath      = "snapshots"
25	metaFilePath  = "meta.json"
26	stateFilePath = "state.bin"
27	tmpSuffix     = ".tmp"
28)
29
30// FileSnapshotStore implements the SnapshotStore interface and allows
31// snapshots to be made on the local disk.
32type FileSnapshotStore struct {
33	path   string
34	retain int
35	logger hclog.Logger
36
37	// noSync, if true, skips crash-safe file fsync api calls.
38	// It's a private field, only used in testing
39	noSync bool
40}
41
42type snapMetaSlice []*fileSnapshotMeta
43
44// FileSnapshotSink implements SnapshotSink with a file.
45type FileSnapshotSink struct {
46	store     *FileSnapshotStore
47	logger    hclog.Logger
48	dir       string
49	parentDir string
50	meta      fileSnapshotMeta
51
52	noSync bool
53
54	stateFile *os.File
55	stateHash hash.Hash64
56	buffered  *bufio.Writer
57
58	closed bool
59}
60
61// fileSnapshotMeta is stored on disk. We also put a CRC
62// on disk so that we can verify the snapshot.
63type fileSnapshotMeta struct {
64	SnapshotMeta
65	CRC []byte
66}
67
68// bufferedFile is returned when we open a snapshot. This way
69// reads are buffered and the file still gets closed.
70type bufferedFile struct {
71	bh *bufio.Reader
72	fh *os.File
73}
74
75func (b *bufferedFile) Read(p []byte) (n int, err error) {
76	return b.bh.Read(p)
77}
78
79func (b *bufferedFile) Close() error {
80	return b.fh.Close()
81}
82
83// NewFileSnapshotStoreWithLogger creates a new FileSnapshotStore based
84// on a base directory. The `retain` parameter controls how many
85// snapshots are retained. Must be at least 1.
86func NewFileSnapshotStoreWithLogger(base string, retain int, logger hclog.Logger) (*FileSnapshotStore, error) {
87	if retain < 1 {
88		return nil, fmt.Errorf("must retain at least one snapshot")
89	}
90	if logger == nil {
91		logger = hclog.New(&hclog.LoggerOptions{
92			Name:   "snapshot",
93			Output: hclog.DefaultOutput,
94			Level:  hclog.DefaultLevel,
95		})
96	}
97
98	// Ensure our path exists
99	path := filepath.Join(base, snapPath)
100	if err := os.MkdirAll(path, 0755); err != nil && !os.IsExist(err) {
101		return nil, fmt.Errorf("snapshot path not accessible: %v", err)
102	}
103
104	// Setup the store
105	store := &FileSnapshotStore{
106		path:   path,
107		retain: retain,
108		logger: logger,
109	}
110
111	// Do a permissions test
112	if err := store.testPermissions(); err != nil {
113		return nil, fmt.Errorf("permissions test failed: %v", err)
114	}
115	return store, nil
116}
117
118// NewFileSnapshotStore creates a new FileSnapshotStore based
119// on a base directory. The `retain` parameter controls how many
120// snapshots are retained. Must be at least 1.
121func NewFileSnapshotStore(base string, retain int, logOutput io.Writer) (*FileSnapshotStore, error) {
122	if logOutput == nil {
123		logOutput = os.Stderr
124	}
125	return NewFileSnapshotStoreWithLogger(base, retain, hclog.New(&hclog.LoggerOptions{
126		Name:   "snapshot",
127		Output: logOutput,
128		Level:  hclog.DefaultLevel,
129	}))
130}
131
132// testPermissions tries to touch a file in our path to see if it works.
133func (f *FileSnapshotStore) testPermissions() error {
134	path := filepath.Join(f.path, testPath)
135	fh, err := os.Create(path)
136	if err != nil {
137		return err
138	}
139
140	if err = fh.Close(); err != nil {
141		return err
142	}
143
144	if err = os.Remove(path); err != nil {
145		return err
146	}
147	return nil
148}
149
150// snapshotName generates a name for the snapshot.
151func snapshotName(term, index uint64) string {
152	now := time.Now()
153	msec := now.UnixNano() / int64(time.Millisecond)
154	return fmt.Sprintf("%d-%d-%d", term, index, msec)
155}
156
157// Create is used to start a new snapshot
158func (f *FileSnapshotStore) Create(version SnapshotVersion, index, term uint64,
159	configuration Configuration, configurationIndex uint64, trans Transport) (SnapshotSink, error) {
160	// We only support version 1 snapshots at this time.
161	if version != 1 {
162		return nil, fmt.Errorf("unsupported snapshot version %d", version)
163	}
164
165	// Create a new path
166	name := snapshotName(term, index)
167	path := filepath.Join(f.path, name+tmpSuffix)
168	f.logger.Info("creating new snapshot", "path", path)
169
170	// Make the directory
171	if err := os.MkdirAll(path, 0755); err != nil {
172		f.logger.Error("failed to make snapshot directly", "error", err)
173		return nil, err
174	}
175
176	// Create the sink
177	sink := &FileSnapshotSink{
178		store:     f,
179		logger:    f.logger,
180		dir:       path,
181		parentDir: f.path,
182		noSync:    f.noSync,
183		meta: fileSnapshotMeta{
184			SnapshotMeta: SnapshotMeta{
185				Version:            version,
186				ID:                 name,
187				Index:              index,
188				Term:               term,
189				Peers:              encodePeers(configuration, trans),
190				Configuration:      configuration,
191				ConfigurationIndex: configurationIndex,
192			},
193			CRC: nil,
194		},
195	}
196
197	// Write out the meta data
198	if err := sink.writeMeta(); err != nil {
199		f.logger.Error("failed to write metadata", "error", err)
200		return nil, err
201	}
202
203	// Open the state file
204	statePath := filepath.Join(path, stateFilePath)
205	fh, err := os.Create(statePath)
206	if err != nil {
207		f.logger.Error("failed to create state file", "error", err)
208		return nil, err
209	}
210	sink.stateFile = fh
211
212	// Create a CRC64 hash
213	sink.stateHash = crc64.New(crc64.MakeTable(crc64.ECMA))
214
215	// Wrap both the hash and file in a MultiWriter with buffering
216	multi := io.MultiWriter(sink.stateFile, sink.stateHash)
217	sink.buffered = bufio.NewWriter(multi)
218
219	// Done
220	return sink, nil
221}
222
223// List returns available snapshots in the store.
224func (f *FileSnapshotStore) List() ([]*SnapshotMeta, error) {
225	// Get the eligible snapshots
226	snapshots, err := f.getSnapshots()
227	if err != nil {
228		f.logger.Error("failed to get snapshots", "error", err)
229		return nil, err
230	}
231
232	var snapMeta []*SnapshotMeta
233	for _, meta := range snapshots {
234		snapMeta = append(snapMeta, &meta.SnapshotMeta)
235		if len(snapMeta) == f.retain {
236			break
237		}
238	}
239	return snapMeta, nil
240}
241
242// getSnapshots returns all the known snapshots.
243func (f *FileSnapshotStore) getSnapshots() ([]*fileSnapshotMeta, error) {
244	// Get the eligible snapshots
245	snapshots, err := ioutil.ReadDir(f.path)
246	if err != nil {
247		f.logger.Error("failed to scan snapshot directory", "error", err)
248		return nil, err
249	}
250
251	// Populate the metadata
252	var snapMeta []*fileSnapshotMeta
253	for _, snap := range snapshots {
254		// Ignore any files
255		if !snap.IsDir() {
256			continue
257		}
258
259		// Ignore any temporary snapshots
260		dirName := snap.Name()
261		if strings.HasSuffix(dirName, tmpSuffix) {
262			f.logger.Warn("found temporary snapshot", "name", dirName)
263			continue
264		}
265
266		// Try to read the meta data
267		meta, err := f.readMeta(dirName)
268		if err != nil {
269			f.logger.Warn("failed to read metadata", "name", dirName, "error", err)
270			continue
271		}
272
273		// Make sure we can understand this version.
274		if meta.Version < SnapshotVersionMin || meta.Version > SnapshotVersionMax {
275			f.logger.Warn("snapshot version not supported", "name", dirName, "version", meta.Version)
276			continue
277		}
278
279		// Append, but only return up to the retain count
280		snapMeta = append(snapMeta, meta)
281	}
282
283	// Sort the snapshot, reverse so we get new -> old
284	sort.Sort(sort.Reverse(snapMetaSlice(snapMeta)))
285
286	return snapMeta, nil
287}
288
289// readMeta is used to read the meta data for a given named backup
290func (f *FileSnapshotStore) readMeta(name string) (*fileSnapshotMeta, error) {
291	// Open the meta file
292	metaPath := filepath.Join(f.path, name, metaFilePath)
293	fh, err := os.Open(metaPath)
294	if err != nil {
295		return nil, err
296	}
297	defer fh.Close()
298
299	// Buffer the file IO
300	buffered := bufio.NewReader(fh)
301
302	// Read in the JSON
303	meta := &fileSnapshotMeta{}
304	dec := json.NewDecoder(buffered)
305	if err := dec.Decode(meta); err != nil {
306		return nil, err
307	}
308	return meta, nil
309}
310
311// Open takes a snapshot ID and returns a ReadCloser for that snapshot.
312func (f *FileSnapshotStore) Open(id string) (*SnapshotMeta, io.ReadCloser, error) {
313	// Get the metadata
314	meta, err := f.readMeta(id)
315	if err != nil {
316		f.logger.Error("failed to get meta data to open snapshot", "error", err)
317		return nil, nil, err
318	}
319
320	// Open the state file
321	statePath := filepath.Join(f.path, id, stateFilePath)
322	fh, err := os.Open(statePath)
323	if err != nil {
324		f.logger.Error("failed to open state file", "error", err)
325		return nil, nil, err
326	}
327
328	// Create a CRC64 hash
329	stateHash := crc64.New(crc64.MakeTable(crc64.ECMA))
330
331	// Compute the hash
332	_, err = io.Copy(stateHash, fh)
333	if err != nil {
334		f.logger.Error("failed to read state file", "error", err)
335		fh.Close()
336		return nil, nil, err
337	}
338
339	// Verify the hash
340	computed := stateHash.Sum(nil)
341	if bytes.Compare(meta.CRC, computed) != 0 {
342		f.logger.Error("CRC checksum failed", "stored", meta.CRC, "computed", computed)
343		fh.Close()
344		return nil, nil, fmt.Errorf("CRC mismatch")
345	}
346
347	// Seek to the start
348	if _, err := fh.Seek(0, 0); err != nil {
349		f.logger.Error("state file seek failed", "error", err)
350		fh.Close()
351		return nil, nil, err
352	}
353
354	// Return a buffered file
355	buffered := &bufferedFile{
356		bh: bufio.NewReader(fh),
357		fh: fh,
358	}
359
360	return &meta.SnapshotMeta, buffered, nil
361}
362
363// ReapSnapshots reaps any snapshots beyond the retain count.
364func (f *FileSnapshotStore) ReapSnapshots() error {
365	snapshots, err := f.getSnapshots()
366	if err != nil {
367		f.logger.Error("failed to get snapshots", "error", err)
368		return err
369	}
370
371	for i := f.retain; i < len(snapshots); i++ {
372		path := filepath.Join(f.path, snapshots[i].ID)
373		f.logger.Info("reaping snapshot", "path", path)
374		if err := os.RemoveAll(path); err != nil {
375			f.logger.Error("failed to reap snapshot", "path", path, "error", err)
376			return err
377		}
378	}
379	return nil
380}
381
382// ID returns the ID of the snapshot, can be used with Open()
383// after the snapshot is finalized.
384func (s *FileSnapshotSink) ID() string {
385	return s.meta.ID
386}
387
388// Write is used to append to the state file. We write to the
389// buffered IO object to reduce the amount of context switches.
390func (s *FileSnapshotSink) Write(b []byte) (int, error) {
391	return s.buffered.Write(b)
392}
393
394// Close is used to indicate a successful end.
395func (s *FileSnapshotSink) Close() error {
396	// Make sure close is idempotent
397	if s.closed {
398		return nil
399	}
400	s.closed = true
401
402	// Close the open handles
403	if err := s.finalize(); err != nil {
404		s.logger.Error("failed to finalize snapshot", "error", err)
405		if delErr := os.RemoveAll(s.dir); delErr != nil {
406			s.logger.Error("failed to delete temporary snapshot directory", "path", s.dir, "error", delErr)
407			return delErr
408		}
409		return err
410	}
411
412	// Write out the meta data
413	if err := s.writeMeta(); err != nil {
414		s.logger.Error("failed to write metadata", "error", err)
415		return err
416	}
417
418	// Move the directory into place
419	newPath := strings.TrimSuffix(s.dir, tmpSuffix)
420	if err := os.Rename(s.dir, newPath); err != nil {
421		s.logger.Error("failed to move snapshot into place", "error", err)
422		return err
423	}
424
425	if !s.noSync && runtime.GOOS != "windows" { // skipping fsync for directory entry edits on Windows, only needed for *nix style file systems
426		parentFH, err := os.Open(s.parentDir)
427		defer parentFH.Close()
428		if err != nil {
429			s.logger.Error("failed to open snapshot parent directory", "path", s.parentDir, "error", err)
430			return err
431		}
432
433		if err = parentFH.Sync(); err != nil {
434			s.logger.Error("failed syncing parent directory", "path", s.parentDir, "error", err)
435			return err
436		}
437	}
438
439	// Reap any old snapshots
440	if err := s.store.ReapSnapshots(); err != nil {
441		return err
442	}
443
444	return nil
445}
446
447// Cancel is used to indicate an unsuccessful end.
448func (s *FileSnapshotSink) Cancel() error {
449	// Make sure close is idempotent
450	if s.closed {
451		return nil
452	}
453	s.closed = true
454
455	// Close the open handles
456	if err := s.finalize(); err != nil {
457		s.logger.Error("failed to finalize snapshot", "error", err)
458		return err
459	}
460
461	// Attempt to remove all artifacts
462	return os.RemoveAll(s.dir)
463}
464
465// finalize is used to close all of our resources.
466func (s *FileSnapshotSink) finalize() error {
467	// Flush any remaining data
468	if err := s.buffered.Flush(); err != nil {
469		return err
470	}
471
472	// Sync to force fsync to disk
473	if !s.noSync {
474		if err := s.stateFile.Sync(); err != nil {
475			return err
476		}
477	}
478
479	// Get the file size
480	stat, statErr := s.stateFile.Stat()
481
482	// Close the file
483	if err := s.stateFile.Close(); err != nil {
484		return err
485	}
486
487	// Set the file size, check after we close
488	if statErr != nil {
489		return statErr
490	}
491	s.meta.Size = stat.Size()
492
493	// Set the CRC
494	s.meta.CRC = s.stateHash.Sum(nil)
495	return nil
496}
497
498// writeMeta is used to write out the metadata we have.
499func (s *FileSnapshotSink) writeMeta() error {
500	var err error
501	// Open the meta file
502	metaPath := filepath.Join(s.dir, metaFilePath)
503	var fh *os.File
504	fh, err = os.Create(metaPath)
505	if err != nil {
506		return err
507	}
508	defer fh.Close()
509
510	// Buffer the file IO
511	buffered := bufio.NewWriter(fh)
512
513	// Write out as JSON
514	enc := json.NewEncoder(buffered)
515	if err = enc.Encode(&s.meta); err != nil {
516		return err
517	}
518
519	if err = buffered.Flush(); err != nil {
520		return err
521	}
522
523	if !s.noSync {
524		if err = fh.Sync(); err != nil {
525			return err
526		}
527	}
528
529	return nil
530}
531
532// Implement the sort interface for []*fileSnapshotMeta.
533func (s snapMetaSlice) Len() int {
534	return len(s)
535}
536
537func (s snapMetaSlice) Less(i, j int) bool {
538	if s[i].Term != s[j].Term {
539		return s[i].Term < s[j].Term
540	}
541	if s[i].Index != s[j].Index {
542		return s[i].Index < s[j].Index
543	}
544	return s[i].ID < s[j].ID
545}
546
547func (s snapMetaSlice) Swap(i, j int) {
548	s[i], s[j] = s[j], s[i]
549}
550