1package operations 2 3import ( 4 "context" 5 "io" 6 "sync" 7 8 "github.com/pkg/errors" 9 "github.com/rclone/rclone/fs" 10 "github.com/rclone/rclone/fs/fserrors" 11) 12 13// ReOpen is a wrapper for an object reader which reopens the stream on error 14type ReOpen struct { 15 ctx context.Context 16 mu sync.Mutex // mutex to protect the below 17 src fs.Object // object to open 18 options []fs.OpenOption // option to pass to initial open 19 rc io.ReadCloser // underlying stream 20 read int64 // number of bytes read from this stream 21 maxTries int // maximum number of retries 22 tries int // number of retries we've had so far in this stream 23 err error // if this is set then Read/Close calls will return it 24 opened bool // if set then rc is valid and needs closing 25} 26 27var ( 28 errorFileClosed = errors.New("file already closed") 29 errorTooManyTries = errors.New("failed to reopen: too many retries") 30) 31 32// NewReOpen makes a handle which will reopen itself and seek to where it was on errors 33// 34// If hashOption is set this will be applied when reading from the start 35// 36// If rangeOption is set then this will applied when reading from the 37// start, and updated on retries. 38func NewReOpen(ctx context.Context, src fs.Object, maxTries int, options ...fs.OpenOption) (rc io.ReadCloser, err error) { 39 h := &ReOpen{ 40 ctx: ctx, 41 src: src, 42 maxTries: maxTries, 43 options: options, 44 } 45 h.mu.Lock() 46 defer h.mu.Unlock() 47 err = h.open() 48 if err != nil { 49 return nil, err 50 } 51 return h, nil 52} 53 54// open the underlying handle - call with lock held 55// 56// we don't retry here as the Open() call will itself have low level retries 57func (h *ReOpen) open() error { 58 opts := []fs.OpenOption{} 59 var hashOption *fs.HashesOption 60 var rangeOption *fs.RangeOption 61 for _, option := range h.options { 62 switch option.(type) { 63 case *fs.HashesOption: 64 hashOption = option.(*fs.HashesOption) 65 case *fs.RangeOption: 66 rangeOption = option.(*fs.RangeOption) 67 case *fs.HTTPOption: 68 opts = append(opts, option) 69 default: 70 if option.Mandatory() { 71 fs.Logf(h.src, "Unsupported mandatory option: %v", option) 72 } 73 } 74 } 75 if h.read == 0 { 76 if rangeOption != nil { 77 opts = append(opts, rangeOption) 78 } 79 if hashOption != nil { 80 // put hashOption on if reading from the start, ditch otherwise 81 opts = append(opts, hashOption) 82 } 83 } else { 84 if rangeOption != nil { 85 // range to the read point 86 opts = append(opts, &fs.RangeOption{Start: rangeOption.Start + h.read, End: rangeOption.End}) 87 } else { 88 // seek to the read point 89 opts = append(opts, &fs.SeekOption{Offset: h.read}) 90 } 91 } 92 h.tries++ 93 if h.tries > h.maxTries { 94 h.err = errorTooManyTries 95 } else { 96 h.rc, h.err = h.src.Open(h.ctx, opts...) 97 } 98 if h.err != nil { 99 if h.tries > 1 { 100 fs.Debugf(h.src, "Reopen failed after %d bytes read: %v", h.read, h.err) 101 } 102 return h.err 103 } 104 h.opened = true 105 return nil 106} 107 108// Read bytes retrying as necessary 109func (h *ReOpen) Read(p []byte) (n int, err error) { 110 h.mu.Lock() 111 defer h.mu.Unlock() 112 if h.err != nil { 113 // return a previous error if there is one 114 return n, h.err 115 } 116 n, err = h.rc.Read(p) 117 if err != nil { 118 h.err = err 119 } 120 h.read += int64(n) 121 if err != nil && err != io.EOF && !fserrors.IsNoLowLevelRetryError(err) { 122 // close underlying stream 123 h.opened = false 124 _ = h.rc.Close() 125 // reopen stream, clearing error if successful 126 fs.Debugf(h.src, "Reopening on read failure after %d bytes: retry %d/%d: %v", h.read, h.tries, h.maxTries, err) 127 if h.open() == nil { 128 err = nil 129 } 130 } 131 return n, err 132} 133 134// Close the stream 135func (h *ReOpen) Close() error { 136 h.mu.Lock() 137 defer h.mu.Unlock() 138 if !h.opened { 139 return errorFileClosed 140 } 141 h.opened = false 142 h.err = errorFileClosed 143 return h.rc.Close() 144} 145