1// Copyright 2016 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15// Based on github.com/grpc-ecosystem/go-grpc-middleware/retry, but modified to support the more 16// fine grained error checking required by write-at-most-once retry semantics of etcd. 17 18package clientv3 19 20import ( 21 "context" 22 "io" 23 "sync" 24 "time" 25 26 "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" 27 "go.uber.org/zap" 28 "google.golang.org/grpc" 29 "google.golang.org/grpc/codes" 30 "google.golang.org/grpc/metadata" 31 "google.golang.org/grpc/status" 32) 33 34// unaryClientInterceptor returns a new retrying unary client interceptor. 35// 36// The default configuration of the interceptor is to not retry *at all*. This behaviour can be 37// changed through options (e.g. WithMax) on creation of the interceptor or on call (through grpc.CallOptions). 38func (c *Client) unaryClientInterceptor(optFuncs ...retryOption) grpc.UnaryClientInterceptor { 39 intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs) 40 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { 41 ctx = withVersion(ctx) 42 grpcOpts, retryOpts := filterCallOptions(opts) 43 callOpts := reuseOrNewWithCallOptions(intOpts, retryOpts) 44 // short circuit for simplicity, and avoiding allocations. 45 if callOpts.max == 0 { 46 return invoker(ctx, method, req, reply, cc, grpcOpts...) 47 } 48 var lastErr error 49 for attempt := uint(0); attempt < callOpts.max; attempt++ { 50 if err := waitRetryBackoff(ctx, attempt, callOpts); err != nil { 51 return err 52 } 53 c.GetLogger().Debug( 54 "retrying of unary invoker", 55 zap.String("target", cc.Target()), 56 zap.Uint("attempt", attempt), 57 ) 58 lastErr = invoker(ctx, method, req, reply, cc, grpcOpts...) 59 if lastErr == nil { 60 return nil 61 } 62 c.GetLogger().Warn( 63 "retrying of unary invoker failed", 64 zap.String("target", cc.Target()), 65 zap.Uint("attempt", attempt), 66 zap.Error(lastErr), 67 ) 68 if isContextError(lastErr) { 69 if ctx.Err() != nil { 70 // its the context deadline or cancellation. 71 return lastErr 72 } 73 // its the callCtx deadline or cancellation, in which case try again. 74 continue 75 } 76 if callOpts.retryAuth && rpctypes.Error(lastErr) == rpctypes.ErrInvalidAuthToken { 77 // clear auth token before refreshing it. 78 // call c.Auth.Authenticate with an invalid token will always fail the auth check on the server-side, 79 // if the server has not apply the patch of pr #12165 (https://github.com/etcd-io/etcd/pull/12165) 80 // and a rpctypes.ErrInvalidAuthToken will recursively call c.getToken until system run out of resource. 81 c.authTokenBundle.UpdateAuthToken("") 82 83 gterr := c.getToken(ctx) 84 if gterr != nil { 85 c.GetLogger().Warn( 86 "retrying of unary invoker failed to fetch new auth token", 87 zap.String("target", cc.Target()), 88 zap.Error(gterr), 89 ) 90 return gterr // lastErr must be invalid auth token 91 } 92 continue 93 } 94 if !isSafeRetry(c.lg, lastErr, callOpts) { 95 return lastErr 96 } 97 } 98 return lastErr 99 } 100} 101 102// streamClientInterceptor returns a new retrying stream client interceptor for server side streaming calls. 103// 104// The default configuration of the interceptor is to not retry *at all*. This behaviour can be 105// changed through options (e.g. WithMax) on creation of the interceptor or on call (through grpc.CallOptions). 106// 107// Retry logic is available *only for ServerStreams*, i.e. 1:n streams, as the internal logic needs 108// to buffer the messages sent by the client. If retry is enabled on any other streams (ClientStreams, 109// BidiStreams), the retry interceptor will fail the call. 110func (c *Client) streamClientInterceptor(optFuncs ...retryOption) grpc.StreamClientInterceptor { 111 intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs) 112 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { 113 ctx = withVersion(ctx) 114 // getToken automatically 115 // TODO(cfc4n): keep this code block, remove codes about getToken in client.go after pr #12165 merged. 116 if c.authTokenBundle != nil { 117 // equal to c.Username != "" && c.Password != "" 118 err := c.getToken(ctx) 119 if err != nil && rpctypes.Error(err) != rpctypes.ErrAuthNotEnabled { 120 c.GetLogger().Error("clientv3/retry_interceptor: getToken failed", zap.Error(err)) 121 return nil, err 122 } 123 } 124 grpcOpts, retryOpts := filterCallOptions(opts) 125 callOpts := reuseOrNewWithCallOptions(intOpts, retryOpts) 126 // short circuit for simplicity, and avoiding allocations. 127 if callOpts.max == 0 { 128 return streamer(ctx, desc, cc, method, grpcOpts...) 129 } 130 if desc.ClientStreams { 131 return nil, status.Errorf(codes.Unimplemented, "clientv3/retry_interceptor: cannot retry on ClientStreams, set Disable()") 132 } 133 newStreamer, err := streamer(ctx, desc, cc, method, grpcOpts...) 134 if err != nil { 135 c.GetLogger().Error("streamer failed to create ClientStream", zap.Error(err)) 136 return nil, err // TODO(mwitkow): Maybe dial and transport errors should be retriable? 137 } 138 retryingStreamer := &serverStreamingRetryingStream{ 139 client: c, 140 ClientStream: newStreamer, 141 callOpts: callOpts, 142 ctx: ctx, 143 streamerCall: func(ctx context.Context) (grpc.ClientStream, error) { 144 return streamer(ctx, desc, cc, method, grpcOpts...) 145 }, 146 } 147 return retryingStreamer, nil 148 } 149} 150 151// type serverStreamingRetryingStream is the implementation of grpc.ClientStream that acts as a 152// proxy to the underlying call. If any of the RecvMsg() calls fail, it will try to reestablish 153// a new ClientStream according to the retry policy. 154type serverStreamingRetryingStream struct { 155 grpc.ClientStream 156 client *Client 157 bufferedSends []interface{} // single message that the client can sen 158 receivedGood bool // indicates whether any prior receives were successful 159 wasClosedSend bool // indicates that CloseSend was closed 160 ctx context.Context 161 callOpts *options 162 streamerCall func(ctx context.Context) (grpc.ClientStream, error) 163 mu sync.RWMutex 164} 165 166func (s *serverStreamingRetryingStream) setStream(clientStream grpc.ClientStream) { 167 s.mu.Lock() 168 s.ClientStream = clientStream 169 s.mu.Unlock() 170} 171 172func (s *serverStreamingRetryingStream) getStream() grpc.ClientStream { 173 s.mu.RLock() 174 defer s.mu.RUnlock() 175 return s.ClientStream 176} 177 178func (s *serverStreamingRetryingStream) SendMsg(m interface{}) error { 179 s.mu.Lock() 180 s.bufferedSends = append(s.bufferedSends, m) 181 s.mu.Unlock() 182 return s.getStream().SendMsg(m) 183} 184 185func (s *serverStreamingRetryingStream) CloseSend() error { 186 s.mu.Lock() 187 s.wasClosedSend = true 188 s.mu.Unlock() 189 return s.getStream().CloseSend() 190} 191 192func (s *serverStreamingRetryingStream) Header() (metadata.MD, error) { 193 return s.getStream().Header() 194} 195 196func (s *serverStreamingRetryingStream) Trailer() metadata.MD { 197 return s.getStream().Trailer() 198} 199 200func (s *serverStreamingRetryingStream) RecvMsg(m interface{}) error { 201 attemptRetry, lastErr := s.receiveMsgAndIndicateRetry(m) 202 if !attemptRetry { 203 return lastErr // success or hard failure 204 } 205 206 // We start off from attempt 1, because zeroth was already made on normal SendMsg(). 207 for attempt := uint(1); attempt < s.callOpts.max; attempt++ { 208 if err := waitRetryBackoff(s.ctx, attempt, s.callOpts); err != nil { 209 return err 210 } 211 newStream, err := s.reestablishStreamAndResendBuffer(s.ctx) 212 if err != nil { 213 s.client.lg.Error("failed reestablishStreamAndResendBuffer", zap.Error(err)) 214 return err // TODO(mwitkow): Maybe dial and transport errors should be retriable? 215 } 216 s.setStream(newStream) 217 218 s.client.lg.Warn("retrying RecvMsg", zap.Error(lastErr)) 219 attemptRetry, lastErr = s.receiveMsgAndIndicateRetry(m) 220 if !attemptRetry { 221 return lastErr 222 } 223 } 224 return lastErr 225} 226 227func (s *serverStreamingRetryingStream) receiveMsgAndIndicateRetry(m interface{}) (bool, error) { 228 s.mu.RLock() 229 wasGood := s.receivedGood 230 s.mu.RUnlock() 231 err := s.getStream().RecvMsg(m) 232 if err == nil || err == io.EOF { 233 s.mu.Lock() 234 s.receivedGood = true 235 s.mu.Unlock() 236 return false, err 237 } else if wasGood { 238 // previous RecvMsg in the stream succeeded, no retry logic should interfere 239 return false, err 240 } 241 if isContextError(err) { 242 if s.ctx.Err() != nil { 243 return false, err 244 } 245 // its the callCtx deadline or cancellation, in which case try again. 246 return true, err 247 } 248 if s.callOpts.retryAuth && rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken { 249 // clear auth token to avoid failure when call getToken 250 s.client.authTokenBundle.UpdateAuthToken("") 251 252 gterr := s.client.getToken(s.ctx) 253 if gterr != nil { 254 s.client.lg.Warn("retry failed to fetch new auth token", zap.Error(gterr)) 255 return false, err // return the original error for simplicity 256 } 257 return true, err 258 259 } 260 return isSafeRetry(s.client.lg, err, s.callOpts), err 261} 262 263func (s *serverStreamingRetryingStream) reestablishStreamAndResendBuffer(callCtx context.Context) (grpc.ClientStream, error) { 264 s.mu.RLock() 265 bufferedSends := s.bufferedSends 266 s.mu.RUnlock() 267 newStream, err := s.streamerCall(callCtx) 268 if err != nil { 269 return nil, err 270 } 271 for _, msg := range bufferedSends { 272 if err := newStream.SendMsg(msg); err != nil { 273 return nil, err 274 } 275 } 276 if err := newStream.CloseSend(); err != nil { 277 return nil, err 278 } 279 return newStream, nil 280} 281 282func waitRetryBackoff(ctx context.Context, attempt uint, callOpts *options) error { 283 waitTime := time.Duration(0) 284 if attempt > 0 { 285 waitTime = callOpts.backoffFunc(attempt) 286 } 287 if waitTime > 0 { 288 timer := time.NewTimer(waitTime) 289 select { 290 case <-ctx.Done(): 291 timer.Stop() 292 return contextErrToGrpcErr(ctx.Err()) 293 case <-timer.C: 294 } 295 } 296 return nil 297} 298 299// isSafeRetry returns "true", if request is safe for retry with the given error. 300func isSafeRetry(lg *zap.Logger, err error, callOpts *options) bool { 301 if isContextError(err) { 302 return false 303 } 304 switch callOpts.retryPolicy { 305 case repeatable: 306 return isSafeRetryImmutableRPC(err) 307 case nonRepeatable: 308 return isSafeRetryMutableRPC(err) 309 default: 310 lg.Warn("unrecognized retry policy", zap.String("retryPolicy", callOpts.retryPolicy.String())) 311 return false 312 } 313} 314 315func isContextError(err error) bool { 316 return status.Code(err) == codes.DeadlineExceeded || status.Code(err) == codes.Canceled 317} 318 319func contextErrToGrpcErr(err error) error { 320 switch err { 321 case context.DeadlineExceeded: 322 return status.Errorf(codes.DeadlineExceeded, err.Error()) 323 case context.Canceled: 324 return status.Errorf(codes.Canceled, err.Error()) 325 default: 326 return status.Errorf(codes.Unknown, err.Error()) 327 } 328} 329 330var ( 331 defaultOptions = &options{ 332 retryPolicy: nonRepeatable, 333 max: 0, // disable 334 backoffFunc: backoffLinearWithJitter(50*time.Millisecond /*jitter*/, 0.10), 335 retryAuth: true, 336 } 337) 338 339// backoffFunc denotes a family of functions that control the backoff duration between call retries. 340// 341// They are called with an identifier of the attempt, and should return a time the system client should 342// hold off for. If the time returned is longer than the `context.Context.Deadline` of the request 343// the deadline of the request takes precedence and the wait will be interrupted before proceeding 344// with the next iteration. 345type backoffFunc func(attempt uint) time.Duration 346 347// withRetryPolicy sets the retry policy of this call. 348func withRetryPolicy(rp retryPolicy) retryOption { 349 return retryOption{applyFunc: func(o *options) { 350 o.retryPolicy = rp 351 }} 352} 353 354// withMax sets the maximum number of retries on this call, or this interceptor. 355func withMax(maxRetries uint) retryOption { 356 return retryOption{applyFunc: func(o *options) { 357 o.max = maxRetries 358 }} 359} 360 361// WithBackoff sets the `BackoffFunc `used to control time between retries. 362func withBackoff(bf backoffFunc) retryOption { 363 return retryOption{applyFunc: func(o *options) { 364 o.backoffFunc = bf 365 }} 366} 367 368type options struct { 369 retryPolicy retryPolicy 370 max uint 371 backoffFunc backoffFunc 372 retryAuth bool 373} 374 375// retryOption is a grpc.CallOption that is local to clientv3's retry interceptor. 376type retryOption struct { 377 grpc.EmptyCallOption // make sure we implement private after() and before() fields so we don't panic. 378 applyFunc func(opt *options) 379} 380 381func reuseOrNewWithCallOptions(opt *options, retryOptions []retryOption) *options { 382 if len(retryOptions) == 0 { 383 return opt 384 } 385 optCopy := &options{} 386 *optCopy = *opt 387 for _, f := range retryOptions { 388 f.applyFunc(optCopy) 389 } 390 return optCopy 391} 392 393func filterCallOptions(callOptions []grpc.CallOption) (grpcOptions []grpc.CallOption, retryOptions []retryOption) { 394 for _, opt := range callOptions { 395 if co, ok := opt.(retryOption); ok { 396 retryOptions = append(retryOptions, co) 397 } else { 398 grpcOptions = append(grpcOptions, opt) 399 } 400 } 401 return grpcOptions, retryOptions 402} 403 404// BackoffLinearWithJitter waits a set period of time, allowing for jitter (fractional adjustment). 405// 406// For example waitBetween=1s and jitter=0.10 can generate waits between 900ms and 1100ms. 407func backoffLinearWithJitter(waitBetween time.Duration, jitterFraction float64) backoffFunc { 408 return func(attempt uint) time.Duration { 409 return jitterUp(waitBetween, jitterFraction) 410 } 411} 412