1package vfs
2
3import (
4	"context"
5	"io"
6	"os"
7	"sync"
8	"time"
9
10	"github.com/pkg/errors"
11	"github.com/rclone/rclone/fs"
12	"github.com/rclone/rclone/fs/accounting"
13	"github.com/rclone/rclone/fs/chunkedreader"
14	"github.com/rclone/rclone/fs/hash"
15)
16
17// ReadFileHandle is an open for read file handle on a File
18type ReadFileHandle struct {
19	baseHandle
20	done        func(ctx context.Context, err error)
21	mu          sync.Mutex
22	cond        *sync.Cond // cond lock for out of sequence reads
23	closed      bool       // set if handle has been closed
24	r           *accounting.Account
25	readCalled  bool  // set if read has been called
26	size        int64 // size of the object (0 for unknown length)
27	offset      int64 // offset of read of o
28	roffset     int64 // offset of Read() calls
29	noSeek      bool
30	sizeUnknown bool // set if size of source is not known
31	file        *File
32	hash        *hash.MultiHasher
33	opened      bool
34	remote      string
35}
36
37// Check interfaces
38var (
39	_ io.Reader   = (*ReadFileHandle)(nil)
40	_ io.ReaderAt = (*ReadFileHandle)(nil)
41	_ io.Seeker   = (*ReadFileHandle)(nil)
42	_ io.Closer   = (*ReadFileHandle)(nil)
43)
44
45func newReadFileHandle(f *File) (*ReadFileHandle, error) {
46	var mhash *hash.MultiHasher
47	var err error
48	o := f.getObject()
49	if !f.VFS().Opt.NoChecksum {
50		hashes := hash.NewHashSet(o.Fs().Hashes().GetOne()) // just pick one hash
51		mhash, err = hash.NewMultiHasherTypes(hashes)
52		if err != nil {
53			fs.Errorf(o.Fs(), "newReadFileHandle hash error: %v", err)
54		}
55	}
56
57	fh := &ReadFileHandle{
58		remote:      o.Remote(),
59		noSeek:      f.VFS().Opt.NoSeek,
60		file:        f,
61		hash:        mhash,
62		size:        nonNegative(o.Size()),
63		sizeUnknown: o.Size() < 0,
64	}
65	fh.cond = sync.NewCond(&fh.mu)
66	return fh, nil
67}
68
69// openPending opens the file if there is a pending open
70// call with the lock held
71func (fh *ReadFileHandle) openPending() (err error) {
72	if fh.opened {
73		return nil
74	}
75	o := fh.file.getObject()
76	r, err := chunkedreader.New(context.TODO(), o, int64(fh.file.VFS().Opt.ChunkSize), int64(fh.file.VFS().Opt.ChunkSizeLimit)).Open()
77	if err != nil {
78		return err
79	}
80	tr := accounting.GlobalStats().NewTransfer(o)
81	fh.done = tr.Done
82	fh.r = tr.Account(context.TODO(), r).WithBuffer() // account the transfer
83	fh.opened = true
84
85	return nil
86}
87
88// String converts it to printable
89func (fh *ReadFileHandle) String() string {
90	if fh == nil {
91		return "<nil *ReadFileHandle>"
92	}
93	fh.mu.Lock()
94	defer fh.mu.Unlock()
95	if fh.file == nil {
96		return "<nil *ReadFileHandle.file>"
97	}
98	return fh.file.String() + " (r)"
99}
100
101// Node returns the Node associated with this - satisfies Noder interface
102func (fh *ReadFileHandle) Node() Node {
103	fh.mu.Lock()
104	defer fh.mu.Unlock()
105	return fh.file
106}
107
108// seek to a new offset
109//
110// if reopen is true, then we won't attempt to use an io.Seeker interface
111//
112// Must be called with fh.mu held
113func (fh *ReadFileHandle) seek(offset int64, reopen bool) (err error) {
114	if fh.noSeek {
115		return ESPIPE
116	}
117	fh.hash = nil
118	if !reopen {
119		ar := fh.r.GetAsyncReader()
120		// try to fulfill the seek with buffer discard
121		if ar != nil && ar.SkipBytes(int(offset-fh.offset)) {
122			fh.offset = offset
123			return nil
124		}
125	}
126	fh.r.StopBuffering() // stop the background reading first
127	oldReader := fh.r.GetReader()
128	r, ok := oldReader.(*chunkedreader.ChunkedReader)
129	if !ok {
130		fs.Logf(fh.remote, "ReadFileHandle.Read expected reader to be a ChunkedReader, got %T", oldReader)
131		reopen = true
132	}
133	if !reopen {
134		fs.Debugf(fh.remote, "ReadFileHandle.seek from %d to %d (fs.RangeSeeker)", fh.offset, offset)
135		_, err = r.RangeSeek(context.TODO(), offset, io.SeekStart, -1)
136		if err != nil {
137			fs.Debugf(fh.remote, "ReadFileHandle.Read fs.RangeSeeker failed: %v", err)
138			return err
139		}
140	} else {
141		fs.Debugf(fh.remote, "ReadFileHandle.seek from %d to %d", fh.offset, offset)
142		// close old one
143		err = oldReader.Close()
144		if err != nil {
145			fs.Debugf(fh.remote, "ReadFileHandle.Read seek close old failed: %v", err)
146		}
147		// re-open with a seek
148		o := fh.file.getObject()
149		r = chunkedreader.New(context.TODO(), o, int64(fh.file.VFS().Opt.ChunkSize), int64(fh.file.VFS().Opt.ChunkSizeLimit))
150		_, err := r.Seek(offset, 0)
151		if err != nil {
152			fs.Debugf(fh.remote, "ReadFileHandle.Read seek failed: %v", err)
153			return err
154		}
155		r, err = r.Open()
156		if err != nil {
157			fs.Debugf(fh.remote, "ReadFileHandle.Read seek failed: %v", err)
158			return err
159		}
160	}
161	fh.r.UpdateReader(context.TODO(), r)
162	fh.offset = offset
163	return nil
164}
165
166// Seek the file - returns ESPIPE if seeking isn't possible
167func (fh *ReadFileHandle) Seek(offset int64, whence int) (n int64, err error) {
168	fh.mu.Lock()
169	defer fh.mu.Unlock()
170	if fh.noSeek {
171		return 0, ESPIPE
172	}
173	size := fh.size
174	switch whence {
175	case io.SeekStart:
176		fh.roffset = 0
177	case io.SeekEnd:
178		fh.roffset = size
179	}
180	fh.roffset += offset
181	// we don't check the offset - the next Read will
182	return fh.roffset, nil
183}
184
185// ReadAt reads len(p) bytes into p starting at offset off in the
186// underlying input source. It returns the number of bytes read (0 <=
187// n <= len(p)) and any error encountered.
188//
189// When ReadAt returns n < len(p), it returns a non-nil error
190// explaining why more bytes were not returned. In this respect,
191// ReadAt is stricter than Read.
192//
193// Even if ReadAt returns n < len(p), it may use all of p as scratch
194// space during the call. If some data is available but not len(p)
195// bytes, ReadAt blocks until either all the data is available or an
196// error occurs. In this respect ReadAt is different from Read.
197//
198// If the n = len(p) bytes returned by ReadAt are at the end of the
199// input source, ReadAt may return either err == EOF or err == nil.
200//
201// If ReadAt is reading from an input source with a seek offset,
202// ReadAt should not affect nor be affected by the underlying seek
203// offset.
204//
205// Clients of ReadAt can execute parallel ReadAt calls on the same
206// input source.
207//
208// Implementations must not retain p.
209func (fh *ReadFileHandle) ReadAt(p []byte, off int64) (n int, err error) {
210	fh.mu.Lock()
211	defer fh.mu.Unlock()
212	return fh.readAt(p, off)
213}
214
215// This waits for *poff to equal off or aborts after the timeout.
216//
217// Waits here potentially affect all seeks so need to keep them short
218//
219// Call with fh.mu Locked
220func waitSequential(what string, remote string, cond *sync.Cond, maxWait time.Duration, poff *int64, off int64) {
221	var (
222		timeout = time.NewTimer(maxWait)
223		done    = make(chan struct{})
224		abort   = false
225	)
226	go func() {
227		select {
228		case <-timeout.C:
229			// take the lock to make sure that cond.Wait() is called before
230			// cond.Broadcast. NB cond.L == mu
231			cond.L.Lock()
232			// set abort flag and give all the waiting goroutines a kick on timeout
233			abort = true
234			fs.Debugf(remote, "aborting in-sequence %s wait, off=%d", what, off)
235			cond.Broadcast()
236			cond.L.Unlock()
237		case <-done:
238		}
239	}()
240	for *poff != off && !abort {
241		fs.Debugf(remote, "waiting for in-sequence %s to %d for %v", what, off, maxWait)
242		cond.Wait()
243	}
244	// tidy up end timer
245	close(done)
246	timeout.Stop()
247	if *poff != off {
248		fs.Debugf(remote, "failed to wait for in-sequence %s to %d", what, off)
249	}
250}
251
252// Implementation of ReadAt - call with lock held
253func (fh *ReadFileHandle) readAt(p []byte, off int64) (n int, err error) {
254	// defer log.Trace(fh.remote, "p[%d], off=%d", len(p), off)("n=%d, err=%v", &n, &err)
255	err = fh.openPending() // FIXME pending open could be more efficient in the presence of seek (and retries)
256	if err != nil {
257		return 0, err
258	}
259	// fs.Debugf(fh.remote, "ReadFileHandle.Read size %d offset %d", reqSize, off)
260	if fh.closed {
261		fs.Errorf(fh.remote, "ReadFileHandle.Read error: %v", EBADF)
262		return 0, ECLOSED
263	}
264	maxBuf := 1024 * 1024
265	if len(p) < maxBuf {
266		maxBuf = len(p)
267	}
268	if gap := off - fh.offset; gap > 0 && gap < int64(8*maxBuf) {
269		waitSequential("read", fh.remote, fh.cond, fh.file.VFS().Opt.ReadWait, &fh.offset, off)
270	}
271	doSeek := off != fh.offset
272	if doSeek && fh.noSeek {
273		return 0, ESPIPE
274	}
275	var newOffset int64
276	retries := 0
277	reqSize := len(p)
278	doReopen := false
279	lowLevelRetries := fs.GetConfig(context.TODO()).LowLevelRetries
280	for {
281		if doSeek {
282			// Are we attempting to seek beyond the end of the
283			// file - if so just return EOF leaving the underlying
284			// file in an unchanged state.
285			if off >= fh.size {
286				fs.Debugf(fh.remote, "ReadFileHandle.Read attempt to read beyond end of file: %d > %d", off, fh.size)
287				return 0, io.EOF
288			}
289			// Otherwise do the seek
290			err = fh.seek(off, doReopen)
291		} else {
292			err = nil
293		}
294		if err == nil {
295			if reqSize > 0 {
296				fh.readCalled = true
297			}
298			n, err = io.ReadFull(fh.r, p)
299			newOffset = fh.offset + int64(n)
300			// if err == nil && rand.Intn(10) == 0 {
301			// 	err = errors.New("random error")
302			// }
303			if err == nil {
304				break
305			} else if (err == io.ErrUnexpectedEOF || err == io.EOF) && (newOffset == fh.size || fh.sizeUnknown) {
306				if fh.sizeUnknown {
307					// size is now known since we have read to the end
308					fh.sizeUnknown = false
309					fh.size = newOffset
310				}
311				// Have read to end of file - reset error
312				err = nil
313				break
314			}
315		}
316		if retries >= lowLevelRetries {
317			break
318		}
319		retries++
320		fs.Errorf(fh.remote, "ReadFileHandle.Read error: low level retry %d/%d: %v", retries, lowLevelRetries, err)
321		doSeek = true
322		doReopen = true
323	}
324	if err != nil {
325		fs.Errorf(fh.remote, "ReadFileHandle.Read error: %v", err)
326	} else {
327		fh.offset = newOffset
328		// fs.Debugf(fh.remote, "ReadFileHandle.Read OK")
329
330		if fh.hash != nil {
331			_, err = fh.hash.Write(p[:n])
332			if err != nil {
333				fs.Errorf(fh.remote, "ReadFileHandle.Read HashError: %v", err)
334				return 0, err
335			}
336		}
337
338		// If we have no error and we didn't fill the buffer, must be EOF
339		if n != len(p) {
340			err = io.EOF
341		}
342	}
343	fh.cond.Broadcast() // wake everyone up waiting for an in-sequence read
344	return n, err
345}
346
347func (fh *ReadFileHandle) checkHash() error {
348	if fh.hash == nil || !fh.readCalled || fh.offset < fh.size {
349		return nil
350	}
351
352	o := fh.file.getObject()
353	for hashType, dstSum := range fh.hash.Sums() {
354		srcSum, err := o.Hash(context.TODO(), hashType)
355		if err != nil {
356			if os.IsNotExist(errors.Cause(err)) {
357				// if it was file not found then at
358				// this point we don't care any more
359				continue
360			}
361			return err
362		}
363		if !hash.Equals(dstSum, srcSum) {
364			return errors.Errorf("corrupted on transfer: %v hash differ %q vs %q", hashType, dstSum, srcSum)
365		}
366	}
367
368	return nil
369}
370
371// Read reads up to len(p) bytes into p. It returns the number of bytes read (0
372// <= n <= len(p)) and any error encountered. Even if Read returns n < len(p),
373// it may use all of p as scratch space during the call. If some data is
374// available but not len(p) bytes, Read conventionally returns what is
375// available instead of waiting for more.
376//
377// When Read encounters an error or end-of-file condition after successfully
378// reading n > 0 bytes, it returns the number of bytes read. It may return the
379// (non-nil) error from the same call or return the error (and n == 0) from a
380// subsequent call. An instance of this general case is that a Reader returning
381// a non-zero number of bytes at the end of the input stream may return either
382// err == EOF or err == nil. The next Read should return 0, EOF.
383//
384// Callers should always process the n > 0 bytes returned before considering
385// the error err. Doing so correctly handles I/O errors that happen after
386// reading some bytes and also both of the allowed EOF behaviors.
387//
388// Implementations of Read are discouraged from returning a zero byte count
389// with a nil error, except when len(p) == 0. Callers should treat a return of
390// 0 and nil as indicating that nothing happened; in particular it does not
391// indicate EOF.
392//
393// Implementations must not retain p.
394func (fh *ReadFileHandle) Read(p []byte) (n int, err error) {
395	fh.mu.Lock()
396	defer fh.mu.Unlock()
397	if fh.roffset >= fh.size && !fh.sizeUnknown {
398		return 0, io.EOF
399	}
400	n, err = fh.readAt(p, fh.roffset)
401	fh.roffset += int64(n)
402	return n, err
403}
404
405// close the file handle returning EBADF if it has been
406// closed already.
407//
408// Must be called with fh.mu held
409func (fh *ReadFileHandle) close() error {
410	if fh.closed {
411		return ECLOSED
412	}
413	fh.closed = true
414
415	if fh.opened {
416		var err error
417		defer func() {
418			fh.done(context.TODO(), err)
419		}()
420		// Close first so that we have hashes
421		err = fh.r.Close()
422		if err != nil {
423			return err
424		}
425		// Now check the hash
426		err = fh.checkHash()
427		if err != nil {
428			return err
429		}
430	}
431	return nil
432}
433
434// Close closes the file
435func (fh *ReadFileHandle) Close() error {
436	fh.mu.Lock()
437	defer fh.mu.Unlock()
438	return fh.close()
439}
440
441// Flush is called each time the file or directory is closed.
442// Because there can be multiple file descriptors referring to a
443// single opened file, Flush can be called multiple times.
444func (fh *ReadFileHandle) Flush() error {
445	fh.mu.Lock()
446	defer fh.mu.Unlock()
447	if !fh.opened {
448		return nil
449	}
450	// fs.Debugf(fh.remote, "ReadFileHandle.Flush")
451
452	if err := fh.checkHash(); err != nil {
453		fs.Errorf(fh.remote, "ReadFileHandle.Flush error: %v", err)
454		return err
455	}
456
457	// fs.Debugf(fh.remote, "ReadFileHandle.Flush OK")
458	return nil
459}
460
461// Release is called when we are finished with the file handle
462//
463// It isn't called directly from userspace so the error is ignored by
464// the kernel
465func (fh *ReadFileHandle) Release() error {
466	fh.mu.Lock()
467	defer fh.mu.Unlock()
468	if !fh.opened {
469		return nil
470	}
471	if fh.closed {
472		fs.Debugf(fh.remote, "ReadFileHandle.Release nothing to do")
473		return nil
474	}
475	fs.Debugf(fh.remote, "ReadFileHandle.Release closing")
476	err := fh.close()
477	if err != nil {
478		fs.Errorf(fh.remote, "ReadFileHandle.Release error: %v", err)
479	} else {
480		// fs.Debugf(fh.remote, "ReadFileHandle.Release OK")
481	}
482	return err
483}
484
485// Size returns the size of the underlying file
486func (fh *ReadFileHandle) Size() int64 {
487	fh.mu.Lock()
488	defer fh.mu.Unlock()
489	return fh.size
490}
491
492// Stat returns info about the file
493func (fh *ReadFileHandle) Stat() (os.FileInfo, error) {
494	fh.mu.Lock()
495	defer fh.mu.Unlock()
496	return fh.file, nil
497}
498