1package vfscache 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "os" 9 "sync" 10 "time" 11 12 "github.com/pkg/errors" 13 "github.com/rclone/rclone/fs" 14 "github.com/rclone/rclone/fs/fserrors" 15 "github.com/rclone/rclone/fs/operations" 16 "github.com/rclone/rclone/lib/file" 17 "github.com/rclone/rclone/lib/ranges" 18 "github.com/rclone/rclone/vfs/vfscache/downloaders" 19 "github.com/rclone/rclone/vfs/vfscache/writeback" 20) 21 22// NB as Cache and Item are tightly linked it is necessary to have a 23// total lock ordering between them. So Cache.mu must always be 24// taken before Item.mu to avoid deadlocks. 25// 26// Cache may call into Item but care is needed if Item calls Cache 27// 28// A lot of the Cache methods do not require locking, these include 29// 30// - Cache.toOSPath 31// - Cache.toOSPathMeta 32// - Cache.createItemDir 33// - Cache.objectFingerprint 34// - Cache.AddVirtual 35 36// NB Item and downloader are tightly linked so it is necessary to 37// have a total lock ordering between them. downloader.mu must always 38// be taken before Item.mu. downloader may call into Item but Item may 39// **not** call downloader methods with Item.mu held 40 41// NB Item and writeback are tightly linked so it is necessary to 42// have a total lock ordering between them. writeback.mu must always 43// be taken before Item.mu. writeback may call into Item but Item may 44// **not** call writeback methods with Item.mu held 45 46// LL Item reset is invoked by cache cleaner for synchronous recovery 47// from ENOSPC errors. The reset operation removes the cache file and 48// closes/reopens the downloaders. Although most parts of reset and 49// other item operations are done with the item mutex held, the mutex 50// is released during fd.WriteAt and downloaders calls. We use preAccess 51// and postAccess calls to serialize reset and other item operations. 52 53// Item is stored in the item map 54// 55// The Info field is written to the backing store to store status 56type Item struct { 57 // read only 58 c *Cache // cache this is part of 59 mu sync.Mutex // protect the variables 60 cond *sync.Cond // synchronize with cache cleaner 61 name string // name in the VFS 62 opens int // number of times file is open 63 downloaders *downloaders.Downloaders // a record of the downloaders in action - may be nil 64 o fs.Object // object we are caching - may be nil 65 fd *os.File // handle we are using to read and write to the file 66 modified bool // set if the file has been modified since the last Open 67 info Info // info about the file to persist to backing store 68 writeBackID writeback.Handle // id of any writebacks in progress 69 pendingAccesses int // number of threads - cache reset not allowed if not zero 70 beingReset bool // cache cleaner is resetting the cache file, access not allowed 71} 72 73// Info is persisted to backing store 74type Info struct { 75 ModTime time.Time // last time file was modified 76 ATime time.Time // last time file was accessed 77 Size int64 // size of the file 78 Rs ranges.Ranges // which parts of the file are present 79 Fingerprint string // fingerprint of remote object 80 Dirty bool // set if the backing file has been modified 81} 82 83// Items are a slice of *Item ordered by ATime 84type Items []*Item 85 86// ResetResult reports the actual action taken in the Reset function and reason 87type ResetResult int 88 89// Constants used to report actual action taken in the Reset function and reason 90const ( 91 SkippedDirty ResetResult = iota // Dirty item cannot be reset 92 SkippedPendingAccess // Reset pending access can lead to deadlock 93 SkippedEmpty // Reset empty item does not save space 94 RemovedNotInUse // Item not used. Remove instead of reset 95 ResetFailed // Reset failed with an error 96 ResetComplete // Reset completed successfully 97) 98 99func (rr ResetResult) String() string { 100 return [...]string{"Dirty item skipped", "In-access item skipped", "Empty item skipped", 101 "Not-in-use item removed", "Item reset failed", "Item reset completed"}[rr] 102} 103 104func (v Items) Len() int { return len(v) } 105func (v Items) Swap(i, j int) { v[i], v[j] = v[j], v[i] } 106func (v Items) Less(i, j int) bool { 107 if i == j { 108 return false 109 } 110 iItem := v[i] 111 jItem := v[j] 112 iItem.mu.Lock() 113 defer iItem.mu.Unlock() 114 jItem.mu.Lock() 115 defer jItem.mu.Unlock() 116 117 return iItem.info.ATime.Before(jItem.info.ATime) 118} 119 120// clean the item after its cache file has been deleted 121func (info *Info) clean() { 122 *info = Info{} 123 info.ModTime = time.Now() 124 info.ATime = info.ModTime 125} 126 127// StoreFn is called back with an object after it has been uploaded 128type StoreFn func(fs.Object) 129 130// newItem returns an item for the cache 131func newItem(c *Cache, name string) (item *Item) { 132 now := time.Now() 133 item = &Item{ 134 c: c, 135 name: name, 136 info: Info{ 137 ModTime: now, 138 ATime: now, 139 }, 140 } 141 item.cond = sync.NewCond(&item.mu) 142 // check the cache file exists 143 osPath := c.toOSPath(name) 144 fi, statErr := os.Stat(osPath) 145 if statErr != nil { 146 if os.IsNotExist(statErr) { 147 item._removeMeta("cache file doesn't exist") 148 } else { 149 item.remove(fmt.Sprintf("failed to stat cache file: %v", statErr)) 150 } 151 } 152 153 // Try to load the metadata 154 exists, err := item.load() 155 if !exists { 156 item._removeFile("metadata doesn't exist") 157 } else if err != nil { 158 item.remove(fmt.Sprintf("failed to load metadata: %v", err)) 159 } 160 161 // Get size estimate (which is best we can do until Open() called) 162 if statErr == nil { 163 item.info.Size = fi.Size() 164 } 165 return item 166} 167 168// inUse returns true if the item is open or dirty 169func (item *Item) inUse() bool { 170 item.mu.Lock() 171 defer item.mu.Unlock() 172 return item.opens != 0 || item.info.Dirty 173} 174 175// getATime returns the ATime of the item 176func (item *Item) getATime() time.Time { 177 item.mu.Lock() 178 defer item.mu.Unlock() 179 return item.info.ATime 180} 181 182// getDiskSize returns the size on disk (approximately) of the item 183// 184// We return the sizes of the chunks we have fetched, however there is 185// likely to be some overhead which we are not taking into account. 186func (item *Item) getDiskSize() int64 { 187 item.mu.Lock() 188 defer item.mu.Unlock() 189 return item.info.Rs.Size() 190} 191 192// load reads an item from the disk or returns nil if not found 193func (item *Item) load() (exists bool, err error) { 194 item.mu.Lock() 195 defer item.mu.Unlock() 196 osPathMeta := item.c.toOSPathMeta(item.name) // No locking in Cache 197 in, err := os.Open(osPathMeta) 198 if err != nil { 199 if os.IsNotExist(err) { 200 return false, err 201 } 202 return true, errors.Wrap(err, "vfs cache item: failed to read metadata") 203 } 204 defer fs.CheckClose(in, &err) 205 decoder := json.NewDecoder(in) 206 err = decoder.Decode(&item.info) 207 if err != nil { 208 return true, errors.Wrap(err, "vfs cache item: corrupt metadata") 209 } 210 return true, nil 211} 212 213// save writes an item to the disk 214// 215// call with the lock held 216func (item *Item) _save() (err error) { 217 osPathMeta := item.c.toOSPathMeta(item.name) // No locking in Cache 218 out, err := os.Create(osPathMeta) 219 if err != nil { 220 return errors.Wrap(err, "vfs cache item: failed to write metadata") 221 } 222 defer fs.CheckClose(out, &err) 223 encoder := json.NewEncoder(out) 224 encoder.SetIndent("", "\t") 225 err = encoder.Encode(item.info) 226 if err != nil { 227 return errors.Wrap(err, "vfs cache item: failed to encode metadata") 228 } 229 return nil 230} 231 232// truncate the item to the given size, creating it if necessary 233// 234// this does not mark the object as dirty 235// 236// call with the lock held 237func (item *Item) _truncate(size int64) (err error) { 238 if size < 0 { 239 // FIXME ignore unknown length files 240 return nil 241 } 242 243 // Use open handle if available 244 fd := item.fd 245 if fd == nil { 246 // If the metadata says we have some blocks cached then the 247 // file should exist, so open without O_CREATE 248 oFlags := os.O_WRONLY 249 if item.info.Rs.Size() == 0 { 250 oFlags |= os.O_CREATE 251 } 252 osPath := item.c.toOSPath(item.name) // No locking in Cache 253 fd, err = file.OpenFile(osPath, oFlags, 0600) 254 if err != nil && os.IsNotExist(err) { 255 // If the metadata has info but the file doesn't 256 // not exist then it has been externally removed 257 fs.Errorf(item.name, "vfs cache: detected external removal of cache file") 258 item.info.Rs = nil // show we have no blocks cached 259 item.info.Dirty = false // file can't be dirty if it doesn't exist 260 item._removeMeta("cache file externally deleted") 261 fd, err = file.OpenFile(osPath, os.O_CREATE|os.O_WRONLY, 0600) 262 } 263 if err != nil { 264 return errors.Wrap(err, "vfs cache: truncate: failed to open cache file") 265 } 266 267 defer fs.CheckClose(fd, &err) 268 269 err = file.SetSparse(fd) 270 if err != nil { 271 fs.Errorf(item.name, "vfs cache: truncate: failed to set as a sparse file: %v", err) 272 } 273 } 274 275 fs.Debugf(item.name, "vfs cache: truncate to size=%d", size) 276 277 err = fd.Truncate(size) 278 if err != nil { 279 return errors.Wrap(err, "vfs cache: truncate") 280 } 281 282 item.info.Size = size 283 284 return nil 285} 286 287// Truncate the item to the current size, creating if necessary 288// 289// This does not mark the object as dirty 290// 291// call with the lock held 292func (item *Item) _truncateToCurrentSize() (err error) { 293 size, err := item._getSize() 294 if err != nil && !os.IsNotExist(errors.Cause(err)) { 295 return errors.Wrap(err, "truncate to current size") 296 } 297 if size < 0 { 298 // FIXME ignore unknown length files 299 return nil 300 } 301 err = item._truncate(size) 302 if err != nil { 303 return err 304 } 305 return nil 306} 307 308// Truncate the item to the given size, creating it if necessary 309// 310// If the new size is shorter than the existing size then the object 311// will be shortened and marked as dirty. 312// 313// If the new size is longer than the old size then the object will be 314// extended and the extended data will be filled with zeros. The 315// object will be marked as dirty in this case also. 316func (item *Item) Truncate(size int64) (err error) { 317 item.preAccess() 318 defer item.postAccess() 319 item.mu.Lock() 320 defer item.mu.Unlock() 321 322 if item.fd == nil { 323 return errors.New("vfs cache item truncate: internal error: didn't Open file") 324 } 325 326 // Read old size 327 oldSize, err := item._getSize() 328 if err != nil { 329 if !os.IsNotExist(errors.Cause(err)) { 330 return errors.Wrap(err, "truncate failed to read size") 331 } 332 oldSize = 0 333 } 334 335 err = item._truncate(size) 336 if err != nil { 337 return err 338 } 339 340 changed := true 341 if size > oldSize { 342 // Truncate extends the file in which case all new bytes are 343 // read as zeros. In this case we must show we have written to 344 // the new parts of the file. 345 item._written(oldSize, size) 346 } else if size < oldSize { 347 // Truncate shrinks the file so clip the downloaded ranges 348 item.info.Rs = item.info.Rs.Intersection(ranges.Range{Pos: 0, Size: size}) 349 } else { 350 changed = item.o == nil 351 } 352 if changed { 353 item._dirty() 354 } 355 356 return nil 357} 358 359// _stat gets the current stat of the backing file 360// 361// Call with mutex held 362func (item *Item) _stat() (fi os.FileInfo, err error) { 363 if item.fd != nil { 364 return item.fd.Stat() 365 } 366 osPath := item.c.toOSPath(item.name) // No locking in Cache 367 return os.Stat(osPath) 368} 369 370// _getSize gets the current size of the item and updates item.info.Size 371// 372// Call with mutex held 373func (item *Item) _getSize() (size int64, err error) { 374 fi, err := item._stat() 375 if err != nil { 376 if os.IsNotExist(err) && item.o != nil { 377 size = item.o.Size() 378 err = nil 379 } 380 } else { 381 size = fi.Size() 382 } 383 if err == nil { 384 item.info.Size = size 385 } 386 return size, err 387} 388 389// GetName gets the vfs name of the item 390func (item *Item) GetName() (name string) { 391 item.mu.Lock() 392 defer item.mu.Unlock() 393 return item.name 394} 395 396// GetSize gets the current size of the item 397func (item *Item) GetSize() (size int64, err error) { 398 item.mu.Lock() 399 defer item.mu.Unlock() 400 return item._getSize() 401} 402 403// _exists returns whether the backing file for the item exists or not 404// 405// call with mutex held 406func (item *Item) _exists() bool { 407 osPath := item.c.toOSPath(item.name) // No locking in Cache 408 _, err := os.Stat(osPath) 409 return err == nil 410} 411 412// Exists returns whether the backing file for the item exists or not 413func (item *Item) Exists() bool { 414 item.mu.Lock() 415 defer item.mu.Unlock() 416 return item._exists() 417} 418 419// _dirty marks the item as changed and needing writeback 420// 421// call with lock held 422func (item *Item) _dirty() { 423 item.info.ModTime = time.Now() 424 item.info.ATime = item.info.ModTime 425 if !item.modified { 426 item.modified = true 427 item.mu.Unlock() 428 item.c.writeback.Remove(item.writeBackID) 429 item.mu.Lock() 430 } 431 if !item.info.Dirty { 432 item.info.Dirty = true 433 err := item._save() 434 if err != nil { 435 fs.Errorf(item.name, "vfs cache: failed to save item info: %v", err) 436 } 437 } 438} 439 440// Dirty marks the item as changed and needing writeback 441func (item *Item) Dirty() { 442 item.preAccess() 443 defer item.postAccess() 444 item.mu.Lock() 445 item._dirty() 446 item.mu.Unlock() 447} 448 449// IsDirty returns true if the item data is dirty 450func (item *Item) IsDirty() bool { 451 item.mu.Lock() 452 defer item.mu.Unlock() 453 return item.info.Dirty 454} 455 456// Create the cache file and store the metadata on disk 457// Called with item.mu locked 458func (item *Item) _createFile(osPath string) (err error) { 459 if item.fd != nil { 460 return errors.New("vfs cache item: internal error: didn't Close file") 461 } 462 item.modified = false 463 fd, err := file.OpenFile(osPath, os.O_RDWR, 0600) 464 if err != nil { 465 return errors.Wrap(err, "vfs cache item: open failed") 466 } 467 err = file.SetSparse(fd) 468 if err != nil { 469 fs.Errorf(item.name, "vfs cache: failed to set as a sparse file: %v", err) 470 } 471 item.fd = fd 472 473 err = item._save() 474 if err != nil { 475 closeErr := item.fd.Close() 476 if closeErr != nil { 477 fs.Errorf(item.name, "vfs cache: item.fd.Close: closeErr: %v", err) 478 } 479 item.fd = nil 480 return errors.Wrap(err, "vfs cache item: _save failed") 481 } 482 return err 483} 484 485// Open the local file from the object passed in. Wraps open() 486// to provide recovery from out of space error. 487func (item *Item) Open(o fs.Object) (err error) { 488 for retries := 0; retries < fs.GetConfig(context.TODO()).LowLevelRetries; retries++ { 489 item.preAccess() 490 err = item.open(o) 491 item.postAccess() 492 if err == nil { 493 break 494 } 495 fs.Errorf(item.name, "vfs cache: failed to open item: %v", err) 496 if !fserrors.IsErrNoSpace(err) && err.Error() != "no space left on device" { 497 fs.Errorf(item.name, "Non-out-of-space error encountered during open") 498 break 499 } 500 item.c.KickCleaner() 501 } 502 return err 503} 504 505// Open the local file from the object passed in (which may be nil) 506// which implies we are about to create the file 507func (item *Item) open(o fs.Object) (err error) { 508 // defer log.Trace(o, "item=%p", item)("err=%v", &err) 509 item.mu.Lock() 510 defer item.mu.Unlock() 511 512 item.info.ATime = time.Now() 513 514 osPath, err := item.c.createItemDir(item.name) // No locking in Cache 515 if err != nil { 516 return errors.Wrap(err, "vfs cache item: createItemDir failed") 517 } 518 519 err = item._checkObject(o) 520 if err != nil { 521 return errors.Wrap(err, "vfs cache item: check object failed") 522 } 523 524 item.opens++ 525 if item.opens != 1 { 526 return nil 527 } 528 529 err = item._createFile(osPath) 530 if err != nil { 531 item._remove("item.open failed on _createFile, remove cache data/metadata files") 532 item.fd = nil 533 item.opens-- 534 return errors.Wrap(err, "vfs cache item: create cache file failed") 535 } 536 // Unlock the Item.mu so we can call some methods which take Cache.mu 537 item.mu.Unlock() 538 539 // Ensure this item is in the cache. It is possible a cache 540 // expiry has run and removed the item if it had no opens so 541 // we put it back here. If there was an item with opens 542 // already then return an error. This shouldn't happen because 543 // there should only be one vfs.File with a pointer to this 544 // item in at a time. 545 oldItem := item.c.put(item.name, item) // LOCKING in Cache method 546 if oldItem != nil { 547 oldItem.mu.Lock() 548 if oldItem.opens != 0 { 549 // Put the item back and return an error 550 item.c.put(item.name, oldItem) // LOCKING in Cache method 551 err = errors.Errorf("internal error: item %q already open in the cache", item.name) 552 } 553 oldItem.mu.Unlock() 554 } 555 556 // Relock the Item.mu for the return 557 item.mu.Lock() 558 559 // Create the downloaders 560 if item.o != nil { 561 item.downloaders = downloaders.New(item, item.c.opt, item.name, item.o) 562 } 563 564 return err 565} 566 567// Store stores the local cache file to the remote object, returning 568// the new remote object. objOld is the old object if known. 569// 570// Call with lock held 571func (item *Item) _store(ctx context.Context, storeFn StoreFn) (err error) { 572 // defer log.Trace(item.name, "item=%p", item)("err=%v", &err) 573 574 // Transfer the temp file to the remote 575 cacheObj, err := item.c.fcache.NewObject(ctx, item.name) 576 if err != nil && err != fs.ErrorObjectNotFound { 577 return errors.Wrap(err, "vfs cache: failed to find cache file") 578 } 579 580 // Object has disappeared if cacheObj == nil 581 if cacheObj != nil { 582 o, name := item.o, item.name 583 item.mu.Unlock() 584 o, err := operations.Copy(ctx, item.c.fremote, o, name, cacheObj) 585 item.mu.Lock() 586 if err != nil { 587 return errors.Wrap(err, "vfs cache: failed to transfer file from cache to remote") 588 } 589 item.o = o 590 item._updateFingerprint() 591 } 592 593 item.info.Dirty = false 594 err = item._save() 595 if err != nil { 596 fs.Errorf(item.name, "vfs cache: failed to write metadata file: %v", err) 597 } 598 if storeFn != nil && item.o != nil { 599 fs.Debugf(item.name, "vfs cache: writeback object to VFS layer") 600 // Write the object back to the VFS layer as last 601 // thing we do with mutex unlocked 602 o := item.o 603 item.mu.Unlock() 604 storeFn(o) 605 item.mu.Lock() 606 } 607 return nil 608} 609 610// Store stores the local cache file to the remote object, returning 611// the new remote object. objOld is the old object if known. 612func (item *Item) store(ctx context.Context, storeFn StoreFn) (err error) { 613 item.mu.Lock() 614 defer item.mu.Unlock() 615 return item._store(ctx, storeFn) 616} 617 618// Close the cache file 619func (item *Item) Close(storeFn StoreFn) (err error) { 620 // defer log.Trace(item.o, "Item.Close")("err=%v", &err) 621 item.preAccess() 622 defer item.postAccess() 623 var ( 624 downloaders *downloaders.Downloaders 625 syncWriteBack = item.c.opt.WriteBack <= 0 626 ) 627 item.mu.Lock() 628 defer item.mu.Unlock() 629 630 item.info.ATime = time.Now() 631 item.opens-- 632 633 if item.opens < 0 { 634 return os.ErrClosed 635 } else if item.opens > 0 { 636 return nil 637 } 638 639 // Update the size on close 640 _, _ = item._getSize() 641 642 // If the file is dirty ensure any segments not transferred 643 // are brought in first. 644 // 645 // FIXME It would be nice to do this asynchronously however it 646 // would require keeping the downloaders alive after the item 647 // has been closed 648 if item.info.Dirty && item.o != nil { 649 err = item._ensure(0, item.info.Size) 650 if err != nil { 651 return errors.Wrap(err, "vfs cache: failed to download missing parts of cache file") 652 } 653 } 654 655 // Accumulate and log errors 656 checkErr := func(e error) { 657 if e != nil { 658 fs.Errorf(item.o, "vfs cache: item close failed: %v", e) 659 if err == nil { 660 err = e 661 } 662 } 663 } 664 665 // Close the downloaders 666 if downloaders = item.downloaders; downloaders != nil { 667 item.downloaders = nil 668 // FIXME need to unlock to kill downloader - should we 669 // re-arrange locking so this isn't necessary? maybe 670 // downloader should use the item mutex for locking? or put a 671 // finer lock on Rs? 672 // 673 // downloader.Write calls ensure which needs the lock 674 // close downloader with mutex unlocked 675 item.mu.Unlock() 676 checkErr(downloaders.Close(nil)) 677 item.mu.Lock() 678 } 679 680 // close the file handle 681 if item.fd == nil { 682 checkErr(errors.New("vfs cache item: internal error: didn't Open file")) 683 } else { 684 checkErr(item.fd.Close()) 685 item.fd = nil 686 } 687 688 // save the metadata once more since it may be dirty 689 // after the downloader 690 checkErr(item._save()) 691 692 // if the item hasn't been changed but has been completed then 693 // set the modtime from the object otherwise set it from the info 694 if item._exists() { 695 if !item.info.Dirty && item.o != nil { 696 item._setModTime(item.o.ModTime(context.Background())) 697 } else { 698 item._setModTime(item.info.ModTime) 699 } 700 } 701 702 // upload the file to backing store if changed 703 if item.info.Dirty { 704 fs.Infof(item.name, "vfs cache: queuing for upload in %v", item.c.opt.WriteBack) 705 if syncWriteBack { 706 // do synchronous writeback 707 checkErr(item._store(context.Background(), storeFn)) 708 } else { 709 // asynchronous writeback 710 item.c.writeback.SetID(&item.writeBackID) 711 id := item.writeBackID 712 item.mu.Unlock() 713 item.c.writeback.Add(id, item.name, item.modified, func(ctx context.Context) error { 714 return item.store(ctx, storeFn) 715 }) 716 item.mu.Lock() 717 } 718 } 719 720 // mark as not modified now we have uploaded or queued for upload 721 item.modified = false 722 723 return err 724} 725 726// reload is called with valid items recovered from a cache reload. 727// 728// If they are dirty then it makes sure they get uploaded 729// 730// it is called before the cache has started so opens will be 0 and 731// metaDirty will be false. 732func (item *Item) reload(ctx context.Context) error { 733 item.mu.Lock() 734 dirty := item.info.Dirty 735 item.mu.Unlock() 736 if !dirty { 737 return nil 738 } 739 // see if the object still exists 740 obj, _ := item.c.fremote.NewObject(ctx, item.name) 741 // open the file with the object (or nil) 742 err := item.Open(obj) 743 if err != nil { 744 return err 745 } 746 // close the file to execute the writeback if needed 747 err = item.Close(nil) 748 if err != nil { 749 return err 750 } 751 // put the file into the directory listings 752 size, err := item._getSize() 753 if err != nil { 754 return errors.Wrap(err, "reload: failed to read size") 755 } 756 err = item.c.AddVirtual(item.name, size, false) 757 if err != nil { 758 return errors.Wrap(err, "reload: failed to add virtual dir entry") 759 } 760 return nil 761} 762 763// check the fingerprint of an object and update the item or delete 764// the cached file accordingly 765// 766// If we have local modifications then they take precedence 767// over a change in the remote 768// 769// It ensures the file is the correct size for the object 770// 771// call with lock held 772func (item *Item) _checkObject(o fs.Object) error { 773 if o == nil { 774 if item.info.Fingerprint != "" { 775 // no remote object && local object 776 // remove local object unless dirty 777 if !item.info.Dirty { 778 item._remove("stale (remote deleted)") 779 } else { 780 fs.Debugf(item.name, "vfs cache: remote object has gone but local object modified - keeping it") 781 } 782 } else { 783 // no remote object && no local object 784 // OK 785 } 786 } else { 787 remoteFingerprint := fs.Fingerprint(context.TODO(), o, false) 788 fs.Debugf(item.name, "vfs cache: checking remote fingerprint %q against cached fingerprint %q", remoteFingerprint, item.info.Fingerprint) 789 if item.info.Fingerprint != "" { 790 // remote object && local object 791 if remoteFingerprint != item.info.Fingerprint { 792 if !item.info.Dirty { 793 fs.Debugf(item.name, "vfs cache: removing cached entry as stale (remote fingerprint %q != cached fingerprint %q)", remoteFingerprint, item.info.Fingerprint) 794 item._remove("stale (remote is different)") 795 } else { 796 fs.Debugf(item.name, "vfs cache: remote object has changed but local object modified - keeping it (remote fingerprint %q != cached fingerprint %q)", remoteFingerprint, item.info.Fingerprint) 797 } 798 } 799 } else { 800 // remote object && no local object 801 // Set fingerprint 802 item.info.Fingerprint = remoteFingerprint 803 } 804 item.info.Size = o.Size() 805 } 806 item.o = o 807 808 err := item._truncateToCurrentSize() 809 if err != nil { 810 return errors.Wrap(err, "vfs cache item: open truncate failed") 811 } 812 813 return nil 814} 815 816// WrittenBack checks to see if the item has been written back or not 817func (item *Item) WrittenBack() bool { 818 item.mu.Lock() 819 defer item.mu.Unlock() 820 return item.info.Fingerprint != "" 821} 822 823// remove the cached file 824// 825// call with lock held 826func (item *Item) _removeFile(reason string) { 827 osPath := item.c.toOSPath(item.name) // No locking in Cache 828 err := os.Remove(osPath) 829 if err != nil { 830 if !os.IsNotExist(err) { 831 fs.Errorf(item.name, "vfs cache: failed to remove cache file as %s: %v", reason, err) 832 } 833 } else { 834 fs.Infof(item.name, "vfs cache: removed cache file as %s", reason) 835 } 836} 837 838// remove the metadata 839// 840// call with lock held 841func (item *Item) _removeMeta(reason string) { 842 osPathMeta := item.c.toOSPathMeta(item.name) // No locking in Cache 843 err := os.Remove(osPathMeta) 844 if err != nil { 845 if !os.IsNotExist(err) { 846 fs.Errorf(item.name, "vfs cache: failed to remove metadata from cache as %s: %v", reason, err) 847 } 848 } else { 849 fs.Debugf(item.name, "vfs cache: removed metadata from cache as %s", reason) 850 } 851} 852 853// remove the cached file and empty the metadata 854// 855// This returns true if the file was in the transfer queue so may not 856// have completely uploaded yet. 857// 858// call with lock held 859func (item *Item) _remove(reason string) (wasWriting bool) { 860 // Cancel writeback, if any 861 item.mu.Unlock() 862 wasWriting = item.c.writeback.Remove(item.writeBackID) 863 item.mu.Lock() 864 item.info.clean() 865 item._removeFile(reason) 866 item._removeMeta(reason) 867 return wasWriting 868} 869 870// remove the cached file and empty the metadata 871// 872// This returns true if the file was in the transfer queue so may not 873// have completely uploaded yet. 874func (item *Item) remove(reason string) (wasWriting bool) { 875 item.mu.Lock() 876 defer item.mu.Unlock() 877 return item._remove(reason) 878} 879 880// RemoveNotInUse is called to remove cache file that has not been accessed recently 881// It may also be called for removing empty cache files too when the quota is already reached. 882func (item *Item) RemoveNotInUse(maxAge time.Duration, emptyOnly bool) (removed bool, spaceFreed int64) { 883 item.mu.Lock() 884 defer item.mu.Unlock() 885 886 spaceFreed = 0 887 removed = false 888 889 if item.opens != 0 || item.info.Dirty { 890 return 891 } 892 893 removeIt := false 894 if maxAge == 0 { 895 removeIt = true // quota-driven removal 896 } 897 if maxAge != 0 { 898 cutoff := time.Now().Add(-maxAge) 899 // If not locked and access time too long ago - delete the file 900 accessTime := item.info.ATime 901 if accessTime.Sub(cutoff) <= 0 { 902 removeIt = true 903 } 904 } 905 if removeIt { 906 spaceUsed := item.info.Rs.Size() 907 if !emptyOnly || spaceUsed == 0 { 908 spaceFreed = spaceUsed 909 removed = true 910 if item._remove("Removing old cache file not in use") { 911 fs.Errorf(item.name, "item removed when it was writing/uploaded") 912 } 913 } 914 } 915 return 916} 917 918// Reset is called by the cache purge functions only to reset (empty the contents) cache files that 919// are not dirty. It is used when cache space runs out and we see some ENOSPC error. 920func (item *Item) Reset() (rr ResetResult, spaceFreed int64, err error) { 921 item.mu.Lock() 922 defer item.mu.Unlock() 923 924 // The item is not being used now. Just remove it instead of resetting it. 925 if item.opens == 0 && !item.info.Dirty { 926 spaceFreed = item.info.Rs.Size() 927 if item._remove("Removing old cache file not in use") { 928 fs.Errorf(item.name, "item removed when it was writing/uploaded") 929 } 930 return RemovedNotInUse, spaceFreed, nil 931 } 932 933 // do not reset dirty file 934 if item.info.Dirty { 935 return SkippedDirty, 0, nil 936 } 937 938 /* A wait on pendingAccessCnt to become 0 can lead to deadlock when an item.Open bumps 939 up the pendingAccesses count, calls item.open, which calls cache.put. The cache.put 940 operation needs the cache mutex, which is held here. We skip this file now. The 941 caller (the cache cleaner thread) may retry resetting this item if the cache size does 942 not reduce below quota. */ 943 if item.pendingAccesses > 0 { 944 return SkippedPendingAccess, 0, nil 945 } 946 947 /* Do not need to reset an empty cache file unless it was being reset and the reset failed. 948 Some thread(s) may be waiting on the reset's succesful completion in that case. */ 949 if item.info.Rs.Size() == 0 && item.beingReset == false { 950 return SkippedEmpty, 0, nil 951 } 952 953 item.beingReset = true 954 955 /* Error handling from this point on (setting item.fd and item.beingReset): 956 Since Reset is called by the cache cleaner thread, there is no direct way to return 957 the error to the io threads. Set item.fd to nil upon internal errors, so that the 958 io threads will return internal errors seeing a nil fd. In the case when the error 959 is ENOSPC, keep the item in isBeingReset state and that will keep the item.ReadAt 960 waiting at its beginning. The cache purge loop will try to redo the reset after cache 961 space is made available again. This recovery design should allow most io threads to 962 eventually go through, unless large files are written/overwritten concurrently and 963 the total size of these files exceed the cache storage limit. */ 964 965 // Close the downloaders 966 // Accumulate and log errors 967 checkErr := func(e error) { 968 if e != nil { 969 fs.Errorf(item.o, "vfs cache: item reset failed: %v", e) 970 if err == nil { 971 err = e 972 } 973 } 974 } 975 976 if downloaders := item.downloaders; downloaders != nil { 977 item.downloaders = nil 978 // FIXME need to unlock to kill downloader - should we 979 // re-arrange locking so this isn't necessary? maybe 980 // downloader should use the item mutex for locking? or put a 981 // finer lock on Rs? 982 // 983 // downloader.Write calls ensure which needs the lock 984 // close downloader with mutex unlocked 985 item.mu.Unlock() 986 checkErr(downloaders.Close(nil)) 987 item.mu.Lock() 988 } 989 990 // close the file handle 991 // fd can be nil if we tried Reset and failed before because of ENOSPC during reset 992 if item.fd != nil { 993 checkErr(item.fd.Close()) 994 if err != nil { 995 // Could not close the cache file 996 item.beingReset = false 997 item.cond.Broadcast() 998 return ResetFailed, 0, err 999 } 1000 item.fd = nil 1001 } 1002 1003 spaceFreed = item.info.Rs.Size() 1004 1005 // This should not be possible. We get here only if cache data is not dirty. 1006 if item._remove("cache out of space, item is clean") { 1007 fs.Errorf(item.o, "vfs cache item removed when it was writing/uploaded") 1008 } 1009 1010 // can we have an item with no dirty data (so that we can get here) and nil item.o at the same time? 1011 fso := item.o 1012 checkErr(item._checkObject(fso)) 1013 if err != nil { 1014 item.beingReset = false 1015 item.cond.Broadcast() 1016 return ResetFailed, spaceFreed, err 1017 } 1018 1019 osPath := item.c.toOSPath(item.name) 1020 checkErr(item._createFile(osPath)) 1021 if err != nil { 1022 item._remove("cache reset failed on _createFile, removed cache data file") 1023 item.fd = nil // This allows a new Reset redo to have a clean state to deal with 1024 if !fserrors.IsErrNoSpace(err) { 1025 item.beingReset = false 1026 item.cond.Broadcast() 1027 } 1028 return ResetFailed, spaceFreed, err 1029 } 1030 1031 // Create the downloaders 1032 if item.o != nil { 1033 item.downloaders = downloaders.New(item, item.c.opt, item.name, item.o) 1034 } 1035 1036 /* The item will stay in the beingReset state if we get an error that prevents us from 1037 reaching this point. The cache purge loop will redo the failed Reset. */ 1038 item.beingReset = false 1039 item.cond.Broadcast() 1040 1041 return ResetComplete, spaceFreed, err 1042} 1043 1044// ProtectCache either waits for an ongoing cache reset to finish or increases pendingReads 1045// to protect against cache reset on this item while the thread potentially uses the cache file 1046// Cache cleaner waits until pendingReads is zero before resetting cache. 1047func (item *Item) preAccess() { 1048 item.mu.Lock() 1049 defer item.mu.Unlock() 1050 1051 if item.beingReset { 1052 for { 1053 item.cond.Wait() 1054 if !item.beingReset { 1055 break 1056 } 1057 } 1058 } 1059 item.pendingAccesses++ 1060} 1061 1062// postAccess reduces the pendingReads count enabling cache reset upon ENOSPC 1063func (item *Item) postAccess() { 1064 item.mu.Lock() 1065 defer item.mu.Unlock() 1066 1067 item.pendingAccesses-- 1068 item.cond.Broadcast() 1069} 1070 1071// _present returns true if the whole file has been downloaded 1072// 1073// call with the lock held 1074func (item *Item) _present() bool { 1075 return item.info.Rs.Present(ranges.Range{Pos: 0, Size: item.info.Size}) 1076} 1077 1078// present returns true if the whole file has been downloaded 1079func (item *Item) present() bool { 1080 item.mu.Lock() 1081 defer item.mu.Unlock() 1082 return item._present() 1083} 1084 1085// HasRange returns true if the current ranges entirely include range 1086func (item *Item) HasRange(r ranges.Range) bool { 1087 item.mu.Lock() 1088 defer item.mu.Unlock() 1089 return item.info.Rs.Present(r) 1090} 1091 1092// FindMissing adjusts r returning a new ranges.Range which only 1093// contains the range which needs to be downloaded. This could be 1094// empty - check with IsEmpty. It also adjust this to make sure it is 1095// not larger than the file. 1096func (item *Item) FindMissing(r ranges.Range) (outr ranges.Range) { 1097 item.mu.Lock() 1098 defer item.mu.Unlock() 1099 outr = item.info.Rs.FindMissing(r) 1100 // Clip returned block to size of file 1101 outr.Clip(item.info.Size) 1102 return outr 1103} 1104 1105// ensure the range from offset, size is present in the backing file 1106// 1107// call with the item lock held 1108func (item *Item) _ensure(offset, size int64) (err error) { 1109 // defer log.Trace(item.name, "offset=%d, size=%d", offset, size)("err=%v", &err) 1110 if offset+size > item.info.Size { 1111 size = item.info.Size - offset 1112 } 1113 r := ranges.Range{Pos: offset, Size: size} 1114 present := item.info.Rs.Present(r) 1115 /* This statement simulates a cache space error for test purpose */ 1116 /* if present != true && item.info.Rs.Size() > 32*1024*1024 { 1117 return errors.New("no space left on device") 1118 } */ 1119 fs.Debugf(nil, "vfs cache: looking for range=%+v in %+v - present %v", r, item.info.Rs, present) 1120 item.mu.Unlock() 1121 defer item.mu.Lock() 1122 if present { 1123 // This is a file we are writing so no downloaders needed 1124 if item.downloaders == nil { 1125 return nil 1126 } 1127 // Otherwise start the downloader for the future if required 1128 return item.downloaders.EnsureDownloader(r) 1129 } 1130 if item.downloaders == nil { 1131 return errors.New("internal error: downloaders is nil") 1132 } 1133 return item.downloaders.Download(r) 1134} 1135 1136// _written marks the (offset, size) as present in the backing file 1137// 1138// This is called by the downloader downloading file segments and the 1139// vfs layer writing to the file. 1140// 1141// This doesn't mark the item as Dirty - that the the responsibility 1142// of the caller as we don't know here whether we are adding reads or 1143// writes to the cache file. 1144// 1145// call with lock held 1146func (item *Item) _written(offset, size int64) { 1147 // defer log.Trace(item.name, "offset=%d, size=%d", offset, size)("") 1148 item.info.Rs.Insert(ranges.Range{Pos: offset, Size: size}) 1149} 1150 1151// update the fingerprint of the object if any 1152// 1153// call with lock held 1154func (item *Item) _updateFingerprint() { 1155 if item.o == nil { 1156 return 1157 } 1158 oldFingerprint := item.info.Fingerprint 1159 item.info.Fingerprint = fs.Fingerprint(context.TODO(), item.o, false) 1160 if oldFingerprint != item.info.Fingerprint { 1161 fs.Debugf(item.o, "vfs cache: fingerprint now %q", item.info.Fingerprint) 1162 } 1163} 1164 1165// setModTime of the cache file 1166// 1167// call with lock held 1168func (item *Item) _setModTime(modTime time.Time) { 1169 fs.Debugf(item.name, "vfs cache: setting modification time to %v", modTime) 1170 osPath := item.c.toOSPath(item.name) // No locking in Cache 1171 err := os.Chtimes(osPath, modTime, modTime) 1172 if err != nil { 1173 fs.Errorf(item.name, "vfs cache: failed to set modification time of cached file: %v", err) 1174 } 1175} 1176 1177// setModTime of the cache file and in the Item 1178func (item *Item) setModTime(modTime time.Time) { 1179 // defer log.Trace(item.name, "modTime=%v", modTime)("") 1180 item.mu.Lock() 1181 item._updateFingerprint() 1182 item._setModTime(modTime) 1183 item.info.ModTime = modTime 1184 err := item._save() 1185 if err != nil { 1186 fs.Errorf(item.name, "vfs cache: setModTime: failed to save item info: %v", err) 1187 } 1188 item.mu.Unlock() 1189} 1190 1191// GetModTime of the cache file 1192func (item *Item) GetModTime() (modTime time.Time, err error) { 1193 // defer log.Trace(item.name, "modTime=%v", modTime)("") 1194 item.mu.Lock() 1195 defer item.mu.Unlock() 1196 fi, err := item._stat() 1197 if err == nil { 1198 modTime = fi.ModTime() 1199 } 1200 return modTime, nil 1201} 1202 1203// ReadAt bytes from the file at off 1204func (item *Item) ReadAt(b []byte, off int64) (n int, err error) { 1205 n = 0 1206 var expBackOff int 1207 for retries := 0; retries < fs.GetConfig(context.TODO()).LowLevelRetries; retries++ { 1208 item.preAccess() 1209 n, err = item.readAt(b, off) 1210 item.postAccess() 1211 if err == nil || err == io.EOF { 1212 break 1213 } 1214 fs.Errorf(item.name, "vfs cache: failed to _ensure cache %v", err) 1215 if !fserrors.IsErrNoSpace(err) && err.Error() != "no space left on device" { 1216 fs.Debugf(item.name, "vfs cache: failed to _ensure cache %v is not out of space", err) 1217 break 1218 } 1219 item.c.KickCleaner() 1220 expBackOff = 2 << uint(retries) 1221 time.Sleep(time.Duration(expBackOff) * time.Millisecond) // Exponential back-off the retries 1222 } 1223 1224 if fserrors.IsErrNoSpace(err) { 1225 fs.Errorf(item.name, "vfs cache: failed to _ensure cache after retries %v", err) 1226 } 1227 1228 return n, err 1229} 1230 1231// ReadAt bytes from the file at off 1232func (item *Item) readAt(b []byte, off int64) (n int, err error) { 1233 item.mu.Lock() 1234 if item.fd == nil { 1235 item.mu.Unlock() 1236 return 0, errors.New("vfs cache item ReadAt: internal error: didn't Open file") 1237 } 1238 if off < 0 { 1239 item.mu.Unlock() 1240 return 0, io.EOF 1241 } 1242 defer item.mu.Unlock() 1243 1244 err = item._ensure(off, int64(len(b))) 1245 if err != nil { 1246 return 0, err 1247 } 1248 1249 item.info.ATime = time.Now() 1250 // Do the reading with Item.mu unlocked and cache protected by preAccess 1251 n, err = item.fd.ReadAt(b, off) 1252 return n, err 1253} 1254 1255// WriteAt bytes to the file at off 1256func (item *Item) WriteAt(b []byte, off int64) (n int, err error) { 1257 item.preAccess() 1258 defer item.postAccess() 1259 item.mu.Lock() 1260 if item.fd == nil { 1261 item.mu.Unlock() 1262 return 0, errors.New("vfs cache item WriteAt: internal error: didn't Open file") 1263 } 1264 item.mu.Unlock() 1265 // Do the writing with Item.mu unlocked 1266 n, err = item.fd.WriteAt(b, off) 1267 if err == nil && n != len(b) { 1268 err = errors.Errorf("short write: tried to write %d but only %d written", len(b), n) 1269 } 1270 item.mu.Lock() 1271 item._written(off, int64(n)) 1272 if n > 0 { 1273 item._dirty() 1274 } 1275 end := off + int64(n) 1276 // Writing off the end of the file so need to make some 1277 // zeroes. we do this by showing that we have written to the 1278 // new parts of the file. 1279 if off > item.info.Size { 1280 item._written(item.info.Size, off-item.info.Size) 1281 item._dirty() 1282 } 1283 // Update size 1284 if end > item.info.Size { 1285 item.info.Size = end 1286 } 1287 item.mu.Unlock() 1288 return n, err 1289} 1290 1291// WriteAtNoOverwrite writes b to the file, but will not overwrite 1292// already present ranges. 1293// 1294// This is used by the downloader to write bytes to the file 1295// 1296// It returns n the total bytes processed and skipped the number of 1297// bytes which were processed but not actually written to the file. 1298func (item *Item) WriteAtNoOverwrite(b []byte, off int64) (n int, skipped int, err error) { 1299 item.mu.Lock() 1300 1301 var ( 1302 // Range we wish to write 1303 r = ranges.Range{Pos: off, Size: int64(len(b))} 1304 // Ranges that we need to write 1305 foundRanges = item.info.Rs.FindAll(r) 1306 // Length of each write 1307 nn int 1308 ) 1309 1310 // Write the range out ignoring already written chunks 1311 // fs.Debugf(item.name, "Ranges = %v", item.info.Rs) 1312 for i := range foundRanges { 1313 foundRange := &foundRanges[i] 1314 // fs.Debugf(item.name, "foundRange[%d] = %v", i, foundRange) 1315 if foundRange.R.Pos != off { 1316 err = errors.New("internal error: offset of range is wrong") 1317 break 1318 } 1319 size := int(foundRange.R.Size) 1320 if foundRange.Present { 1321 // if present want to skip this range 1322 // fs.Debugf(item.name, "skip chunk offset=%d size=%d", off, size) 1323 nn = size 1324 skipped += size 1325 } else { 1326 // if range not present then we want to write it 1327 // fs.Debugf(item.name, "write chunk offset=%d size=%d", off, size) 1328 nn, err = item.fd.WriteAt(b[:size], off) 1329 if err == nil && nn != size { 1330 err = errors.Errorf("downloader: short write: tried to write %d but only %d written", size, nn) 1331 } 1332 item._written(off, int64(nn)) 1333 } 1334 off += int64(nn) 1335 b = b[nn:] 1336 n += nn 1337 if err != nil { 1338 break 1339 } 1340 } 1341 item.mu.Unlock() 1342 return n, skipped, err 1343} 1344 1345// Sync commits the current contents of the file to stable storage. Typically, 1346// this means flushing the file system's in-memory copy of recently written 1347// data to disk. 1348func (item *Item) Sync() (err error) { 1349 item.preAccess() 1350 defer item.postAccess() 1351 item.mu.Lock() 1352 defer item.mu.Unlock() 1353 if item.fd == nil { 1354 return errors.New("vfs cache item sync: internal error: didn't Open file") 1355 } 1356 // sync the file and the metadata to disk 1357 err = item.fd.Sync() 1358 if err != nil { 1359 return errors.Wrap(err, "vfs cache item sync: failed to sync file") 1360 } 1361 err = item._save() 1362 if err != nil { 1363 return errors.Wrap(err, "vfs cache item sync: failed to sync metadata") 1364 } 1365 return nil 1366} 1367 1368// rename the item 1369func (item *Item) rename(name string, newName string, newObj fs.Object) (err error) { 1370 item.preAccess() 1371 defer item.postAccess() 1372 item.mu.Lock() 1373 1374 // stop downloader 1375 downloaders := item.downloaders 1376 item.downloaders = nil 1377 1378 // id for writeback cancel 1379 id := item.writeBackID 1380 1381 // Set internal state 1382 item.name = newName 1383 item.o = newObj 1384 1385 // Rename cache file if it exists 1386 err = rename(item.c.toOSPath(name), item.c.toOSPath(newName)) // No locking in Cache 1387 1388 // Rename meta file if it exists 1389 err2 := rename(item.c.toOSPathMeta(name), item.c.toOSPathMeta(newName)) // No locking in Cache 1390 if err2 != nil { 1391 err = err2 1392 } 1393 1394 item.mu.Unlock() 1395 1396 // close downloader and cancel writebacks with mutex unlocked 1397 if downloaders != nil { 1398 _ = downloaders.Close(nil) 1399 } 1400 item.c.writeback.Rename(id, newName) 1401 return err 1402} 1403