1package downloaders 2 3import ( 4 "context" 5 "sync" 6 "time" 7 8 "github.com/pkg/errors" 9 "github.com/rclone/rclone/fs" 10 "github.com/rclone/rclone/fs/accounting" 11 "github.com/rclone/rclone/fs/asyncreader" 12 "github.com/rclone/rclone/fs/chunkedreader" 13 "github.com/rclone/rclone/fs/fserrors" 14 "github.com/rclone/rclone/lib/ranges" 15 "github.com/rclone/rclone/vfs/vfscommon" 16) 17 18// FIXME implement max downloaders 19 20const ( 21 // max time a downloader can be idle before closing itself 22 maxDownloaderIdleTime = 5 * time.Second 23 // max number of bytes a reader should skip over before closing it 24 maxSkipBytes = 1024 * 1024 25 // time between background kicks of waiters to pick up errors 26 backgroundKickerInterval = 5 * time.Second 27 // maximum number of errors before declaring dead 28 maxErrorCount = 10 29 // If a downloader is within this range or --buffer-size 30 // whichever is the larger, we will reuse the downloader 31 minWindow = 1024 * 1024 32) 33 34// Item is the interface that an item to download must obey 35type Item interface { 36 // FindMissing adjusts r returning a new ranges.Range which only 37 // contains the range which needs to be downloaded. This could be 38 // empty - check with IsEmpty. It also adjust this to make sure it is 39 // not larger than the file. 40 FindMissing(r ranges.Range) (outr ranges.Range) 41 42 // HasRange returns true if the current ranges entirely include range 43 HasRange(r ranges.Range) bool 44 45 // WriteAtNoOverwrite writes b to the file, but will not overwrite 46 // already present ranges. 47 // 48 // This is used by the downloader to write bytes to the file 49 // 50 // It returns n the total bytes processed and skipped the number of 51 // bytes which were processed but not actually written to the file. 52 WriteAtNoOverwrite(b []byte, off int64) (n int, skipped int, err error) 53} 54 55// Downloaders is a number of downloader~s and a queue of waiters 56// waiting for segments to be downloaded to a file. 57type Downloaders struct { 58 // Write once - no locking required 59 ctx context.Context 60 cancel context.CancelFunc 61 item Item 62 opt *vfscommon.Options 63 src fs.Object // source object 64 remote string 65 wg sync.WaitGroup 66 67 // Read write 68 mu sync.Mutex 69 dls []*downloader 70 waiters []waiter 71 errorCount int // number of consecutive errors 72 lastErr error // last error received 73} 74 75// waiter is a range we are waiting for and a channel to signal when 76// the range is found 77type waiter struct { 78 r ranges.Range 79 errChan chan<- error 80} 81 82// downloader represents a running download for part of a file. 83type downloader struct { 84 // Write once 85 dls *Downloaders // parent structure 86 quit chan struct{} // close to quit the downloader 87 wg sync.WaitGroup // to keep track of downloader goroutine 88 kick chan struct{} // kick the downloader when needed 89 90 // Read write 91 mu sync.Mutex 92 start int64 // start offset 93 offset int64 // current offset 94 maxOffset int64 // maximum offset we are reading to 95 tr *accounting.Transfer 96 in *accounting.Account // input we are reading from 97 skipped int64 // number of bytes we have skipped sequentially 98 _closed bool // set to true if downloader is closed 99 stop bool // set to true if we have called _stop() 100} 101 102// New makes a downloader for item 103func New(item Item, opt *vfscommon.Options, remote string, src fs.Object) (dls *Downloaders) { 104 if src == nil { 105 panic("internal error: newDownloaders called with nil src object") 106 } 107 ctx, cancel := context.WithCancel(context.Background()) 108 dls = &Downloaders{ 109 ctx: ctx, 110 cancel: cancel, 111 item: item, 112 opt: opt, 113 src: src, 114 remote: remote, 115 } 116 dls.wg.Add(1) 117 go func() { 118 defer dls.wg.Done() 119 ticker := time.NewTicker(backgroundKickerInterval) 120 select { 121 case <-ticker.C: 122 err := dls.kickWaiters() 123 if err != nil { 124 fs.Errorf(dls.src, "vfs cache: failed to kick waiters: %v", err) 125 } 126 case <-ctx.Done(): 127 break 128 } 129 ticker.Stop() 130 }() 131 132 return dls 133} 134 135// Accumulate errors for this downloader 136// 137// It should be called with 138// 139// n bytes downloaded 140// err is error from download 141// 142// call with lock held 143func (dls *Downloaders) _countErrors(n int64, err error) { 144 if err == nil && n != 0 { 145 if dls.errorCount != 0 { 146 fs.Infof(dls.src, "vfs cache: downloader: resetting error count to 0") 147 dls.errorCount = 0 148 dls.lastErr = nil 149 } 150 return 151 } 152 if err != nil { 153 //if err != syscall.ENOSPC { 154 dls.errorCount++ 155 //} 156 dls.lastErr = err 157 fs.Infof(dls.src, "vfs cache: downloader: error count now %d: %v", dls.errorCount, err) 158 } 159} 160 161func (dls *Downloaders) countErrors(n int64, err error) { 162 dls.mu.Lock() 163 dls._countErrors(n, err) 164 dls.mu.Unlock() 165} 166 167// Make a new downloader, starting it to download r 168// 169// call with lock held 170func (dls *Downloaders) _newDownloader(r ranges.Range) (dl *downloader, err error) { 171 // defer log.Trace(dls.src, "r=%v", r)("err=%v", &err) 172 173 dl = &downloader{ 174 kick: make(chan struct{}, 1), 175 quit: make(chan struct{}), 176 dls: dls, 177 start: r.Pos, 178 offset: r.Pos, 179 maxOffset: r.End(), 180 } 181 182 err = dl.open(dl.offset) 183 if err != nil { 184 _ = dl.close(err) 185 return nil, errors.Wrap(err, "failed to open downloader") 186 } 187 188 dls.dls = append(dls.dls, dl) 189 190 dl.wg.Add(1) 191 go func() { 192 defer dl.wg.Done() 193 n, err := dl.download() 194 _ = dl.close(err) 195 dl.dls.countErrors(n, err) 196 if err != nil { 197 fs.Errorf(dl.dls.src, "vfs cache: failed to download: %v", err) 198 } 199 err = dl.dls.kickWaiters() 200 if err != nil { 201 fs.Errorf(dl.dls.src, "vfs cache: failed to kick waiters: %v", err) 202 } 203 }() 204 205 return dl, nil 206} 207 208// _removeClosed() removes any downloaders which are closed. 209// 210// Call with the mutex held 211func (dls *Downloaders) _removeClosed() { 212 newDownloaders := dls.dls[:0] 213 for _, dl := range dls.dls { 214 if !dl.closed() { 215 newDownloaders = append(newDownloaders, dl) 216 } 217 } 218 dls.dls = newDownloaders 219} 220 221// Close all running downloaders and return any unfulfilled waiters 222// with inErr 223func (dls *Downloaders) Close(inErr error) (err error) { 224 dls.mu.Lock() 225 defer dls.mu.Unlock() 226 dls._removeClosed() 227 for _, dl := range dls.dls { 228 dls.mu.Unlock() 229 closeErr := dl.stopAndClose(inErr) 230 dls.mu.Lock() 231 if closeErr != nil && err != nil { 232 err = closeErr 233 } 234 } 235 dls.cancel() 236 // dls may have entered the periodical (every 5 seconds) kickWaiters() call 237 // unlock the mutex to allow it to finish so that we can get its dls.wg.Done() 238 dls.mu.Unlock() 239 dls.wg.Wait() 240 dls.mu.Lock() 241 dls.dls = nil 242 dls._dispatchWaiters() 243 dls._closeWaiters(inErr) 244 return err 245} 246 247// Download the range passed in returning when it has been downloaded 248// with an error from the downloading go routine. 249func (dls *Downloaders) Download(r ranges.Range) (err error) { 250 // defer log.Trace(dls.src, "r=%+v", r)("err=%v", &err) 251 252 dls.mu.Lock() 253 254 errChan := make(chan error) 255 waiter := waiter{ 256 r: r, 257 errChan: errChan, 258 } 259 260 err = dls._ensureDownloader(r) 261 if err != nil { 262 dls.mu.Unlock() 263 return err 264 } 265 266 dls.waiters = append(dls.waiters, waiter) 267 dls.mu.Unlock() 268 return <-errChan 269} 270 271// close any waiters with the error passed in 272// 273// call with lock held 274func (dls *Downloaders) _closeWaiters(err error) { 275 for _, waiter := range dls.waiters { 276 waiter.errChan <- err 277 } 278 dls.waiters = nil 279} 280 281// ensure a downloader is running for the range if required. If one isn't found 282// then it starts it. 283// 284// call with lock held 285func (dls *Downloaders) _ensureDownloader(r ranges.Range) (err error) { 286 // defer log.Trace(dls.src, "r=%v", r)("err=%v", &err) 287 288 // The window includes potentially unread data in the buffer 289 window := int64(fs.GetConfig(context.TODO()).BufferSize) 290 291 // Increase the read range by the read ahead if set 292 if dls.opt.ReadAhead > 0 { 293 r.Size += int64(dls.opt.ReadAhead) 294 } 295 296 // We may be reopening a downloader after a failure here or 297 // doing a tentative prefetch so check to see that we haven't 298 // read some stuff already. 299 // 300 // Clip r to stuff which needs downloading 301 r = dls.item.FindMissing(r) 302 303 // If the range is entirely present then we only need to start a 304 // downloader if the window isn't full. 305 startNew := true 306 if r.IsEmpty() { 307 // Make a new range which includes the window 308 rWindow := r 309 rWindow.Size += window 310 311 // Clip rWindow to stuff which needs downloading 312 rWindowClipped := dls.item.FindMissing(rWindow) 313 314 // If rWindowClipped is empty then don't start a new downloader 315 // if there isn't an existing one as there is no data within the 316 // window which needs downloading. We do want to kick an 317 // existing one though to stop it timing out. 318 if rWindowClipped.IsEmpty() { 319 // Don't start any more downloaders 320 startNew = false 321 // Start downloading at the start of the unread window 322 // This likely has been downloaded already but it will 323 // kick the downloader 324 r.Pos = rWindow.End() 325 } else { 326 // Start downloading at the start of the unread window 327 r.Pos = rWindowClipped.Pos 328 } 329 // But don't write anything for the moment 330 r.Size = 0 331 } 332 333 // If buffer size is less than minWindow then make it that 334 if window < minWindow { 335 window = minWindow 336 } 337 338 var dl *downloader 339 // Look through downloaders to find one in range 340 // If there isn't one then start a new one 341 dls._removeClosed() 342 for _, dl = range dls.dls { 343 start, offset := dl.getRange() 344 345 // The downloader's offset to offset+window is the gap 346 // in which we would like to re-use this 347 // downloader. The downloader will never reach before 348 // start and offset+windows is too far away - we'd 349 // rather start another downloader. 350 // fs.Debugf(nil, "r=%v start=%d, offset=%d, found=%v", r, start, offset, r.Pos >= start && r.Pos < offset+window) 351 if r.Pos >= start && r.Pos < offset+window { 352 // Found downloader which will soon have our data 353 dl.setRange(r) 354 return nil 355 } 356 } 357 if !startNew { 358 return nil 359 } 360 // Downloader not found so start a new one 361 dl, err = dls._newDownloader(r) 362 if err != nil { 363 dls._countErrors(0, err) 364 return errors.Wrap(err, "failed to start downloader") 365 } 366 return err 367} 368 369// EnsureDownloader makes sure a downloader is running for the range 370// passed in. If one isn't found then it starts it. 371// 372// It does not wait for the range to be downloaded 373func (dls *Downloaders) EnsureDownloader(r ranges.Range) (err error) { 374 dls.mu.Lock() 375 defer dls.mu.Unlock() 376 return dls._ensureDownloader(r) 377} 378 379// _dispatchWaiters() sends any waiters which have completed back to 380// their callers. 381// 382// Call with the mutex held 383func (dls *Downloaders) _dispatchWaiters() { 384 if len(dls.waiters) == 0 { 385 return 386 } 387 388 newWaiters := dls.waiters[:0] 389 for _, waiter := range dls.waiters { 390 if dls.item.HasRange(waiter.r) { 391 waiter.errChan <- nil 392 } else { 393 newWaiters = append(newWaiters, waiter) 394 } 395 } 396 dls.waiters = newWaiters 397} 398 399// Send any waiters which have completed back to their callers and make sure 400// there is a downloader appropriate for each waiter 401func (dls *Downloaders) kickWaiters() (err error) { 402 dls.mu.Lock() 403 defer dls.mu.Unlock() 404 405 dls._dispatchWaiters() 406 407 if len(dls.waiters) == 0 { 408 return nil 409 } 410 411 // Make sure each waiter has a downloader 412 // This is an O(waiters*Downloaders) algorithm 413 // However the number of waiters and the number of downloaders 414 // are both expected to be small. 415 for _, waiter := range dls.waiters { 416 err = dls._ensureDownloader(waiter.r) 417 if err != nil { 418 // Failures here will be retried by background kicker 419 fs.Errorf(dls.src, "vfs cache: restart download failed: %v", err) 420 } 421 } 422 if fserrors.IsErrNoSpace(dls.lastErr) { 423 fs.Errorf(dls.src, "vfs cache: cache is out of space %d/%d: last error: %v", dls.errorCount, maxErrorCount, dls.lastErr) 424 dls._closeWaiters(dls.lastErr) 425 return dls.lastErr 426 } 427 428 if dls.errorCount > maxErrorCount { 429 fs.Errorf(dls.src, "vfs cache: too many errors %d/%d: last error: %v", dls.errorCount, maxErrorCount, dls.lastErr) 430 dls._closeWaiters(dls.lastErr) 431 return dls.lastErr 432 } 433 434 return nil 435} 436 437// Write writes len(p) bytes from p to the underlying data stream. It 438// returns the number of bytes written from p (0 <= n <= len(p)) and 439// any error encountered that caused the write to stop early. Write 440// must return a non-nil error if it returns n < len(p). Write must 441// not modify the slice data, even temporarily. 442// 443// Implementations must not retain p. 444func (dl *downloader) Write(p []byte) (n int, err error) { 445 // defer log.Trace(dl.dls.src, "p_len=%d", len(p))("n=%d, err=%v", &n, &err) 446 447 // Kick the waiters on exit if some characters received 448 defer func() { 449 if n <= 0 { 450 return 451 } 452 if waitErr := dl.dls.kickWaiters(); waitErr != nil { 453 fs.Errorf(dl.dls.src, "vfs cache: download write: failed to kick waiters: %v", waitErr) 454 if err == nil { 455 err = waitErr 456 } 457 } 458 }() 459 460 dl.mu.Lock() 461 defer dl.mu.Unlock() 462 463 // Wait here if we have reached maxOffset until 464 // - we are quitting 465 // - we get kicked 466 // - timeout happens 467loop: 468 for dl.offset >= dl.maxOffset { 469 var timeout = time.NewTimer(maxDownloaderIdleTime) 470 dl.mu.Unlock() 471 select { 472 case <-dl.quit: 473 dl.mu.Lock() 474 timeout.Stop() 475 break loop 476 case <-dl.kick: 477 dl.mu.Lock() 478 timeout.Stop() 479 case <-timeout.C: 480 // stop any future reading 481 dl.mu.Lock() 482 if !dl.stop { 483 fs.Debugf(dl.dls.src, "vfs cache: stopping download thread as it timed out") 484 dl._stop() 485 } 486 break loop 487 } 488 } 489 490 n, skipped, err := dl.dls.item.WriteAtNoOverwrite(p, dl.offset) 491 if skipped == n { 492 dl.skipped += int64(skipped) 493 } else { 494 dl.skipped = 0 495 } 496 dl.offset += int64(n) 497 498 // Kill this downloader if skipped too many bytes 499 if !dl.stop && dl.skipped > maxSkipBytes { 500 fs.Debugf(dl.dls.src, "vfs cache: stopping download thread as it has skipped %d bytes", dl.skipped) 501 dl._stop() 502 } 503 504 // If running without a async buffer then stop now as 505 // StopBuffering has no effect if the Account wasn't buffered 506 // so we need to stop manually now rather than wait for the 507 // AsyncReader to stop. 508 if dl.stop && !dl.in.HasBuffer() { 509 err = asyncreader.ErrorStreamAbandoned 510 } 511 return n, err 512} 513 514// open the file from offset 515// 516// should be called on a fresh downloader 517func (dl *downloader) open(offset int64) (err error) { 518 // defer log.Trace(dl.dls.src, "offset=%d", offset)("err=%v", &err) 519 dl.tr = accounting.Stats(dl.dls.ctx).NewTransfer(dl.dls.src) 520 521 size := dl.dls.src.Size() 522 if size < 0 { 523 // FIXME should just completely download these 524 return errors.New("can't open unknown sized file") 525 } 526 527 // FIXME hashType needs to ignore when --no-checksum is set too? Which is a VFS flag. 528 // var rangeOption *fs.RangeOption 529 // if offset > 0 { 530 // rangeOption = &fs.RangeOption{Start: offset, End: size - 1} 531 // } 532 // in0, err := operations.NewReOpen(dl.dls.ctx, dl.dls.src, ci.LowLevelRetries, dl.dls.item.c.hashOption, rangeOption) 533 534 in0 := chunkedreader.New(context.TODO(), dl.dls.src, int64(dl.dls.opt.ChunkSize), int64(dl.dls.opt.ChunkSizeLimit)) 535 _, err = in0.Seek(offset, 0) 536 if err != nil { 537 return errors.Wrap(err, "vfs reader: failed to open source file") 538 } 539 dl.in = dl.tr.Account(dl.dls.ctx, in0).WithBuffer() // account and buffer the transfer 540 541 dl.offset = offset 542 543 // FIXME set mod time 544 // FIXME check checksums 545 546 return nil 547} 548 549// close the downloader 550func (dl *downloader) close(inErr error) (err error) { 551 // defer log.Trace(dl.dls.src, "inErr=%v", err)("err=%v", &err) 552 checkErr := func(e error) { 553 if e == nil || errors.Cause(err) == asyncreader.ErrorStreamAbandoned { 554 return 555 } 556 err = e 557 } 558 dl.mu.Lock() 559 if dl.in != nil { 560 checkErr(dl.in.Close()) 561 dl.in = nil 562 } 563 if dl.tr != nil { 564 dl.tr.Done(dl.dls.ctx, inErr) 565 dl.tr = nil 566 } 567 dl._closed = true 568 dl.mu.Unlock() 569 return nil 570} 571 572// closed returns true if the downloader has been closed already 573func (dl *downloader) closed() bool { 574 dl.mu.Lock() 575 defer dl.mu.Unlock() 576 return dl._closed 577} 578 579// stop the downloader if running 580// 581// Call with the mutex held 582func (dl *downloader) _stop() { 583 // defer log.Trace(dl.dls.src, "")("") 584 585 // exit if have already called _stop 586 if dl.stop { 587 return 588 } 589 dl.stop = true 590 591 // Signal quit now to unblock the downloader 592 close(dl.quit) 593 594 // stop the downloader by stopping the async reader buffering 595 // any more input. This causes all the stuff in the async 596 // buffer (which can be many MiB) to be written to the disk 597 // before exiting. 598 if dl.in != nil { 599 dl.in.StopBuffering() 600 } 601} 602 603// stop the downloader if running then close it with the error passed in 604func (dl *downloader) stopAndClose(inErr error) (err error) { 605 // Stop the downloader by closing its input 606 dl.mu.Lock() 607 dl._stop() 608 dl.mu.Unlock() 609 // wait for downloader to finish... 610 // do this without mutex as asyncreader 611 // calls back into Write() which needs the lock 612 dl.wg.Wait() 613 return dl.close(inErr) 614} 615 616// Start downloading to the local file starting at offset until maxOffset. 617func (dl *downloader) download() (n int64, err error) { 618 // defer log.Trace(dl.dls.src, "")("err=%v", &err) 619 n, err = dl.in.WriteTo(dl) 620 if err != nil && errors.Cause(err) != asyncreader.ErrorStreamAbandoned { 621 return n, errors.Wrap(err, "vfs reader: failed to write to cache file") 622 } 623 624 return n, nil 625} 626 627// setRange makes sure the downloader is downloading the range passed in 628func (dl *downloader) setRange(r ranges.Range) { 629 // defer log.Trace(dl.dls.src, "r=%v", r)("") 630 dl.mu.Lock() 631 maxOffset := r.End() 632 if maxOffset > dl.maxOffset { 633 dl.maxOffset = maxOffset 634 } 635 dl.mu.Unlock() 636 // fs.Debugf(dl.dls.src, "kicking downloader with maxOffset %d", maxOffset) 637 select { 638 case dl.kick <- struct{}{}: 639 default: 640 } 641} 642 643// get the current range this downloader is working on 644func (dl *downloader) getRange() (start, offset int64) { 645 dl.mu.Lock() 646 defer dl.mu.Unlock() 647 return dl.start, dl.offset 648} 649