1package azblob
2
3import (
4	"context"
5	"io"
6	"net"
7	"net/http"
8	"strings"
9	"sync"
10)
11
12const CountToEnd = 0
13
14// HTTPGetter is a function type that refers to a method that performs an HTTP GET operation.
15type HTTPGetter func(ctx context.Context, i HTTPGetterInfo) (*http.Response, error)
16
17// HTTPGetterInfo is passed to an HTTPGetter function passing it parameters
18// that should be used to make an HTTP GET request.
19type HTTPGetterInfo struct {
20	// Offset specifies the start offset that should be used when
21	// creating the HTTP GET request's Range header
22	Offset int64
23
24	// Count specifies the count of bytes that should be used to calculate
25	// the end offset when creating the HTTP GET request's Range header
26	Count int64
27
28	// ETag specifies the resource's etag that should be used when creating
29	// the HTTP GET request's If-Match header
30	ETag ETag
31}
32
33// FailedReadNotifier is a function type that represents the notification function called when a read fails
34type FailedReadNotifier func(failureCount int, lastError error, offset int64, count int64, willRetry bool)
35
36// RetryReaderOptions contains properties which can help to decide when to do retry.
37type RetryReaderOptions struct {
38	// MaxRetryRequests specifies the maximum number of HTTP GET requests that will be made
39	// while reading from a RetryReader. A value of zero means that no additional HTTP
40	// GET requests will be made.
41	MaxRetryRequests   int
42	doInjectError      bool
43	doInjectErrorRound int
44
45	// NotifyFailedRead is called, if non-nil, after any failure to read. Expected usage is diagnostic logging.
46	NotifyFailedRead FailedReadNotifier
47
48	// TreatEarlyCloseAsError can be set to true to prevent retries after "read on closed response body". By default,
49	// retryReader has the following special behaviour: closing the response body before it is all read is treated as a
50	// retryable error. This is to allow callers to force a retry by closing the body from another goroutine (e.g. if the =
51	// read is too slow, caller may want to force a retry in the hope that the retry will be quicker).  If
52	// TreatEarlyCloseAsError is true, then retryReader's special behaviour is suppressed, and "read on closed body" is instead
53	// treated as a fatal (non-retryable) error.
54	// Note that setting TreatEarlyCloseAsError only guarantees that Closing will produce a fatal error if the Close happens
55	// from the same "thread" (goroutine) as Read.  Concurrent Close calls from other goroutines may instead produce network errors
56	// which will be retried.
57	TreatEarlyCloseAsError bool
58}
59
60// retryReader implements io.ReaderCloser methods.
61// retryReader tries to read from response, and if there is retriable network error
62// returned during reading, it will retry according to retry reader option through executing
63// user defined action with provided data to get a new response, and continue the overall reading process
64// through reading from the new response.
65type retryReader struct {
66	ctx             context.Context
67	info            HTTPGetterInfo
68	countWasBounded bool
69	o               RetryReaderOptions
70	getter          HTTPGetter
71
72	// we support Close-ing during Reads (from other goroutines), so we protect the shared state, which is response
73	responseMu *sync.Mutex
74	response   *http.Response
75}
76
77// NewRetryReader creates a retry reader.
78func NewRetryReader(ctx context.Context, initialResponse *http.Response,
79	info HTTPGetterInfo, o RetryReaderOptions, getter HTTPGetter) io.ReadCloser {
80	return &retryReader{
81		ctx:             ctx,
82		getter:          getter,
83		info:            info,
84		countWasBounded: info.Count != CountToEnd,
85		response:        initialResponse,
86		responseMu:      &sync.Mutex{},
87		o:               o}
88}
89
90func (s *retryReader) setResponse(r *http.Response) {
91	s.responseMu.Lock()
92	defer s.responseMu.Unlock()
93	s.response = r
94}
95
96func (s *retryReader) Read(p []byte) (n int, err error) {
97	for try := 0; ; try++ {
98		//fmt.Println(try)       // Comment out for debugging.
99		if s.countWasBounded && s.info.Count == CountToEnd {
100			// User specified an original count and the remaining bytes are 0, return 0, EOF
101			return 0, io.EOF
102		}
103
104		s.responseMu.Lock()
105		resp := s.response
106		s.responseMu.Unlock()
107		if resp == nil { // We don't have a response stream to read from, try to get one.
108			newResponse, err := s.getter(s.ctx, s.info)
109			if err != nil {
110				return 0, err
111			}
112			// Successful GET; this is the network stream we'll read from.
113			s.setResponse(newResponse)
114			resp = newResponse
115		}
116		n, err := resp.Body.Read(p) // Read from the stream (this will return non-nil err if forceRetry is called, from another goroutine, while it is running)
117
118		// Injection mechanism for testing.
119		if s.o.doInjectError && try == s.o.doInjectErrorRound {
120			err = &net.DNSError{IsTemporary: true}
121		}
122
123		// We successfully read data or end EOF.
124		if err == nil || err == io.EOF {
125			s.info.Offset += int64(n) // Increments the start offset in case we need to make a new HTTP request in the future
126			if s.info.Count != CountToEnd {
127				s.info.Count -= int64(n) // Decrement the count in case we need to make a new HTTP request in the future
128			}
129			return n, err // Return the return to the caller
130		}
131		s.Close()          // Error, close stream
132		s.setResponse(nil) // Our stream is no longer good
133
134		// Check the retry count and error code, and decide whether to retry.
135		retriesExhausted := try >= s.o.MaxRetryRequests
136		_, isNetError := err.(net.Error)
137		willRetry := (isNetError || s.wasRetryableEarlyClose(err)) && !retriesExhausted
138
139		// Notify, for logging purposes, of any failures
140		if s.o.NotifyFailedRead != nil {
141			failureCount := try + 1 // because try is zero-based
142			s.o.NotifyFailedRead(failureCount, err, s.info.Offset, s.info.Count, willRetry)
143		}
144
145		if willRetry {
146			continue
147			// Loop around and try to get and read from new stream.
148		}
149		return n, err // Not retryable, or retries exhausted, so just return
150	}
151}
152
153// By default, we allow early Closing, from another concurrent goroutine, to be used to force a retry
154// Is this safe, to close early from another goroutine?  Early close ultimately ends up calling
155// net.Conn.Close, and that is documented as "Any blocked Read or Write operations will be unblocked and return errors"
156// which is exactly the behaviour we want.
157// NOTE: that if caller has forced an early Close from a separate goroutine (separate from the Read)
158// then there are two different types of error that may happen - either the one one we check for here,
159// or a net.Error (due to closure of connection). Which one happens depends on timing. We only need this routine
160// to check for one, since the other is a net.Error, which our main Read retry loop is already handing.
161func (s *retryReader) wasRetryableEarlyClose(err error) bool {
162	if s.o.TreatEarlyCloseAsError {
163		return false // user wants all early closes to be errors, and so not retryable
164	}
165	// unfortunately, http.errReadOnClosedResBody is private, so the best we can do here is to check for its text
166	return strings.HasSuffix(err.Error(), ReadOnClosedBodyMessage)
167}
168
169const ReadOnClosedBodyMessage = "read on closed response body"
170
171func (s *retryReader) Close() error {
172	s.responseMu.Lock()
173	defer s.responseMu.Unlock()
174	if s.response != nil && s.response.Body != nil {
175		return s.response.Body.Close()
176	}
177	return nil
178}
179