1/* 2Copyright 2017 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package spanner 18 19import ( 20 "context" 21 "strings" 22 "time" 23 24 "cloud.google.com/go/internal/trace" 25 "github.com/golang/protobuf/ptypes" 26 "github.com/googleapis/gax-go/v2" 27 "google.golang.org/genproto/googleapis/rpc/errdetails" 28 "google.golang.org/grpc/codes" 29 "google.golang.org/grpc/status" 30) 31 32const ( 33 retryInfoKey = "google.rpc.retryinfo-bin" 34) 35 36// DefaultRetryBackoff is used for retryers as a fallback value when the server 37// did not return any retry information. 38var DefaultRetryBackoff = gax.Backoff{ 39 Initial: 20 * time.Millisecond, 40 Max: 32 * time.Second, 41 Multiplier: 1.3, 42} 43 44// spannerRetryer extends the generic gax Retryer, but also checks for any 45// retry info returned by Cloud Spanner and uses that if present. 46type spannerRetryer struct { 47 gax.Retryer 48} 49 50// onCodes returns a spannerRetryer that will retry on the specified error 51// codes. For Internal errors, only errors that have one of a list of known 52// descriptions should be retried. 53func onCodes(bo gax.Backoff, cc ...codes.Code) gax.Retryer { 54 return &spannerRetryer{ 55 Retryer: gax.OnCodes(cc, bo), 56 } 57} 58 59// Retry returns the retry delay returned by Cloud Spanner if that is present. 60// Otherwise it returns the retry delay calculated by the generic gax Retryer. 61func (r *spannerRetryer) Retry(err error) (time.Duration, bool) { 62 if status.Code(err) == codes.Internal && 63 !strings.Contains(err.Error(), "stream terminated by RST_STREAM") && 64 // See b/25451313. 65 !strings.Contains(err.Error(), "HTTP/2 error code: INTERNAL_ERROR") && 66 // See b/27794742. 67 !strings.Contains(err.Error(), "Connection closed with unknown cause") && 68 !strings.Contains(err.Error(), "Received unexpected EOS on DATA frame from server") { 69 return 0, false 70 } 71 72 delay, shouldRetry := r.Retryer.Retry(err) 73 if !shouldRetry { 74 return 0, false 75 } 76 if serverDelay, hasServerDelay := ExtractRetryDelay(err); hasServerDelay { 77 delay = serverDelay 78 } 79 return delay, true 80} 81 82// runWithRetryOnAbortedOrSessionNotFound executes the given function and 83// retries it if it returns an Aborted or Session not found error. The retry 84// is delayed if the error was Aborted. The delay between retries is the delay 85// returned by Cloud Spanner, or if none is returned, the calculated delay with 86// a minimum of 10ms and maximum of 32s. There is no delay before the retry if 87// the error was Session not found. 88func runWithRetryOnAbortedOrSessionNotFound(ctx context.Context, f func(context.Context) error) error { 89 retryer := onCodes(DefaultRetryBackoff, codes.Aborted) 90 funcWithRetry := func(ctx context.Context) error { 91 for { 92 err := f(ctx) 93 if err == nil { 94 return nil 95 } 96 // Get Spanner or GRPC status error. 97 // TODO(loite): Refactor to unwrap Status error instead of Spanner 98 // error when statusError implements the (errors|xerrors).Wrapper 99 // interface. 100 var retryErr error 101 var se *Error 102 if errorAs(err, &se) { 103 // It is a (wrapped) Spanner error. Use that to check whether 104 // we should retry. 105 retryErr = se 106 } else { 107 // It's not a Spanner error, check if it is a status error. 108 _, ok := status.FromError(err) 109 if !ok { 110 return err 111 } 112 retryErr = err 113 } 114 if isSessionNotFoundError(retryErr) { 115 trace.TracePrintf(ctx, nil, "Retrying after Session not found") 116 continue 117 } 118 delay, shouldRetry := retryer.Retry(retryErr) 119 if !shouldRetry { 120 return err 121 } 122 trace.TracePrintf(ctx, nil, "Backing off after ABORTED for %s, then retrying", delay) 123 if err := gax.Sleep(ctx, delay); err != nil { 124 return err 125 } 126 } 127 } 128 return funcWithRetry(ctx) 129} 130 131// ExtractRetryDelay extracts retry backoff from a *spanner.Error if present. 132func ExtractRetryDelay(err error) (time.Duration, bool) { 133 var se *Error 134 var s *status.Status 135 // Unwrap status error. 136 if errorAs(err, &se) { 137 s = status.Convert(se.Unwrap()) 138 } else { 139 s = status.Convert(err) 140 } 141 if s == nil { 142 return 0, false 143 } 144 for _, detail := range s.Details() { 145 if retryInfo, ok := detail.(*errdetails.RetryInfo); ok { 146 delay, err := ptypes.Duration(retryInfo.RetryDelay) 147 if err != nil { 148 return 0, false 149 } 150 return delay, true 151 } 152 } 153 return 0, false 154} 155