1package azblob
2
3import (
4	"context"
5	"errors"
6	"io"
7	"io/ioutil"
8	"math/rand"
9	"net"
10	"net/http"
11	"strconv"
12	"strings"
13	"time"
14
15	"github.com/Azure/azure-pipeline-go/pipeline"
16)
17
18// RetryPolicy tells the pipeline what kind of retry policy to use. See the RetryPolicy* constants.
19type RetryPolicy int32
20
21const (
22	// RetryPolicyExponential tells the pipeline to use an exponential back-off retry policy
23	RetryPolicyExponential RetryPolicy = 0
24
25	// RetryPolicyFixed tells the pipeline to use a fixed back-off retry policy
26	RetryPolicyFixed RetryPolicy = 1
27)
28
29// RetryOptions configures the retry policy's behavior.
30type RetryOptions struct {
31	// Policy tells the pipeline what kind of retry policy to use. See the RetryPolicy* constants.\
32	// A value of zero means that you accept our default policy.
33	Policy RetryPolicy
34
35	// MaxTries specifies the maximum number of attempts an operation will be tried before producing an error (0=default).
36	// A value of zero means that you accept our default policy. A value of 1 means 1 try and no retries.
37	MaxTries int32
38
39	// TryTimeout indicates the maximum time allowed for any single try of an HTTP request.
40	// A value of zero means that you accept our default timeout. NOTE: When transferring large amounts
41	// of data, the default TryTimeout will probably not be sufficient. You should override this value
42	// based on the bandwidth available to the host machine and proximity to the Storage service. A good
43	// starting point may be something like (60 seconds per MB of anticipated-payload-size).
44	TryTimeout time.Duration
45
46	// RetryDelay specifies the amount of delay to use before retrying an operation (0=default).
47	// When RetryPolicy is specified as RetryPolicyExponential, the delay increases exponentially
48	// with each retry up to a maximum specified by MaxRetryDelay.
49	// If you specify 0, then you must also specify 0 for MaxRetryDelay.
50	// If you specify RetryDelay, then you must also specify MaxRetryDelay, and MaxRetryDelay should be
51	// equal to or greater than RetryDelay.
52	RetryDelay time.Duration
53
54	// MaxRetryDelay specifies the maximum delay allowed before retrying an operation (0=default).
55	// If you specify 0, then you must also specify 0 for RetryDelay.
56	MaxRetryDelay time.Duration
57
58	// RetryReadsFromSecondaryHost specifies whether the retry policy should retry a read operation against another host.
59	// If RetryReadsFromSecondaryHost is "" (the default) then operations are not retried against another host.
60	// NOTE: Before setting this field, make sure you understand the issues around reading stale & potentially-inconsistent
61	// data at this webpage: https://docs.microsoft.com/en-us/azure/storage/common/storage-designing-ha-apps-with-ragrs
62	RetryReadsFromSecondaryHost string // Comment this our for non-Blob SDKs
63}
64
65func (o RetryOptions) retryReadsFromSecondaryHost() string {
66	return o.RetryReadsFromSecondaryHost // This is for the Blob SDK only
67	//return "" // This is for non-blob SDKs
68}
69
70func (o RetryOptions) defaults() RetryOptions {
71	// We assume the following:
72	// 1. o.Policy should either be RetryPolicyExponential or RetryPolicyFixed
73	// 2. o.MaxTries >= 0
74	// 3. o.TryTimeout, o.RetryDelay, and o.MaxRetryDelay >=0
75	// 4. o.RetryDelay <= o.MaxRetryDelay
76	// 5. Both o.RetryDelay and o.MaxRetryDelay must be 0 or neither can be 0
77
78	IfDefault := func(current *time.Duration, desired time.Duration) {
79		if *current == time.Duration(0) {
80			*current = desired
81		}
82	}
83
84	// Set defaults if unspecified
85	if o.MaxTries == 0 {
86		o.MaxTries = 4
87	}
88	switch o.Policy {
89	case RetryPolicyExponential:
90		IfDefault(&o.TryTimeout, 1*time.Minute)
91		IfDefault(&o.RetryDelay, 4*time.Second)
92		IfDefault(&o.MaxRetryDelay, 120*time.Second)
93
94	case RetryPolicyFixed:
95		IfDefault(&o.TryTimeout, 1*time.Minute)
96		IfDefault(&o.RetryDelay, 30*time.Second)
97		IfDefault(&o.MaxRetryDelay, 120*time.Second)
98	}
99	return o
100}
101
102func (o RetryOptions) calcDelay(try int32) time.Duration { // try is >=1; never 0
103	pow := func(number int64, exponent int32) int64 { // pow is nested helper function
104		var result int64 = 1
105		for n := int32(0); n < exponent; n++ {
106			result *= number
107		}
108		return result
109	}
110
111	delay := time.Duration(0)
112	switch o.Policy {
113	case RetryPolicyExponential:
114		delay = time.Duration(pow(2, try-1)-1) * o.RetryDelay
115
116	case RetryPolicyFixed:
117		if try > 1 { // Any try after the 1st uses the fixed delay
118			delay = o.RetryDelay
119		}
120	}
121
122	// Introduce some jitter:  [0.0, 1.0) / 2 = [0.0, 0.5) + 0.8 = [0.8, 1.3)
123	// For casts and rounding - be careful, as per https://github.com/golang/go/issues/20757
124	delay = time.Duration(float32(delay) * (rand.Float32()/2 + 0.8)) // NOTE: We want math/rand; not crypto/rand
125	if delay > o.MaxRetryDelay {
126		delay = o.MaxRetryDelay
127	}
128	return delay
129}
130
131// NewRetryPolicyFactory creates a RetryPolicyFactory object configured using the specified options.
132func NewRetryPolicyFactory(o RetryOptions) pipeline.Factory {
133	o = o.defaults() // Force defaults to be calculated
134	return pipeline.FactoryFunc(func(next pipeline.Policy, po *pipeline.PolicyOptions) pipeline.PolicyFunc {
135		return func(ctx context.Context, request pipeline.Request) (response pipeline.Response, err error) {
136			// Before each try, we'll select either the primary or secondary URL.
137			primaryTry := int32(0) // This indicates how many tries we've attempted against the primary DC
138
139			// We only consider retrying against a secondary if we have a read request (GET/HEAD) AND this policy has a Secondary URL it can use
140			considerSecondary := (request.Method == http.MethodGet || request.Method == http.MethodHead) && o.retryReadsFromSecondaryHost() != ""
141
142			// Exponential retry algorithm: ((2 ^ attempt) - 1) * delay * random(0.8, 1.2)
143			// When to retry: connection failure or temporary/timeout. NOTE: StorageError considers HTTP 500/503 as temporary & is therefore retryable
144			// If using a secondary:
145			//    Even tries go against primary; odd tries go against the secondary
146			//    For a primary wait ((2 ^ primaryTries - 1) * delay * random(0.8, 1.2)
147			//    If secondary gets a 404, don't fail, retry but future retries are only against the primary
148			//    When retrying against a secondary, ignore the retry count and wait (.1 second * random(0.8, 1.2))
149			for try := int32(1); try <= o.MaxTries; try++ {
150				logf("\n=====> Try=%d\n", try)
151
152				// Determine which endpoint to try. It's primary if there is no secondary or if it is an add # attempt.
153				tryingPrimary := !considerSecondary || (try%2 == 1)
154				// Select the correct host and delay
155				if tryingPrimary {
156					primaryTry++
157					delay := o.calcDelay(primaryTry)
158					logf("Primary try=%d, Delay=%v\n", primaryTry, delay)
159					time.Sleep(delay) // The 1st try returns 0 delay
160				} else {
161					// For casts and rounding - be careful, as per https://github.com/golang/go/issues/20757
162					delay := time.Duration(float32(time.Second) * (rand.Float32()/2 + 0.8))
163					logf("Secondary try=%d, Delay=%v\n", try-primaryTry, delay)
164					time.Sleep(delay) // Delay with some jitter before trying secondary
165				}
166
167				// Clone the original request to ensure that each try starts with the original (unmutated) request.
168				requestCopy := request.Copy()
169
170				// For each try, seek to the beginning of the Body stream. We do this even for the 1st try because
171				// the stream may not be at offset 0 when we first get it and we want the same behavior for the
172				// 1st try as for additional tries.
173				err = requestCopy.RewindBody()
174				if err != nil {
175					return nil, errors.New("we must be able to seek on the Body Stream, otherwise retries would cause data corruption")
176				}
177
178				if !tryingPrimary {
179					requestCopy.URL.Host = o.retryReadsFromSecondaryHost()
180					requestCopy.Host = o.retryReadsFromSecondaryHost()
181				}
182
183				// Set the server-side timeout query parameter "timeout=[seconds]"
184				timeout := int32(o.TryTimeout.Seconds()) // Max seconds per try
185				if deadline, ok := ctx.Deadline(); ok {  // If user's ctx has a deadline, make the timeout the smaller of the two
186					t := int32(deadline.Sub(time.Now()).Seconds()) // Duration from now until user's ctx reaches its deadline
187					logf("MaxTryTimeout=%d secs, TimeTilDeadline=%d sec\n", timeout, t)
188					if t < timeout {
189						timeout = t
190					}
191					if timeout < 0 {
192						timeout = 0 // If timeout ever goes negative, set it to zero; this happen while debugging
193					}
194					logf("TryTimeout adjusted to=%d sec\n", timeout)
195				}
196				q := requestCopy.Request.URL.Query()
197				q.Set("timeout", strconv.Itoa(int(timeout+1))) // Add 1 to "round up"
198				requestCopy.Request.URL.RawQuery = q.Encode()
199				logf("Url=%s\n", requestCopy.Request.URL.String())
200
201				// Set the time for this particular retry operation and then Do the operation.
202				tryCtx, tryCancel := context.WithTimeout(ctx, time.Second*time.Duration(timeout))
203				//requestCopy.Body = &deadlineExceededReadCloser{r: requestCopy.Request.Body}
204				response, err = next.Do(tryCtx, requestCopy) // Make the request
205				/*err = improveDeadlineExceeded(err)
206				if err == nil {
207					response.Response().Body = &deadlineExceededReadCloser{r: response.Response().Body}
208				}*/
209				logf("Err=%v, response=%v\n", err, response)
210
211				action := "" // This MUST get changed within the switch code below
212				switch {
213				case ctx.Err() != nil:
214					action = "NoRetry: Op timeout"
215				case !tryingPrimary && response != nil && response.Response() != nil && response.Response().StatusCode == http.StatusNotFound:
216					// If attempt was against the secondary & it returned a StatusNotFound (404), then
217					// the resource was not found. This may be due to replication delay. So, in this
218					// case, we'll never try the secondary again for this operation.
219					considerSecondary = false
220					action = "Retry: Secondary URL returned 404"
221				case err != nil:
222					// NOTE: Protocol Responder returns non-nil if REST API returns invalid status code for the invoked operation.
223					// Use ServiceCode to verify if the error is related to storage service-side,
224					// ServiceCode is set only when error related to storage service happened.
225					if stErr, ok := err.(StorageError); ok {
226						if stErr.Temporary() {
227							action = "Retry: StorageError with error service code and Temporary()"
228						} else if stErr.Response() != nil && isSuccessStatusCode(stErr.Response()) { // TODO: This is a temporarily work around, remove this after protocol layer fix the issue that net.Error is wrapped as storageError
229							action = "Retry: StorageError with success status code"
230						} else {
231							action = "NoRetry: StorageError not Temporary() and without retriable status code"
232						}
233					} else if netErr, ok := err.(net.Error); ok {
234						// Use non-retriable net.Error list, but not retriable list.
235						// As there are errors without Temporary() implementation,
236						// while need be retried, like 'connection reset by peer', 'transport connection broken' and etc.
237						// So the SDK do retry for most of the case, unless the error should not be retried for sure.
238						if !isNotRetriable(netErr) {
239							action = "Retry: net.Error and not in the non-retriable list"
240						} else {
241							action = "NoRetry: net.Error and in the non-retriable list"
242						}
243					} else {
244						action = "NoRetry: unrecognized error"
245					}
246				default:
247					action = "NoRetry: successful HTTP request" // no error
248				}
249
250				logf("Action=%s\n", action)
251				// fmt.Println(action + "\n") // This is where we could log the retry operation; action is why we're retrying
252				if action[0] != 'R' { // Retry only if action starts with 'R'
253					if err != nil {
254						tryCancel() // If we're returning an error, cancel this current/last per-retry timeout context
255					} else {
256						// We wrap the last per-try context in a body and overwrite the Response's Body field with our wrapper.
257						// So, when the user closes the Body, the our per-try context gets closed too.
258						// Another option, is that the Last Policy do this wrapping for a per-retry context (not for the user's context)
259						if response == nil || response.Response() == nil {
260							// We do panic in the case response or response.Response() is nil,
261							// as for client, the response should not be nil if request is sent and the operations is executed successfully.
262							// Another option, is that execute the cancel function when response or response.Response() is nil,
263							// as in this case, current per-try has nothing to do in future.
264							return nil, errors.New("invalid state, response should not be nil when the operation is executed successfully")
265						}
266						response.Response().Body = &contextCancelReadCloser{cf: tryCancel, body: response.Response().Body}
267					}
268					break // Don't retry
269				}
270				if response != nil && response.Response() != nil && response.Response().Body != nil {
271					// If we're going to retry and we got a previous response, then flush its body to avoid leaking its TCP connection
272					body := response.Response().Body
273					io.Copy(ioutil.Discard, body)
274					body.Close()
275				}
276				// If retrying, cancel the current per-try timeout context
277				tryCancel()
278			}
279			return response, err // Not retryable or too many retries; return the last response/error
280		}
281	})
282}
283
284// contextCancelReadCloser helps to invoke context's cancelFunc properly when the ReadCloser is closed.
285type contextCancelReadCloser struct {
286	cf   context.CancelFunc
287	body io.ReadCloser
288}
289
290func (rc *contextCancelReadCloser) Read(p []byte) (n int, err error) {
291	return rc.body.Read(p)
292}
293
294func (rc *contextCancelReadCloser) Close() error {
295	err := rc.body.Close()
296	if rc.cf != nil {
297		rc.cf()
298	}
299	return err
300}
301
302// isNotRetriable checks if the provided net.Error isn't retriable.
303func isNotRetriable(errToParse net.Error) bool {
304	// No error, so this is NOT retriable.
305	if errToParse == nil {
306		return true
307	}
308
309	// The error is either temporary or a timeout so it IS retriable (not not retriable).
310	if errToParse.Temporary() || errToParse.Timeout() {
311		return false
312	}
313
314	genericErr := error(errToParse)
315
316	// From here all the error are neither Temporary() nor Timeout().
317	switch err := errToParse.(type) {
318	case *net.OpError:
319		// The net.Error is also a net.OpError but the inner error is nil, so this is not retriable.
320		if err.Err == nil {
321			return true
322		}
323		genericErr = err.Err
324	}
325
326	switch genericErr.(type) {
327	case *net.AddrError, net.UnknownNetworkError, *net.DNSError, net.InvalidAddrError, *net.ParseError, *net.DNSConfigError:
328		// If the error is one of the ones listed, then it is NOT retriable.
329		return true
330	}
331
332	// If it's invalid header field name/value error thrown by http module, then it is NOT retriable.
333	// This could happen when metadata's key or value is invalid. (RoundTrip in transport.go)
334	if strings.Contains(genericErr.Error(), "invalid header field") {
335		return true
336	}
337
338	// Assume the error is retriable.
339	return false
340}
341
342var successStatusCodes = []int{http.StatusOK, http.StatusCreated, http.StatusAccepted, http.StatusNoContent, http.StatusPartialContent}
343
344func isSuccessStatusCode(resp *http.Response) bool {
345	if resp == nil {
346		return false
347	}
348	for _, i := range successStatusCodes {
349		if i == resp.StatusCode {
350			return true
351		}
352	}
353	return false
354}
355
356// According to https://github.com/golang/go/wiki/CompilerOptimizations, the compiler will inline this method and hopefully optimize all calls to it away
357var logf = func(format string, a ...interface{}) {}
358
359// Use this version to see the retry method's code path (import "fmt")
360//var logf = fmt.Printf
361
362/*
363type deadlineExceededReadCloser struct {
364	r io.ReadCloser
365}
366
367func (r *deadlineExceededReadCloser) Read(p []byte) (int, error) {
368	n, err := 0, io.EOF
369	if r.r != nil {
370		n, err = r.r.Read(p)
371	}
372	return n, improveDeadlineExceeded(err)
373}
374func (r *deadlineExceededReadCloser) Seek(offset int64, whence int) (int64, error) {
375	// For an HTTP request, the ReadCloser MUST also implement seek
376	// For an HTTP response, Seek MUST not be called (or this will panic)
377	o, err := r.r.(io.Seeker).Seek(offset, whence)
378	return o, improveDeadlineExceeded(err)
379}
380func (r *deadlineExceededReadCloser) Close() error {
381	if c, ok := r.r.(io.Closer); ok {
382		c.Close()
383	}
384	return nil
385}
386
387// timeoutError is the internal struct that implements our richer timeout error.
388type deadlineExceeded struct {
389	responseError
390}
391
392var _ net.Error = (*deadlineExceeded)(nil) // Ensure deadlineExceeded implements the net.Error interface at compile time
393
394// improveDeadlineExceeded creates a timeoutError object that implements the error interface IF cause is a context.DeadlineExceeded error.
395func improveDeadlineExceeded(cause error) error {
396	// If cause is not DeadlineExceeded, return the same error passed in.
397	if cause != context.DeadlineExceeded {
398		return cause
399	}
400	// Else, convert DeadlineExceeded to our timeoutError which gives a richer string message
401	return &deadlineExceeded{
402		responseError: responseError{
403			ErrorNode: pipeline.ErrorNode{}.Initialize(cause, 3),
404		},
405	}
406}
407
408// Error implements the error interface's Error method to return a string representation of the error.
409func (e *deadlineExceeded) Error() string {
410	return e.ErrorNode.Error("context deadline exceeded; when creating a pipeline, consider increasing RetryOptions' TryTimeout field")
411}
412*/
413