1package raft
2
3import (
4	"bytes"
5	"context"
6	"errors"
7	"fmt"
8	"io"
9	"math"
10	"path/filepath"
11	"strconv"
12	"strings"
13	"sync"
14	"sync/atomic"
15	"time"
16
17	metrics "github.com/armon/go-metrics"
18	protoio "github.com/gogo/protobuf/io"
19	proto "github.com/golang/protobuf/proto"
20	"github.com/hashicorp/errwrap"
21	log "github.com/hashicorp/go-hclog"
22	"github.com/hashicorp/go-raftchunking"
23	"github.com/hashicorp/raft"
24	"github.com/hashicorp/vault/sdk/helper/jsonutil"
25	"github.com/hashicorp/vault/sdk/helper/strutil"
26	"github.com/hashicorp/vault/sdk/physical"
27	"github.com/hashicorp/vault/sdk/plugin/pb"
28	bolt "go.etcd.io/bbolt"
29)
30
31const (
32	deleteOp uint32 = 1 << iota
33	putOp
34	restoreCallbackOp
35
36	chunkingPrefix = "raftchunking/"
37)
38
39var (
40	// dataBucketName is the value we use for the bucket
41	dataBucketName   = []byte("data")
42	configBucketName = []byte("config")
43	latestIndexKey   = []byte("latest_indexes")
44	latestConfigKey  = []byte("latest_config")
45)
46
47// Verify FSM satisfies the correct interfaces
48var _ physical.Backend = (*FSM)(nil)
49var _ physical.Transactional = (*FSM)(nil)
50var _ raft.FSM = (*FSM)(nil)
51var _ raft.ConfigurationStore = (*FSM)(nil)
52
53type restoreCallback func(context.Context) error
54
55// FSMApplyResponse is returned from an FSM apply. It indicates if the apply was
56// successful or not.
57type FSMApplyResponse struct {
58	Success bool
59}
60
61// FSM is Vault's primary state storage. It writes updates to an bolt db file
62// that lives on local disk. FSM implements raft.FSM and physical.Backend
63// interfaces.
64type FSM struct {
65	// latestIndex and latestTerm must stay at the top of this struct to be
66	// properly 64-bit aligned.
67
68	// latestIndex and latestTerm are the term and index of the last log we
69	// received
70	latestIndex *uint64
71	latestTerm  *uint64
72	// latestConfig is the latest server configuration we've seen
73	latestConfig atomic.Value
74
75	l           sync.RWMutex
76	path        string
77	logger      log.Logger
78	permitPool  *physical.PermitPool
79	noopRestore bool
80
81	db *bolt.DB
82
83	// retoreCb is called after we've restored a snapshot
84	restoreCb restoreCallback
85
86	// This is just used in tests to disable to storing the latest indexes and
87	// configs so we can conform to the standard backend tests, which expect to
88	// additional state in the backend.
89	storeLatestState bool
90
91	chunker *raftchunking.ChunkingConfigurationStore
92}
93
94// NewFSM constructs a FSM using the given directory
95func NewFSM(conf map[string]string, logger log.Logger) (*FSM, error) {
96	path, ok := conf["path"]
97	if !ok {
98		return nil, fmt.Errorf("'path' must be set")
99	}
100
101	dbPath := filepath.Join(path, "vault.db")
102
103	boltDB, err := bolt.Open(dbPath, 0666, &bolt.Options{Timeout: 1 * time.Second})
104	if err != nil {
105		return nil, err
106	}
107
108	// Initialize the latest term, index, and config values
109	latestTerm := new(uint64)
110	latestIndex := new(uint64)
111	latestConfig := atomic.Value{}
112	atomic.StoreUint64(latestTerm, 0)
113	atomic.StoreUint64(latestIndex, 0)
114	latestConfig.Store((*ConfigurationValue)(nil))
115
116	err = boltDB.Update(func(tx *bolt.Tx) error {
117		// make sure we have the necessary buckets created
118		_, err := tx.CreateBucketIfNotExists(dataBucketName)
119		if err != nil {
120			return fmt.Errorf("failed to create bucket: %v", err)
121		}
122		b, err := tx.CreateBucketIfNotExists(configBucketName)
123		if err != nil {
124			return fmt.Errorf("failed to create bucket: %v", err)
125		}
126		// Read in our latest index and term and populate it inmemory
127		val := b.Get(latestIndexKey)
128		if val != nil {
129			var latest IndexValue
130			err := proto.Unmarshal(val, &latest)
131			if err != nil {
132				return err
133			}
134
135			atomic.StoreUint64(latestTerm, latest.Term)
136			atomic.StoreUint64(latestIndex, latest.Index)
137		}
138
139		// Read in our latest config and populate it inmemory
140		val = b.Get(latestConfigKey)
141		if val != nil {
142			var latest ConfigurationValue
143			err := proto.Unmarshal(val, &latest)
144			if err != nil {
145				return err
146			}
147
148			latestConfig.Store(&latest)
149		}
150		return nil
151	})
152	if err != nil {
153		return nil, err
154	}
155
156	storeLatestState := true
157	if _, ok := conf["doNotStoreLatestState"]; ok {
158		storeLatestState = false
159	}
160
161	f := &FSM{
162		path:       conf["path"],
163		logger:     logger,
164		permitPool: physical.NewPermitPool(physical.DefaultParallelOperations),
165
166		db:               boltDB,
167		latestTerm:       latestTerm,
168		latestIndex:      latestIndex,
169		latestConfig:     latestConfig,
170		storeLatestState: storeLatestState,
171	}
172
173	f.chunker = raftchunking.NewChunkingConfigurationStore(f, &FSMChunkStorage{
174		f:   f,
175		ctx: context.Background(),
176	})
177
178	return f, nil
179}
180
181// LatestState returns the latest index and configuration values we have seen on
182// this FSM.
183func (f *FSM) LatestState() (*IndexValue, *ConfigurationValue) {
184	return &IndexValue{
185		Term:  atomic.LoadUint64(f.latestTerm),
186		Index: atomic.LoadUint64(f.latestIndex),
187	}, f.latestConfig.Load().(*ConfigurationValue)
188}
189
190func (f *FSM) witnessIndex(i *IndexValue) {
191	seen, _ := f.LatestState()
192	if seen.Index < i.Index {
193		atomic.StoreUint64(f.latestIndex, i.Index)
194		atomic.StoreUint64(f.latestTerm, i.Term)
195	}
196}
197
198func (f *FSM) witnessSnapshot(index, term, configurationIndex uint64, configuration raft.Configuration) error {
199	var indexBytes []byte
200	latestIndex, _ := f.LatestState()
201
202	latestIndex.Index = index
203	latestIndex.Term = term
204
205	var err error
206	indexBytes, err = proto.Marshal(latestIndex)
207	if err != nil {
208		return err
209	}
210
211	protoConfig := raftConfigurationToProtoConfiguration(configurationIndex, configuration)
212	configBytes, err := proto.Marshal(protoConfig)
213	if err != nil {
214		return err
215	}
216
217	if f.storeLatestState {
218		err = f.db.Update(func(tx *bolt.Tx) error {
219			b := tx.Bucket(configBucketName)
220			err := b.Put(latestConfigKey, configBytes)
221			if err != nil {
222				return err
223			}
224
225			err = b.Put(latestIndexKey, indexBytes)
226			if err != nil {
227				return err
228			}
229
230			return nil
231		})
232		if err != nil {
233			return err
234		}
235	}
236
237	atomic.StoreUint64(f.latestIndex, index)
238	atomic.StoreUint64(f.latestTerm, term)
239	f.latestConfig.Store(protoConfig)
240
241	return nil
242}
243
244// Delete deletes the given key from the bolt file.
245func (f *FSM) Delete(ctx context.Context, path string) error {
246	defer metrics.MeasureSince([]string{"raft", "delete"}, time.Now())
247
248	f.permitPool.Acquire()
249	defer f.permitPool.Release()
250
251	f.l.RLock()
252	defer f.l.RUnlock()
253
254	return f.db.Update(func(tx *bolt.Tx) error {
255		return tx.Bucket(dataBucketName).Delete([]byte(path))
256	})
257}
258
259// Delete deletes the given key from the bolt file.
260func (f *FSM) DeletePrefix(ctx context.Context, prefix string) error {
261	defer metrics.MeasureSince([]string{"raft", "delete_prefix"}, time.Now())
262
263	f.permitPool.Acquire()
264	defer f.permitPool.Release()
265
266	f.l.RLock()
267	defer f.l.RUnlock()
268
269	err := f.db.Update(func(tx *bolt.Tx) error {
270		// Assume bucket exists and has keys
271		c := tx.Bucket(dataBucketName).Cursor()
272
273		prefixBytes := []byte(prefix)
274		for k, _ := c.Seek(prefixBytes); k != nil && bytes.HasPrefix(k, prefixBytes); k, _ = c.Next() {
275			if err := c.Delete(); err != nil {
276				return err
277			}
278		}
279
280		return nil
281	})
282
283	return err
284}
285
286// Get retrieves the value at the given path from the bolt file.
287func (f *FSM) Get(ctx context.Context, path string) (*physical.Entry, error) {
288	defer metrics.MeasureSince([]string{"raft", "get"}, time.Now())
289
290	f.permitPool.Acquire()
291	defer f.permitPool.Release()
292
293	f.l.RLock()
294	defer f.l.RUnlock()
295
296	var valCopy []byte
297	var found bool
298
299	err := f.db.View(func(tx *bolt.Tx) error {
300
301		value := tx.Bucket(dataBucketName).Get([]byte(path))
302		if value != nil {
303			found = true
304			valCopy = make([]byte, len(value))
305			copy(valCopy, value)
306		}
307
308		return nil
309	})
310	if err != nil {
311		return nil, err
312	}
313	if !found {
314		return nil, nil
315	}
316
317	return &physical.Entry{
318		Key:   path,
319		Value: valCopy,
320	}, nil
321}
322
323// Put writes the given entry to the bolt file.
324func (f *FSM) Put(ctx context.Context, entry *physical.Entry) error {
325	defer metrics.MeasureSince([]string{"raft", "put"}, time.Now())
326
327	f.permitPool.Acquire()
328	defer f.permitPool.Release()
329
330	f.l.RLock()
331	defer f.l.RUnlock()
332
333	// Start a write transaction.
334	return f.db.Update(func(tx *bolt.Tx) error {
335		return tx.Bucket(dataBucketName).Put([]byte(entry.Key), entry.Value)
336	})
337}
338
339// List retrieves the set of keys with the given prefix from the bolt file.
340func (f *FSM) List(ctx context.Context, prefix string) ([]string, error) {
341	defer metrics.MeasureSince([]string{"raft", "list"}, time.Now())
342
343	f.permitPool.Acquire()
344	defer f.permitPool.Release()
345
346	f.l.RLock()
347	defer f.l.RUnlock()
348
349	var keys []string
350
351	err := f.db.View(func(tx *bolt.Tx) error {
352		// Assume bucket exists and has keys
353		c := tx.Bucket(dataBucketName).Cursor()
354
355		prefixBytes := []byte(prefix)
356		for k, _ := c.Seek(prefixBytes); k != nil && bytes.HasPrefix(k, prefixBytes); k, _ = c.Next() {
357			key := string(k)
358			key = strings.TrimPrefix(key, prefix)
359			if i := strings.Index(key, "/"); i == -1 {
360				// Add objects only from the current 'folder'
361				keys = append(keys, key)
362			} else {
363				// Add truncated 'folder' paths
364				keys = strutil.AppendIfMissing(keys, string(key[:i+1]))
365			}
366		}
367
368		return nil
369	})
370
371	return keys, err
372}
373
374// Transaction writes all the operations in the provided transaction to the bolt
375// file.
376func (f *FSM) Transaction(ctx context.Context, txns []*physical.TxnEntry) error {
377	f.permitPool.Acquire()
378	defer f.permitPool.Release()
379
380	f.l.RLock()
381	defer f.l.RUnlock()
382
383	// TODO: should this be a Batch?
384	// Start a write transaction.
385	err := f.db.Update(func(tx *bolt.Tx) error {
386		b := tx.Bucket(dataBucketName)
387		for _, txn := range txns {
388			var err error
389			switch txn.Operation {
390			case physical.PutOperation:
391				err = b.Put([]byte(txn.Entry.Key), txn.Entry.Value)
392			case physical.DeleteOperation:
393				err = b.Delete([]byte(txn.Entry.Key))
394			default:
395				return fmt.Errorf("%q is not a supported transaction operation", txn.Operation)
396			}
397			if err != nil {
398				return err
399			}
400		}
401
402		return nil
403	})
404	return err
405}
406
407// Apply will apply a log value to the FSM. This is called from the raft
408// library.
409func (f *FSM) Apply(log *raft.Log) interface{} {
410	command := &LogData{}
411	err := proto.Unmarshal(log.Data, command)
412	if err != nil {
413		f.logger.Error("error proto unmarshaling log data", "error", err)
414		panic("error proto unmarshaling log data")
415	}
416
417	f.l.RLock()
418	defer f.l.RUnlock()
419
420	// Only advance latest pointer if this log has a higher index value than
421	// what we have seen in the past.
422	var logIndex []byte
423	latestIndex, _ := f.LatestState()
424	if latestIndex.Index < log.Index {
425		logIndex, err = proto.Marshal(&IndexValue{
426			Term:  log.Term,
427			Index: log.Index,
428		})
429		if err != nil {
430			f.logger.Error("unable to marshal latest index", "error", err)
431			panic("unable to marshal latest index")
432		}
433	}
434
435	err = f.db.Update(func(tx *bolt.Tx) error {
436		b := tx.Bucket(dataBucketName)
437		for _, op := range command.Operations {
438			var err error
439			switch op.OpType {
440			case putOp:
441				err = b.Put([]byte(op.Key), op.Value)
442			case deleteOp:
443				err = b.Delete([]byte(op.Key))
444			case restoreCallbackOp:
445				if f.restoreCb != nil {
446					// Kick off the restore callback function in a go routine
447					go f.restoreCb(context.Background())
448				}
449			default:
450				return fmt.Errorf("%q is not a supported transaction operation", op.OpType)
451			}
452			if err != nil {
453				return err
454			}
455		}
456
457		// TODO: benchmark so we can know how much time this adds
458		if f.storeLatestState && len(logIndex) > 0 {
459			b := tx.Bucket(configBucketName)
460			err = b.Put(latestIndexKey, logIndex)
461			if err != nil {
462				return err
463			}
464		}
465
466		return nil
467	})
468	if err != nil {
469		f.logger.Error("failed to store data", "error", err)
470		panic("failed to store data")
471	}
472
473	// If we advanced the latest value, update the in-memory representation too.
474	if len(logIndex) > 0 {
475		atomic.StoreUint64(f.latestTerm, log.Term)
476		atomic.StoreUint64(f.latestIndex, log.Index)
477	}
478
479	return &FSMApplyResponse{
480		Success: true,
481	}
482}
483
484type writeErrorCloser interface {
485	io.WriteCloser
486	CloseWithError(error) error
487}
488
489// writeTo will copy the FSM's content to a remote sink. The data is written
490// twice, once for use in determining various metadata attributes of the dataset
491// (size, checksum, etc) and a second for the sink of the data. We also use a
492// proto delimited writer so we can stream proto messages to the sink.
493func (f *FSM) writeTo(ctx context.Context, metaSink writeErrorCloser, sink writeErrorCloser) {
494	protoWriter := protoio.NewDelimitedWriter(sink)
495	metadataProtoWriter := protoio.NewDelimitedWriter(metaSink)
496
497	f.l.RLock()
498	defer f.l.RUnlock()
499
500	err := f.db.View(func(tx *bolt.Tx) error {
501		b := tx.Bucket(dataBucketName)
502
503		c := b.Cursor()
504
505		// Do the first scan of the data for metadata purposes.
506		for k, v := c.First(); k != nil; k, v = c.Next() {
507			err := metadataProtoWriter.WriteMsg(&pb.StorageEntry{
508				Key:   string(k),
509				Value: v,
510			})
511			if err != nil {
512				metaSink.CloseWithError(err)
513				return err
514			}
515		}
516		metaSink.Close()
517
518		// Do the second scan for copy purposes.
519		for k, v := c.First(); k != nil; k, v = c.Next() {
520			err := protoWriter.WriteMsg(&pb.StorageEntry{
521				Key:   string(k),
522				Value: v,
523			})
524			if err != nil {
525				return err
526			}
527		}
528
529		return nil
530	})
531	sink.CloseWithError(err)
532}
533
534// Snapshot implements the FSM interface. It returns a noop snapshot object.
535func (f *FSM) Snapshot() (raft.FSMSnapshot, error) {
536	return &noopSnapshotter{}, nil
537}
538
539// SetNoopRestore is used to disable restore operations on raft startup. Because
540// we are using persistent storage in our FSM we do not need to issue a restore
541// on startup.
542func (f *FSM) SetNoopRestore(enabled bool) {
543	f.l.Lock()
544	f.noopRestore = enabled
545	f.l.Unlock()
546}
547
548// Restore reads data from the provided reader and writes it into the FSM. It
549// first deletes the existing bucket to clear all existing data, then recreates
550// it so we can copy in the snapshot.
551func (f *FSM) Restore(r io.ReadCloser) error {
552	if f.noopRestore == true {
553		return nil
554	}
555
556	protoReader := protoio.NewDelimitedReader(r, math.MaxInt32)
557	defer protoReader.Close()
558
559	f.l.Lock()
560	defer f.l.Unlock()
561
562	// Start a write transaction.
563	err := f.db.Update(func(tx *bolt.Tx) error {
564		err := tx.DeleteBucket(dataBucketName)
565		if err != nil {
566			return err
567		}
568
569		b, err := tx.CreateBucket(dataBucketName)
570		if err != nil {
571			return err
572		}
573
574		for {
575			s := new(pb.StorageEntry)
576			err := protoReader.ReadMsg(s)
577			if err != nil {
578				if err == io.EOF {
579					return nil
580				}
581				return err
582			}
583
584			err = b.Put([]byte(s.Key), s.Value)
585			if err != nil {
586				return err
587			}
588		}
589
590		return nil
591	})
592	if err != nil {
593		f.logger.Error("could not restore snapshot", "error", err)
594		return err
595	}
596
597	return nil
598}
599
600// noopSnapshotter implements the fsm.Snapshot interface. It doesn't do anything
601// since our SnapshotStore reads data out of the FSM on Open().
602type noopSnapshotter struct{}
603
604// Persist doesn't do anything.
605func (s *noopSnapshotter) Persist(sink raft.SnapshotSink) error {
606	return nil
607}
608
609// Release doesn't do anything.
610func (s *noopSnapshotter) Release() {}
611
612// StoreConfig satisfies the raft.ConfigurationStore interface and persists the
613// latest raft server configuration to the bolt file.
614func (f *FSM) StoreConfiguration(index uint64, configuration raft.Configuration) {
615	f.l.RLock()
616	defer f.l.RUnlock()
617
618	var indexBytes []byte
619	latestIndex, _ := f.LatestState()
620	// Only write the new index if we are advancing the pointer
621	if index > latestIndex.Index {
622		latestIndex.Index = index
623
624		var err error
625		indexBytes, err = proto.Marshal(latestIndex)
626		if err != nil {
627			f.logger.Error("unable to marshal latest index", "error", err)
628			panic(fmt.Sprintf("unable to marshal latest index: %v", err))
629		}
630	}
631
632	protoConfig := raftConfigurationToProtoConfiguration(index, configuration)
633	configBytes, err := proto.Marshal(protoConfig)
634	if err != nil {
635		f.logger.Error("unable to marshal config", "error", err)
636		panic(fmt.Sprintf("unable to marshal config: %v", err))
637	}
638
639	if f.storeLatestState {
640		err = f.db.Update(func(tx *bolt.Tx) error {
641			b := tx.Bucket(configBucketName)
642			err := b.Put(latestConfigKey, configBytes)
643			if err != nil {
644				return err
645			}
646
647			// TODO: benchmark so we can know how much time this adds
648			if len(indexBytes) > 0 {
649				err = b.Put(latestIndexKey, indexBytes)
650				if err != nil {
651					return err
652				}
653			}
654
655			return nil
656		})
657		if err != nil {
658			f.logger.Error("unable to store latest configuration", "error", err)
659			panic(fmt.Sprintf("unable to store latest configuration: %v", err))
660		}
661	}
662
663	f.witnessIndex(latestIndex)
664	f.latestConfig.Store(protoConfig)
665}
666
667// raftConfigurationToProtoConfiguration converts a raft configuration object to
668// a proto value.
669func raftConfigurationToProtoConfiguration(index uint64, configuration raft.Configuration) *ConfigurationValue {
670	servers := make([]*Server, len(configuration.Servers))
671	for i, s := range configuration.Servers {
672		servers[i] = &Server{
673			Suffrage: int32(s.Suffrage),
674			Id:       string(s.ID),
675			Address:  string(s.Address),
676		}
677	}
678	return &ConfigurationValue{
679		Index:   index,
680		Servers: servers,
681	}
682}
683
684// protoConfigurationToRaftConfiguration converts a proto configuration object
685// to a raft object.
686func protoConfigurationToRaftConfiguration(configuration *ConfigurationValue) (uint64, raft.Configuration) {
687	servers := make([]raft.Server, len(configuration.Servers))
688	for i, s := range configuration.Servers {
689		servers[i] = raft.Server{
690			Suffrage: raft.ServerSuffrage(s.Suffrage),
691			ID:       raft.ServerID(s.Id),
692			Address:  raft.ServerAddress(s.Address),
693		}
694	}
695	return configuration.Index, raft.Configuration{
696		Servers: servers,
697	}
698}
699
700type FSMChunkStorage struct {
701	f   *FSM
702	ctx context.Context
703}
704
705// chunkPaths returns a disk prefix and key given chunkinfo
706func (f *FSMChunkStorage) chunkPaths(chunk *raftchunking.ChunkInfo) (string, string) {
707	prefix := fmt.Sprintf("%s%d/", chunkingPrefix, chunk.OpNum)
708	key := fmt.Sprintf("%s%d", prefix, chunk.SequenceNum)
709	return prefix, key
710}
711
712func (f *FSMChunkStorage) StoreChunk(chunk *raftchunking.ChunkInfo) (bool, error) {
713	b, err := jsonutil.EncodeJSON(chunk)
714	if err != nil {
715		return false, errwrap.Wrapf("error encoding chunk info: {{err}}", err)
716	}
717
718	prefix, key := f.chunkPaths(chunk)
719
720	entry := &physical.Entry{
721		Key:   key,
722		Value: b,
723	}
724
725	f.f.permitPool.Acquire()
726	defer f.f.permitPool.Release()
727
728	f.f.l.RLock()
729	defer f.f.l.RUnlock()
730
731	// Start a write transaction.
732	done := new(bool)
733	if err := f.f.db.Update(func(tx *bolt.Tx) error {
734		if err := tx.Bucket(dataBucketName).Put([]byte(entry.Key), entry.Value); err != nil {
735			return errwrap.Wrapf("error storing chunk info: {{err}}", err)
736		}
737
738		// Assume bucket exists and has keys
739		c := tx.Bucket(dataBucketName).Cursor()
740
741		var keys []string
742		prefixBytes := []byte(prefix)
743		for k, _ := c.Seek(prefixBytes); k != nil && bytes.HasPrefix(k, prefixBytes); k, _ = c.Next() {
744			key := string(k)
745			key = strings.TrimPrefix(key, prefix)
746			if i := strings.Index(key, "/"); i == -1 {
747				// Add objects only from the current 'folder'
748				keys = append(keys, key)
749			} else {
750				// Add truncated 'folder' paths
751				keys = strutil.AppendIfMissing(keys, string(key[:i+1]))
752			}
753		}
754
755		*done = uint32(len(keys)) == chunk.NumChunks
756
757		return nil
758	}); err != nil {
759		return false, err
760	}
761
762	return *done, nil
763}
764
765func (f *FSMChunkStorage) FinalizeOp(opNum uint64) ([]*raftchunking.ChunkInfo, error) {
766	ret, err := f.chunksForOpNum(opNum)
767	if err != nil {
768		return nil, errwrap.Wrapf("error getting chunks for op keys: {{err}}", err)
769	}
770
771	prefix, _ := f.chunkPaths(&raftchunking.ChunkInfo{OpNum: opNum})
772	if err := f.f.DeletePrefix(f.ctx, prefix); err != nil {
773		return nil, errwrap.Wrapf("error deleting prefix after op finalization: {{err}}", err)
774	}
775
776	return ret, nil
777}
778
779func (f *FSMChunkStorage) chunksForOpNum(opNum uint64) ([]*raftchunking.ChunkInfo, error) {
780	prefix, _ := f.chunkPaths(&raftchunking.ChunkInfo{OpNum: opNum})
781
782	opChunkKeys, err := f.f.List(f.ctx, prefix)
783	if err != nil {
784		return nil, errwrap.Wrapf("error fetching op chunk keys: {{err}}", err)
785	}
786
787	if len(opChunkKeys) == 0 {
788		return nil, nil
789	}
790
791	var ret []*raftchunking.ChunkInfo
792
793	for _, v := range opChunkKeys {
794		seqNum, err := strconv.ParseInt(v, 10, 64)
795		if err != nil {
796			return nil, errwrap.Wrapf("error converting seqnum to integer: {{err}}", err)
797		}
798
799		entry, err := f.f.Get(f.ctx, prefix+v)
800		if err != nil {
801			return nil, errwrap.Wrapf("error fetching chunkinfo: {{err}}", err)
802		}
803
804		var ci raftchunking.ChunkInfo
805		if err := jsonutil.DecodeJSON(entry.Value, &ci); err != nil {
806			return nil, errwrap.Wrapf("error decoding chunkinfo json: {{err}}", err)
807		}
808
809		if ret == nil {
810			ret = make([]*raftchunking.ChunkInfo, ci.NumChunks)
811		}
812
813		ret[seqNum] = &ci
814	}
815
816	return ret, nil
817}
818
819func (f *FSMChunkStorage) GetChunks() (raftchunking.ChunkMap, error) {
820	opNums, err := f.f.List(f.ctx, chunkingPrefix)
821	if err != nil {
822		return nil, errwrap.Wrapf("error doing recursive list for chunk saving: {{err}}", err)
823	}
824
825	if len(opNums) == 0 {
826		return nil, nil
827	}
828
829	ret := make(raftchunking.ChunkMap, len(opNums))
830	for _, opNumStr := range opNums {
831		opNum, err := strconv.ParseInt(opNumStr, 10, 64)
832		if err != nil {
833			return nil, errwrap.Wrapf("error parsing op num during chunk saving: {{err}}", err)
834		}
835
836		opChunks, err := f.chunksForOpNum(uint64(opNum))
837		if err != nil {
838			return nil, errwrap.Wrapf("error getting chunks for op keys during chunk saving: {{err}}", err)
839		}
840
841		ret[uint64(opNum)] = opChunks
842	}
843
844	return ret, nil
845}
846
847func (f *FSMChunkStorage) RestoreChunks(chunks raftchunking.ChunkMap) error {
848	if err := f.f.DeletePrefix(f.ctx, chunkingPrefix); err != nil {
849		return errwrap.Wrapf("error deleting prefix for chunk restoration: {{err}}", err)
850	}
851	if len(chunks) == 0 {
852		return nil
853	}
854
855	for opNum, opChunks := range chunks {
856		for _, chunk := range opChunks {
857			if chunk == nil {
858				continue
859			}
860			if chunk.OpNum != opNum {
861				return errors.New("unexpected op number in chunk")
862			}
863			if _, err := f.StoreChunk(chunk); err != nil {
864				return errwrap.Wrapf("error storing chunk during restoration: {{err}}", err)
865			}
866		}
867	}
868
869	return nil
870}
871