1package azblob 2 3import ( 4 "context" 5 "io" 6 "net" 7 "net/http" 8 "strings" 9 "sync" 10) 11 12const CountToEnd = 0 13 14// HTTPGetter is a function type that refers to a method that performs an HTTP GET operation. 15type HTTPGetter func(ctx context.Context, i HTTPGetterInfo) (*http.Response, error) 16 17// HTTPGetterInfo is passed to an HTTPGetter function passing it parameters 18// that should be used to make an HTTP GET request. 19type HTTPGetterInfo struct { 20 // Offset specifies the start offset that should be used when 21 // creating the HTTP GET request's Range header 22 Offset int64 23 24 // Count specifies the count of bytes that should be used to calculate 25 // the end offset when creating the HTTP GET request's Range header 26 Count int64 27 28 // ETag specifies the resource's etag that should be used when creating 29 // the HTTP GET request's If-Match header 30 ETag ETag 31} 32 33// FailedReadNotifier is a function type that represents the notification function called when a read fails 34type FailedReadNotifier func(failureCount int, lastError error, offset int64, count int64, willRetry bool) 35 36// RetryReaderOptions contains properties which can help to decide when to do retry. 37type RetryReaderOptions struct { 38 // MaxRetryRequests specifies the maximum number of HTTP GET requests that will be made 39 // while reading from a RetryReader. A value of zero means that no additional HTTP 40 // GET requests will be made. 41 MaxRetryRequests int 42 doInjectError bool 43 doInjectErrorRound int 44 45 // NotifyFailedRead is called, if non-nil, after any failure to read. Expected usage is diagnostic logging. 46 NotifyFailedRead FailedReadNotifier 47 48 // TreatEarlyCloseAsError can be set to true to prevent retries after "read on closed response body". By default, 49 // retryReader has the following special behaviour: closing the response body before it is all read is treated as a 50 // retryable error. This is to allow callers to force a retry by closing the body from another goroutine (e.g. if the = 51 // read is too slow, caller may want to force a retry in the hope that the retry will be quicker). If 52 // TreatEarlyCloseAsError is true, then retryReader's special behaviour is suppressed, and "read on closed body" is instead 53 // treated as a fatal (non-retryable) error. 54 // Note that setting TreatEarlyCloseAsError only guarantees that Closing will produce a fatal error if the Close happens 55 // from the same "thread" (goroutine) as Read. Concurrent Close calls from other goroutines may instead produce network errors 56 // which will be retried. 57 TreatEarlyCloseAsError bool 58} 59 60// retryReader implements io.ReaderCloser methods. 61// retryReader tries to read from response, and if there is retriable network error 62// returned during reading, it will retry according to retry reader option through executing 63// user defined action with provided data to get a new response, and continue the overall reading process 64// through reading from the new response. 65type retryReader struct { 66 ctx context.Context 67 info HTTPGetterInfo 68 countWasBounded bool 69 o RetryReaderOptions 70 getter HTTPGetter 71 72 // we support Close-ing during Reads (from other goroutines), so we protect the shared state, which is response 73 responseMu *sync.Mutex 74 response *http.Response 75} 76 77// NewRetryReader creates a retry reader. 78func NewRetryReader(ctx context.Context, initialResponse *http.Response, 79 info HTTPGetterInfo, o RetryReaderOptions, getter HTTPGetter) io.ReadCloser { 80 return &retryReader{ 81 ctx: ctx, 82 getter: getter, 83 info: info, 84 countWasBounded: info.Count != CountToEnd, 85 response: initialResponse, 86 responseMu: &sync.Mutex{}, 87 o: o} 88} 89 90func (s *retryReader) setResponse(r *http.Response) { 91 s.responseMu.Lock() 92 defer s.responseMu.Unlock() 93 s.response = r 94} 95 96func (s *retryReader) Read(p []byte) (n int, err error) { 97 for try := 0; ; try++ { 98 //fmt.Println(try) // Comment out for debugging. 99 if s.countWasBounded && s.info.Count == CountToEnd { 100 // User specified an original count and the remaining bytes are 0, return 0, EOF 101 return 0, io.EOF 102 } 103 104 s.responseMu.Lock() 105 resp := s.response 106 s.responseMu.Unlock() 107 if resp == nil { // We don't have a response stream to read from, try to get one. 108 newResponse, err := s.getter(s.ctx, s.info) 109 if err != nil { 110 return 0, err 111 } 112 // Successful GET; this is the network stream we'll read from. 113 s.setResponse(newResponse) 114 resp = newResponse 115 } 116 n, err := resp.Body.Read(p) // Read from the stream (this will return non-nil err if forceRetry is called, from another goroutine, while it is running) 117 118 // Injection mechanism for testing. 119 if s.o.doInjectError && try == s.o.doInjectErrorRound { 120 err = &net.DNSError{IsTemporary: true} 121 } 122 123 // We successfully read data or end EOF. 124 if err == nil || err == io.EOF { 125 s.info.Offset += int64(n) // Increments the start offset in case we need to make a new HTTP request in the future 126 if s.info.Count != CountToEnd { 127 s.info.Count -= int64(n) // Decrement the count in case we need to make a new HTTP request in the future 128 } 129 return n, err // Return the return to the caller 130 } 131 s.Close() // Error, close stream 132 s.setResponse(nil) // Our stream is no longer good 133 134 // Check the retry count and error code, and decide whether to retry. 135 retriesExhausted := try >= s.o.MaxRetryRequests 136 _, isNetError := err.(net.Error) 137 willRetry := (isNetError || s.wasRetryableEarlyClose(err)) && !retriesExhausted 138 139 // Notify, for logging purposes, of any failures 140 if s.o.NotifyFailedRead != nil { 141 failureCount := try + 1 // because try is zero-based 142 s.o.NotifyFailedRead(failureCount, err, s.info.Offset, s.info.Count, willRetry) 143 } 144 145 if willRetry { 146 continue 147 // Loop around and try to get and read from new stream. 148 } 149 return n, err // Not retryable, or retries exhausted, so just return 150 } 151} 152 153// By default, we allow early Closing, from another concurrent goroutine, to be used to force a retry 154// Is this safe, to close early from another goroutine? Early close ultimately ends up calling 155// net.Conn.Close, and that is documented as "Any blocked Read or Write operations will be unblocked and return errors" 156// which is exactly the behaviour we want. 157// NOTE: that if caller has forced an early Close from a separate goroutine (separate from the Read) 158// then there are two different types of error that may happen - either the one one we check for here, 159// or a net.Error (due to closure of connection). Which one happens depends on timing. We only need this routine 160// to check for one, since the other is a net.Error, which our main Read retry loop is already handing. 161func (s *retryReader) wasRetryableEarlyClose(err error) bool { 162 if s.o.TreatEarlyCloseAsError { 163 return false // user wants all early closes to be errors, and so not retryable 164 } 165 // unfortunately, http.errReadOnClosedResBody is private, so the best we can do here is to check for its text 166 return strings.HasSuffix(err.Error(), ReadOnClosedBodyMessage) 167} 168 169const ReadOnClosedBodyMessage = "read on closed response body" 170 171func (s *retryReader) Close() error { 172 s.responseMu.Lock() 173 defer s.responseMu.Unlock() 174 if s.response != nil && s.response.Body != nil { 175 return s.response.Body.Close() 176 } 177 return nil 178} 179