1/*
2Copyright 2017 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package spanner
18
19import (
20	"context"
21	"strings"
22	"time"
23
24	"cloud.google.com/go/internal/trace"
25	"github.com/golang/protobuf/ptypes"
26	"github.com/googleapis/gax-go/v2"
27	"google.golang.org/genproto/googleapis/rpc/errdetails"
28	"google.golang.org/grpc/codes"
29	"google.golang.org/grpc/status"
30)
31
32const (
33	retryInfoKey = "google.rpc.retryinfo-bin"
34)
35
36// DefaultRetryBackoff is used for retryers as a fallback value when the server
37// did not return any retry information.
38var DefaultRetryBackoff = gax.Backoff{
39	Initial:    20 * time.Millisecond,
40	Max:        32 * time.Second,
41	Multiplier: 1.3,
42}
43
44// spannerRetryer extends the generic gax Retryer, but also checks for any
45// retry info returned by Cloud Spanner and uses that if present.
46type spannerRetryer struct {
47	gax.Retryer
48}
49
50// onCodes returns a spannerRetryer that will retry on the specified error
51// codes. For Internal errors, only errors that have one of a list of known
52// descriptions should be retried.
53func onCodes(bo gax.Backoff, cc ...codes.Code) gax.Retryer {
54	return &spannerRetryer{
55		Retryer: gax.OnCodes(cc, bo),
56	}
57}
58
59// Retry returns the retry delay returned by Cloud Spanner if that is present.
60// Otherwise it returns the retry delay calculated by the generic gax Retryer.
61func (r *spannerRetryer) Retry(err error) (time.Duration, bool) {
62	if status.Code(err) == codes.Internal &&
63		!strings.Contains(err.Error(), "stream terminated by RST_STREAM") &&
64		// See b/25451313.
65		!strings.Contains(err.Error(), "HTTP/2 error code: INTERNAL_ERROR") &&
66		// See b/27794742.
67		!strings.Contains(err.Error(), "Connection closed with unknown cause") &&
68		!strings.Contains(err.Error(), "Received unexpected EOS on DATA frame from server") {
69		return 0, false
70	}
71
72	delay, shouldRetry := r.Retryer.Retry(err)
73	if !shouldRetry {
74		return 0, false
75	}
76	if serverDelay, hasServerDelay := ExtractRetryDelay(err); hasServerDelay {
77		delay = serverDelay
78	}
79	return delay, true
80}
81
82// runWithRetryOnAbortedOrSessionNotFound executes the given function and
83// retries it if it returns an Aborted or Session not found error. The retry
84// is delayed if the error was Aborted. The delay between retries is the delay
85// returned by Cloud Spanner, or if none is returned, the calculated delay with
86// a minimum of 10ms and maximum of 32s. There is no delay before the retry if
87// the error was Session not found.
88func runWithRetryOnAbortedOrSessionNotFound(ctx context.Context, f func(context.Context) error) error {
89	retryer := onCodes(DefaultRetryBackoff, codes.Aborted)
90	funcWithRetry := func(ctx context.Context) error {
91		for {
92			err := f(ctx)
93			if err == nil {
94				return nil
95			}
96			// Get Spanner or GRPC status error.
97			// TODO(loite): Refactor to unwrap Status error instead of Spanner
98			// error when statusError implements the (errors|xerrors).Wrapper
99			// interface.
100			var retryErr error
101			var se *Error
102			if errorAs(err, &se) {
103				// It is a (wrapped) Spanner error. Use that to check whether
104				// we should retry.
105				retryErr = se
106			} else {
107				// It's not a Spanner error, check if it is a status error.
108				_, ok := status.FromError(err)
109				if !ok {
110					return err
111				}
112				retryErr = err
113			}
114			if isSessionNotFoundError(retryErr) {
115				trace.TracePrintf(ctx, nil, "Retrying after Session not found")
116				continue
117			}
118			delay, shouldRetry := retryer.Retry(retryErr)
119			if !shouldRetry {
120				return err
121			}
122			trace.TracePrintf(ctx, nil, "Backing off after ABORTED for %s, then retrying", delay)
123			if err := gax.Sleep(ctx, delay); err != nil {
124				return err
125			}
126		}
127	}
128	return funcWithRetry(ctx)
129}
130
131// ExtractRetryDelay extracts retry backoff from a *spanner.Error if present.
132func ExtractRetryDelay(err error) (time.Duration, bool) {
133	var se *Error
134	var s *status.Status
135	// Unwrap status error.
136	if errorAs(err, &se) {
137		s = status.Convert(se.Unwrap())
138	} else {
139		s = status.Convert(err)
140	}
141	if s == nil {
142		return 0, false
143	}
144	for _, detail := range s.Details() {
145		if retryInfo, ok := detail.(*errdetails.RetryInfo); ok {
146			delay, err := ptypes.Duration(retryInfo.RetryDelay)
147			if err != nil {
148				return 0, false
149			}
150			return delay, true
151		}
152	}
153	return 0, false
154}
155