1package raft
2
3import (
4	"context"
5	"errors"
6	"fmt"
7	"io"
8	"io/ioutil"
9	"sync"
10
11	log "github.com/hashicorp/go-hclog"
12
13	"github.com/hashicorp/raft"
14)
15
16const (
17	// boltSnapshotID is the stable ID for any boltDB snapshot. Keeping the ID
18	// stable means there is only ever one bolt snapshot in the system
19	boltSnapshotID = "bolt-snapshot"
20)
21
22// BoltSnapshotStore implements the SnapshotStore interface and allows
23// snapshots to be made on the local disk. The main difference between this
24// store and the file store is we make the distinction between snapshots that
25// have been written by the FSM and by internal Raft operations. The former are
26// treated as noop snapshots on Persist and are read in full from the FSM on
27// Open. The latter are treated like normal file snapshots and are able to be
28// opened and applied as usual.
29type BoltSnapshotStore struct {
30	// path is the directory in which to store file based snapshots
31	path string
32	// retain is the number of file based snapshots to keep
33	retain int
34
35	// We hold a copy of the FSM so we can stream snapshots straight out of the
36	// database.
37	fsm *FSM
38
39	// fileSnapStore is used to fall back to file snapshots when the data is
40	// being written from the raft library. This currently only happens on a
41	// follower during a snapshot install RPC.
42	fileSnapStore *raft.FileSnapshotStore
43	logger        log.Logger
44}
45
46// BoltSnapshotSink implements SnapshotSink optionally choosing to write to a
47// file.
48type BoltSnapshotSink struct {
49	store  *BoltSnapshotStore
50	logger log.Logger
51	meta   raft.SnapshotMeta
52	trans  raft.Transport
53
54	fileSink raft.SnapshotSink
55	l        sync.Mutex
56	closed   bool
57}
58
59// NewBoltSnapshotStore creates a new BoltSnapshotStore based
60// on a base directory. The `retain` parameter controls how many
61// snapshots are retained. Must be at least 1.
62func NewBoltSnapshotStore(base string, retain int, logger log.Logger, fsm *FSM) (*BoltSnapshotStore, error) {
63	if retain < 1 {
64		return nil, fmt.Errorf("must retain at least one snapshot")
65	}
66	if logger == nil {
67		return nil, fmt.Errorf("no logger provided")
68	}
69
70	fileStore, err := raft.NewFileSnapshotStore(base, retain, nil)
71	if err != nil {
72		return nil, err
73	}
74
75	// Setup the store
76	store := &BoltSnapshotStore{
77		logger:        logger,
78		fsm:           fsm,
79		fileSnapStore: fileStore,
80	}
81
82	{
83		// TODO: I think this needs to be done before every NewRaft and
84		// RecoverCluster call. Not just on Factory method.
85
86		// Here we delete all the existing file based snapshots. This is necessary
87		// because we do not issue a restore on NewRaft. If a previous file snapshot
88		// had failed to apply we will be incorrectly setting the indexes. It's
89		// safer to simply delete all file snapshots on startup and rely on Raft to
90		// reconcile the FSM state.
91		if err := store.ReapSnapshots(); err != nil {
92			return nil, err
93		}
94	}
95
96	return store, nil
97}
98
99// Create is used to start a new snapshot
100func (f *BoltSnapshotStore) Create(version raft.SnapshotVersion, index, term uint64,
101	configuration raft.Configuration, configurationIndex uint64, trans raft.Transport) (raft.SnapshotSink, error) {
102	// We only support version 1 snapshots at this time.
103	if version != 1 {
104		return nil, fmt.Errorf("unsupported snapshot version %d", version)
105	}
106
107	// We are processing a snapshot, fastforward the index, term, and
108	// configuration to the latest seen by the raft system. This could include
109	// log indexes for operation types that are never sent to the FSM.
110	if err := f.fsm.witnessSnapshot(index, term, configurationIndex, configuration); err != nil {
111		return nil, err
112	}
113
114	// Create the sink
115	sink := &BoltSnapshotSink{
116		store:  f,
117		logger: f.logger,
118		meta: raft.SnapshotMeta{
119			Version:            version,
120			ID:                 boltSnapshotID,
121			Index:              index,
122			Term:               term,
123			Configuration:      configuration,
124			ConfigurationIndex: configurationIndex,
125		},
126		trans: trans,
127	}
128
129	// Done
130	return sink, nil
131}
132
133// List returns available snapshots in the store. It only returns bolt
134// snapshots. No snapshot will be returned if there are no indexes in the
135// FSM.
136func (f *BoltSnapshotStore) List() ([]*raft.SnapshotMeta, error) {
137	meta, err := f.getBoltSnapshotMeta()
138	if err != nil {
139		return nil, err
140	}
141
142	// If we haven't seen any data yet do not return a snapshot
143	if meta.Index == 0 {
144		return nil, nil
145	}
146
147	return []*raft.SnapshotMeta{meta}, nil
148}
149
150// getBoltSnapshotMeta returns the fsm's latest state and configuration.
151func (f *BoltSnapshotStore) getBoltSnapshotMeta() (*raft.SnapshotMeta, error) {
152	latestIndex, latestConfig := f.fsm.LatestState()
153	meta := &raft.SnapshotMeta{
154		Version: 1,
155		ID:      boltSnapshotID,
156		Index:   latestIndex.Index,
157		Term:    latestIndex.Term,
158	}
159
160	if latestConfig != nil {
161		index, configuration := protoConfigurationToRaftConfiguration(latestConfig)
162		meta.Configuration = configuration
163		meta.ConfigurationIndex = index
164	}
165
166	return meta, nil
167}
168
169// Open takes a snapshot ID and returns a ReadCloser for that snapshot.
170func (f *BoltSnapshotStore) Open(id string) (*raft.SnapshotMeta, io.ReadCloser, error) {
171	var readCloser io.ReadCloser
172	var meta *raft.SnapshotMeta
173	switch id {
174	case boltSnapshotID:
175
176		var err error
177		meta, err = f.getBoltSnapshotMeta()
178		if err != nil {
179			return nil, nil, err
180		}
181		// If we don't have any data return an error
182		if meta.Index == 0 {
183			return nil, nil, errors.New("no snapshot data")
184		}
185
186		// Stream data out of the FSM to calculate the size
187		var writeCloser *io.PipeWriter
188		readCloser, writeCloser = io.Pipe()
189		metaReadCloser, metaWriteCloser := io.Pipe()
190		go func() {
191			f.fsm.writeTo(context.Background(), metaWriteCloser, writeCloser)
192		}()
193
194		// Compute the size
195		n, err := io.Copy(ioutil.Discard, metaReadCloser)
196		if err != nil {
197			f.logger.Error("failed to read state file", "error", err)
198			metaReadCloser.Close()
199			readCloser.Close()
200			return nil, nil, err
201		}
202
203		meta.Size = n
204
205	default:
206		var err error
207		meta, readCloser, err = f.fileSnapStore.Open(id)
208		if err != nil {
209			return nil, nil, err
210		}
211	}
212
213	return meta, readCloser, nil
214}
215
216// ReapSnapshots reaps any snapshots beyond the retain count.
217func (f *BoltSnapshotStore) ReapSnapshots() error {
218	return f.fileSnapStore.ReapSnapshots()
219}
220
221// ID returns the ID of the snapshot, can be used with Open()
222// after the snapshot is finalized.
223func (s *BoltSnapshotSink) ID() string {
224	s.l.Lock()
225	defer s.l.Unlock()
226
227	if s.fileSink != nil {
228		return s.fileSink.ID()
229	}
230
231	return s.meta.ID
232}
233
234// Write is used to append to the state file. We write to the
235// buffered IO object to reduce the amount of context switches.
236func (s *BoltSnapshotSink) Write(b []byte) (int, error) {
237	s.l.Lock()
238	defer s.l.Unlock()
239
240	// If someone is writing to this sink then we need to create a file sink to
241	// capture the data. This currently only happens when a follower is being
242	// sent a snapshot.
243	if s.fileSink == nil {
244		fileSink, err := s.store.fileSnapStore.Create(s.meta.Version, s.meta.Index, s.meta.Term, s.meta.Configuration, s.meta.ConfigurationIndex, s.trans)
245		if err != nil {
246			return 0, err
247		}
248		s.fileSink = fileSink
249	}
250
251	return s.fileSink.Write(b)
252}
253
254// Close is used to indicate a successful end.
255func (s *BoltSnapshotSink) Close() error {
256	s.l.Lock()
257	defer s.l.Unlock()
258
259	// Make sure close is idempotent
260	if s.closed {
261		return nil
262	}
263	s.closed = true
264
265	if s.fileSink != nil {
266		return s.fileSink.Close()
267	}
268
269	return nil
270}
271
272// Cancel is used to indicate an unsuccessful end.
273func (s *BoltSnapshotSink) Cancel() error {
274	s.l.Lock()
275	defer s.l.Unlock()
276
277	// Make sure close is idempotent
278	if s.closed {
279		return nil
280	}
281	s.closed = true
282
283	if s.fileSink != nil {
284		return s.fileSink.Cancel()
285	}
286
287	return nil
288}
289