1package raft 2 3import ( 4 "bufio" 5 "bytes" 6 "encoding/json" 7 "fmt" 8 "hash" 9 "hash/crc64" 10 "io" 11 "io/ioutil" 12 "os" 13 "path/filepath" 14 "runtime" 15 "sort" 16 "strings" 17 "time" 18 19 hclog "github.com/hashicorp/go-hclog" 20) 21 22const ( 23 testPath = "permTest" 24 snapPath = "snapshots" 25 metaFilePath = "meta.json" 26 stateFilePath = "state.bin" 27 tmpSuffix = ".tmp" 28) 29 30// FileSnapshotStore implements the SnapshotStore interface and allows 31// snapshots to be made on the local disk. 32type FileSnapshotStore struct { 33 path string 34 retain int 35 logger hclog.Logger 36 37 // noSync, if true, skips crash-safe file fsync api calls. 38 // It's a private field, only used in testing 39 noSync bool 40} 41 42type snapMetaSlice []*fileSnapshotMeta 43 44// FileSnapshotSink implements SnapshotSink with a file. 45type FileSnapshotSink struct { 46 store *FileSnapshotStore 47 logger hclog.Logger 48 dir string 49 parentDir string 50 meta fileSnapshotMeta 51 52 noSync bool 53 54 stateFile *os.File 55 stateHash hash.Hash64 56 buffered *bufio.Writer 57 58 closed bool 59} 60 61// fileSnapshotMeta is stored on disk. We also put a CRC 62// on disk so that we can verify the snapshot. 63type fileSnapshotMeta struct { 64 SnapshotMeta 65 CRC []byte 66} 67 68// bufferedFile is returned when we open a snapshot. This way 69// reads are buffered and the file still gets closed. 70type bufferedFile struct { 71 bh *bufio.Reader 72 fh *os.File 73} 74 75func (b *bufferedFile) Read(p []byte) (n int, err error) { 76 return b.bh.Read(p) 77} 78 79func (b *bufferedFile) Close() error { 80 return b.fh.Close() 81} 82 83// NewFileSnapshotStoreWithLogger creates a new FileSnapshotStore based 84// on a base directory. The `retain` parameter controls how many 85// snapshots are retained. Must be at least 1. 86func NewFileSnapshotStoreWithLogger(base string, retain int, logger hclog.Logger) (*FileSnapshotStore, error) { 87 if retain < 1 { 88 return nil, fmt.Errorf("must retain at least one snapshot") 89 } 90 if logger == nil { 91 logger = hclog.New(&hclog.LoggerOptions{ 92 Name: "snapshot", 93 Output: hclog.DefaultOutput, 94 Level: hclog.DefaultLevel, 95 }) 96 } 97 98 // Ensure our path exists 99 path := filepath.Join(base, snapPath) 100 if err := os.MkdirAll(path, 0755); err != nil && !os.IsExist(err) { 101 return nil, fmt.Errorf("snapshot path not accessible: %v", err) 102 } 103 104 // Setup the store 105 store := &FileSnapshotStore{ 106 path: path, 107 retain: retain, 108 logger: logger, 109 } 110 111 // Do a permissions test 112 if err := store.testPermissions(); err != nil { 113 return nil, fmt.Errorf("permissions test failed: %v", err) 114 } 115 return store, nil 116} 117 118// NewFileSnapshotStore creates a new FileSnapshotStore based 119// on a base directory. The `retain` parameter controls how many 120// snapshots are retained. Must be at least 1. 121func NewFileSnapshotStore(base string, retain int, logOutput io.Writer) (*FileSnapshotStore, error) { 122 if logOutput == nil { 123 logOutput = os.Stderr 124 } 125 return NewFileSnapshotStoreWithLogger(base, retain, hclog.New(&hclog.LoggerOptions{ 126 Name: "snapshot", 127 Output: logOutput, 128 Level: hclog.DefaultLevel, 129 })) 130} 131 132// testPermissions tries to touch a file in our path to see if it works. 133func (f *FileSnapshotStore) testPermissions() error { 134 path := filepath.Join(f.path, testPath) 135 fh, err := os.Create(path) 136 if err != nil { 137 return err 138 } 139 140 if err = fh.Close(); err != nil { 141 return err 142 } 143 144 if err = os.Remove(path); err != nil { 145 return err 146 } 147 return nil 148} 149 150// snapshotName generates a name for the snapshot. 151func snapshotName(term, index uint64) string { 152 now := time.Now() 153 msec := now.UnixNano() / int64(time.Millisecond) 154 return fmt.Sprintf("%d-%d-%d", term, index, msec) 155} 156 157// Create is used to start a new snapshot 158func (f *FileSnapshotStore) Create(version SnapshotVersion, index, term uint64, 159 configuration Configuration, configurationIndex uint64, trans Transport) (SnapshotSink, error) { 160 // We only support version 1 snapshots at this time. 161 if version != 1 { 162 return nil, fmt.Errorf("unsupported snapshot version %d", version) 163 } 164 165 // Create a new path 166 name := snapshotName(term, index) 167 path := filepath.Join(f.path, name+tmpSuffix) 168 f.logger.Info("creating new snapshot", "path", path) 169 170 // Make the directory 171 if err := os.MkdirAll(path, 0755); err != nil { 172 f.logger.Error("failed to make snapshot directly", "error", err) 173 return nil, err 174 } 175 176 // Create the sink 177 sink := &FileSnapshotSink{ 178 store: f, 179 logger: f.logger, 180 dir: path, 181 parentDir: f.path, 182 noSync: f.noSync, 183 meta: fileSnapshotMeta{ 184 SnapshotMeta: SnapshotMeta{ 185 Version: version, 186 ID: name, 187 Index: index, 188 Term: term, 189 Peers: encodePeers(configuration, trans), 190 Configuration: configuration, 191 ConfigurationIndex: configurationIndex, 192 }, 193 CRC: nil, 194 }, 195 } 196 197 // Write out the meta data 198 if err := sink.writeMeta(); err != nil { 199 f.logger.Error("failed to write metadata", "error", err) 200 return nil, err 201 } 202 203 // Open the state file 204 statePath := filepath.Join(path, stateFilePath) 205 fh, err := os.Create(statePath) 206 if err != nil { 207 f.logger.Error("failed to create state file", "error", err) 208 return nil, err 209 } 210 sink.stateFile = fh 211 212 // Create a CRC64 hash 213 sink.stateHash = crc64.New(crc64.MakeTable(crc64.ECMA)) 214 215 // Wrap both the hash and file in a MultiWriter with buffering 216 multi := io.MultiWriter(sink.stateFile, sink.stateHash) 217 sink.buffered = bufio.NewWriter(multi) 218 219 // Done 220 return sink, nil 221} 222 223// List returns available snapshots in the store. 224func (f *FileSnapshotStore) List() ([]*SnapshotMeta, error) { 225 // Get the eligible snapshots 226 snapshots, err := f.getSnapshots() 227 if err != nil { 228 f.logger.Error("failed to get snapshots", "error", err) 229 return nil, err 230 } 231 232 var snapMeta []*SnapshotMeta 233 for _, meta := range snapshots { 234 snapMeta = append(snapMeta, &meta.SnapshotMeta) 235 if len(snapMeta) == f.retain { 236 break 237 } 238 } 239 return snapMeta, nil 240} 241 242// getSnapshots returns all the known snapshots. 243func (f *FileSnapshotStore) getSnapshots() ([]*fileSnapshotMeta, error) { 244 // Get the eligible snapshots 245 snapshots, err := ioutil.ReadDir(f.path) 246 if err != nil { 247 f.logger.Error("failed to scan snapshot directory", "error", err) 248 return nil, err 249 } 250 251 // Populate the metadata 252 var snapMeta []*fileSnapshotMeta 253 for _, snap := range snapshots { 254 // Ignore any files 255 if !snap.IsDir() { 256 continue 257 } 258 259 // Ignore any temporary snapshots 260 dirName := snap.Name() 261 if strings.HasSuffix(dirName, tmpSuffix) { 262 f.logger.Warn("found temporary snapshot", "name", dirName) 263 continue 264 } 265 266 // Try to read the meta data 267 meta, err := f.readMeta(dirName) 268 if err != nil { 269 f.logger.Warn("failed to read metadata", "name", dirName, "error", err) 270 continue 271 } 272 273 // Make sure we can understand this version. 274 if meta.Version < SnapshotVersionMin || meta.Version > SnapshotVersionMax { 275 f.logger.Warn("snapshot version not supported", "name", dirName, "version", meta.Version) 276 continue 277 } 278 279 // Append, but only return up to the retain count 280 snapMeta = append(snapMeta, meta) 281 } 282 283 // Sort the snapshot, reverse so we get new -> old 284 sort.Sort(sort.Reverse(snapMetaSlice(snapMeta))) 285 286 return snapMeta, nil 287} 288 289// readMeta is used to read the meta data for a given named backup 290func (f *FileSnapshotStore) readMeta(name string) (*fileSnapshotMeta, error) { 291 // Open the meta file 292 metaPath := filepath.Join(f.path, name, metaFilePath) 293 fh, err := os.Open(metaPath) 294 if err != nil { 295 return nil, err 296 } 297 defer fh.Close() 298 299 // Buffer the file IO 300 buffered := bufio.NewReader(fh) 301 302 // Read in the JSON 303 meta := &fileSnapshotMeta{} 304 dec := json.NewDecoder(buffered) 305 if err := dec.Decode(meta); err != nil { 306 return nil, err 307 } 308 return meta, nil 309} 310 311// Open takes a snapshot ID and returns a ReadCloser for that snapshot. 312func (f *FileSnapshotStore) Open(id string) (*SnapshotMeta, io.ReadCloser, error) { 313 // Get the metadata 314 meta, err := f.readMeta(id) 315 if err != nil { 316 f.logger.Error("failed to get meta data to open snapshot", "error", err) 317 return nil, nil, err 318 } 319 320 // Open the state file 321 statePath := filepath.Join(f.path, id, stateFilePath) 322 fh, err := os.Open(statePath) 323 if err != nil { 324 f.logger.Error("failed to open state file", "error", err) 325 return nil, nil, err 326 } 327 328 // Create a CRC64 hash 329 stateHash := crc64.New(crc64.MakeTable(crc64.ECMA)) 330 331 // Compute the hash 332 _, err = io.Copy(stateHash, fh) 333 if err != nil { 334 f.logger.Error("failed to read state file", "error", err) 335 fh.Close() 336 return nil, nil, err 337 } 338 339 // Verify the hash 340 computed := stateHash.Sum(nil) 341 if bytes.Compare(meta.CRC, computed) != 0 { 342 f.logger.Error("CRC checksum failed", "stored", meta.CRC, "computed", computed) 343 fh.Close() 344 return nil, nil, fmt.Errorf("CRC mismatch") 345 } 346 347 // Seek to the start 348 if _, err := fh.Seek(0, 0); err != nil { 349 f.logger.Error("state file seek failed", "error", err) 350 fh.Close() 351 return nil, nil, err 352 } 353 354 // Return a buffered file 355 buffered := &bufferedFile{ 356 bh: bufio.NewReader(fh), 357 fh: fh, 358 } 359 360 return &meta.SnapshotMeta, buffered, nil 361} 362 363// ReapSnapshots reaps any snapshots beyond the retain count. 364func (f *FileSnapshotStore) ReapSnapshots() error { 365 snapshots, err := f.getSnapshots() 366 if err != nil { 367 f.logger.Error("failed to get snapshots", "error", err) 368 return err 369 } 370 371 for i := f.retain; i < len(snapshots); i++ { 372 path := filepath.Join(f.path, snapshots[i].ID) 373 f.logger.Info("reaping snapshot", "path", path) 374 if err := os.RemoveAll(path); err != nil { 375 f.logger.Error("failed to reap snapshot", "path", path, "error", err) 376 return err 377 } 378 } 379 return nil 380} 381 382// ID returns the ID of the snapshot, can be used with Open() 383// after the snapshot is finalized. 384func (s *FileSnapshotSink) ID() string { 385 return s.meta.ID 386} 387 388// Write is used to append to the state file. We write to the 389// buffered IO object to reduce the amount of context switches. 390func (s *FileSnapshotSink) Write(b []byte) (int, error) { 391 return s.buffered.Write(b) 392} 393 394// Close is used to indicate a successful end. 395func (s *FileSnapshotSink) Close() error { 396 // Make sure close is idempotent 397 if s.closed { 398 return nil 399 } 400 s.closed = true 401 402 // Close the open handles 403 if err := s.finalize(); err != nil { 404 s.logger.Error("failed to finalize snapshot", "error", err) 405 if delErr := os.RemoveAll(s.dir); delErr != nil { 406 s.logger.Error("failed to delete temporary snapshot directory", "path", s.dir, "error", delErr) 407 return delErr 408 } 409 return err 410 } 411 412 // Write out the meta data 413 if err := s.writeMeta(); err != nil { 414 s.logger.Error("failed to write metadata", "error", err) 415 return err 416 } 417 418 // Move the directory into place 419 newPath := strings.TrimSuffix(s.dir, tmpSuffix) 420 if err := os.Rename(s.dir, newPath); err != nil { 421 s.logger.Error("failed to move snapshot into place", "error", err) 422 return err 423 } 424 425 if !s.noSync && runtime.GOOS != "windows" { // skipping fsync for directory entry edits on Windows, only needed for *nix style file systems 426 parentFH, err := os.Open(s.parentDir) 427 defer parentFH.Close() 428 if err != nil { 429 s.logger.Error("failed to open snapshot parent directory", "path", s.parentDir, "error", err) 430 return err 431 } 432 433 if err = parentFH.Sync(); err != nil { 434 s.logger.Error("failed syncing parent directory", "path", s.parentDir, "error", err) 435 return err 436 } 437 } 438 439 // Reap any old snapshots 440 if err := s.store.ReapSnapshots(); err != nil { 441 return err 442 } 443 444 return nil 445} 446 447// Cancel is used to indicate an unsuccessful end. 448func (s *FileSnapshotSink) Cancel() error { 449 // Make sure close is idempotent 450 if s.closed { 451 return nil 452 } 453 s.closed = true 454 455 // Close the open handles 456 if err := s.finalize(); err != nil { 457 s.logger.Error("failed to finalize snapshot", "error", err) 458 return err 459 } 460 461 // Attempt to remove all artifacts 462 return os.RemoveAll(s.dir) 463} 464 465// finalize is used to close all of our resources. 466func (s *FileSnapshotSink) finalize() error { 467 // Flush any remaining data 468 if err := s.buffered.Flush(); err != nil { 469 return err 470 } 471 472 // Sync to force fsync to disk 473 if !s.noSync { 474 if err := s.stateFile.Sync(); err != nil { 475 return err 476 } 477 } 478 479 // Get the file size 480 stat, statErr := s.stateFile.Stat() 481 482 // Close the file 483 if err := s.stateFile.Close(); err != nil { 484 return err 485 } 486 487 // Set the file size, check after we close 488 if statErr != nil { 489 return statErr 490 } 491 s.meta.Size = stat.Size() 492 493 // Set the CRC 494 s.meta.CRC = s.stateHash.Sum(nil) 495 return nil 496} 497 498// writeMeta is used to write out the metadata we have. 499func (s *FileSnapshotSink) writeMeta() error { 500 var err error 501 // Open the meta file 502 metaPath := filepath.Join(s.dir, metaFilePath) 503 var fh *os.File 504 fh, err = os.Create(metaPath) 505 if err != nil { 506 return err 507 } 508 defer fh.Close() 509 510 // Buffer the file IO 511 buffered := bufio.NewWriter(fh) 512 513 // Write out as JSON 514 enc := json.NewEncoder(buffered) 515 if err = enc.Encode(&s.meta); err != nil { 516 return err 517 } 518 519 if err = buffered.Flush(); err != nil { 520 return err 521 } 522 523 if !s.noSync { 524 if err = fh.Sync(); err != nil { 525 return err 526 } 527 } 528 529 return nil 530} 531 532// Implement the sort interface for []*fileSnapshotMeta. 533func (s snapMetaSlice) Len() int { 534 return len(s) 535} 536 537func (s snapMetaSlice) Less(i, j int) bool { 538 if s[i].Term != s[j].Term { 539 return s[i].Term < s[j].Term 540 } 541 if s[i].Index != s[j].Index { 542 return s[i].Index < s[j].Index 543 } 544 return s[i].ID < s[j].ID 545} 546 547func (s snapMetaSlice) Swap(i, j int) { 548 s[i], s[j] = s[j], s[i] 549} 550