1package operations
2
3import (
4	"context"
5	"io"
6	"sync"
7
8	"github.com/pkg/errors"
9	"github.com/rclone/rclone/fs"
10	"github.com/rclone/rclone/fs/fserrors"
11)
12
13// ReOpen is a wrapper for an object reader which reopens the stream on error
14type ReOpen struct {
15	ctx      context.Context
16	mu       sync.Mutex      // mutex to protect the below
17	src      fs.Object       // object to open
18	options  []fs.OpenOption // option to pass to initial open
19	rc       io.ReadCloser   // underlying stream
20	read     int64           // number of bytes read from this stream
21	maxTries int             // maximum number of retries
22	tries    int             // number of retries we've had so far in this stream
23	err      error           // if this is set then Read/Close calls will return it
24	opened   bool            // if set then rc is valid and needs closing
25}
26
27var (
28	errorFileClosed   = errors.New("file already closed")
29	errorTooManyTries = errors.New("failed to reopen: too many retries")
30)
31
32// NewReOpen makes a handle which will reopen itself and seek to where it was on errors
33//
34// If hashOption is set this will be applied when reading from the start
35//
36// If rangeOption is set then this will applied when reading from the
37// start, and updated on retries.
38func NewReOpen(ctx context.Context, src fs.Object, maxTries int, options ...fs.OpenOption) (rc io.ReadCloser, err error) {
39	h := &ReOpen{
40		ctx:      ctx,
41		src:      src,
42		maxTries: maxTries,
43		options:  options,
44	}
45	h.mu.Lock()
46	defer h.mu.Unlock()
47	err = h.open()
48	if err != nil {
49		return nil, err
50	}
51	return h, nil
52}
53
54// open the underlying handle - call with lock held
55//
56// we don't retry here as the Open() call will itself have low level retries
57func (h *ReOpen) open() error {
58	opts := []fs.OpenOption{}
59	var hashOption *fs.HashesOption
60	var rangeOption *fs.RangeOption
61	for _, option := range h.options {
62		switch option.(type) {
63		case *fs.HashesOption:
64			hashOption = option.(*fs.HashesOption)
65		case *fs.RangeOption:
66			rangeOption = option.(*fs.RangeOption)
67		case *fs.HTTPOption:
68			opts = append(opts, option)
69		default:
70			if option.Mandatory() {
71				fs.Logf(h.src, "Unsupported mandatory option: %v", option)
72			}
73		}
74	}
75	if h.read == 0 {
76		if rangeOption != nil {
77			opts = append(opts, rangeOption)
78		}
79		if hashOption != nil {
80			// put hashOption on if reading from the start, ditch otherwise
81			opts = append(opts, hashOption)
82		}
83	} else {
84		if rangeOption != nil {
85			// range to the read point
86			opts = append(opts, &fs.RangeOption{Start: rangeOption.Start + h.read, End: rangeOption.End})
87		} else {
88			// seek to the read point
89			opts = append(opts, &fs.SeekOption{Offset: h.read})
90		}
91	}
92	h.tries++
93	if h.tries > h.maxTries {
94		h.err = errorTooManyTries
95	} else {
96		h.rc, h.err = h.src.Open(h.ctx, opts...)
97	}
98	if h.err != nil {
99		if h.tries > 1 {
100			fs.Debugf(h.src, "Reopen failed after %d bytes read: %v", h.read, h.err)
101		}
102		return h.err
103	}
104	h.opened = true
105	return nil
106}
107
108// Read bytes retrying as necessary
109func (h *ReOpen) Read(p []byte) (n int, err error) {
110	h.mu.Lock()
111	defer h.mu.Unlock()
112	if h.err != nil {
113		// return a previous error if there is one
114		return n, h.err
115	}
116	n, err = h.rc.Read(p)
117	if err != nil {
118		h.err = err
119	}
120	h.read += int64(n)
121	if err != nil && err != io.EOF && !fserrors.IsNoLowLevelRetryError(err) {
122		// close underlying stream
123		h.opened = false
124		_ = h.rc.Close()
125		// reopen stream, clearing error if successful
126		fs.Debugf(h.src, "Reopening on read failure after %d bytes: retry %d/%d: %v", h.read, h.tries, h.maxTries, err)
127		if h.open() == nil {
128			err = nil
129		}
130	}
131	return n, err
132}
133
134// Close the stream
135func (h *ReOpen) Close() error {
136	h.mu.Lock()
137	defer h.mu.Unlock()
138	if !h.opened {
139		return errorFileClosed
140	}
141	h.opened = false
142	h.err = errorFileClosed
143	return h.rc.Close()
144}
145