1package vfs 2 3import ( 4 "context" 5 "io" 6 "os" 7 "sync" 8 "time" 9 10 "github.com/pkg/errors" 11 "github.com/rclone/rclone/fs" 12 "github.com/rclone/rclone/fs/accounting" 13 "github.com/rclone/rclone/fs/chunkedreader" 14 "github.com/rclone/rclone/fs/hash" 15) 16 17// ReadFileHandle is an open for read file handle on a File 18type ReadFileHandle struct { 19 baseHandle 20 done func(ctx context.Context, err error) 21 mu sync.Mutex 22 cond *sync.Cond // cond lock for out of sequence reads 23 closed bool // set if handle has been closed 24 r *accounting.Account 25 readCalled bool // set if read has been called 26 size int64 // size of the object (0 for unknown length) 27 offset int64 // offset of read of o 28 roffset int64 // offset of Read() calls 29 noSeek bool 30 sizeUnknown bool // set if size of source is not known 31 file *File 32 hash *hash.MultiHasher 33 opened bool 34 remote string 35} 36 37// Check interfaces 38var ( 39 _ io.Reader = (*ReadFileHandle)(nil) 40 _ io.ReaderAt = (*ReadFileHandle)(nil) 41 _ io.Seeker = (*ReadFileHandle)(nil) 42 _ io.Closer = (*ReadFileHandle)(nil) 43) 44 45func newReadFileHandle(f *File) (*ReadFileHandle, error) { 46 var mhash *hash.MultiHasher 47 var err error 48 o := f.getObject() 49 if !f.VFS().Opt.NoChecksum { 50 hashes := hash.NewHashSet(o.Fs().Hashes().GetOne()) // just pick one hash 51 mhash, err = hash.NewMultiHasherTypes(hashes) 52 if err != nil { 53 fs.Errorf(o.Fs(), "newReadFileHandle hash error: %v", err) 54 } 55 } 56 57 fh := &ReadFileHandle{ 58 remote: o.Remote(), 59 noSeek: f.VFS().Opt.NoSeek, 60 file: f, 61 hash: mhash, 62 size: nonNegative(o.Size()), 63 sizeUnknown: o.Size() < 0, 64 } 65 fh.cond = sync.NewCond(&fh.mu) 66 return fh, nil 67} 68 69// openPending opens the file if there is a pending open 70// call with the lock held 71func (fh *ReadFileHandle) openPending() (err error) { 72 if fh.opened { 73 return nil 74 } 75 o := fh.file.getObject() 76 r, err := chunkedreader.New(context.TODO(), o, int64(fh.file.VFS().Opt.ChunkSize), int64(fh.file.VFS().Opt.ChunkSizeLimit)).Open() 77 if err != nil { 78 return err 79 } 80 tr := accounting.GlobalStats().NewTransfer(o) 81 fh.done = tr.Done 82 fh.r = tr.Account(context.TODO(), r).WithBuffer() // account the transfer 83 fh.opened = true 84 85 return nil 86} 87 88// String converts it to printable 89func (fh *ReadFileHandle) String() string { 90 if fh == nil { 91 return "<nil *ReadFileHandle>" 92 } 93 fh.mu.Lock() 94 defer fh.mu.Unlock() 95 if fh.file == nil { 96 return "<nil *ReadFileHandle.file>" 97 } 98 return fh.file.String() + " (r)" 99} 100 101// Node returns the Node associated with this - satisfies Noder interface 102func (fh *ReadFileHandle) Node() Node { 103 fh.mu.Lock() 104 defer fh.mu.Unlock() 105 return fh.file 106} 107 108// seek to a new offset 109// 110// if reopen is true, then we won't attempt to use an io.Seeker interface 111// 112// Must be called with fh.mu held 113func (fh *ReadFileHandle) seek(offset int64, reopen bool) (err error) { 114 if fh.noSeek { 115 return ESPIPE 116 } 117 fh.hash = nil 118 if !reopen { 119 ar := fh.r.GetAsyncReader() 120 // try to fulfill the seek with buffer discard 121 if ar != nil && ar.SkipBytes(int(offset-fh.offset)) { 122 fh.offset = offset 123 return nil 124 } 125 } 126 fh.r.StopBuffering() // stop the background reading first 127 oldReader := fh.r.GetReader() 128 r, ok := oldReader.(*chunkedreader.ChunkedReader) 129 if !ok { 130 fs.Logf(fh.remote, "ReadFileHandle.Read expected reader to be a ChunkedReader, got %T", oldReader) 131 reopen = true 132 } 133 if !reopen { 134 fs.Debugf(fh.remote, "ReadFileHandle.seek from %d to %d (fs.RangeSeeker)", fh.offset, offset) 135 _, err = r.RangeSeek(context.TODO(), offset, io.SeekStart, -1) 136 if err != nil { 137 fs.Debugf(fh.remote, "ReadFileHandle.Read fs.RangeSeeker failed: %v", err) 138 return err 139 } 140 } else { 141 fs.Debugf(fh.remote, "ReadFileHandle.seek from %d to %d", fh.offset, offset) 142 // close old one 143 err = oldReader.Close() 144 if err != nil { 145 fs.Debugf(fh.remote, "ReadFileHandle.Read seek close old failed: %v", err) 146 } 147 // re-open with a seek 148 o := fh.file.getObject() 149 r = chunkedreader.New(context.TODO(), o, int64(fh.file.VFS().Opt.ChunkSize), int64(fh.file.VFS().Opt.ChunkSizeLimit)) 150 _, err := r.Seek(offset, 0) 151 if err != nil { 152 fs.Debugf(fh.remote, "ReadFileHandle.Read seek failed: %v", err) 153 return err 154 } 155 r, err = r.Open() 156 if err != nil { 157 fs.Debugf(fh.remote, "ReadFileHandle.Read seek failed: %v", err) 158 return err 159 } 160 } 161 fh.r.UpdateReader(context.TODO(), r) 162 fh.offset = offset 163 return nil 164} 165 166// Seek the file - returns ESPIPE if seeking isn't possible 167func (fh *ReadFileHandle) Seek(offset int64, whence int) (n int64, err error) { 168 fh.mu.Lock() 169 defer fh.mu.Unlock() 170 if fh.noSeek { 171 return 0, ESPIPE 172 } 173 size := fh.size 174 switch whence { 175 case io.SeekStart: 176 fh.roffset = 0 177 case io.SeekEnd: 178 fh.roffset = size 179 } 180 fh.roffset += offset 181 // we don't check the offset - the next Read will 182 return fh.roffset, nil 183} 184 185// ReadAt reads len(p) bytes into p starting at offset off in the 186// underlying input source. It returns the number of bytes read (0 <= 187// n <= len(p)) and any error encountered. 188// 189// When ReadAt returns n < len(p), it returns a non-nil error 190// explaining why more bytes were not returned. In this respect, 191// ReadAt is stricter than Read. 192// 193// Even if ReadAt returns n < len(p), it may use all of p as scratch 194// space during the call. If some data is available but not len(p) 195// bytes, ReadAt blocks until either all the data is available or an 196// error occurs. In this respect ReadAt is different from Read. 197// 198// If the n = len(p) bytes returned by ReadAt are at the end of the 199// input source, ReadAt may return either err == EOF or err == nil. 200// 201// If ReadAt is reading from an input source with a seek offset, 202// ReadAt should not affect nor be affected by the underlying seek 203// offset. 204// 205// Clients of ReadAt can execute parallel ReadAt calls on the same 206// input source. 207// 208// Implementations must not retain p. 209func (fh *ReadFileHandle) ReadAt(p []byte, off int64) (n int, err error) { 210 fh.mu.Lock() 211 defer fh.mu.Unlock() 212 return fh.readAt(p, off) 213} 214 215// This waits for *poff to equal off or aborts after the timeout. 216// 217// Waits here potentially affect all seeks so need to keep them short 218// 219// Call with fh.mu Locked 220func waitSequential(what string, remote string, cond *sync.Cond, maxWait time.Duration, poff *int64, off int64) { 221 var ( 222 timeout = time.NewTimer(maxWait) 223 done = make(chan struct{}) 224 abort = false 225 ) 226 go func() { 227 select { 228 case <-timeout.C: 229 // take the lock to make sure that cond.Wait() is called before 230 // cond.Broadcast. NB cond.L == mu 231 cond.L.Lock() 232 // set abort flag and give all the waiting goroutines a kick on timeout 233 abort = true 234 fs.Debugf(remote, "aborting in-sequence %s wait, off=%d", what, off) 235 cond.Broadcast() 236 cond.L.Unlock() 237 case <-done: 238 } 239 }() 240 for *poff != off && !abort { 241 fs.Debugf(remote, "waiting for in-sequence %s to %d for %v", what, off, maxWait) 242 cond.Wait() 243 } 244 // tidy up end timer 245 close(done) 246 timeout.Stop() 247 if *poff != off { 248 fs.Debugf(remote, "failed to wait for in-sequence %s to %d", what, off) 249 } 250} 251 252// Implementation of ReadAt - call with lock held 253func (fh *ReadFileHandle) readAt(p []byte, off int64) (n int, err error) { 254 // defer log.Trace(fh.remote, "p[%d], off=%d", len(p), off)("n=%d, err=%v", &n, &err) 255 err = fh.openPending() // FIXME pending open could be more efficient in the presence of seek (and retries) 256 if err != nil { 257 return 0, err 258 } 259 // fs.Debugf(fh.remote, "ReadFileHandle.Read size %d offset %d", reqSize, off) 260 if fh.closed { 261 fs.Errorf(fh.remote, "ReadFileHandle.Read error: %v", EBADF) 262 return 0, ECLOSED 263 } 264 maxBuf := 1024 * 1024 265 if len(p) < maxBuf { 266 maxBuf = len(p) 267 } 268 if gap := off - fh.offset; gap > 0 && gap < int64(8*maxBuf) { 269 waitSequential("read", fh.remote, fh.cond, fh.file.VFS().Opt.ReadWait, &fh.offset, off) 270 } 271 doSeek := off != fh.offset 272 if doSeek && fh.noSeek { 273 return 0, ESPIPE 274 } 275 var newOffset int64 276 retries := 0 277 reqSize := len(p) 278 doReopen := false 279 lowLevelRetries := fs.GetConfig(context.TODO()).LowLevelRetries 280 for { 281 if doSeek { 282 // Are we attempting to seek beyond the end of the 283 // file - if so just return EOF leaving the underlying 284 // file in an unchanged state. 285 if off >= fh.size { 286 fs.Debugf(fh.remote, "ReadFileHandle.Read attempt to read beyond end of file: %d > %d", off, fh.size) 287 return 0, io.EOF 288 } 289 // Otherwise do the seek 290 err = fh.seek(off, doReopen) 291 } else { 292 err = nil 293 } 294 if err == nil { 295 if reqSize > 0 { 296 fh.readCalled = true 297 } 298 n, err = io.ReadFull(fh.r, p) 299 newOffset = fh.offset + int64(n) 300 // if err == nil && rand.Intn(10) == 0 { 301 // err = errors.New("random error") 302 // } 303 if err == nil { 304 break 305 } else if (err == io.ErrUnexpectedEOF || err == io.EOF) && (newOffset == fh.size || fh.sizeUnknown) { 306 if fh.sizeUnknown { 307 // size is now known since we have read to the end 308 fh.sizeUnknown = false 309 fh.size = newOffset 310 } 311 // Have read to end of file - reset error 312 err = nil 313 break 314 } 315 } 316 if retries >= lowLevelRetries { 317 break 318 } 319 retries++ 320 fs.Errorf(fh.remote, "ReadFileHandle.Read error: low level retry %d/%d: %v", retries, lowLevelRetries, err) 321 doSeek = true 322 doReopen = true 323 } 324 if err != nil { 325 fs.Errorf(fh.remote, "ReadFileHandle.Read error: %v", err) 326 } else { 327 fh.offset = newOffset 328 // fs.Debugf(fh.remote, "ReadFileHandle.Read OK") 329 330 if fh.hash != nil { 331 _, err = fh.hash.Write(p[:n]) 332 if err != nil { 333 fs.Errorf(fh.remote, "ReadFileHandle.Read HashError: %v", err) 334 return 0, err 335 } 336 } 337 338 // If we have no error and we didn't fill the buffer, must be EOF 339 if n != len(p) { 340 err = io.EOF 341 } 342 } 343 fh.cond.Broadcast() // wake everyone up waiting for an in-sequence read 344 return n, err 345} 346 347func (fh *ReadFileHandle) checkHash() error { 348 if fh.hash == nil || !fh.readCalled || fh.offset < fh.size { 349 return nil 350 } 351 352 o := fh.file.getObject() 353 for hashType, dstSum := range fh.hash.Sums() { 354 srcSum, err := o.Hash(context.TODO(), hashType) 355 if err != nil { 356 if os.IsNotExist(errors.Cause(err)) { 357 // if it was file not found then at 358 // this point we don't care any more 359 continue 360 } 361 return err 362 } 363 if !hash.Equals(dstSum, srcSum) { 364 return errors.Errorf("corrupted on transfer: %v hash differ %q vs %q", hashType, dstSum, srcSum) 365 } 366 } 367 368 return nil 369} 370 371// Read reads up to len(p) bytes into p. It returns the number of bytes read (0 372// <= n <= len(p)) and any error encountered. Even if Read returns n < len(p), 373// it may use all of p as scratch space during the call. If some data is 374// available but not len(p) bytes, Read conventionally returns what is 375// available instead of waiting for more. 376// 377// When Read encounters an error or end-of-file condition after successfully 378// reading n > 0 bytes, it returns the number of bytes read. It may return the 379// (non-nil) error from the same call or return the error (and n == 0) from a 380// subsequent call. An instance of this general case is that a Reader returning 381// a non-zero number of bytes at the end of the input stream may return either 382// err == EOF or err == nil. The next Read should return 0, EOF. 383// 384// Callers should always process the n > 0 bytes returned before considering 385// the error err. Doing so correctly handles I/O errors that happen after 386// reading some bytes and also both of the allowed EOF behaviors. 387// 388// Implementations of Read are discouraged from returning a zero byte count 389// with a nil error, except when len(p) == 0. Callers should treat a return of 390// 0 and nil as indicating that nothing happened; in particular it does not 391// indicate EOF. 392// 393// Implementations must not retain p. 394func (fh *ReadFileHandle) Read(p []byte) (n int, err error) { 395 fh.mu.Lock() 396 defer fh.mu.Unlock() 397 if fh.roffset >= fh.size && !fh.sizeUnknown { 398 return 0, io.EOF 399 } 400 n, err = fh.readAt(p, fh.roffset) 401 fh.roffset += int64(n) 402 return n, err 403} 404 405// close the file handle returning EBADF if it has been 406// closed already. 407// 408// Must be called with fh.mu held 409func (fh *ReadFileHandle) close() error { 410 if fh.closed { 411 return ECLOSED 412 } 413 fh.closed = true 414 415 if fh.opened { 416 var err error 417 defer func() { 418 fh.done(context.TODO(), err) 419 }() 420 // Close first so that we have hashes 421 err = fh.r.Close() 422 if err != nil { 423 return err 424 } 425 // Now check the hash 426 err = fh.checkHash() 427 if err != nil { 428 return err 429 } 430 } 431 return nil 432} 433 434// Close closes the file 435func (fh *ReadFileHandle) Close() error { 436 fh.mu.Lock() 437 defer fh.mu.Unlock() 438 return fh.close() 439} 440 441// Flush is called each time the file or directory is closed. 442// Because there can be multiple file descriptors referring to a 443// single opened file, Flush can be called multiple times. 444func (fh *ReadFileHandle) Flush() error { 445 fh.mu.Lock() 446 defer fh.mu.Unlock() 447 if !fh.opened { 448 return nil 449 } 450 // fs.Debugf(fh.remote, "ReadFileHandle.Flush") 451 452 if err := fh.checkHash(); err != nil { 453 fs.Errorf(fh.remote, "ReadFileHandle.Flush error: %v", err) 454 return err 455 } 456 457 // fs.Debugf(fh.remote, "ReadFileHandle.Flush OK") 458 return nil 459} 460 461// Release is called when we are finished with the file handle 462// 463// It isn't called directly from userspace so the error is ignored by 464// the kernel 465func (fh *ReadFileHandle) Release() error { 466 fh.mu.Lock() 467 defer fh.mu.Unlock() 468 if !fh.opened { 469 return nil 470 } 471 if fh.closed { 472 fs.Debugf(fh.remote, "ReadFileHandle.Release nothing to do") 473 return nil 474 } 475 fs.Debugf(fh.remote, "ReadFileHandle.Release closing") 476 err := fh.close() 477 if err != nil { 478 fs.Errorf(fh.remote, "ReadFileHandle.Release error: %v", err) 479 } else { 480 // fs.Debugf(fh.remote, "ReadFileHandle.Release OK") 481 } 482 return err 483} 484 485// Size returns the size of the underlying file 486func (fh *ReadFileHandle) Size() int64 { 487 fh.mu.Lock() 488 defer fh.mu.Unlock() 489 return fh.size 490} 491 492// Stat returns info about the file 493func (fh *ReadFileHandle) Stat() (os.FileInfo, error) { 494 fh.mu.Lock() 495 defer fh.mu.Unlock() 496 return fh.file, nil 497} 498