1package downloaders
2
3import (
4	"context"
5	"sync"
6	"time"
7
8	"github.com/pkg/errors"
9	"github.com/rclone/rclone/fs"
10	"github.com/rclone/rclone/fs/accounting"
11	"github.com/rclone/rclone/fs/asyncreader"
12	"github.com/rclone/rclone/fs/chunkedreader"
13	"github.com/rclone/rclone/fs/fserrors"
14	"github.com/rclone/rclone/lib/ranges"
15	"github.com/rclone/rclone/vfs/vfscommon"
16)
17
18// FIXME implement max downloaders
19
20const (
21	// max time a downloader can be idle before closing itself
22	maxDownloaderIdleTime = 5 * time.Second
23	// max number of bytes a reader should skip over before closing it
24	maxSkipBytes = 1024 * 1024
25	// time between background kicks of waiters to pick up errors
26	backgroundKickerInterval = 5 * time.Second
27	// maximum number of errors before declaring dead
28	maxErrorCount = 10
29	// If a downloader is within this range or --buffer-size
30	// whichever is the larger, we will reuse the downloader
31	minWindow = 1024 * 1024
32)
33
34// Item is the interface that an item to download must obey
35type Item interface {
36	// FindMissing adjusts r returning a new ranges.Range which only
37	// contains the range which needs to be downloaded. This could be
38	// empty - check with IsEmpty. It also adjust this to make sure it is
39	// not larger than the file.
40	FindMissing(r ranges.Range) (outr ranges.Range)
41
42	// HasRange returns true if the current ranges entirely include range
43	HasRange(r ranges.Range) bool
44
45	// WriteAtNoOverwrite writes b to the file, but will not overwrite
46	// already present ranges.
47	//
48	// This is used by the downloader to write bytes to the file
49	//
50	// It returns n the total bytes processed and skipped the number of
51	// bytes which were processed but not actually written to the file.
52	WriteAtNoOverwrite(b []byte, off int64) (n int, skipped int, err error)
53}
54
55// Downloaders is a number of downloader~s and a queue of waiters
56// waiting for segments to be downloaded to a file.
57type Downloaders struct {
58	// Write once - no locking required
59	ctx    context.Context
60	cancel context.CancelFunc
61	item   Item
62	opt    *vfscommon.Options
63	src    fs.Object // source object
64	remote string
65	wg     sync.WaitGroup
66
67	// Read write
68	mu         sync.Mutex
69	dls        []*downloader
70	waiters    []waiter
71	errorCount int   // number of consecutive errors
72	lastErr    error // last error received
73}
74
75// waiter is a range we are waiting for and a channel to signal when
76// the range is found
77type waiter struct {
78	r       ranges.Range
79	errChan chan<- error
80}
81
82// downloader represents a running download for part of a file.
83type downloader struct {
84	// Write once
85	dls  *Downloaders   // parent structure
86	quit chan struct{}  // close to quit the downloader
87	wg   sync.WaitGroup // to keep track of downloader goroutine
88	kick chan struct{}  // kick the downloader when needed
89
90	// Read write
91	mu        sync.Mutex
92	start     int64 // start offset
93	offset    int64 // current offset
94	maxOffset int64 // maximum offset we are reading to
95	tr        *accounting.Transfer
96	in        *accounting.Account // input we are reading from
97	skipped   int64               // number of bytes we have skipped sequentially
98	_closed   bool                // set to true if downloader is closed
99	stop      bool                // set to true if we have called _stop()
100}
101
102// New makes a downloader for item
103func New(item Item, opt *vfscommon.Options, remote string, src fs.Object) (dls *Downloaders) {
104	if src == nil {
105		panic("internal error: newDownloaders called with nil src object")
106	}
107	ctx, cancel := context.WithCancel(context.Background())
108	dls = &Downloaders{
109		ctx:    ctx,
110		cancel: cancel,
111		item:   item,
112		opt:    opt,
113		src:    src,
114		remote: remote,
115	}
116	dls.wg.Add(1)
117	go func() {
118		defer dls.wg.Done()
119		ticker := time.NewTicker(backgroundKickerInterval)
120		select {
121		case <-ticker.C:
122			err := dls.kickWaiters()
123			if err != nil {
124				fs.Errorf(dls.src, "vfs cache: failed to kick waiters: %v", err)
125			}
126		case <-ctx.Done():
127			break
128		}
129		ticker.Stop()
130	}()
131
132	return dls
133}
134
135// Accumulate errors for this downloader
136//
137// It should be called with
138//
139//   n bytes downloaded
140//   err is error from download
141//
142// call with lock held
143func (dls *Downloaders) _countErrors(n int64, err error) {
144	if err == nil && n != 0 {
145		if dls.errorCount != 0 {
146			fs.Infof(dls.src, "vfs cache: downloader: resetting error count to 0")
147			dls.errorCount = 0
148			dls.lastErr = nil
149		}
150		return
151	}
152	if err != nil {
153		//if err != syscall.ENOSPC {
154		dls.errorCount++
155		//}
156		dls.lastErr = err
157		fs.Infof(dls.src, "vfs cache: downloader: error count now %d: %v", dls.errorCount, err)
158	}
159}
160
161func (dls *Downloaders) countErrors(n int64, err error) {
162	dls.mu.Lock()
163	dls._countErrors(n, err)
164	dls.mu.Unlock()
165}
166
167// Make a new downloader, starting it to download r
168//
169// call with lock held
170func (dls *Downloaders) _newDownloader(r ranges.Range) (dl *downloader, err error) {
171	// defer log.Trace(dls.src, "r=%v", r)("err=%v", &err)
172
173	dl = &downloader{
174		kick:      make(chan struct{}, 1),
175		quit:      make(chan struct{}),
176		dls:       dls,
177		start:     r.Pos,
178		offset:    r.Pos,
179		maxOffset: r.End(),
180	}
181
182	err = dl.open(dl.offset)
183	if err != nil {
184		_ = dl.close(err)
185		return nil, errors.Wrap(err, "failed to open downloader")
186	}
187
188	dls.dls = append(dls.dls, dl)
189
190	dl.wg.Add(1)
191	go func() {
192		defer dl.wg.Done()
193		n, err := dl.download()
194		_ = dl.close(err)
195		dl.dls.countErrors(n, err)
196		if err != nil {
197			fs.Errorf(dl.dls.src, "vfs cache: failed to download: %v", err)
198		}
199		err = dl.dls.kickWaiters()
200		if err != nil {
201			fs.Errorf(dl.dls.src, "vfs cache: failed to kick waiters: %v", err)
202		}
203	}()
204
205	return dl, nil
206}
207
208// _removeClosed() removes any downloaders which are closed.
209//
210// Call with the mutex held
211func (dls *Downloaders) _removeClosed() {
212	newDownloaders := dls.dls[:0]
213	for _, dl := range dls.dls {
214		if !dl.closed() {
215			newDownloaders = append(newDownloaders, dl)
216		}
217	}
218	dls.dls = newDownloaders
219}
220
221// Close all running downloaders and return any unfulfilled waiters
222// with inErr
223func (dls *Downloaders) Close(inErr error) (err error) {
224	dls.mu.Lock()
225	defer dls.mu.Unlock()
226	dls._removeClosed()
227	for _, dl := range dls.dls {
228		dls.mu.Unlock()
229		closeErr := dl.stopAndClose(inErr)
230		dls.mu.Lock()
231		if closeErr != nil && err != nil {
232			err = closeErr
233		}
234	}
235	dls.cancel()
236	// dls may have entered the periodical (every 5 seconds) kickWaiters() call
237	// unlock the mutex to allow it to finish so that we can get its dls.wg.Done()
238	dls.mu.Unlock()
239	dls.wg.Wait()
240	dls.mu.Lock()
241	dls.dls = nil
242	dls._dispatchWaiters()
243	dls._closeWaiters(inErr)
244	return err
245}
246
247// Download the range passed in returning when it has been downloaded
248// with an error from the downloading go routine.
249func (dls *Downloaders) Download(r ranges.Range) (err error) {
250	// defer log.Trace(dls.src, "r=%+v", r)("err=%v", &err)
251
252	dls.mu.Lock()
253
254	errChan := make(chan error)
255	waiter := waiter{
256		r:       r,
257		errChan: errChan,
258	}
259
260	err = dls._ensureDownloader(r)
261	if err != nil {
262		dls.mu.Unlock()
263		return err
264	}
265
266	dls.waiters = append(dls.waiters, waiter)
267	dls.mu.Unlock()
268	return <-errChan
269}
270
271// close any waiters with the error passed in
272//
273// call with lock held
274func (dls *Downloaders) _closeWaiters(err error) {
275	for _, waiter := range dls.waiters {
276		waiter.errChan <- err
277	}
278	dls.waiters = nil
279}
280
281// ensure a downloader is running for the range if required.  If one isn't found
282// then it starts it.
283//
284// call with lock held
285func (dls *Downloaders) _ensureDownloader(r ranges.Range) (err error) {
286	// defer log.Trace(dls.src, "r=%v", r)("err=%v", &err)
287
288	// The window includes potentially unread data in the buffer
289	window := int64(fs.GetConfig(context.TODO()).BufferSize)
290
291	// Increase the read range by the read ahead if set
292	if dls.opt.ReadAhead > 0 {
293		r.Size += int64(dls.opt.ReadAhead)
294	}
295
296	// We may be reopening a downloader after a failure here or
297	// doing a tentative prefetch so check to see that we haven't
298	// read some stuff already.
299	//
300	// Clip r to stuff which needs downloading
301	r = dls.item.FindMissing(r)
302
303	// If the range is entirely present then we only need to start a
304	// downloader if the window isn't full.
305	startNew := true
306	if r.IsEmpty() {
307		// Make a new range which includes the window
308		rWindow := r
309		rWindow.Size += window
310
311		// Clip rWindow to stuff which needs downloading
312		rWindowClipped := dls.item.FindMissing(rWindow)
313
314		// If rWindowClipped is empty then don't start a new downloader
315		// if there isn't an existing one as there is no data within the
316		// window which needs downloading. We do want to kick an
317		// existing one though to stop it timing out.
318		if rWindowClipped.IsEmpty() {
319			// Don't start any more downloaders
320			startNew = false
321			// Start downloading at the start of the unread window
322			// This likely has been downloaded already but it will
323			// kick the downloader
324			r.Pos = rWindow.End()
325		} else {
326			// Start downloading at the start of the unread window
327			r.Pos = rWindowClipped.Pos
328		}
329		// But don't write anything for the moment
330		r.Size = 0
331	}
332
333	// If buffer size is less than minWindow then make it that
334	if window < minWindow {
335		window = minWindow
336	}
337
338	var dl *downloader
339	// Look through downloaders to find one in range
340	// If there isn't one then start a new one
341	dls._removeClosed()
342	for _, dl = range dls.dls {
343		start, offset := dl.getRange()
344
345		// The downloader's offset to offset+window is the gap
346		// in which we would like to re-use this
347		// downloader. The downloader will never reach before
348		// start and offset+windows is too far away - we'd
349		// rather start another downloader.
350		// fs.Debugf(nil, "r=%v start=%d, offset=%d, found=%v", r, start, offset, r.Pos >= start && r.Pos < offset+window)
351		if r.Pos >= start && r.Pos < offset+window {
352			// Found downloader which will soon have our data
353			dl.setRange(r)
354			return nil
355		}
356	}
357	if !startNew {
358		return nil
359	}
360	// Downloader not found so start a new one
361	dl, err = dls._newDownloader(r)
362	if err != nil {
363		dls._countErrors(0, err)
364		return errors.Wrap(err, "failed to start downloader")
365	}
366	return err
367}
368
369// EnsureDownloader makes sure a downloader is running for the range
370// passed in.  If one isn't found then it starts it.
371//
372// It does not wait for the range to be downloaded
373func (dls *Downloaders) EnsureDownloader(r ranges.Range) (err error) {
374	dls.mu.Lock()
375	defer dls.mu.Unlock()
376	return dls._ensureDownloader(r)
377}
378
379// _dispatchWaiters() sends any waiters which have completed back to
380// their callers.
381//
382// Call with the mutex held
383func (dls *Downloaders) _dispatchWaiters() {
384	if len(dls.waiters) == 0 {
385		return
386	}
387
388	newWaiters := dls.waiters[:0]
389	for _, waiter := range dls.waiters {
390		if dls.item.HasRange(waiter.r) {
391			waiter.errChan <- nil
392		} else {
393			newWaiters = append(newWaiters, waiter)
394		}
395	}
396	dls.waiters = newWaiters
397}
398
399// Send any waiters which have completed back to their callers and make sure
400// there is a downloader appropriate for each waiter
401func (dls *Downloaders) kickWaiters() (err error) {
402	dls.mu.Lock()
403	defer dls.mu.Unlock()
404
405	dls._dispatchWaiters()
406
407	if len(dls.waiters) == 0 {
408		return nil
409	}
410
411	// Make sure each waiter has a downloader
412	// This is an O(waiters*Downloaders) algorithm
413	// However the number of waiters and the number of downloaders
414	// are both expected to be small.
415	for _, waiter := range dls.waiters {
416		err = dls._ensureDownloader(waiter.r)
417		if err != nil {
418			// Failures here will be retried by background kicker
419			fs.Errorf(dls.src, "vfs cache: restart download failed: %v", err)
420		}
421	}
422	if fserrors.IsErrNoSpace(dls.lastErr) {
423		fs.Errorf(dls.src, "vfs cache: cache is out of space %d/%d: last error: %v", dls.errorCount, maxErrorCount, dls.lastErr)
424		dls._closeWaiters(dls.lastErr)
425		return dls.lastErr
426	}
427
428	if dls.errorCount > maxErrorCount {
429		fs.Errorf(dls.src, "vfs cache: too many errors %d/%d: last error: %v", dls.errorCount, maxErrorCount, dls.lastErr)
430		dls._closeWaiters(dls.lastErr)
431		return dls.lastErr
432	}
433
434	return nil
435}
436
437// Write writes len(p) bytes from p to the underlying data stream. It
438// returns the number of bytes written from p (0 <= n <= len(p)) and
439// any error encountered that caused the write to stop early. Write
440// must return a non-nil error if it returns n < len(p). Write must
441// not modify the slice data, even temporarily.
442//
443// Implementations must not retain p.
444func (dl *downloader) Write(p []byte) (n int, err error) {
445	// defer log.Trace(dl.dls.src, "p_len=%d", len(p))("n=%d, err=%v", &n, &err)
446
447	// Kick the waiters on exit if some characters received
448	defer func() {
449		if n <= 0 {
450			return
451		}
452		if waitErr := dl.dls.kickWaiters(); waitErr != nil {
453			fs.Errorf(dl.dls.src, "vfs cache: download write: failed to kick waiters: %v", waitErr)
454			if err == nil {
455				err = waitErr
456			}
457		}
458	}()
459
460	dl.mu.Lock()
461	defer dl.mu.Unlock()
462
463	// Wait here if we have reached maxOffset until
464	// - we are quitting
465	// - we get kicked
466	// - timeout happens
467loop:
468	for dl.offset >= dl.maxOffset {
469		var timeout = time.NewTimer(maxDownloaderIdleTime)
470		dl.mu.Unlock()
471		select {
472		case <-dl.quit:
473			dl.mu.Lock()
474			timeout.Stop()
475			break loop
476		case <-dl.kick:
477			dl.mu.Lock()
478			timeout.Stop()
479		case <-timeout.C:
480			// stop any future reading
481			dl.mu.Lock()
482			if !dl.stop {
483				fs.Debugf(dl.dls.src, "vfs cache: stopping download thread as it timed out")
484				dl._stop()
485			}
486			break loop
487		}
488	}
489
490	n, skipped, err := dl.dls.item.WriteAtNoOverwrite(p, dl.offset)
491	if skipped == n {
492		dl.skipped += int64(skipped)
493	} else {
494		dl.skipped = 0
495	}
496	dl.offset += int64(n)
497
498	// Kill this downloader if skipped too many bytes
499	if !dl.stop && dl.skipped > maxSkipBytes {
500		fs.Debugf(dl.dls.src, "vfs cache: stopping download thread as it has skipped %d bytes", dl.skipped)
501		dl._stop()
502	}
503
504	// If running without a async buffer then stop now as
505	// StopBuffering has no effect if the Account wasn't buffered
506	// so we need to stop manually now rather than wait for the
507	// AsyncReader to stop.
508	if dl.stop && !dl.in.HasBuffer() {
509		err = asyncreader.ErrorStreamAbandoned
510	}
511	return n, err
512}
513
514// open the file from offset
515//
516// should be called on a fresh downloader
517func (dl *downloader) open(offset int64) (err error) {
518	// defer log.Trace(dl.dls.src, "offset=%d", offset)("err=%v", &err)
519	dl.tr = accounting.Stats(dl.dls.ctx).NewTransfer(dl.dls.src)
520
521	size := dl.dls.src.Size()
522	if size < 0 {
523		// FIXME should just completely download these
524		return errors.New("can't open unknown sized file")
525	}
526
527	// FIXME hashType needs to ignore when --no-checksum is set too? Which is a VFS flag.
528	// var rangeOption *fs.RangeOption
529	// if offset > 0 {
530	// 	rangeOption = &fs.RangeOption{Start: offset, End: size - 1}
531	// }
532	// in0, err := operations.NewReOpen(dl.dls.ctx, dl.dls.src, ci.LowLevelRetries, dl.dls.item.c.hashOption, rangeOption)
533
534	in0 := chunkedreader.New(context.TODO(), dl.dls.src, int64(dl.dls.opt.ChunkSize), int64(dl.dls.opt.ChunkSizeLimit))
535	_, err = in0.Seek(offset, 0)
536	if err != nil {
537		return errors.Wrap(err, "vfs reader: failed to open source file")
538	}
539	dl.in = dl.tr.Account(dl.dls.ctx, in0).WithBuffer() // account and buffer the transfer
540
541	dl.offset = offset
542
543	// FIXME set mod time
544	// FIXME check checksums
545
546	return nil
547}
548
549// close the downloader
550func (dl *downloader) close(inErr error) (err error) {
551	// defer log.Trace(dl.dls.src, "inErr=%v", err)("err=%v", &err)
552	checkErr := func(e error) {
553		if e == nil || errors.Cause(err) == asyncreader.ErrorStreamAbandoned {
554			return
555		}
556		err = e
557	}
558	dl.mu.Lock()
559	if dl.in != nil {
560		checkErr(dl.in.Close())
561		dl.in = nil
562	}
563	if dl.tr != nil {
564		dl.tr.Done(dl.dls.ctx, inErr)
565		dl.tr = nil
566	}
567	dl._closed = true
568	dl.mu.Unlock()
569	return nil
570}
571
572// closed returns true if the downloader has been closed already
573func (dl *downloader) closed() bool {
574	dl.mu.Lock()
575	defer dl.mu.Unlock()
576	return dl._closed
577}
578
579// stop the downloader if running
580//
581// Call with the mutex held
582func (dl *downloader) _stop() {
583	// defer log.Trace(dl.dls.src, "")("")
584
585	// exit if have already called _stop
586	if dl.stop {
587		return
588	}
589	dl.stop = true
590
591	// Signal quit now to unblock the downloader
592	close(dl.quit)
593
594	// stop the downloader by stopping the async reader buffering
595	// any more input. This causes all the stuff in the async
596	// buffer (which can be many MiB) to be written to the disk
597	// before exiting.
598	if dl.in != nil {
599		dl.in.StopBuffering()
600	}
601}
602
603// stop the downloader if running then close it with the error passed in
604func (dl *downloader) stopAndClose(inErr error) (err error) {
605	// Stop the downloader by closing its input
606	dl.mu.Lock()
607	dl._stop()
608	dl.mu.Unlock()
609	// wait for downloader to finish...
610	// do this without mutex as asyncreader
611	// calls back into Write() which needs the lock
612	dl.wg.Wait()
613	return dl.close(inErr)
614}
615
616// Start downloading to the local file starting at offset until maxOffset.
617func (dl *downloader) download() (n int64, err error) {
618	// defer log.Trace(dl.dls.src, "")("err=%v", &err)
619	n, err = dl.in.WriteTo(dl)
620	if err != nil && errors.Cause(err) != asyncreader.ErrorStreamAbandoned {
621		return n, errors.Wrap(err, "vfs reader: failed to write to cache file")
622	}
623
624	return n, nil
625}
626
627// setRange makes sure the downloader is downloading the range passed in
628func (dl *downloader) setRange(r ranges.Range) {
629	// defer log.Trace(dl.dls.src, "r=%v", r)("")
630	dl.mu.Lock()
631	maxOffset := r.End()
632	if maxOffset > dl.maxOffset {
633		dl.maxOffset = maxOffset
634	}
635	dl.mu.Unlock()
636	// fs.Debugf(dl.dls.src, "kicking downloader with maxOffset %d", maxOffset)
637	select {
638	case dl.kick <- struct{}{}:
639	default:
640	}
641}
642
643// get the current range this downloader is working on
644func (dl *downloader) getRange() (start, offset int64) {
645	dl.mu.Lock()
646	defer dl.mu.Unlock()
647	return dl.start, dl.offset
648}
649