1package azblob 2 3import ( 4 "context" 5 "errors" 6 "io" 7 "io/ioutil" 8 "math/rand" 9 "net" 10 "net/http" 11 "strconv" 12 "strings" 13 "time" 14 15 "github.com/Azure/azure-pipeline-go/pipeline" 16) 17 18// RetryPolicy tells the pipeline what kind of retry policy to use. See the RetryPolicy* constants. 19type RetryPolicy int32 20 21const ( 22 // RetryPolicyExponential tells the pipeline to use an exponential back-off retry policy 23 RetryPolicyExponential RetryPolicy = 0 24 25 // RetryPolicyFixed tells the pipeline to use a fixed back-off retry policy 26 RetryPolicyFixed RetryPolicy = 1 27) 28 29// RetryOptions configures the retry policy's behavior. 30type RetryOptions struct { 31 // Policy tells the pipeline what kind of retry policy to use. See the RetryPolicy* constants.\ 32 // A value of zero means that you accept our default policy. 33 Policy RetryPolicy 34 35 // MaxTries specifies the maximum number of attempts an operation will be tried before producing an error (0=default). 36 // A value of zero means that you accept our default policy. A value of 1 means 1 try and no retries. 37 MaxTries int32 38 39 // TryTimeout indicates the maximum time allowed for any single try of an HTTP request. 40 // A value of zero means that you accept our default timeout. NOTE: When transferring large amounts 41 // of data, the default TryTimeout will probably not be sufficient. You should override this value 42 // based on the bandwidth available to the host machine and proximity to the Storage service. A good 43 // starting point may be something like (60 seconds per MB of anticipated-payload-size). 44 TryTimeout time.Duration 45 46 // RetryDelay specifies the amount of delay to use before retrying an operation (0=default). 47 // When RetryPolicy is specified as RetryPolicyExponential, the delay increases exponentially 48 // with each retry up to a maximum specified by MaxRetryDelay. 49 // If you specify 0, then you must also specify 0 for MaxRetryDelay. 50 // If you specify RetryDelay, then you must also specify MaxRetryDelay, and MaxRetryDelay should be 51 // equal to or greater than RetryDelay. 52 RetryDelay time.Duration 53 54 // MaxRetryDelay specifies the maximum delay allowed before retrying an operation (0=default). 55 // If you specify 0, then you must also specify 0 for RetryDelay. 56 MaxRetryDelay time.Duration 57 58 // RetryReadsFromSecondaryHost specifies whether the retry policy should retry a read operation against another host. 59 // If RetryReadsFromSecondaryHost is "" (the default) then operations are not retried against another host. 60 // NOTE: Before setting this field, make sure you understand the issues around reading stale & potentially-inconsistent 61 // data at this webpage: https://docs.microsoft.com/en-us/azure/storage/common/storage-designing-ha-apps-with-ragrs 62 RetryReadsFromSecondaryHost string // Comment this our for non-Blob SDKs 63} 64 65func (o RetryOptions) retryReadsFromSecondaryHost() string { 66 return o.RetryReadsFromSecondaryHost // This is for the Blob SDK only 67 //return "" // This is for non-blob SDKs 68} 69 70func (o RetryOptions) defaults() RetryOptions { 71 // We assume the following: 72 // 1. o.Policy should either be RetryPolicyExponential or RetryPolicyFixed 73 // 2. o.MaxTries >= 0 74 // 3. o.TryTimeout, o.RetryDelay, and o.MaxRetryDelay >=0 75 // 4. o.RetryDelay <= o.MaxRetryDelay 76 // 5. Both o.RetryDelay and o.MaxRetryDelay must be 0 or neither can be 0 77 78 IfDefault := func(current *time.Duration, desired time.Duration) { 79 if *current == time.Duration(0) { 80 *current = desired 81 } 82 } 83 84 // Set defaults if unspecified 85 if o.MaxTries == 0 { 86 o.MaxTries = 4 87 } 88 switch o.Policy { 89 case RetryPolicyExponential: 90 IfDefault(&o.TryTimeout, 1*time.Minute) 91 IfDefault(&o.RetryDelay, 4*time.Second) 92 IfDefault(&o.MaxRetryDelay, 120*time.Second) 93 94 case RetryPolicyFixed: 95 IfDefault(&o.TryTimeout, 1*time.Minute) 96 IfDefault(&o.RetryDelay, 30*time.Second) 97 IfDefault(&o.MaxRetryDelay, 120*time.Second) 98 } 99 return o 100} 101 102func (o RetryOptions) calcDelay(try int32) time.Duration { // try is >=1; never 0 103 pow := func(number int64, exponent int32) int64 { // pow is nested helper function 104 var result int64 = 1 105 for n := int32(0); n < exponent; n++ { 106 result *= number 107 } 108 return result 109 } 110 111 delay := time.Duration(0) 112 switch o.Policy { 113 case RetryPolicyExponential: 114 delay = time.Duration(pow(2, try-1)-1) * o.RetryDelay 115 116 case RetryPolicyFixed: 117 if try > 1 { // Any try after the 1st uses the fixed delay 118 delay = o.RetryDelay 119 } 120 } 121 122 // Introduce some jitter: [0.0, 1.0) / 2 = [0.0, 0.5) + 0.8 = [0.8, 1.3) 123 // For casts and rounding - be careful, as per https://github.com/golang/go/issues/20757 124 delay = time.Duration(float32(delay) * (rand.Float32()/2 + 0.8)) // NOTE: We want math/rand; not crypto/rand 125 if delay > o.MaxRetryDelay { 126 delay = o.MaxRetryDelay 127 } 128 return delay 129} 130 131// NewRetryPolicyFactory creates a RetryPolicyFactory object configured using the specified options. 132func NewRetryPolicyFactory(o RetryOptions) pipeline.Factory { 133 o = o.defaults() // Force defaults to be calculated 134 return pipeline.FactoryFunc(func(next pipeline.Policy, po *pipeline.PolicyOptions) pipeline.PolicyFunc { 135 return func(ctx context.Context, request pipeline.Request) (response pipeline.Response, err error) { 136 // Before each try, we'll select either the primary or secondary URL. 137 primaryTry := int32(0) // This indicates how many tries we've attempted against the primary DC 138 139 // We only consider retrying against a secondary if we have a read request (GET/HEAD) AND this policy has a Secondary URL it can use 140 considerSecondary := (request.Method == http.MethodGet || request.Method == http.MethodHead) && o.retryReadsFromSecondaryHost() != "" 141 142 // Exponential retry algorithm: ((2 ^ attempt) - 1) * delay * random(0.8, 1.2) 143 // When to retry: connection failure or temporary/timeout. NOTE: StorageError considers HTTP 500/503 as temporary & is therefore retryable 144 // If using a secondary: 145 // Even tries go against primary; odd tries go against the secondary 146 // For a primary wait ((2 ^ primaryTries - 1) * delay * random(0.8, 1.2) 147 // If secondary gets a 404, don't fail, retry but future retries are only against the primary 148 // When retrying against a secondary, ignore the retry count and wait (.1 second * random(0.8, 1.2)) 149 for try := int32(1); try <= o.MaxTries; try++ { 150 logf("\n=====> Try=%d\n", try) 151 152 // Determine which endpoint to try. It's primary if there is no secondary or if it is an add # attempt. 153 tryingPrimary := !considerSecondary || (try%2 == 1) 154 // Select the correct host and delay 155 if tryingPrimary { 156 primaryTry++ 157 delay := o.calcDelay(primaryTry) 158 logf("Primary try=%d, Delay=%v\n", primaryTry, delay) 159 time.Sleep(delay) // The 1st try returns 0 delay 160 } else { 161 // For casts and rounding - be careful, as per https://github.com/golang/go/issues/20757 162 delay := time.Duration(float32(time.Second) * (rand.Float32()/2 + 0.8)) 163 logf("Secondary try=%d, Delay=%v\n", try-primaryTry, delay) 164 time.Sleep(delay) // Delay with some jitter before trying secondary 165 } 166 167 // Clone the original request to ensure that each try starts with the original (unmutated) request. 168 requestCopy := request.Copy() 169 170 // For each try, seek to the beginning of the Body stream. We do this even for the 1st try because 171 // the stream may not be at offset 0 when we first get it and we want the same behavior for the 172 // 1st try as for additional tries. 173 err = requestCopy.RewindBody() 174 if err != nil { 175 return nil, errors.New("we must be able to seek on the Body Stream, otherwise retries would cause data corruption") 176 } 177 178 if !tryingPrimary { 179 requestCopy.URL.Host = o.retryReadsFromSecondaryHost() 180 requestCopy.Host = o.retryReadsFromSecondaryHost() 181 } 182 183 // Set the server-side timeout query parameter "timeout=[seconds]" 184 timeout := int32(o.TryTimeout.Seconds()) // Max seconds per try 185 if deadline, ok := ctx.Deadline(); ok { // If user's ctx has a deadline, make the timeout the smaller of the two 186 t := int32(deadline.Sub(time.Now()).Seconds()) // Duration from now until user's ctx reaches its deadline 187 logf("MaxTryTimeout=%d secs, TimeTilDeadline=%d sec\n", timeout, t) 188 if t < timeout { 189 timeout = t 190 } 191 if timeout < 0 { 192 timeout = 0 // If timeout ever goes negative, set it to zero; this happen while debugging 193 } 194 logf("TryTimeout adjusted to=%d sec\n", timeout) 195 } 196 q := requestCopy.Request.URL.Query() 197 q.Set("timeout", strconv.Itoa(int(timeout+1))) // Add 1 to "round up" 198 requestCopy.Request.URL.RawQuery = q.Encode() 199 logf("Url=%s\n", requestCopy.Request.URL.String()) 200 201 // Set the time for this particular retry operation and then Do the operation. 202 tryCtx, tryCancel := context.WithTimeout(ctx, time.Second*time.Duration(timeout)) 203 //requestCopy.Body = &deadlineExceededReadCloser{r: requestCopy.Request.Body} 204 response, err = next.Do(tryCtx, requestCopy) // Make the request 205 /*err = improveDeadlineExceeded(err) 206 if err == nil { 207 response.Response().Body = &deadlineExceededReadCloser{r: response.Response().Body} 208 }*/ 209 logf("Err=%v, response=%v\n", err, response) 210 211 action := "" // This MUST get changed within the switch code below 212 switch { 213 case ctx.Err() != nil: 214 action = "NoRetry: Op timeout" 215 case !tryingPrimary && response != nil && response.Response() != nil && response.Response().StatusCode == http.StatusNotFound: 216 // If attempt was against the secondary & it returned a StatusNotFound (404), then 217 // the resource was not found. This may be due to replication delay. So, in this 218 // case, we'll never try the secondary again for this operation. 219 considerSecondary = false 220 action = "Retry: Secondary URL returned 404" 221 case err != nil: 222 // NOTE: Protocol Responder returns non-nil if REST API returns invalid status code for the invoked operation. 223 // Use ServiceCode to verify if the error is related to storage service-side, 224 // ServiceCode is set only when error related to storage service happened. 225 if stErr, ok := err.(StorageError); ok { 226 if stErr.Temporary() { 227 action = "Retry: StorageError with error service code and Temporary()" 228 } else if stErr.Response() != nil && isSuccessStatusCode(stErr.Response()) { // TODO: This is a temporarily work around, remove this after protocol layer fix the issue that net.Error is wrapped as storageError 229 action = "Retry: StorageError with success status code" 230 } else { 231 action = "NoRetry: StorageError not Temporary() and without retriable status code" 232 } 233 } else if netErr, ok := err.(net.Error); ok { 234 // Use non-retriable net.Error list, but not retriable list. 235 // As there are errors without Temporary() implementation, 236 // while need be retried, like 'connection reset by peer', 'transport connection broken' and etc. 237 // So the SDK do retry for most of the case, unless the error should not be retried for sure. 238 if !isNotRetriable(netErr) { 239 action = "Retry: net.Error and not in the non-retriable list" 240 } else { 241 action = "NoRetry: net.Error and in the non-retriable list" 242 } 243 } else { 244 action = "NoRetry: unrecognized error" 245 } 246 default: 247 action = "NoRetry: successful HTTP request" // no error 248 } 249 250 logf("Action=%s\n", action) 251 // fmt.Println(action + "\n") // This is where we could log the retry operation; action is why we're retrying 252 if action[0] != 'R' { // Retry only if action starts with 'R' 253 if err != nil { 254 tryCancel() // If we're returning an error, cancel this current/last per-retry timeout context 255 } else { 256 // We wrap the last per-try context in a body and overwrite the Response's Body field with our wrapper. 257 // So, when the user closes the Body, the our per-try context gets closed too. 258 // Another option, is that the Last Policy do this wrapping for a per-retry context (not for the user's context) 259 if response == nil || response.Response() == nil { 260 // We do panic in the case response or response.Response() is nil, 261 // as for client, the response should not be nil if request is sent and the operations is executed successfully. 262 // Another option, is that execute the cancel function when response or response.Response() is nil, 263 // as in this case, current per-try has nothing to do in future. 264 return nil, errors.New("invalid state, response should not be nil when the operation is executed successfully") 265 } 266 response.Response().Body = &contextCancelReadCloser{cf: tryCancel, body: response.Response().Body} 267 } 268 break // Don't retry 269 } 270 if response != nil && response.Response() != nil && response.Response().Body != nil { 271 // If we're going to retry and we got a previous response, then flush its body to avoid leaking its TCP connection 272 body := response.Response().Body 273 io.Copy(ioutil.Discard, body) 274 body.Close() 275 } 276 // If retrying, cancel the current per-try timeout context 277 tryCancel() 278 } 279 return response, err // Not retryable or too many retries; return the last response/error 280 } 281 }) 282} 283 284// contextCancelReadCloser helps to invoke context's cancelFunc properly when the ReadCloser is closed. 285type contextCancelReadCloser struct { 286 cf context.CancelFunc 287 body io.ReadCloser 288} 289 290func (rc *contextCancelReadCloser) Read(p []byte) (n int, err error) { 291 return rc.body.Read(p) 292} 293 294func (rc *contextCancelReadCloser) Close() error { 295 err := rc.body.Close() 296 if rc.cf != nil { 297 rc.cf() 298 } 299 return err 300} 301 302// isNotRetriable checks if the provided net.Error isn't retriable. 303func isNotRetriable(errToParse net.Error) bool { 304 // No error, so this is NOT retriable. 305 if errToParse == nil { 306 return true 307 } 308 309 // The error is either temporary or a timeout so it IS retriable (not not retriable). 310 if errToParse.Temporary() || errToParse.Timeout() { 311 return false 312 } 313 314 genericErr := error(errToParse) 315 316 // From here all the error are neither Temporary() nor Timeout(). 317 switch err := errToParse.(type) { 318 case *net.OpError: 319 // The net.Error is also a net.OpError but the inner error is nil, so this is not retriable. 320 if err.Err == nil { 321 return true 322 } 323 genericErr = err.Err 324 } 325 326 switch genericErr.(type) { 327 case *net.AddrError, net.UnknownNetworkError, *net.DNSError, net.InvalidAddrError, *net.ParseError, *net.DNSConfigError: 328 // If the error is one of the ones listed, then it is NOT retriable. 329 return true 330 } 331 332 // If it's invalid header field name/value error thrown by http module, then it is NOT retriable. 333 // This could happen when metadata's key or value is invalid. (RoundTrip in transport.go) 334 if strings.Contains(genericErr.Error(), "invalid header field") { 335 return true 336 } 337 338 // Assume the error is retriable. 339 return false 340} 341 342var successStatusCodes = []int{http.StatusOK, http.StatusCreated, http.StatusAccepted, http.StatusNoContent, http.StatusPartialContent} 343 344func isSuccessStatusCode(resp *http.Response) bool { 345 if resp == nil { 346 return false 347 } 348 for _, i := range successStatusCodes { 349 if i == resp.StatusCode { 350 return true 351 } 352 } 353 return false 354} 355 356// According to https://github.com/golang/go/wiki/CompilerOptimizations, the compiler will inline this method and hopefully optimize all calls to it away 357var logf = func(format string, a ...interface{}) {} 358 359// Use this version to see the retry method's code path (import "fmt") 360//var logf = fmt.Printf 361 362/* 363type deadlineExceededReadCloser struct { 364 r io.ReadCloser 365} 366 367func (r *deadlineExceededReadCloser) Read(p []byte) (int, error) { 368 n, err := 0, io.EOF 369 if r.r != nil { 370 n, err = r.r.Read(p) 371 } 372 return n, improveDeadlineExceeded(err) 373} 374func (r *deadlineExceededReadCloser) Seek(offset int64, whence int) (int64, error) { 375 // For an HTTP request, the ReadCloser MUST also implement seek 376 // For an HTTP response, Seek MUST not be called (or this will panic) 377 o, err := r.r.(io.Seeker).Seek(offset, whence) 378 return o, improveDeadlineExceeded(err) 379} 380func (r *deadlineExceededReadCloser) Close() error { 381 if c, ok := r.r.(io.Closer); ok { 382 c.Close() 383 } 384 return nil 385} 386 387// timeoutError is the internal struct that implements our richer timeout error. 388type deadlineExceeded struct { 389 responseError 390} 391 392var _ net.Error = (*deadlineExceeded)(nil) // Ensure deadlineExceeded implements the net.Error interface at compile time 393 394// improveDeadlineExceeded creates a timeoutError object that implements the error interface IF cause is a context.DeadlineExceeded error. 395func improveDeadlineExceeded(cause error) error { 396 // If cause is not DeadlineExceeded, return the same error passed in. 397 if cause != context.DeadlineExceeded { 398 return cause 399 } 400 // Else, convert DeadlineExceeded to our timeoutError which gives a richer string message 401 return &deadlineExceeded{ 402 responseError: responseError{ 403 ErrorNode: pipeline.ErrorNode{}.Initialize(cause, 3), 404 }, 405 } 406} 407 408// Error implements the error interface's Error method to return a string representation of the error. 409func (e *deadlineExceeded) Error() string { 410 return e.ErrorNode.Error("context deadline exceeded; when creating a pipeline, consider increasing RetryOptions' TryTimeout field") 411} 412*/ 413