1package vfscache
2
3import (
4	"context"
5	"encoding/json"
6	"fmt"
7	"io"
8	"os"
9	"sync"
10	"time"
11
12	"github.com/pkg/errors"
13	"github.com/rclone/rclone/fs"
14	"github.com/rclone/rclone/fs/fserrors"
15	"github.com/rclone/rclone/fs/operations"
16	"github.com/rclone/rclone/lib/file"
17	"github.com/rclone/rclone/lib/ranges"
18	"github.com/rclone/rclone/vfs/vfscache/downloaders"
19	"github.com/rclone/rclone/vfs/vfscache/writeback"
20)
21
22// NB as Cache and Item are tightly linked it is necessary to have a
23// total lock ordering between them. So Cache.mu must always be
24// taken before Item.mu to avoid deadlocks.
25//
26// Cache may call into Item but care is needed if Item calls Cache
27//
28// A lot of the Cache methods do not require locking, these include
29//
30// - Cache.toOSPath
31// - Cache.toOSPathMeta
32// - Cache.createItemDir
33// - Cache.objectFingerprint
34// - Cache.AddVirtual
35
36// NB Item and downloader are tightly linked so it is necessary to
37// have a total lock ordering between them. downloader.mu must always
38// be taken before Item.mu. downloader may call into Item but Item may
39// **not** call downloader methods with Item.mu held
40
41// NB Item and writeback are tightly linked so it is necessary to
42// have a total lock ordering between them. writeback.mu must always
43// be taken before Item.mu. writeback may call into Item but Item may
44// **not** call writeback methods with Item.mu held
45
46// LL Item reset is invoked by cache cleaner for synchronous recovery
47// from ENOSPC errors. The reset operation removes the cache file and
48// closes/reopens the downloaders.  Although most parts of reset and
49// other item operations are done with the item mutex held, the mutex
50// is released during fd.WriteAt and downloaders calls. We use preAccess
51// and postAccess calls to serialize reset and other item operations.
52
53// Item is stored in the item map
54//
55// The Info field is written to the backing store to store status
56type Item struct {
57	// read only
58	c               *Cache                   // cache this is part of
59	mu              sync.Mutex               // protect the variables
60	cond            *sync.Cond               // synchronize with cache cleaner
61	name            string                   // name in the VFS
62	opens           int                      // number of times file is open
63	downloaders     *downloaders.Downloaders // a record of the downloaders in action - may be nil
64	o               fs.Object                // object we are caching - may be nil
65	fd              *os.File                 // handle we are using to read and write to the file
66	modified        bool                     // set if the file has been modified since the last Open
67	info            Info                     // info about the file to persist to backing store
68	writeBackID     writeback.Handle         // id of any writebacks in progress
69	pendingAccesses int                      // number of threads - cache reset not allowed if not zero
70	beingReset      bool                     // cache cleaner is resetting the cache file, access not allowed
71}
72
73// Info is persisted to backing store
74type Info struct {
75	ModTime     time.Time     // last time file was modified
76	ATime       time.Time     // last time file was accessed
77	Size        int64         // size of the file
78	Rs          ranges.Ranges // which parts of the file are present
79	Fingerprint string        // fingerprint of remote object
80	Dirty       bool          // set if the backing file has been modified
81}
82
83// Items are a slice of *Item ordered by ATime
84type Items []*Item
85
86// ResetResult reports the actual action taken in the Reset function and reason
87type ResetResult int
88
89// Constants used to report actual action taken in the Reset function and reason
90const (
91	SkippedDirty         ResetResult = iota // Dirty item cannot be reset
92	SkippedPendingAccess                    // Reset pending access can lead to deadlock
93	SkippedEmpty                            // Reset empty item does not save space
94	RemovedNotInUse                         // Item not used. Remove instead of reset
95	ResetFailed                             // Reset failed with an error
96	ResetComplete                           // Reset completed successfully
97)
98
99func (rr ResetResult) String() string {
100	return [...]string{"Dirty item skipped", "In-access item skipped", "Empty item skipped",
101		"Not-in-use item removed", "Item reset failed", "Item reset completed"}[rr]
102}
103
104func (v Items) Len() int      { return len(v) }
105func (v Items) Swap(i, j int) { v[i], v[j] = v[j], v[i] }
106func (v Items) Less(i, j int) bool {
107	if i == j {
108		return false
109	}
110	iItem := v[i]
111	jItem := v[j]
112	iItem.mu.Lock()
113	defer iItem.mu.Unlock()
114	jItem.mu.Lock()
115	defer jItem.mu.Unlock()
116
117	return iItem.info.ATime.Before(jItem.info.ATime)
118}
119
120// clean the item after its cache file has been deleted
121func (info *Info) clean() {
122	*info = Info{}
123	info.ModTime = time.Now()
124	info.ATime = info.ModTime
125}
126
127// StoreFn is called back with an object after it has been uploaded
128type StoreFn func(fs.Object)
129
130// newItem returns an item for the cache
131func newItem(c *Cache, name string) (item *Item) {
132	now := time.Now()
133	item = &Item{
134		c:    c,
135		name: name,
136		info: Info{
137			ModTime: now,
138			ATime:   now,
139		},
140	}
141	item.cond = sync.NewCond(&item.mu)
142	// check the cache file exists
143	osPath := c.toOSPath(name)
144	fi, statErr := os.Stat(osPath)
145	if statErr != nil {
146		if os.IsNotExist(statErr) {
147			item._removeMeta("cache file doesn't exist")
148		} else {
149			item.remove(fmt.Sprintf("failed to stat cache file: %v", statErr))
150		}
151	}
152
153	// Try to load the metadata
154	exists, err := item.load()
155	if !exists {
156		item._removeFile("metadata doesn't exist")
157	} else if err != nil {
158		item.remove(fmt.Sprintf("failed to load metadata: %v", err))
159	}
160
161	// Get size estimate (which is best we can do until Open() called)
162	if statErr == nil {
163		item.info.Size = fi.Size()
164	}
165	return item
166}
167
168// inUse returns true if the item is open or dirty
169func (item *Item) inUse() bool {
170	item.mu.Lock()
171	defer item.mu.Unlock()
172	return item.opens != 0 || item.info.Dirty
173}
174
175// getATime returns the ATime of the item
176func (item *Item) getATime() time.Time {
177	item.mu.Lock()
178	defer item.mu.Unlock()
179	return item.info.ATime
180}
181
182// getDiskSize returns the size on disk (approximately) of the item
183//
184// We return the sizes of the chunks we have fetched, however there is
185// likely to be some overhead which we are not taking into account.
186func (item *Item) getDiskSize() int64 {
187	item.mu.Lock()
188	defer item.mu.Unlock()
189	return item.info.Rs.Size()
190}
191
192// load reads an item from the disk or returns nil if not found
193func (item *Item) load() (exists bool, err error) {
194	item.mu.Lock()
195	defer item.mu.Unlock()
196	osPathMeta := item.c.toOSPathMeta(item.name) // No locking in Cache
197	in, err := os.Open(osPathMeta)
198	if err != nil {
199		if os.IsNotExist(err) {
200			return false, err
201		}
202		return true, errors.Wrap(err, "vfs cache item: failed to read metadata")
203	}
204	defer fs.CheckClose(in, &err)
205	decoder := json.NewDecoder(in)
206	err = decoder.Decode(&item.info)
207	if err != nil {
208		return true, errors.Wrap(err, "vfs cache item: corrupt metadata")
209	}
210	return true, nil
211}
212
213// save writes an item to the disk
214//
215// call with the lock held
216func (item *Item) _save() (err error) {
217	osPathMeta := item.c.toOSPathMeta(item.name) // No locking in Cache
218	out, err := os.Create(osPathMeta)
219	if err != nil {
220		return errors.Wrap(err, "vfs cache item: failed to write metadata")
221	}
222	defer fs.CheckClose(out, &err)
223	encoder := json.NewEncoder(out)
224	encoder.SetIndent("", "\t")
225	err = encoder.Encode(item.info)
226	if err != nil {
227		return errors.Wrap(err, "vfs cache item: failed to encode metadata")
228	}
229	return nil
230}
231
232// truncate the item to the given size, creating it if necessary
233//
234// this does not mark the object as dirty
235//
236// call with the lock held
237func (item *Item) _truncate(size int64) (err error) {
238	if size < 0 {
239		// FIXME ignore unknown length files
240		return nil
241	}
242
243	// Use open handle if available
244	fd := item.fd
245	if fd == nil {
246		// If the metadata says we have some blocks cached then the
247		// file should exist, so open without O_CREATE
248		oFlags := os.O_WRONLY
249		if item.info.Rs.Size() == 0 {
250			oFlags |= os.O_CREATE
251		}
252		osPath := item.c.toOSPath(item.name) // No locking in Cache
253		fd, err = file.OpenFile(osPath, oFlags, 0600)
254		if err != nil && os.IsNotExist(err) {
255			// If the metadata has info but the file doesn't
256			// not exist then it has been externally removed
257			fs.Errorf(item.name, "vfs cache: detected external removal of cache file")
258			item.info.Rs = nil      // show we have no blocks cached
259			item.info.Dirty = false // file can't be dirty if it doesn't exist
260			item._removeMeta("cache file externally deleted")
261			fd, err = file.OpenFile(osPath, os.O_CREATE|os.O_WRONLY, 0600)
262		}
263		if err != nil {
264			return errors.Wrap(err, "vfs cache: truncate: failed to open cache file")
265		}
266
267		defer fs.CheckClose(fd, &err)
268
269		err = file.SetSparse(fd)
270		if err != nil {
271			fs.Errorf(item.name, "vfs cache: truncate: failed to set as a sparse file: %v", err)
272		}
273	}
274
275	fs.Debugf(item.name, "vfs cache: truncate to size=%d", size)
276
277	err = fd.Truncate(size)
278	if err != nil {
279		return errors.Wrap(err, "vfs cache: truncate")
280	}
281
282	item.info.Size = size
283
284	return nil
285}
286
287// Truncate the item to the current size, creating if necessary
288//
289// This does not mark the object as dirty
290//
291// call with the lock held
292func (item *Item) _truncateToCurrentSize() (err error) {
293	size, err := item._getSize()
294	if err != nil && !os.IsNotExist(errors.Cause(err)) {
295		return errors.Wrap(err, "truncate to current size")
296	}
297	if size < 0 {
298		// FIXME ignore unknown length files
299		return nil
300	}
301	err = item._truncate(size)
302	if err != nil {
303		return err
304	}
305	return nil
306}
307
308// Truncate the item to the given size, creating it if necessary
309//
310// If the new size is shorter than the existing size then the object
311// will be shortened and marked as dirty.
312//
313// If the new size is longer than the old size then the object will be
314// extended and the extended data will be filled with zeros. The
315// object will be marked as dirty in this case also.
316func (item *Item) Truncate(size int64) (err error) {
317	item.preAccess()
318	defer item.postAccess()
319	item.mu.Lock()
320	defer item.mu.Unlock()
321
322	if item.fd == nil {
323		return errors.New("vfs cache item truncate: internal error: didn't Open file")
324	}
325
326	// Read old size
327	oldSize, err := item._getSize()
328	if err != nil {
329		if !os.IsNotExist(errors.Cause(err)) {
330			return errors.Wrap(err, "truncate failed to read size")
331		}
332		oldSize = 0
333	}
334
335	err = item._truncate(size)
336	if err != nil {
337		return err
338	}
339
340	changed := true
341	if size > oldSize {
342		// Truncate extends the file in which case all new bytes are
343		// read as zeros. In this case we must show we have written to
344		// the new parts of the file.
345		item._written(oldSize, size)
346	} else if size < oldSize {
347		// Truncate shrinks the file so clip the downloaded ranges
348		item.info.Rs = item.info.Rs.Intersection(ranges.Range{Pos: 0, Size: size})
349	} else {
350		changed = item.o == nil
351	}
352	if changed {
353		item._dirty()
354	}
355
356	return nil
357}
358
359// _stat gets the current stat of the backing file
360//
361// Call with mutex held
362func (item *Item) _stat() (fi os.FileInfo, err error) {
363	if item.fd != nil {
364		return item.fd.Stat()
365	}
366	osPath := item.c.toOSPath(item.name) // No locking in Cache
367	return os.Stat(osPath)
368}
369
370// _getSize gets the current size of the item and updates item.info.Size
371//
372// Call with mutex held
373func (item *Item) _getSize() (size int64, err error) {
374	fi, err := item._stat()
375	if err != nil {
376		if os.IsNotExist(err) && item.o != nil {
377			size = item.o.Size()
378			err = nil
379		}
380	} else {
381		size = fi.Size()
382	}
383	if err == nil {
384		item.info.Size = size
385	}
386	return size, err
387}
388
389// GetName gets the vfs name of the item
390func (item *Item) GetName() (name string) {
391	item.mu.Lock()
392	defer item.mu.Unlock()
393	return item.name
394}
395
396// GetSize gets the current size of the item
397func (item *Item) GetSize() (size int64, err error) {
398	item.mu.Lock()
399	defer item.mu.Unlock()
400	return item._getSize()
401}
402
403// _exists returns whether the backing file for the item exists or not
404//
405// call with mutex held
406func (item *Item) _exists() bool {
407	osPath := item.c.toOSPath(item.name) // No locking in Cache
408	_, err := os.Stat(osPath)
409	return err == nil
410}
411
412// Exists returns whether the backing file for the item exists or not
413func (item *Item) Exists() bool {
414	item.mu.Lock()
415	defer item.mu.Unlock()
416	return item._exists()
417}
418
419// _dirty marks the item as changed and needing writeback
420//
421// call with lock held
422func (item *Item) _dirty() {
423	item.info.ModTime = time.Now()
424	item.info.ATime = item.info.ModTime
425	if !item.modified {
426		item.modified = true
427		item.mu.Unlock()
428		item.c.writeback.Remove(item.writeBackID)
429		item.mu.Lock()
430	}
431	if !item.info.Dirty {
432		item.info.Dirty = true
433		err := item._save()
434		if err != nil {
435			fs.Errorf(item.name, "vfs cache: failed to save item info: %v", err)
436		}
437	}
438}
439
440// Dirty marks the item as changed and needing writeback
441func (item *Item) Dirty() {
442	item.preAccess()
443	defer item.postAccess()
444	item.mu.Lock()
445	item._dirty()
446	item.mu.Unlock()
447}
448
449// IsDirty returns true if the item data is dirty
450func (item *Item) IsDirty() bool {
451	item.mu.Lock()
452	defer item.mu.Unlock()
453	return item.info.Dirty
454}
455
456// Create the cache file and store the metadata on disk
457// Called with item.mu locked
458func (item *Item) _createFile(osPath string) (err error) {
459	if item.fd != nil {
460		return errors.New("vfs cache item: internal error: didn't Close file")
461	}
462	item.modified = false
463	fd, err := file.OpenFile(osPath, os.O_RDWR, 0600)
464	if err != nil {
465		return errors.Wrap(err, "vfs cache item: open failed")
466	}
467	err = file.SetSparse(fd)
468	if err != nil {
469		fs.Errorf(item.name, "vfs cache: failed to set as a sparse file: %v", err)
470	}
471	item.fd = fd
472
473	err = item._save()
474	if err != nil {
475		closeErr := item.fd.Close()
476		if closeErr != nil {
477			fs.Errorf(item.name, "vfs cache: item.fd.Close: closeErr: %v", err)
478		}
479		item.fd = nil
480		return errors.Wrap(err, "vfs cache item: _save failed")
481	}
482	return err
483}
484
485// Open the local file from the object passed in.  Wraps open()
486// to provide recovery from out of space error.
487func (item *Item) Open(o fs.Object) (err error) {
488	for retries := 0; retries < fs.GetConfig(context.TODO()).LowLevelRetries; retries++ {
489		item.preAccess()
490		err = item.open(o)
491		item.postAccess()
492		if err == nil {
493			break
494		}
495		fs.Errorf(item.name, "vfs cache: failed to open item: %v", err)
496		if !fserrors.IsErrNoSpace(err) && err.Error() != "no space left on device" {
497			fs.Errorf(item.name, "Non-out-of-space error encountered during open")
498			break
499		}
500		item.c.KickCleaner()
501	}
502	return err
503}
504
505// Open the local file from the object passed in (which may be nil)
506// which implies we are about to create the file
507func (item *Item) open(o fs.Object) (err error) {
508	// defer log.Trace(o, "item=%p", item)("err=%v", &err)
509	item.mu.Lock()
510	defer item.mu.Unlock()
511
512	item.info.ATime = time.Now()
513
514	osPath, err := item.c.createItemDir(item.name) // No locking in Cache
515	if err != nil {
516		return errors.Wrap(err, "vfs cache item: createItemDir failed")
517	}
518
519	err = item._checkObject(o)
520	if err != nil {
521		return errors.Wrap(err, "vfs cache item: check object failed")
522	}
523
524	item.opens++
525	if item.opens != 1 {
526		return nil
527	}
528
529	err = item._createFile(osPath)
530	if err != nil {
531		item._remove("item.open failed on _createFile, remove cache data/metadata files")
532		item.fd = nil
533		item.opens--
534		return errors.Wrap(err, "vfs cache item: create cache file failed")
535	}
536	// Unlock the Item.mu so we can call some methods which take Cache.mu
537	item.mu.Unlock()
538
539	// Ensure this item is in the cache. It is possible a cache
540	// expiry has run and removed the item if it had no opens so
541	// we put it back here. If there was an item with opens
542	// already then return an error. This shouldn't happen because
543	// there should only be one vfs.File with a pointer to this
544	// item in at a time.
545	oldItem := item.c.put(item.name, item) // LOCKING in Cache method
546	if oldItem != nil {
547		oldItem.mu.Lock()
548		if oldItem.opens != 0 {
549			// Put the item back and return an error
550			item.c.put(item.name, oldItem) // LOCKING in Cache method
551			err = errors.Errorf("internal error: item %q already open in the cache", item.name)
552		}
553		oldItem.mu.Unlock()
554	}
555
556	// Relock the Item.mu for the return
557	item.mu.Lock()
558
559	// Create the downloaders
560	if item.o != nil {
561		item.downloaders = downloaders.New(item, item.c.opt, item.name, item.o)
562	}
563
564	return err
565}
566
567// Store stores the local cache file to the remote object, returning
568// the new remote object. objOld is the old object if known.
569//
570// Call with lock held
571func (item *Item) _store(ctx context.Context, storeFn StoreFn) (err error) {
572	// defer log.Trace(item.name, "item=%p", item)("err=%v", &err)
573
574	// Transfer the temp file to the remote
575	cacheObj, err := item.c.fcache.NewObject(ctx, item.name)
576	if err != nil && err != fs.ErrorObjectNotFound {
577		return errors.Wrap(err, "vfs cache: failed to find cache file")
578	}
579
580	// Object has disappeared if cacheObj == nil
581	if cacheObj != nil {
582		o, name := item.o, item.name
583		item.mu.Unlock()
584		o, err := operations.Copy(ctx, item.c.fremote, o, name, cacheObj)
585		item.mu.Lock()
586		if err != nil {
587			return errors.Wrap(err, "vfs cache: failed to transfer file from cache to remote")
588		}
589		item.o = o
590		item._updateFingerprint()
591	}
592
593	item.info.Dirty = false
594	err = item._save()
595	if err != nil {
596		fs.Errorf(item.name, "vfs cache: failed to write metadata file: %v", err)
597	}
598	if storeFn != nil && item.o != nil {
599		fs.Debugf(item.name, "vfs cache: writeback object to VFS layer")
600		// Write the object back to the VFS layer as last
601		// thing we do with mutex unlocked
602		o := item.o
603		item.mu.Unlock()
604		storeFn(o)
605		item.mu.Lock()
606	}
607	return nil
608}
609
610// Store stores the local cache file to the remote object, returning
611// the new remote object. objOld is the old object if known.
612func (item *Item) store(ctx context.Context, storeFn StoreFn) (err error) {
613	item.mu.Lock()
614	defer item.mu.Unlock()
615	return item._store(ctx, storeFn)
616}
617
618// Close the cache file
619func (item *Item) Close(storeFn StoreFn) (err error) {
620	// defer log.Trace(item.o, "Item.Close")("err=%v", &err)
621	item.preAccess()
622	defer item.postAccess()
623	var (
624		downloaders   *downloaders.Downloaders
625		syncWriteBack = item.c.opt.WriteBack <= 0
626	)
627	item.mu.Lock()
628	defer item.mu.Unlock()
629
630	item.info.ATime = time.Now()
631	item.opens--
632
633	if item.opens < 0 {
634		return os.ErrClosed
635	} else if item.opens > 0 {
636		return nil
637	}
638
639	// Update the size on close
640	_, _ = item._getSize()
641
642	// If the file is dirty ensure any segments not transferred
643	// are brought in first.
644	//
645	// FIXME It would be nice to do this asynchronously however it
646	// would require keeping the downloaders alive after the item
647	// has been closed
648	if item.info.Dirty && item.o != nil {
649		err = item._ensure(0, item.info.Size)
650		if err != nil {
651			return errors.Wrap(err, "vfs cache: failed to download missing parts of cache file")
652		}
653	}
654
655	// Accumulate and log errors
656	checkErr := func(e error) {
657		if e != nil {
658			fs.Errorf(item.o, "vfs cache: item close failed: %v", e)
659			if err == nil {
660				err = e
661			}
662		}
663	}
664
665	// Close the downloaders
666	if downloaders = item.downloaders; downloaders != nil {
667		item.downloaders = nil
668		// FIXME need to unlock to kill downloader - should we
669		// re-arrange locking so this isn't necessary?  maybe
670		// downloader should use the item mutex for locking? or put a
671		// finer lock on Rs?
672		//
673		// downloader.Write calls ensure which needs the lock
674		// close downloader with mutex unlocked
675		item.mu.Unlock()
676		checkErr(downloaders.Close(nil))
677		item.mu.Lock()
678	}
679
680	// close the file handle
681	if item.fd == nil {
682		checkErr(errors.New("vfs cache item: internal error: didn't Open file"))
683	} else {
684		checkErr(item.fd.Close())
685		item.fd = nil
686	}
687
688	// save the metadata once more since it may be dirty
689	// after the downloader
690	checkErr(item._save())
691
692	// if the item hasn't been changed but has been completed then
693	// set the modtime from the object otherwise set it from the info
694	if item._exists() {
695		if !item.info.Dirty && item.o != nil {
696			item._setModTime(item.o.ModTime(context.Background()))
697		} else {
698			item._setModTime(item.info.ModTime)
699		}
700	}
701
702	// upload the file to backing store if changed
703	if item.info.Dirty {
704		fs.Infof(item.name, "vfs cache: queuing for upload in %v", item.c.opt.WriteBack)
705		if syncWriteBack {
706			// do synchronous writeback
707			checkErr(item._store(context.Background(), storeFn))
708		} else {
709			// asynchronous writeback
710			item.c.writeback.SetID(&item.writeBackID)
711			id := item.writeBackID
712			item.mu.Unlock()
713			item.c.writeback.Add(id, item.name, item.modified, func(ctx context.Context) error {
714				return item.store(ctx, storeFn)
715			})
716			item.mu.Lock()
717		}
718	}
719
720	// mark as not modified now we have uploaded or queued for upload
721	item.modified = false
722
723	return err
724}
725
726// reload is called with valid items recovered from a cache reload.
727//
728// If they are dirty then it makes sure they get uploaded
729//
730// it is called before the cache has started so opens will be 0 and
731// metaDirty will be false.
732func (item *Item) reload(ctx context.Context) error {
733	item.mu.Lock()
734	dirty := item.info.Dirty
735	item.mu.Unlock()
736	if !dirty {
737		return nil
738	}
739	// see if the object still exists
740	obj, _ := item.c.fremote.NewObject(ctx, item.name)
741	// open the file with the object (or nil)
742	err := item.Open(obj)
743	if err != nil {
744		return err
745	}
746	// close the file to execute the writeback if needed
747	err = item.Close(nil)
748	if err != nil {
749		return err
750	}
751	// put the file into the directory listings
752	size, err := item._getSize()
753	if err != nil {
754		return errors.Wrap(err, "reload: failed to read size")
755	}
756	err = item.c.AddVirtual(item.name, size, false)
757	if err != nil {
758		return errors.Wrap(err, "reload: failed to add virtual dir entry")
759	}
760	return nil
761}
762
763// check the fingerprint of an object and update the item or delete
764// the cached file accordingly
765//
766// If we have local modifications then they take precedence
767// over a change in the remote
768//
769// It ensures the file is the correct size for the object
770//
771// call with lock held
772func (item *Item) _checkObject(o fs.Object) error {
773	if o == nil {
774		if item.info.Fingerprint != "" {
775			// no remote object && local object
776			// remove local object unless dirty
777			if !item.info.Dirty {
778				item._remove("stale (remote deleted)")
779			} else {
780				fs.Debugf(item.name, "vfs cache: remote object has gone but local object modified - keeping it")
781			}
782		} else {
783			// no remote object && no local object
784			// OK
785		}
786	} else {
787		remoteFingerprint := fs.Fingerprint(context.TODO(), o, false)
788		fs.Debugf(item.name, "vfs cache: checking remote fingerprint %q against cached fingerprint %q", remoteFingerprint, item.info.Fingerprint)
789		if item.info.Fingerprint != "" {
790			// remote object && local object
791			if remoteFingerprint != item.info.Fingerprint {
792				if !item.info.Dirty {
793					fs.Debugf(item.name, "vfs cache: removing cached entry as stale (remote fingerprint %q != cached fingerprint %q)", remoteFingerprint, item.info.Fingerprint)
794					item._remove("stale (remote is different)")
795				} else {
796					fs.Debugf(item.name, "vfs cache: remote object has changed but local object modified - keeping it (remote fingerprint %q != cached fingerprint %q)", remoteFingerprint, item.info.Fingerprint)
797				}
798			}
799		} else {
800			// remote object && no local object
801			// Set fingerprint
802			item.info.Fingerprint = remoteFingerprint
803		}
804		item.info.Size = o.Size()
805	}
806	item.o = o
807
808	err := item._truncateToCurrentSize()
809	if err != nil {
810		return errors.Wrap(err, "vfs cache item: open truncate failed")
811	}
812
813	return nil
814}
815
816// WrittenBack checks to see if the item has been written back or not
817func (item *Item) WrittenBack() bool {
818	item.mu.Lock()
819	defer item.mu.Unlock()
820	return item.info.Fingerprint != ""
821}
822
823// remove the cached file
824//
825// call with lock held
826func (item *Item) _removeFile(reason string) {
827	osPath := item.c.toOSPath(item.name) // No locking in Cache
828	err := os.Remove(osPath)
829	if err != nil {
830		if !os.IsNotExist(err) {
831			fs.Errorf(item.name, "vfs cache: failed to remove cache file as %s: %v", reason, err)
832		}
833	} else {
834		fs.Infof(item.name, "vfs cache: removed cache file as %s", reason)
835	}
836}
837
838// remove the metadata
839//
840// call with lock held
841func (item *Item) _removeMeta(reason string) {
842	osPathMeta := item.c.toOSPathMeta(item.name) // No locking in Cache
843	err := os.Remove(osPathMeta)
844	if err != nil {
845		if !os.IsNotExist(err) {
846			fs.Errorf(item.name, "vfs cache: failed to remove metadata from cache as %s: %v", reason, err)
847		}
848	} else {
849		fs.Debugf(item.name, "vfs cache: removed metadata from cache as %s", reason)
850	}
851}
852
853// remove the cached file and empty the metadata
854//
855// This returns true if the file was in the transfer queue so may not
856// have completely uploaded yet.
857//
858// call with lock held
859func (item *Item) _remove(reason string) (wasWriting bool) {
860	// Cancel writeback, if any
861	item.mu.Unlock()
862	wasWriting = item.c.writeback.Remove(item.writeBackID)
863	item.mu.Lock()
864	item.info.clean()
865	item._removeFile(reason)
866	item._removeMeta(reason)
867	return wasWriting
868}
869
870// remove the cached file and empty the metadata
871//
872// This returns true if the file was in the transfer queue so may not
873// have completely uploaded yet.
874func (item *Item) remove(reason string) (wasWriting bool) {
875	item.mu.Lock()
876	defer item.mu.Unlock()
877	return item._remove(reason)
878}
879
880// RemoveNotInUse is called to remove cache file that has not been accessed recently
881// It may also be called for removing empty cache files too when the quota is already reached.
882func (item *Item) RemoveNotInUse(maxAge time.Duration, emptyOnly bool) (removed bool, spaceFreed int64) {
883	item.mu.Lock()
884	defer item.mu.Unlock()
885
886	spaceFreed = 0
887	removed = false
888
889	if item.opens != 0 || item.info.Dirty {
890		return
891	}
892
893	removeIt := false
894	if maxAge == 0 {
895		removeIt = true // quota-driven removal
896	}
897	if maxAge != 0 {
898		cutoff := time.Now().Add(-maxAge)
899		// If not locked and access time too long ago - delete the file
900		accessTime := item.info.ATime
901		if accessTime.Sub(cutoff) <= 0 {
902			removeIt = true
903		}
904	}
905	if removeIt {
906		spaceUsed := item.info.Rs.Size()
907		if !emptyOnly || spaceUsed == 0 {
908			spaceFreed = spaceUsed
909			removed = true
910			if item._remove("Removing old cache file not in use") {
911				fs.Errorf(item.name, "item removed when it was writing/uploaded")
912			}
913		}
914	}
915	return
916}
917
918// Reset is called by the cache purge functions only to reset (empty the contents) cache files that
919// are not dirty.  It is used when cache space runs out and we see some ENOSPC error.
920func (item *Item) Reset() (rr ResetResult, spaceFreed int64, err error) {
921	item.mu.Lock()
922	defer item.mu.Unlock()
923
924	// The item is not being used now.  Just remove it instead of resetting it.
925	if item.opens == 0 && !item.info.Dirty {
926		spaceFreed = item.info.Rs.Size()
927		if item._remove("Removing old cache file not in use") {
928			fs.Errorf(item.name, "item removed when it was writing/uploaded")
929		}
930		return RemovedNotInUse, spaceFreed, nil
931	}
932
933	// do not reset dirty file
934	if item.info.Dirty {
935		return SkippedDirty, 0, nil
936	}
937
938	/* A wait on pendingAccessCnt to become 0 can lead to deadlock when an item.Open bumps
939	   up the pendingAccesses count, calls item.open, which calls cache.put. The cache.put
940	   operation needs the cache mutex, which is held here.  We skip this file now. The
941	   caller (the cache cleaner thread) may retry resetting this item if the cache size does
942	   not reduce below quota. */
943	if item.pendingAccesses > 0 {
944		return SkippedPendingAccess, 0, nil
945	}
946
947	/* Do not need to reset an empty cache file unless it was being reset and the reset failed.
948	   Some thread(s) may be waiting on the reset's succesful completion in that case. */
949	if item.info.Rs.Size() == 0 && item.beingReset == false {
950		return SkippedEmpty, 0, nil
951	}
952
953	item.beingReset = true
954
955	/* Error handling from this point on (setting item.fd and item.beingReset):
956	   Since Reset is called by the cache cleaner thread, there is no direct way to return
957	   the error to the io threads.  Set item.fd to nil upon internal errors, so that the
958	   io threads will return internal errors seeing a nil fd. In the case when the error
959	   is ENOSPC, keep the item in isBeingReset state and that will keep the item.ReadAt
960	   waiting at its beginning. The cache purge loop will try to redo the reset after cache
961	   space is made available again. This recovery design should allow most io threads to
962	   eventually go through, unless large files are written/overwritten concurrently and
963	   the total size of these files exceed the cache storage limit. */
964
965	// Close the downloaders
966	// Accumulate and log errors
967	checkErr := func(e error) {
968		if e != nil {
969			fs.Errorf(item.o, "vfs cache: item reset failed: %v", e)
970			if err == nil {
971				err = e
972			}
973		}
974	}
975
976	if downloaders := item.downloaders; downloaders != nil {
977		item.downloaders = nil
978		// FIXME need to unlock to kill downloader - should we
979		// re-arrange locking so this isn't necessary?  maybe
980		// downloader should use the item mutex for locking? or put a
981		// finer lock on Rs?
982		//
983		// downloader.Write calls ensure which needs the lock
984		// close downloader with mutex unlocked
985		item.mu.Unlock()
986		checkErr(downloaders.Close(nil))
987		item.mu.Lock()
988	}
989
990	// close the file handle
991	// fd can be nil if we tried Reset and failed before because of ENOSPC during reset
992	if item.fd != nil {
993		checkErr(item.fd.Close())
994		if err != nil {
995			// Could not close the cache file
996			item.beingReset = false
997			item.cond.Broadcast()
998			return ResetFailed, 0, err
999		}
1000		item.fd = nil
1001	}
1002
1003	spaceFreed = item.info.Rs.Size()
1004
1005	// This should not be possible.  We get here only if cache data is not dirty.
1006	if item._remove("cache out of space, item is clean") {
1007		fs.Errorf(item.o, "vfs cache item removed when it was writing/uploaded")
1008	}
1009
1010	// can we have an item with no dirty data (so that we can get here) and nil item.o at the same time?
1011	fso := item.o
1012	checkErr(item._checkObject(fso))
1013	if err != nil {
1014		item.beingReset = false
1015		item.cond.Broadcast()
1016		return ResetFailed, spaceFreed, err
1017	}
1018
1019	osPath := item.c.toOSPath(item.name)
1020	checkErr(item._createFile(osPath))
1021	if err != nil {
1022		item._remove("cache reset failed on _createFile, removed cache data file")
1023		item.fd = nil // This allows a new Reset redo to have a clean state to deal with
1024		if !fserrors.IsErrNoSpace(err) {
1025			item.beingReset = false
1026			item.cond.Broadcast()
1027		}
1028		return ResetFailed, spaceFreed, err
1029	}
1030
1031	// Create the downloaders
1032	if item.o != nil {
1033		item.downloaders = downloaders.New(item, item.c.opt, item.name, item.o)
1034	}
1035
1036	/* The item will stay in the beingReset state if we get an error that prevents us from
1037	reaching this point.  The cache purge loop will redo the failed Reset. */
1038	item.beingReset = false
1039	item.cond.Broadcast()
1040
1041	return ResetComplete, spaceFreed, err
1042}
1043
1044// ProtectCache either waits for an ongoing cache reset to finish or increases pendingReads
1045// to protect against cache reset on this item while the thread potentially uses the cache file
1046// Cache cleaner waits until pendingReads is zero before resetting cache.
1047func (item *Item) preAccess() {
1048	item.mu.Lock()
1049	defer item.mu.Unlock()
1050
1051	if item.beingReset {
1052		for {
1053			item.cond.Wait()
1054			if !item.beingReset {
1055				break
1056			}
1057		}
1058	}
1059	item.pendingAccesses++
1060}
1061
1062// postAccess reduces the pendingReads count enabling cache reset upon ENOSPC
1063func (item *Item) postAccess() {
1064	item.mu.Lock()
1065	defer item.mu.Unlock()
1066
1067	item.pendingAccesses--
1068	item.cond.Broadcast()
1069}
1070
1071// _present returns true if the whole file has been downloaded
1072//
1073// call with the lock held
1074func (item *Item) _present() bool {
1075	return item.info.Rs.Present(ranges.Range{Pos: 0, Size: item.info.Size})
1076}
1077
1078// present returns true if the whole file has been downloaded
1079func (item *Item) present() bool {
1080	item.mu.Lock()
1081	defer item.mu.Unlock()
1082	return item._present()
1083}
1084
1085// HasRange returns true if the current ranges entirely include range
1086func (item *Item) HasRange(r ranges.Range) bool {
1087	item.mu.Lock()
1088	defer item.mu.Unlock()
1089	return item.info.Rs.Present(r)
1090}
1091
1092// FindMissing adjusts r returning a new ranges.Range which only
1093// contains the range which needs to be downloaded. This could be
1094// empty - check with IsEmpty. It also adjust this to make sure it is
1095// not larger than the file.
1096func (item *Item) FindMissing(r ranges.Range) (outr ranges.Range) {
1097	item.mu.Lock()
1098	defer item.mu.Unlock()
1099	outr = item.info.Rs.FindMissing(r)
1100	// Clip returned block to size of file
1101	outr.Clip(item.info.Size)
1102	return outr
1103}
1104
1105// ensure the range from offset, size is present in the backing file
1106//
1107// call with the item lock held
1108func (item *Item) _ensure(offset, size int64) (err error) {
1109	// defer log.Trace(item.name, "offset=%d, size=%d", offset, size)("err=%v", &err)
1110	if offset+size > item.info.Size {
1111		size = item.info.Size - offset
1112	}
1113	r := ranges.Range{Pos: offset, Size: size}
1114	present := item.info.Rs.Present(r)
1115	/* This statement simulates a cache space error for test purpose */
1116	/* if present != true && item.info.Rs.Size() > 32*1024*1024 {
1117		return errors.New("no space left on device")
1118	} */
1119	fs.Debugf(nil, "vfs cache: looking for range=%+v in %+v - present %v", r, item.info.Rs, present)
1120	item.mu.Unlock()
1121	defer item.mu.Lock()
1122	if present {
1123		// This is a file we are writing so no downloaders needed
1124		if item.downloaders == nil {
1125			return nil
1126		}
1127		// Otherwise start the downloader for the future if required
1128		return item.downloaders.EnsureDownloader(r)
1129	}
1130	if item.downloaders == nil {
1131		return errors.New("internal error: downloaders is nil")
1132	}
1133	return item.downloaders.Download(r)
1134}
1135
1136// _written marks the (offset, size) as present in the backing file
1137//
1138// This is called by the downloader downloading file segments and the
1139// vfs layer writing to the file.
1140//
1141// This doesn't mark the item as Dirty - that the the responsibility
1142// of the caller as we don't know here whether we are adding reads or
1143// writes to the cache file.
1144//
1145// call with lock held
1146func (item *Item) _written(offset, size int64) {
1147	// defer log.Trace(item.name, "offset=%d, size=%d", offset, size)("")
1148	item.info.Rs.Insert(ranges.Range{Pos: offset, Size: size})
1149}
1150
1151// update the fingerprint of the object if any
1152//
1153// call with lock held
1154func (item *Item) _updateFingerprint() {
1155	if item.o == nil {
1156		return
1157	}
1158	oldFingerprint := item.info.Fingerprint
1159	item.info.Fingerprint = fs.Fingerprint(context.TODO(), item.o, false)
1160	if oldFingerprint != item.info.Fingerprint {
1161		fs.Debugf(item.o, "vfs cache: fingerprint now %q", item.info.Fingerprint)
1162	}
1163}
1164
1165// setModTime of the cache file
1166//
1167// call with lock held
1168func (item *Item) _setModTime(modTime time.Time) {
1169	fs.Debugf(item.name, "vfs cache: setting modification time to %v", modTime)
1170	osPath := item.c.toOSPath(item.name) // No locking in Cache
1171	err := os.Chtimes(osPath, modTime, modTime)
1172	if err != nil {
1173		fs.Errorf(item.name, "vfs cache: failed to set modification time of cached file: %v", err)
1174	}
1175}
1176
1177// setModTime of the cache file and in the Item
1178func (item *Item) setModTime(modTime time.Time) {
1179	// defer log.Trace(item.name, "modTime=%v", modTime)("")
1180	item.mu.Lock()
1181	item._updateFingerprint()
1182	item._setModTime(modTime)
1183	item.info.ModTime = modTime
1184	err := item._save()
1185	if err != nil {
1186		fs.Errorf(item.name, "vfs cache: setModTime: failed to save item info: %v", err)
1187	}
1188	item.mu.Unlock()
1189}
1190
1191// GetModTime of the cache file
1192func (item *Item) GetModTime() (modTime time.Time, err error) {
1193	// defer log.Trace(item.name, "modTime=%v", modTime)("")
1194	item.mu.Lock()
1195	defer item.mu.Unlock()
1196	fi, err := item._stat()
1197	if err == nil {
1198		modTime = fi.ModTime()
1199	}
1200	return modTime, nil
1201}
1202
1203// ReadAt bytes from the file at off
1204func (item *Item) ReadAt(b []byte, off int64) (n int, err error) {
1205	n = 0
1206	var expBackOff int
1207	for retries := 0; retries < fs.GetConfig(context.TODO()).LowLevelRetries; retries++ {
1208		item.preAccess()
1209		n, err = item.readAt(b, off)
1210		item.postAccess()
1211		if err == nil || err == io.EOF {
1212			break
1213		}
1214		fs.Errorf(item.name, "vfs cache: failed to _ensure cache %v", err)
1215		if !fserrors.IsErrNoSpace(err) && err.Error() != "no space left on device" {
1216			fs.Debugf(item.name, "vfs cache: failed to _ensure cache %v is not out of space", err)
1217			break
1218		}
1219		item.c.KickCleaner()
1220		expBackOff = 2 << uint(retries)
1221		time.Sleep(time.Duration(expBackOff) * time.Millisecond) // Exponential back-off the retries
1222	}
1223
1224	if fserrors.IsErrNoSpace(err) {
1225		fs.Errorf(item.name, "vfs cache: failed to _ensure cache after retries %v", err)
1226	}
1227
1228	return n, err
1229}
1230
1231// ReadAt bytes from the file at off
1232func (item *Item) readAt(b []byte, off int64) (n int, err error) {
1233	item.mu.Lock()
1234	if item.fd == nil {
1235		item.mu.Unlock()
1236		return 0, errors.New("vfs cache item ReadAt: internal error: didn't Open file")
1237	}
1238	if off < 0 {
1239		item.mu.Unlock()
1240		return 0, io.EOF
1241	}
1242	defer item.mu.Unlock()
1243
1244	err = item._ensure(off, int64(len(b)))
1245	if err != nil {
1246		return 0, err
1247	}
1248
1249	item.info.ATime = time.Now()
1250	// Do the reading with Item.mu unlocked and cache protected by preAccess
1251	n, err = item.fd.ReadAt(b, off)
1252	return n, err
1253}
1254
1255// WriteAt bytes to the file at off
1256func (item *Item) WriteAt(b []byte, off int64) (n int, err error) {
1257	item.preAccess()
1258	defer item.postAccess()
1259	item.mu.Lock()
1260	if item.fd == nil {
1261		item.mu.Unlock()
1262		return 0, errors.New("vfs cache item WriteAt: internal error: didn't Open file")
1263	}
1264	item.mu.Unlock()
1265	// Do the writing with Item.mu unlocked
1266	n, err = item.fd.WriteAt(b, off)
1267	if err == nil && n != len(b) {
1268		err = errors.Errorf("short write: tried to write %d but only %d written", len(b), n)
1269	}
1270	item.mu.Lock()
1271	item._written(off, int64(n))
1272	if n > 0 {
1273		item._dirty()
1274	}
1275	end := off + int64(n)
1276	// Writing off the end of the file so need to make some
1277	// zeroes.  we do this by showing that we have written to the
1278	// new parts of the file.
1279	if off > item.info.Size {
1280		item._written(item.info.Size, off-item.info.Size)
1281		item._dirty()
1282	}
1283	// Update size
1284	if end > item.info.Size {
1285		item.info.Size = end
1286	}
1287	item.mu.Unlock()
1288	return n, err
1289}
1290
1291// WriteAtNoOverwrite writes b to the file, but will not overwrite
1292// already present ranges.
1293//
1294// This is used by the downloader to write bytes to the file
1295//
1296// It returns n the total bytes processed and skipped the number of
1297// bytes which were processed but not actually written to the file.
1298func (item *Item) WriteAtNoOverwrite(b []byte, off int64) (n int, skipped int, err error) {
1299	item.mu.Lock()
1300
1301	var (
1302		// Range we wish to write
1303		r = ranges.Range{Pos: off, Size: int64(len(b))}
1304		// Ranges that we need to write
1305		foundRanges = item.info.Rs.FindAll(r)
1306		// Length of each write
1307		nn int
1308	)
1309
1310	// Write the range out ignoring already written chunks
1311	// fs.Debugf(item.name, "Ranges = %v", item.info.Rs)
1312	for i := range foundRanges {
1313		foundRange := &foundRanges[i]
1314		// fs.Debugf(item.name, "foundRange[%d] = %v", i, foundRange)
1315		if foundRange.R.Pos != off {
1316			err = errors.New("internal error: offset of range is wrong")
1317			break
1318		}
1319		size := int(foundRange.R.Size)
1320		if foundRange.Present {
1321			// if present want to skip this range
1322			// fs.Debugf(item.name, "skip chunk offset=%d size=%d", off, size)
1323			nn = size
1324			skipped += size
1325		} else {
1326			// if range not present then we want to write it
1327			// fs.Debugf(item.name, "write chunk offset=%d size=%d", off, size)
1328			nn, err = item.fd.WriteAt(b[:size], off)
1329			if err == nil && nn != size {
1330				err = errors.Errorf("downloader: short write: tried to write %d but only %d written", size, nn)
1331			}
1332			item._written(off, int64(nn))
1333		}
1334		off += int64(nn)
1335		b = b[nn:]
1336		n += nn
1337		if err != nil {
1338			break
1339		}
1340	}
1341	item.mu.Unlock()
1342	return n, skipped, err
1343}
1344
1345// Sync commits the current contents of the file to stable storage. Typically,
1346// this means flushing the file system's in-memory copy of recently written
1347// data to disk.
1348func (item *Item) Sync() (err error) {
1349	item.preAccess()
1350	defer item.postAccess()
1351	item.mu.Lock()
1352	defer item.mu.Unlock()
1353	if item.fd == nil {
1354		return errors.New("vfs cache item sync: internal error: didn't Open file")
1355	}
1356	// sync the file and the metadata to disk
1357	err = item.fd.Sync()
1358	if err != nil {
1359		return errors.Wrap(err, "vfs cache item sync: failed to sync file")
1360	}
1361	err = item._save()
1362	if err != nil {
1363		return errors.Wrap(err, "vfs cache item sync: failed to sync metadata")
1364	}
1365	return nil
1366}
1367
1368// rename the item
1369func (item *Item) rename(name string, newName string, newObj fs.Object) (err error) {
1370	item.preAccess()
1371	defer item.postAccess()
1372	item.mu.Lock()
1373
1374	// stop downloader
1375	downloaders := item.downloaders
1376	item.downloaders = nil
1377
1378	// id for writeback cancel
1379	id := item.writeBackID
1380
1381	// Set internal state
1382	item.name = newName
1383	item.o = newObj
1384
1385	// Rename cache file if it exists
1386	err = rename(item.c.toOSPath(name), item.c.toOSPath(newName)) // No locking in Cache
1387
1388	// Rename meta file if it exists
1389	err2 := rename(item.c.toOSPathMeta(name), item.c.toOSPathMeta(newName)) // No locking in Cache
1390	if err2 != nil {
1391		err = err2
1392	}
1393
1394	item.mu.Unlock()
1395
1396	// close downloader and cancel writebacks with mutex unlocked
1397	if downloaders != nil {
1398		_ = downloaders.Close(nil)
1399	}
1400	item.c.writeback.Rename(id, newName)
1401	return err
1402}
1403