1// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Based on github.com/grpc-ecosystem/go-grpc-middleware/retry, but modified to support the more
16// fine grained error checking required by write-at-most-once retry semantics of etcd.
17
18package clientv3
19
20import (
21	"context"
22	"io"
23	"sync"
24	"time"
25
26	"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
27	"go.uber.org/zap"
28	"google.golang.org/grpc"
29	"google.golang.org/grpc/codes"
30	"google.golang.org/grpc/metadata"
31	"google.golang.org/grpc/status"
32)
33
34// unaryClientInterceptor returns a new retrying unary client interceptor.
35//
36// The default configuration of the interceptor is to not retry *at all*. This behaviour can be
37// changed through options (e.g. WithMax) on creation of the interceptor or on call (through grpc.CallOptions).
38func (c *Client) unaryClientInterceptor(optFuncs ...retryOption) grpc.UnaryClientInterceptor {
39	intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs)
40	return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
41		ctx = withVersion(ctx)
42		grpcOpts, retryOpts := filterCallOptions(opts)
43		callOpts := reuseOrNewWithCallOptions(intOpts, retryOpts)
44		// short circuit for simplicity, and avoiding allocations.
45		if callOpts.max == 0 {
46			return invoker(ctx, method, req, reply, cc, grpcOpts...)
47		}
48		var lastErr error
49		for attempt := uint(0); attempt < callOpts.max; attempt++ {
50			if err := waitRetryBackoff(ctx, attempt, callOpts); err != nil {
51				return err
52			}
53			c.GetLogger().Debug(
54				"retrying of unary invoker",
55				zap.String("target", cc.Target()),
56				zap.Uint("attempt", attempt),
57			)
58			lastErr = invoker(ctx, method, req, reply, cc, grpcOpts...)
59			if lastErr == nil {
60				return nil
61			}
62			c.GetLogger().Warn(
63				"retrying of unary invoker failed",
64				zap.String("target", cc.Target()),
65				zap.Uint("attempt", attempt),
66				zap.Error(lastErr),
67			)
68			if isContextError(lastErr) {
69				if ctx.Err() != nil {
70					// its the context deadline or cancellation.
71					return lastErr
72				}
73				// its the callCtx deadline or cancellation, in which case try again.
74				continue
75			}
76			if callOpts.retryAuth && rpctypes.Error(lastErr) == rpctypes.ErrInvalidAuthToken {
77				// clear auth token before refreshing it.
78				// call c.Auth.Authenticate with an invalid token will always fail the auth check on the server-side,
79				// if the server has not apply the patch of pr #12165 (https://github.com/etcd-io/etcd/pull/12165)
80				// and a rpctypes.ErrInvalidAuthToken will recursively call c.getToken until system run out of resource.
81				c.authTokenBundle.UpdateAuthToken("")
82
83				gterr := c.getToken(ctx)
84				if gterr != nil {
85					c.GetLogger().Warn(
86						"retrying of unary invoker failed to fetch new auth token",
87						zap.String("target", cc.Target()),
88						zap.Error(gterr),
89					)
90					return gterr // lastErr must be invalid auth token
91				}
92				continue
93			}
94			if !isSafeRetry(c.lg, lastErr, callOpts) {
95				return lastErr
96			}
97		}
98		return lastErr
99	}
100}
101
102// streamClientInterceptor returns a new retrying stream client interceptor for server side streaming calls.
103//
104// The default configuration of the interceptor is to not retry *at all*. This behaviour can be
105// changed through options (e.g. WithMax) on creation of the interceptor or on call (through grpc.CallOptions).
106//
107// Retry logic is available *only for ServerStreams*, i.e. 1:n streams, as the internal logic needs
108// to buffer the messages sent by the client. If retry is enabled on any other streams (ClientStreams,
109// BidiStreams), the retry interceptor will fail the call.
110func (c *Client) streamClientInterceptor(optFuncs ...retryOption) grpc.StreamClientInterceptor {
111	intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs)
112	return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
113		ctx = withVersion(ctx)
114		// getToken automatically
115		// TODO(cfc4n): keep this code block, remove codes about getToken in client.go after pr #12165 merged.
116		if c.authTokenBundle != nil {
117			// equal to c.Username != "" && c.Password != ""
118			err := c.getToken(ctx)
119			if err != nil && rpctypes.Error(err) != rpctypes.ErrAuthNotEnabled {
120				c.GetLogger().Error("clientv3/retry_interceptor: getToken failed", zap.Error(err))
121				return nil, err
122			}
123		}
124		grpcOpts, retryOpts := filterCallOptions(opts)
125		callOpts := reuseOrNewWithCallOptions(intOpts, retryOpts)
126		// short circuit for simplicity, and avoiding allocations.
127		if callOpts.max == 0 {
128			return streamer(ctx, desc, cc, method, grpcOpts...)
129		}
130		if desc.ClientStreams {
131			return nil, status.Errorf(codes.Unimplemented, "clientv3/retry_interceptor: cannot retry on ClientStreams, set Disable()")
132		}
133		newStreamer, err := streamer(ctx, desc, cc, method, grpcOpts...)
134		if err != nil {
135			c.GetLogger().Error("streamer failed to create ClientStream", zap.Error(err))
136			return nil, err // TODO(mwitkow): Maybe dial and transport errors should be retriable?
137		}
138		retryingStreamer := &serverStreamingRetryingStream{
139			client:       c,
140			ClientStream: newStreamer,
141			callOpts:     callOpts,
142			ctx:          ctx,
143			streamerCall: func(ctx context.Context) (grpc.ClientStream, error) {
144				return streamer(ctx, desc, cc, method, grpcOpts...)
145			},
146		}
147		return retryingStreamer, nil
148	}
149}
150
151// type serverStreamingRetryingStream is the implementation of grpc.ClientStream that acts as a
152// proxy to the underlying call. If any of the RecvMsg() calls fail, it will try to reestablish
153// a new ClientStream according to the retry policy.
154type serverStreamingRetryingStream struct {
155	grpc.ClientStream
156	client        *Client
157	bufferedSends []interface{} // single message that the client can sen
158	receivedGood  bool          // indicates whether any prior receives were successful
159	wasClosedSend bool          // indicates that CloseSend was closed
160	ctx           context.Context
161	callOpts      *options
162	streamerCall  func(ctx context.Context) (grpc.ClientStream, error)
163	mu            sync.RWMutex
164}
165
166func (s *serverStreamingRetryingStream) setStream(clientStream grpc.ClientStream) {
167	s.mu.Lock()
168	s.ClientStream = clientStream
169	s.mu.Unlock()
170}
171
172func (s *serverStreamingRetryingStream) getStream() grpc.ClientStream {
173	s.mu.RLock()
174	defer s.mu.RUnlock()
175	return s.ClientStream
176}
177
178func (s *serverStreamingRetryingStream) SendMsg(m interface{}) error {
179	s.mu.Lock()
180	s.bufferedSends = append(s.bufferedSends, m)
181	s.mu.Unlock()
182	return s.getStream().SendMsg(m)
183}
184
185func (s *serverStreamingRetryingStream) CloseSend() error {
186	s.mu.Lock()
187	s.wasClosedSend = true
188	s.mu.Unlock()
189	return s.getStream().CloseSend()
190}
191
192func (s *serverStreamingRetryingStream) Header() (metadata.MD, error) {
193	return s.getStream().Header()
194}
195
196func (s *serverStreamingRetryingStream) Trailer() metadata.MD {
197	return s.getStream().Trailer()
198}
199
200func (s *serverStreamingRetryingStream) RecvMsg(m interface{}) error {
201	attemptRetry, lastErr := s.receiveMsgAndIndicateRetry(m)
202	if !attemptRetry {
203		return lastErr // success or hard failure
204	}
205
206	// We start off from attempt 1, because zeroth was already made on normal SendMsg().
207	for attempt := uint(1); attempt < s.callOpts.max; attempt++ {
208		if err := waitRetryBackoff(s.ctx, attempt, s.callOpts); err != nil {
209			return err
210		}
211		newStream, err := s.reestablishStreamAndResendBuffer(s.ctx)
212		if err != nil {
213			s.client.lg.Error("failed reestablishStreamAndResendBuffer", zap.Error(err))
214			return err // TODO(mwitkow): Maybe dial and transport errors should be retriable?
215		}
216		s.setStream(newStream)
217
218		s.client.lg.Warn("retrying RecvMsg", zap.Error(lastErr))
219		attemptRetry, lastErr = s.receiveMsgAndIndicateRetry(m)
220		if !attemptRetry {
221			return lastErr
222		}
223	}
224	return lastErr
225}
226
227func (s *serverStreamingRetryingStream) receiveMsgAndIndicateRetry(m interface{}) (bool, error) {
228	s.mu.RLock()
229	wasGood := s.receivedGood
230	s.mu.RUnlock()
231	err := s.getStream().RecvMsg(m)
232	if err == nil || err == io.EOF {
233		s.mu.Lock()
234		s.receivedGood = true
235		s.mu.Unlock()
236		return false, err
237	} else if wasGood {
238		// previous RecvMsg in the stream succeeded, no retry logic should interfere
239		return false, err
240	}
241	if isContextError(err) {
242		if s.ctx.Err() != nil {
243			return false, err
244		}
245		// its the callCtx deadline or cancellation, in which case try again.
246		return true, err
247	}
248	if s.callOpts.retryAuth && rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken {
249		// clear auth token to avoid failure when call getToken
250		s.client.authTokenBundle.UpdateAuthToken("")
251
252		gterr := s.client.getToken(s.ctx)
253		if gterr != nil {
254			s.client.lg.Warn("retry failed to fetch new auth token", zap.Error(gterr))
255			return false, err // return the original error for simplicity
256		}
257		return true, err
258
259	}
260	return isSafeRetry(s.client.lg, err, s.callOpts), err
261}
262
263func (s *serverStreamingRetryingStream) reestablishStreamAndResendBuffer(callCtx context.Context) (grpc.ClientStream, error) {
264	s.mu.RLock()
265	bufferedSends := s.bufferedSends
266	s.mu.RUnlock()
267	newStream, err := s.streamerCall(callCtx)
268	if err != nil {
269		return nil, err
270	}
271	for _, msg := range bufferedSends {
272		if err := newStream.SendMsg(msg); err != nil {
273			return nil, err
274		}
275	}
276	if err := newStream.CloseSend(); err != nil {
277		return nil, err
278	}
279	return newStream, nil
280}
281
282func waitRetryBackoff(ctx context.Context, attempt uint, callOpts *options) error {
283	waitTime := time.Duration(0)
284	if attempt > 0 {
285		waitTime = callOpts.backoffFunc(attempt)
286	}
287	if waitTime > 0 {
288		timer := time.NewTimer(waitTime)
289		select {
290		case <-ctx.Done():
291			timer.Stop()
292			return contextErrToGrpcErr(ctx.Err())
293		case <-timer.C:
294		}
295	}
296	return nil
297}
298
299// isSafeRetry returns "true", if request is safe for retry with the given error.
300func isSafeRetry(lg *zap.Logger, err error, callOpts *options) bool {
301	if isContextError(err) {
302		return false
303	}
304	switch callOpts.retryPolicy {
305	case repeatable:
306		return isSafeRetryImmutableRPC(err)
307	case nonRepeatable:
308		return isSafeRetryMutableRPC(err)
309	default:
310		lg.Warn("unrecognized retry policy", zap.String("retryPolicy", callOpts.retryPolicy.String()))
311		return false
312	}
313}
314
315func isContextError(err error) bool {
316	return status.Code(err) == codes.DeadlineExceeded || status.Code(err) == codes.Canceled
317}
318
319func contextErrToGrpcErr(err error) error {
320	switch err {
321	case context.DeadlineExceeded:
322		return status.Errorf(codes.DeadlineExceeded, err.Error())
323	case context.Canceled:
324		return status.Errorf(codes.Canceled, err.Error())
325	default:
326		return status.Errorf(codes.Unknown, err.Error())
327	}
328}
329
330var (
331	defaultOptions = &options{
332		retryPolicy: nonRepeatable,
333		max:         0, // disable
334		backoffFunc: backoffLinearWithJitter(50*time.Millisecond /*jitter*/, 0.10),
335		retryAuth:   true,
336	}
337)
338
339// backoffFunc denotes a family of functions that control the backoff duration between call retries.
340//
341// They are called with an identifier of the attempt, and should return a time the system client should
342// hold off for. If the time returned is longer than the `context.Context.Deadline` of the request
343// the deadline of the request takes precedence and the wait will be interrupted before proceeding
344// with the next iteration.
345type backoffFunc func(attempt uint) time.Duration
346
347// withRetryPolicy sets the retry policy of this call.
348func withRetryPolicy(rp retryPolicy) retryOption {
349	return retryOption{applyFunc: func(o *options) {
350		o.retryPolicy = rp
351	}}
352}
353
354// withMax sets the maximum number of retries on this call, or this interceptor.
355func withMax(maxRetries uint) retryOption {
356	return retryOption{applyFunc: func(o *options) {
357		o.max = maxRetries
358	}}
359}
360
361// WithBackoff sets the `BackoffFunc `used to control time between retries.
362func withBackoff(bf backoffFunc) retryOption {
363	return retryOption{applyFunc: func(o *options) {
364		o.backoffFunc = bf
365	}}
366}
367
368type options struct {
369	retryPolicy retryPolicy
370	max         uint
371	backoffFunc backoffFunc
372	retryAuth   bool
373}
374
375// retryOption is a grpc.CallOption that is local to clientv3's retry interceptor.
376type retryOption struct {
377	grpc.EmptyCallOption // make sure we implement private after() and before() fields so we don't panic.
378	applyFunc            func(opt *options)
379}
380
381func reuseOrNewWithCallOptions(opt *options, retryOptions []retryOption) *options {
382	if len(retryOptions) == 0 {
383		return opt
384	}
385	optCopy := &options{}
386	*optCopy = *opt
387	for _, f := range retryOptions {
388		f.applyFunc(optCopy)
389	}
390	return optCopy
391}
392
393func filterCallOptions(callOptions []grpc.CallOption) (grpcOptions []grpc.CallOption, retryOptions []retryOption) {
394	for _, opt := range callOptions {
395		if co, ok := opt.(retryOption); ok {
396			retryOptions = append(retryOptions, co)
397		} else {
398			grpcOptions = append(grpcOptions, opt)
399		}
400	}
401	return grpcOptions, retryOptions
402}
403
404// BackoffLinearWithJitter waits a set period of time, allowing for jitter (fractional adjustment).
405//
406// For example waitBetween=1s and jitter=0.10 can generate waits between 900ms and 1100ms.
407func backoffLinearWithJitter(waitBetween time.Duration, jitterFraction float64) backoffFunc {
408	return func(attempt uint) time.Duration {
409		return jitterUp(waitBetween, jitterFraction)
410	}
411}
412