1package raft 2 3import ( 4 "bytes" 5 "context" 6 "errors" 7 "fmt" 8 "io" 9 "math" 10 "path/filepath" 11 "strconv" 12 "strings" 13 "sync" 14 "sync/atomic" 15 "time" 16 17 metrics "github.com/armon/go-metrics" 18 protoio "github.com/gogo/protobuf/io" 19 proto "github.com/golang/protobuf/proto" 20 "github.com/hashicorp/errwrap" 21 log "github.com/hashicorp/go-hclog" 22 "github.com/hashicorp/go-raftchunking" 23 "github.com/hashicorp/raft" 24 "github.com/hashicorp/vault/sdk/helper/jsonutil" 25 "github.com/hashicorp/vault/sdk/helper/strutil" 26 "github.com/hashicorp/vault/sdk/physical" 27 "github.com/hashicorp/vault/sdk/plugin/pb" 28 bolt "go.etcd.io/bbolt" 29) 30 31const ( 32 deleteOp uint32 = 1 << iota 33 putOp 34 restoreCallbackOp 35 36 chunkingPrefix = "raftchunking/" 37) 38 39var ( 40 // dataBucketName is the value we use for the bucket 41 dataBucketName = []byte("data") 42 configBucketName = []byte("config") 43 latestIndexKey = []byte("latest_indexes") 44 latestConfigKey = []byte("latest_config") 45) 46 47// Verify FSM satisfies the correct interfaces 48var _ physical.Backend = (*FSM)(nil) 49var _ physical.Transactional = (*FSM)(nil) 50var _ raft.FSM = (*FSM)(nil) 51var _ raft.ConfigurationStore = (*FSM)(nil) 52 53type restoreCallback func(context.Context) error 54 55// FSMApplyResponse is returned from an FSM apply. It indicates if the apply was 56// successful or not. 57type FSMApplyResponse struct { 58 Success bool 59} 60 61// FSM is Vault's primary state storage. It writes updates to an bolt db file 62// that lives on local disk. FSM implements raft.FSM and physical.Backend 63// interfaces. 64type FSM struct { 65 // latestIndex and latestTerm must stay at the top of this struct to be 66 // properly 64-bit aligned. 67 68 // latestIndex and latestTerm are the term and index of the last log we 69 // received 70 latestIndex *uint64 71 latestTerm *uint64 72 // latestConfig is the latest server configuration we've seen 73 latestConfig atomic.Value 74 75 l sync.RWMutex 76 path string 77 logger log.Logger 78 permitPool *physical.PermitPool 79 noopRestore bool 80 81 db *bolt.DB 82 83 // retoreCb is called after we've restored a snapshot 84 restoreCb restoreCallback 85 86 // This is just used in tests to disable to storing the latest indexes and 87 // configs so we can conform to the standard backend tests, which expect to 88 // additional state in the backend. 89 storeLatestState bool 90 91 chunker *raftchunking.ChunkingConfigurationStore 92} 93 94// NewFSM constructs a FSM using the given directory 95func NewFSM(conf map[string]string, logger log.Logger) (*FSM, error) { 96 path, ok := conf["path"] 97 if !ok { 98 return nil, fmt.Errorf("'path' must be set") 99 } 100 101 dbPath := filepath.Join(path, "vault.db") 102 103 boltDB, err := bolt.Open(dbPath, 0666, &bolt.Options{Timeout: 1 * time.Second}) 104 if err != nil { 105 return nil, err 106 } 107 108 // Initialize the latest term, index, and config values 109 latestTerm := new(uint64) 110 latestIndex := new(uint64) 111 latestConfig := atomic.Value{} 112 atomic.StoreUint64(latestTerm, 0) 113 atomic.StoreUint64(latestIndex, 0) 114 latestConfig.Store((*ConfigurationValue)(nil)) 115 116 err = boltDB.Update(func(tx *bolt.Tx) error { 117 // make sure we have the necessary buckets created 118 _, err := tx.CreateBucketIfNotExists(dataBucketName) 119 if err != nil { 120 return fmt.Errorf("failed to create bucket: %v", err) 121 } 122 b, err := tx.CreateBucketIfNotExists(configBucketName) 123 if err != nil { 124 return fmt.Errorf("failed to create bucket: %v", err) 125 } 126 // Read in our latest index and term and populate it inmemory 127 val := b.Get(latestIndexKey) 128 if val != nil { 129 var latest IndexValue 130 err := proto.Unmarshal(val, &latest) 131 if err != nil { 132 return err 133 } 134 135 atomic.StoreUint64(latestTerm, latest.Term) 136 atomic.StoreUint64(latestIndex, latest.Index) 137 } 138 139 // Read in our latest config and populate it inmemory 140 val = b.Get(latestConfigKey) 141 if val != nil { 142 var latest ConfigurationValue 143 err := proto.Unmarshal(val, &latest) 144 if err != nil { 145 return err 146 } 147 148 latestConfig.Store(&latest) 149 } 150 return nil 151 }) 152 if err != nil { 153 return nil, err 154 } 155 156 storeLatestState := true 157 if _, ok := conf["doNotStoreLatestState"]; ok { 158 storeLatestState = false 159 } 160 161 f := &FSM{ 162 path: conf["path"], 163 logger: logger, 164 permitPool: physical.NewPermitPool(physical.DefaultParallelOperations), 165 166 db: boltDB, 167 latestTerm: latestTerm, 168 latestIndex: latestIndex, 169 latestConfig: latestConfig, 170 storeLatestState: storeLatestState, 171 } 172 173 f.chunker = raftchunking.NewChunkingConfigurationStore(f, &FSMChunkStorage{ 174 f: f, 175 ctx: context.Background(), 176 }) 177 178 return f, nil 179} 180 181// LatestState returns the latest index and configuration values we have seen on 182// this FSM. 183func (f *FSM) LatestState() (*IndexValue, *ConfigurationValue) { 184 return &IndexValue{ 185 Term: atomic.LoadUint64(f.latestTerm), 186 Index: atomic.LoadUint64(f.latestIndex), 187 }, f.latestConfig.Load().(*ConfigurationValue) 188} 189 190func (f *FSM) witnessIndex(i *IndexValue) { 191 seen, _ := f.LatestState() 192 if seen.Index < i.Index { 193 atomic.StoreUint64(f.latestIndex, i.Index) 194 atomic.StoreUint64(f.latestTerm, i.Term) 195 } 196} 197 198func (f *FSM) witnessSnapshot(index, term, configurationIndex uint64, configuration raft.Configuration) error { 199 var indexBytes []byte 200 latestIndex, _ := f.LatestState() 201 202 latestIndex.Index = index 203 latestIndex.Term = term 204 205 var err error 206 indexBytes, err = proto.Marshal(latestIndex) 207 if err != nil { 208 return err 209 } 210 211 protoConfig := raftConfigurationToProtoConfiguration(configurationIndex, configuration) 212 configBytes, err := proto.Marshal(protoConfig) 213 if err != nil { 214 return err 215 } 216 217 if f.storeLatestState { 218 err = f.db.Update(func(tx *bolt.Tx) error { 219 b := tx.Bucket(configBucketName) 220 err := b.Put(latestConfigKey, configBytes) 221 if err != nil { 222 return err 223 } 224 225 err = b.Put(latestIndexKey, indexBytes) 226 if err != nil { 227 return err 228 } 229 230 return nil 231 }) 232 if err != nil { 233 return err 234 } 235 } 236 237 atomic.StoreUint64(f.latestIndex, index) 238 atomic.StoreUint64(f.latestTerm, term) 239 f.latestConfig.Store(protoConfig) 240 241 return nil 242} 243 244// Delete deletes the given key from the bolt file. 245func (f *FSM) Delete(ctx context.Context, path string) error { 246 defer metrics.MeasureSince([]string{"raft", "delete"}, time.Now()) 247 248 f.permitPool.Acquire() 249 defer f.permitPool.Release() 250 251 f.l.RLock() 252 defer f.l.RUnlock() 253 254 return f.db.Update(func(tx *bolt.Tx) error { 255 return tx.Bucket(dataBucketName).Delete([]byte(path)) 256 }) 257} 258 259// Delete deletes the given key from the bolt file. 260func (f *FSM) DeletePrefix(ctx context.Context, prefix string) error { 261 defer metrics.MeasureSince([]string{"raft", "delete_prefix"}, time.Now()) 262 263 f.permitPool.Acquire() 264 defer f.permitPool.Release() 265 266 f.l.RLock() 267 defer f.l.RUnlock() 268 269 err := f.db.Update(func(tx *bolt.Tx) error { 270 // Assume bucket exists and has keys 271 c := tx.Bucket(dataBucketName).Cursor() 272 273 prefixBytes := []byte(prefix) 274 for k, _ := c.Seek(prefixBytes); k != nil && bytes.HasPrefix(k, prefixBytes); k, _ = c.Next() { 275 if err := c.Delete(); err != nil { 276 return err 277 } 278 } 279 280 return nil 281 }) 282 283 return err 284} 285 286// Get retrieves the value at the given path from the bolt file. 287func (f *FSM) Get(ctx context.Context, path string) (*physical.Entry, error) { 288 defer metrics.MeasureSince([]string{"raft", "get"}, time.Now()) 289 290 f.permitPool.Acquire() 291 defer f.permitPool.Release() 292 293 f.l.RLock() 294 defer f.l.RUnlock() 295 296 var valCopy []byte 297 var found bool 298 299 err := f.db.View(func(tx *bolt.Tx) error { 300 301 value := tx.Bucket(dataBucketName).Get([]byte(path)) 302 if value != nil { 303 found = true 304 valCopy = make([]byte, len(value)) 305 copy(valCopy, value) 306 } 307 308 return nil 309 }) 310 if err != nil { 311 return nil, err 312 } 313 if !found { 314 return nil, nil 315 } 316 317 return &physical.Entry{ 318 Key: path, 319 Value: valCopy, 320 }, nil 321} 322 323// Put writes the given entry to the bolt file. 324func (f *FSM) Put(ctx context.Context, entry *physical.Entry) error { 325 defer metrics.MeasureSince([]string{"raft", "put"}, time.Now()) 326 327 f.permitPool.Acquire() 328 defer f.permitPool.Release() 329 330 f.l.RLock() 331 defer f.l.RUnlock() 332 333 // Start a write transaction. 334 return f.db.Update(func(tx *bolt.Tx) error { 335 return tx.Bucket(dataBucketName).Put([]byte(entry.Key), entry.Value) 336 }) 337} 338 339// List retrieves the set of keys with the given prefix from the bolt file. 340func (f *FSM) List(ctx context.Context, prefix string) ([]string, error) { 341 defer metrics.MeasureSince([]string{"raft", "list"}, time.Now()) 342 343 f.permitPool.Acquire() 344 defer f.permitPool.Release() 345 346 f.l.RLock() 347 defer f.l.RUnlock() 348 349 var keys []string 350 351 err := f.db.View(func(tx *bolt.Tx) error { 352 // Assume bucket exists and has keys 353 c := tx.Bucket(dataBucketName).Cursor() 354 355 prefixBytes := []byte(prefix) 356 for k, _ := c.Seek(prefixBytes); k != nil && bytes.HasPrefix(k, prefixBytes); k, _ = c.Next() { 357 key := string(k) 358 key = strings.TrimPrefix(key, prefix) 359 if i := strings.Index(key, "/"); i == -1 { 360 // Add objects only from the current 'folder' 361 keys = append(keys, key) 362 } else { 363 // Add truncated 'folder' paths 364 keys = strutil.AppendIfMissing(keys, string(key[:i+1])) 365 } 366 } 367 368 return nil 369 }) 370 371 return keys, err 372} 373 374// Transaction writes all the operations in the provided transaction to the bolt 375// file. 376func (f *FSM) Transaction(ctx context.Context, txns []*physical.TxnEntry) error { 377 f.permitPool.Acquire() 378 defer f.permitPool.Release() 379 380 f.l.RLock() 381 defer f.l.RUnlock() 382 383 // TODO: should this be a Batch? 384 // Start a write transaction. 385 err := f.db.Update(func(tx *bolt.Tx) error { 386 b := tx.Bucket(dataBucketName) 387 for _, txn := range txns { 388 var err error 389 switch txn.Operation { 390 case physical.PutOperation: 391 err = b.Put([]byte(txn.Entry.Key), txn.Entry.Value) 392 case physical.DeleteOperation: 393 err = b.Delete([]byte(txn.Entry.Key)) 394 default: 395 return fmt.Errorf("%q is not a supported transaction operation", txn.Operation) 396 } 397 if err != nil { 398 return err 399 } 400 } 401 402 return nil 403 }) 404 return err 405} 406 407// Apply will apply a log value to the FSM. This is called from the raft 408// library. 409func (f *FSM) Apply(log *raft.Log) interface{} { 410 command := &LogData{} 411 err := proto.Unmarshal(log.Data, command) 412 if err != nil { 413 f.logger.Error("error proto unmarshaling log data", "error", err) 414 panic("error proto unmarshaling log data") 415 } 416 417 f.l.RLock() 418 defer f.l.RUnlock() 419 420 // Only advance latest pointer if this log has a higher index value than 421 // what we have seen in the past. 422 var logIndex []byte 423 latestIndex, _ := f.LatestState() 424 if latestIndex.Index < log.Index { 425 logIndex, err = proto.Marshal(&IndexValue{ 426 Term: log.Term, 427 Index: log.Index, 428 }) 429 if err != nil { 430 f.logger.Error("unable to marshal latest index", "error", err) 431 panic("unable to marshal latest index") 432 } 433 } 434 435 err = f.db.Update(func(tx *bolt.Tx) error { 436 b := tx.Bucket(dataBucketName) 437 for _, op := range command.Operations { 438 var err error 439 switch op.OpType { 440 case putOp: 441 err = b.Put([]byte(op.Key), op.Value) 442 case deleteOp: 443 err = b.Delete([]byte(op.Key)) 444 case restoreCallbackOp: 445 if f.restoreCb != nil { 446 // Kick off the restore callback function in a go routine 447 go f.restoreCb(context.Background()) 448 } 449 default: 450 return fmt.Errorf("%q is not a supported transaction operation", op.OpType) 451 } 452 if err != nil { 453 return err 454 } 455 } 456 457 // TODO: benchmark so we can know how much time this adds 458 if f.storeLatestState && len(logIndex) > 0 { 459 b := tx.Bucket(configBucketName) 460 err = b.Put(latestIndexKey, logIndex) 461 if err != nil { 462 return err 463 } 464 } 465 466 return nil 467 }) 468 if err != nil { 469 f.logger.Error("failed to store data", "error", err) 470 panic("failed to store data") 471 } 472 473 // If we advanced the latest value, update the in-memory representation too. 474 if len(logIndex) > 0 { 475 atomic.StoreUint64(f.latestTerm, log.Term) 476 atomic.StoreUint64(f.latestIndex, log.Index) 477 } 478 479 return &FSMApplyResponse{ 480 Success: true, 481 } 482} 483 484type writeErrorCloser interface { 485 io.WriteCloser 486 CloseWithError(error) error 487} 488 489// writeTo will copy the FSM's content to a remote sink. The data is written 490// twice, once for use in determining various metadata attributes of the dataset 491// (size, checksum, etc) and a second for the sink of the data. We also use a 492// proto delimited writer so we can stream proto messages to the sink. 493func (f *FSM) writeTo(ctx context.Context, metaSink writeErrorCloser, sink writeErrorCloser) { 494 protoWriter := protoio.NewDelimitedWriter(sink) 495 metadataProtoWriter := protoio.NewDelimitedWriter(metaSink) 496 497 f.l.RLock() 498 defer f.l.RUnlock() 499 500 err := f.db.View(func(tx *bolt.Tx) error { 501 b := tx.Bucket(dataBucketName) 502 503 c := b.Cursor() 504 505 // Do the first scan of the data for metadata purposes. 506 for k, v := c.First(); k != nil; k, v = c.Next() { 507 err := metadataProtoWriter.WriteMsg(&pb.StorageEntry{ 508 Key: string(k), 509 Value: v, 510 }) 511 if err != nil { 512 metaSink.CloseWithError(err) 513 return err 514 } 515 } 516 metaSink.Close() 517 518 // Do the second scan for copy purposes. 519 for k, v := c.First(); k != nil; k, v = c.Next() { 520 err := protoWriter.WriteMsg(&pb.StorageEntry{ 521 Key: string(k), 522 Value: v, 523 }) 524 if err != nil { 525 return err 526 } 527 } 528 529 return nil 530 }) 531 sink.CloseWithError(err) 532} 533 534// Snapshot implements the FSM interface. It returns a noop snapshot object. 535func (f *FSM) Snapshot() (raft.FSMSnapshot, error) { 536 return &noopSnapshotter{}, nil 537} 538 539// SetNoopRestore is used to disable restore operations on raft startup. Because 540// we are using persistent storage in our FSM we do not need to issue a restore 541// on startup. 542func (f *FSM) SetNoopRestore(enabled bool) { 543 f.l.Lock() 544 f.noopRestore = enabled 545 f.l.Unlock() 546} 547 548// Restore reads data from the provided reader and writes it into the FSM. It 549// first deletes the existing bucket to clear all existing data, then recreates 550// it so we can copy in the snapshot. 551func (f *FSM) Restore(r io.ReadCloser) error { 552 if f.noopRestore == true { 553 return nil 554 } 555 556 protoReader := protoio.NewDelimitedReader(r, math.MaxInt32) 557 defer protoReader.Close() 558 559 f.l.Lock() 560 defer f.l.Unlock() 561 562 // Start a write transaction. 563 err := f.db.Update(func(tx *bolt.Tx) error { 564 err := tx.DeleteBucket(dataBucketName) 565 if err != nil { 566 return err 567 } 568 569 b, err := tx.CreateBucket(dataBucketName) 570 if err != nil { 571 return err 572 } 573 574 for { 575 s := new(pb.StorageEntry) 576 err := protoReader.ReadMsg(s) 577 if err != nil { 578 if err == io.EOF { 579 return nil 580 } 581 return err 582 } 583 584 err = b.Put([]byte(s.Key), s.Value) 585 if err != nil { 586 return err 587 } 588 } 589 590 return nil 591 }) 592 if err != nil { 593 f.logger.Error("could not restore snapshot", "error", err) 594 return err 595 } 596 597 return nil 598} 599 600// noopSnapshotter implements the fsm.Snapshot interface. It doesn't do anything 601// since our SnapshotStore reads data out of the FSM on Open(). 602type noopSnapshotter struct{} 603 604// Persist doesn't do anything. 605func (s *noopSnapshotter) Persist(sink raft.SnapshotSink) error { 606 return nil 607} 608 609// Release doesn't do anything. 610func (s *noopSnapshotter) Release() {} 611 612// StoreConfig satisfies the raft.ConfigurationStore interface and persists the 613// latest raft server configuration to the bolt file. 614func (f *FSM) StoreConfiguration(index uint64, configuration raft.Configuration) { 615 f.l.RLock() 616 defer f.l.RUnlock() 617 618 var indexBytes []byte 619 latestIndex, _ := f.LatestState() 620 // Only write the new index if we are advancing the pointer 621 if index > latestIndex.Index { 622 latestIndex.Index = index 623 624 var err error 625 indexBytes, err = proto.Marshal(latestIndex) 626 if err != nil { 627 f.logger.Error("unable to marshal latest index", "error", err) 628 panic(fmt.Sprintf("unable to marshal latest index: %v", err)) 629 } 630 } 631 632 protoConfig := raftConfigurationToProtoConfiguration(index, configuration) 633 configBytes, err := proto.Marshal(protoConfig) 634 if err != nil { 635 f.logger.Error("unable to marshal config", "error", err) 636 panic(fmt.Sprintf("unable to marshal config: %v", err)) 637 } 638 639 if f.storeLatestState { 640 err = f.db.Update(func(tx *bolt.Tx) error { 641 b := tx.Bucket(configBucketName) 642 err := b.Put(latestConfigKey, configBytes) 643 if err != nil { 644 return err 645 } 646 647 // TODO: benchmark so we can know how much time this adds 648 if len(indexBytes) > 0 { 649 err = b.Put(latestIndexKey, indexBytes) 650 if err != nil { 651 return err 652 } 653 } 654 655 return nil 656 }) 657 if err != nil { 658 f.logger.Error("unable to store latest configuration", "error", err) 659 panic(fmt.Sprintf("unable to store latest configuration: %v", err)) 660 } 661 } 662 663 f.witnessIndex(latestIndex) 664 f.latestConfig.Store(protoConfig) 665} 666 667// raftConfigurationToProtoConfiguration converts a raft configuration object to 668// a proto value. 669func raftConfigurationToProtoConfiguration(index uint64, configuration raft.Configuration) *ConfigurationValue { 670 servers := make([]*Server, len(configuration.Servers)) 671 for i, s := range configuration.Servers { 672 servers[i] = &Server{ 673 Suffrage: int32(s.Suffrage), 674 Id: string(s.ID), 675 Address: string(s.Address), 676 } 677 } 678 return &ConfigurationValue{ 679 Index: index, 680 Servers: servers, 681 } 682} 683 684// protoConfigurationToRaftConfiguration converts a proto configuration object 685// to a raft object. 686func protoConfigurationToRaftConfiguration(configuration *ConfigurationValue) (uint64, raft.Configuration) { 687 servers := make([]raft.Server, len(configuration.Servers)) 688 for i, s := range configuration.Servers { 689 servers[i] = raft.Server{ 690 Suffrage: raft.ServerSuffrage(s.Suffrage), 691 ID: raft.ServerID(s.Id), 692 Address: raft.ServerAddress(s.Address), 693 } 694 } 695 return configuration.Index, raft.Configuration{ 696 Servers: servers, 697 } 698} 699 700type FSMChunkStorage struct { 701 f *FSM 702 ctx context.Context 703} 704 705// chunkPaths returns a disk prefix and key given chunkinfo 706func (f *FSMChunkStorage) chunkPaths(chunk *raftchunking.ChunkInfo) (string, string) { 707 prefix := fmt.Sprintf("%s%d/", chunkingPrefix, chunk.OpNum) 708 key := fmt.Sprintf("%s%d", prefix, chunk.SequenceNum) 709 return prefix, key 710} 711 712func (f *FSMChunkStorage) StoreChunk(chunk *raftchunking.ChunkInfo) (bool, error) { 713 b, err := jsonutil.EncodeJSON(chunk) 714 if err != nil { 715 return false, errwrap.Wrapf("error encoding chunk info: {{err}}", err) 716 } 717 718 prefix, key := f.chunkPaths(chunk) 719 720 entry := &physical.Entry{ 721 Key: key, 722 Value: b, 723 } 724 725 f.f.permitPool.Acquire() 726 defer f.f.permitPool.Release() 727 728 f.f.l.RLock() 729 defer f.f.l.RUnlock() 730 731 // Start a write transaction. 732 done := new(bool) 733 if err := f.f.db.Update(func(tx *bolt.Tx) error { 734 if err := tx.Bucket(dataBucketName).Put([]byte(entry.Key), entry.Value); err != nil { 735 return errwrap.Wrapf("error storing chunk info: {{err}}", err) 736 } 737 738 // Assume bucket exists and has keys 739 c := tx.Bucket(dataBucketName).Cursor() 740 741 var keys []string 742 prefixBytes := []byte(prefix) 743 for k, _ := c.Seek(prefixBytes); k != nil && bytes.HasPrefix(k, prefixBytes); k, _ = c.Next() { 744 key := string(k) 745 key = strings.TrimPrefix(key, prefix) 746 if i := strings.Index(key, "/"); i == -1 { 747 // Add objects only from the current 'folder' 748 keys = append(keys, key) 749 } else { 750 // Add truncated 'folder' paths 751 keys = strutil.AppendIfMissing(keys, string(key[:i+1])) 752 } 753 } 754 755 *done = uint32(len(keys)) == chunk.NumChunks 756 757 return nil 758 }); err != nil { 759 return false, err 760 } 761 762 return *done, nil 763} 764 765func (f *FSMChunkStorage) FinalizeOp(opNum uint64) ([]*raftchunking.ChunkInfo, error) { 766 ret, err := f.chunksForOpNum(opNum) 767 if err != nil { 768 return nil, errwrap.Wrapf("error getting chunks for op keys: {{err}}", err) 769 } 770 771 prefix, _ := f.chunkPaths(&raftchunking.ChunkInfo{OpNum: opNum}) 772 if err := f.f.DeletePrefix(f.ctx, prefix); err != nil { 773 return nil, errwrap.Wrapf("error deleting prefix after op finalization: {{err}}", err) 774 } 775 776 return ret, nil 777} 778 779func (f *FSMChunkStorage) chunksForOpNum(opNum uint64) ([]*raftchunking.ChunkInfo, error) { 780 prefix, _ := f.chunkPaths(&raftchunking.ChunkInfo{OpNum: opNum}) 781 782 opChunkKeys, err := f.f.List(f.ctx, prefix) 783 if err != nil { 784 return nil, errwrap.Wrapf("error fetching op chunk keys: {{err}}", err) 785 } 786 787 if len(opChunkKeys) == 0 { 788 return nil, nil 789 } 790 791 var ret []*raftchunking.ChunkInfo 792 793 for _, v := range opChunkKeys { 794 seqNum, err := strconv.ParseInt(v, 10, 64) 795 if err != nil { 796 return nil, errwrap.Wrapf("error converting seqnum to integer: {{err}}", err) 797 } 798 799 entry, err := f.f.Get(f.ctx, prefix+v) 800 if err != nil { 801 return nil, errwrap.Wrapf("error fetching chunkinfo: {{err}}", err) 802 } 803 804 var ci raftchunking.ChunkInfo 805 if err := jsonutil.DecodeJSON(entry.Value, &ci); err != nil { 806 return nil, errwrap.Wrapf("error decoding chunkinfo json: {{err}}", err) 807 } 808 809 if ret == nil { 810 ret = make([]*raftchunking.ChunkInfo, ci.NumChunks) 811 } 812 813 ret[seqNum] = &ci 814 } 815 816 return ret, nil 817} 818 819func (f *FSMChunkStorage) GetChunks() (raftchunking.ChunkMap, error) { 820 opNums, err := f.f.List(f.ctx, chunkingPrefix) 821 if err != nil { 822 return nil, errwrap.Wrapf("error doing recursive list for chunk saving: {{err}}", err) 823 } 824 825 if len(opNums) == 0 { 826 return nil, nil 827 } 828 829 ret := make(raftchunking.ChunkMap, len(opNums)) 830 for _, opNumStr := range opNums { 831 opNum, err := strconv.ParseInt(opNumStr, 10, 64) 832 if err != nil { 833 return nil, errwrap.Wrapf("error parsing op num during chunk saving: {{err}}", err) 834 } 835 836 opChunks, err := f.chunksForOpNum(uint64(opNum)) 837 if err != nil { 838 return nil, errwrap.Wrapf("error getting chunks for op keys during chunk saving: {{err}}", err) 839 } 840 841 ret[uint64(opNum)] = opChunks 842 } 843 844 return ret, nil 845} 846 847func (f *FSMChunkStorage) RestoreChunks(chunks raftchunking.ChunkMap) error { 848 if err := f.f.DeletePrefix(f.ctx, chunkingPrefix); err != nil { 849 return errwrap.Wrapf("error deleting prefix for chunk restoration: {{err}}", err) 850 } 851 if len(chunks) == 0 { 852 return nil 853 } 854 855 for opNum, opChunks := range chunks { 856 for _, chunk := range opChunks { 857 if chunk == nil { 858 continue 859 } 860 if chunk.OpNum != opNum { 861 return errors.New("unexpected op number in chunk") 862 } 863 if _, err := f.StoreChunk(chunk); err != nil { 864 return errwrap.Wrapf("error storing chunk during restoration: {{err}}", err) 865 } 866 } 867 } 868 869 return nil 870} 871