1// Copyright 2016 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package clientv3 16 17import ( 18 "context" 19 "errors" 20 "fmt" 21 "strconv" 22 "strings" 23 "sync" 24 "time" 25 26 "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" 27 "go.etcd.io/etcd/client/v3/credentials" 28 "go.etcd.io/etcd/client/v3/internal/endpoint" 29 "go.etcd.io/etcd/client/v3/internal/resolver" 30 "go.uber.org/zap" 31 "google.golang.org/grpc" 32 "google.golang.org/grpc/codes" 33 grpccredentials "google.golang.org/grpc/credentials" 34 "google.golang.org/grpc/keepalive" 35 "google.golang.org/grpc/status" 36) 37 38var ( 39 ErrNoAvailableEndpoints = errors.New("etcdclient: no available endpoints") 40 ErrOldCluster = errors.New("etcdclient: old cluster version") 41) 42 43// Client provides and manages an etcd v3 client session. 44type Client struct { 45 Cluster 46 KV 47 Lease 48 Watcher 49 Auth 50 Maintenance 51 52 conn *grpc.ClientConn 53 54 cfg Config 55 creds grpccredentials.TransportCredentials 56 resolver *resolver.EtcdManualResolver 57 mu *sync.RWMutex 58 59 ctx context.Context 60 cancel context.CancelFunc 61 62 // Username is a user name for authentication. 63 Username string 64 // Password is a password for authentication. 65 Password string 66 authTokenBundle credentials.Bundle 67 68 callOpts []grpc.CallOption 69 70 lgMu *sync.RWMutex 71 lg *zap.Logger 72} 73 74// New creates a new etcdv3 client from a given configuration. 75func New(cfg Config) (*Client, error) { 76 if len(cfg.Endpoints) == 0 { 77 return nil, ErrNoAvailableEndpoints 78 } 79 80 return newClient(&cfg) 81} 82 83// NewCtxClient creates a client with a context but no underlying grpc 84// connection. This is useful for embedded cases that override the 85// service interface implementations and do not need connection management. 86func NewCtxClient(ctx context.Context, opts ...Option) *Client { 87 cctx, cancel := context.WithCancel(ctx) 88 c := &Client{ctx: cctx, cancel: cancel, lgMu: new(sync.RWMutex)} 89 for _, opt := range opts { 90 opt(c) 91 } 92 if c.lg == nil { 93 c.lg = zap.NewNop() 94 } 95 return c 96} 97 98// Option is a function type that can be passed as argument to NewCtxClient to configure client 99type Option func(*Client) 100 101// NewFromURL creates a new etcdv3 client from a URL. 102func NewFromURL(url string) (*Client, error) { 103 return New(Config{Endpoints: []string{url}}) 104} 105 106// NewFromURLs creates a new etcdv3 client from URLs. 107func NewFromURLs(urls []string) (*Client, error) { 108 return New(Config{Endpoints: urls}) 109} 110 111// WithZapLogger is a NewCtxClient option that overrides the logger 112func WithZapLogger(lg *zap.Logger) Option { 113 return func(c *Client) { 114 c.lg = lg 115 } 116} 117 118// WithLogger overrides the logger. 119// 120// Deprecated: Please use WithZapLogger or Logger field in clientv3.Config 121// 122// Does not changes grpcLogger, that can be explicitly configured 123// using grpc_zap.ReplaceGrpcLoggerV2(..) method. 124func (c *Client) WithLogger(lg *zap.Logger) *Client { 125 c.lgMu.Lock() 126 c.lg = lg 127 c.lgMu.Unlock() 128 return c 129} 130 131// GetLogger gets the logger. 132// NOTE: This method is for internal use of etcd-client library and should not be used as general-purpose logger. 133func (c *Client) GetLogger() *zap.Logger { 134 c.lgMu.RLock() 135 l := c.lg 136 c.lgMu.RUnlock() 137 return l 138} 139 140// Close shuts down the client's etcd connections. 141func (c *Client) Close() error { 142 c.cancel() 143 if c.Watcher != nil { 144 c.Watcher.Close() 145 } 146 if c.Lease != nil { 147 c.Lease.Close() 148 } 149 if c.conn != nil { 150 return toErr(c.ctx, c.conn.Close()) 151 } 152 return c.ctx.Err() 153} 154 155// Ctx is a context for "out of band" messages (e.g., for sending 156// "clean up" message when another context is canceled). It is 157// canceled on client Close(). 158func (c *Client) Ctx() context.Context { return c.ctx } 159 160// Endpoints lists the registered endpoints for the client. 161func (c *Client) Endpoints() []string { 162 // copy the slice; protect original endpoints from being changed 163 c.mu.RLock() 164 defer c.mu.RUnlock() 165 eps := make([]string, len(c.cfg.Endpoints)) 166 copy(eps, c.cfg.Endpoints) 167 return eps 168} 169 170// SetEndpoints updates client's endpoints. 171func (c *Client) SetEndpoints(eps ...string) { 172 c.mu.Lock() 173 defer c.mu.Unlock() 174 c.cfg.Endpoints = eps 175 176 c.resolver.SetEndpoints(eps) 177} 178 179// Sync synchronizes client's endpoints with the known endpoints from the etcd membership. 180func (c *Client) Sync(ctx context.Context) error { 181 mresp, err := c.MemberList(ctx) 182 if err != nil { 183 return err 184 } 185 var eps []string 186 for _, m := range mresp.Members { 187 eps = append(eps, m.ClientURLs...) 188 } 189 c.SetEndpoints(eps...) 190 return nil 191} 192 193func (c *Client) autoSync() { 194 if c.cfg.AutoSyncInterval == time.Duration(0) { 195 return 196 } 197 198 for { 199 select { 200 case <-c.ctx.Done(): 201 return 202 case <-time.After(c.cfg.AutoSyncInterval): 203 ctx, cancel := context.WithTimeout(c.ctx, 5*time.Second) 204 err := c.Sync(ctx) 205 cancel() 206 if err != nil && err != c.ctx.Err() { 207 c.lg.Info("Auto sync endpoints failed.", zap.Error(err)) 208 } 209 } 210 } 211} 212 213// dialSetupOpts gives the dial opts prior to any authentication. 214func (c *Client) dialSetupOpts(creds grpccredentials.TransportCredentials, dopts ...grpc.DialOption) (opts []grpc.DialOption, err error) { 215 if c.cfg.DialKeepAliveTime > 0 { 216 params := keepalive.ClientParameters{ 217 Time: c.cfg.DialKeepAliveTime, 218 Timeout: c.cfg.DialKeepAliveTimeout, 219 PermitWithoutStream: c.cfg.PermitWithoutStream, 220 } 221 opts = append(opts, grpc.WithKeepaliveParams(params)) 222 } 223 opts = append(opts, dopts...) 224 225 if creds != nil { 226 opts = append(opts, grpc.WithTransportCredentials(creds)) 227 } else { 228 opts = append(opts, grpc.WithInsecure()) 229 } 230 231 // Interceptor retry and backoff. 232 // TODO: Replace all of clientv3/retry.go with RetryPolicy: 233 // https://github.com/grpc/grpc-proto/blob/cdd9ed5c3d3f87aef62f373b93361cf7bddc620d/grpc/service_config/service_config.proto#L130 234 rrBackoff := withBackoff(c.roundRobinQuorumBackoff(defaultBackoffWaitBetween, defaultBackoffJitterFraction)) 235 opts = append(opts, 236 // Disable stream retry by default since go-grpc-middleware/retry does not support client streams. 237 // Streams that are safe to retry are enabled individually. 238 grpc.WithStreamInterceptor(c.streamClientInterceptor(withMax(0), rrBackoff)), 239 grpc.WithUnaryInterceptor(c.unaryClientInterceptor(withMax(defaultUnaryMaxRetries), rrBackoff)), 240 ) 241 242 return opts, nil 243} 244 245// Dial connects to a single endpoint using the client's config. 246func (c *Client) Dial(ep string) (*grpc.ClientConn, error) { 247 creds := c.credentialsForEndpoint(ep) 248 249 // Using ad-hoc created resolver, to guarantee only explicitly given 250 // endpoint is used. 251 return c.dial(creds, grpc.WithResolvers(resolver.New(ep))) 252} 253 254func (c *Client) getToken(ctx context.Context) error { 255 var err error // return last error in a case of fail 256 257 if c.Username == "" || c.Password == "" { 258 return nil 259 } 260 261 resp, err := c.Auth.Authenticate(ctx, c.Username, c.Password) 262 if err != nil { 263 if err == rpctypes.ErrAuthNotEnabled { 264 return nil 265 } 266 return err 267 } 268 c.authTokenBundle.UpdateAuthToken(resp.Token) 269 return nil 270} 271 272// dialWithBalancer dials the client's current load balanced resolver group. The scheme of the host 273// of the provided endpoint determines the scheme used for all endpoints of the client connection. 274func (c *Client) dialWithBalancer(dopts ...grpc.DialOption) (*grpc.ClientConn, error) { 275 creds := c.credentialsForEndpoint(c.Endpoints()[0]) 276 opts := append(dopts, grpc.WithResolvers(c.resolver)) 277 return c.dial(creds, opts...) 278} 279 280// dial configures and dials any grpc balancer target. 281func (c *Client) dial(creds grpccredentials.TransportCredentials, dopts ...grpc.DialOption) (*grpc.ClientConn, error) { 282 opts, err := c.dialSetupOpts(creds, dopts...) 283 if err != nil { 284 return nil, fmt.Errorf("failed to configure dialer: %v", err) 285 } 286 if c.Username != "" && c.Password != "" { 287 c.authTokenBundle = credentials.NewBundle(credentials.Config{}) 288 opts = append(opts, grpc.WithPerRPCCredentials(c.authTokenBundle.PerRPCCredentials())) 289 } 290 291 opts = append(opts, c.cfg.DialOptions...) 292 293 dctx := c.ctx 294 if c.cfg.DialTimeout > 0 { 295 var cancel context.CancelFunc 296 dctx, cancel = context.WithTimeout(c.ctx, c.cfg.DialTimeout) 297 defer cancel() // TODO: Is this right for cases where grpc.WithBlock() is not set on the dial options? 298 } 299 300 initialEndpoints := strings.Join(c.cfg.Endpoints, ";") 301 target := fmt.Sprintf("%s://%p/#initially=[%s]", resolver.Schema, c, initialEndpoints) 302 conn, err := grpc.DialContext(dctx, target, opts...) 303 if err != nil { 304 return nil, err 305 } 306 return conn, nil 307} 308 309func (c *Client) credentialsForEndpoint(ep string) grpccredentials.TransportCredentials { 310 r := endpoint.RequiresCredentials(ep) 311 switch r { 312 case endpoint.CREDS_DROP: 313 return nil 314 case endpoint.CREDS_OPTIONAL: 315 return c.creds 316 case endpoint.CREDS_REQUIRE: 317 if c.creds != nil { 318 return c.creds 319 } 320 return credentials.NewBundle(credentials.Config{}).TransportCredentials() 321 default: 322 panic(fmt.Errorf("unsupported CredsRequirement: %v", r)) 323 } 324} 325 326func newClient(cfg *Config) (*Client, error) { 327 if cfg == nil { 328 cfg = &Config{} 329 } 330 var creds grpccredentials.TransportCredentials 331 if cfg.TLS != nil { 332 creds = credentials.NewBundle(credentials.Config{TLSConfig: cfg.TLS}).TransportCredentials() 333 } 334 335 // use a temporary skeleton client to bootstrap first connection 336 baseCtx := context.TODO() 337 if cfg.Context != nil { 338 baseCtx = cfg.Context 339 } 340 341 ctx, cancel := context.WithCancel(baseCtx) 342 client := &Client{ 343 conn: nil, 344 cfg: *cfg, 345 creds: creds, 346 ctx: ctx, 347 cancel: cancel, 348 mu: new(sync.RWMutex), 349 callOpts: defaultCallOpts, 350 lgMu: new(sync.RWMutex), 351 } 352 353 var err error 354 if cfg.Logger != nil { 355 client.lg = cfg.Logger 356 } else if cfg.LogConfig != nil { 357 client.lg, err = cfg.LogConfig.Build() 358 } else { 359 client.lg, err = CreateDefaultZapLogger() 360 } 361 if err != nil { 362 return nil, err 363 } 364 365 if cfg.Username != "" && cfg.Password != "" { 366 client.Username = cfg.Username 367 client.Password = cfg.Password 368 } 369 if cfg.MaxCallSendMsgSize > 0 || cfg.MaxCallRecvMsgSize > 0 { 370 if cfg.MaxCallRecvMsgSize > 0 && cfg.MaxCallSendMsgSize > cfg.MaxCallRecvMsgSize { 371 return nil, fmt.Errorf("gRPC message recv limit (%d bytes) must be greater than send limit (%d bytes)", cfg.MaxCallRecvMsgSize, cfg.MaxCallSendMsgSize) 372 } 373 callOpts := []grpc.CallOption{ 374 defaultWaitForReady, 375 defaultMaxCallSendMsgSize, 376 defaultMaxCallRecvMsgSize, 377 } 378 if cfg.MaxCallSendMsgSize > 0 { 379 callOpts[1] = grpc.MaxCallSendMsgSize(cfg.MaxCallSendMsgSize) 380 } 381 if cfg.MaxCallRecvMsgSize > 0 { 382 callOpts[2] = grpc.MaxCallRecvMsgSize(cfg.MaxCallRecvMsgSize) 383 } 384 client.callOpts = callOpts 385 } 386 387 client.resolver = resolver.New(cfg.Endpoints...) 388 389 if len(cfg.Endpoints) < 1 { 390 client.cancel() 391 return nil, fmt.Errorf("at least one Endpoint is required in client config") 392 } 393 // Use a provided endpoint target so that for https:// without any tls config given, then 394 // grpc will assume the certificate server name is the endpoint host. 395 conn, err := client.dialWithBalancer() 396 if err != nil { 397 client.cancel() 398 client.resolver.Close() 399 // TODO: Error like `fmt.Errorf(dialing [%s] failed: %v, strings.Join(cfg.Endpoints, ";"), err)` would help with debugging a lot. 400 return nil, err 401 } 402 client.conn = conn 403 404 client.Cluster = NewCluster(client) 405 client.KV = NewKV(client) 406 client.Lease = NewLease(client) 407 client.Watcher = NewWatcher(client) 408 client.Auth = NewAuth(client) 409 client.Maintenance = NewMaintenance(client) 410 411 //get token with established connection 412 ctx, cancel = client.ctx, func() {} 413 if client.cfg.DialTimeout > 0 { 414 ctx, cancel = context.WithTimeout(ctx, client.cfg.DialTimeout) 415 } 416 err = client.getToken(ctx) 417 if err != nil { 418 client.Close() 419 cancel() 420 //TODO: Consider fmt.Errorf("communicating with [%s] failed: %v", strings.Join(cfg.Endpoints, ";"), err) 421 return nil, err 422 } 423 cancel() 424 425 if cfg.RejectOldCluster { 426 if err := client.checkVersion(); err != nil { 427 client.Close() 428 return nil, err 429 } 430 } 431 432 go client.autoSync() 433 return client, nil 434} 435 436// roundRobinQuorumBackoff retries against quorum between each backoff. 437// This is intended for use with a round robin load balancer. 438func (c *Client) roundRobinQuorumBackoff(waitBetween time.Duration, jitterFraction float64) backoffFunc { 439 return func(attempt uint) time.Duration { 440 // after each round robin across quorum, backoff for our wait between duration 441 n := uint(len(c.Endpoints())) 442 quorum := (n/2 + 1) 443 if attempt%quorum == 0 { 444 c.lg.Debug("backoff", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum), zap.Duration("waitBetween", waitBetween), zap.Float64("jitterFraction", jitterFraction)) 445 return jitterUp(waitBetween, jitterFraction) 446 } 447 c.lg.Debug("backoff skipped", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum)) 448 return 0 449 } 450} 451 452func (c *Client) checkVersion() (err error) { 453 var wg sync.WaitGroup 454 455 eps := c.Endpoints() 456 errc := make(chan error, len(eps)) 457 ctx, cancel := context.WithCancel(c.ctx) 458 if c.cfg.DialTimeout > 0 { 459 cancel() 460 ctx, cancel = context.WithTimeout(c.ctx, c.cfg.DialTimeout) 461 } 462 463 wg.Add(len(eps)) 464 for _, ep := range eps { 465 // if cluster is current, any endpoint gives a recent version 466 go func(e string) { 467 defer wg.Done() 468 resp, rerr := c.Status(ctx, e) 469 if rerr != nil { 470 errc <- rerr 471 return 472 } 473 vs := strings.Split(resp.Version, ".") 474 maj, min := 0, 0 475 if len(vs) >= 2 { 476 var serr error 477 if maj, serr = strconv.Atoi(vs[0]); serr != nil { 478 errc <- serr 479 return 480 } 481 if min, serr = strconv.Atoi(vs[1]); serr != nil { 482 errc <- serr 483 return 484 } 485 } 486 if maj < 3 || (maj == 3 && min < 2) { 487 rerr = ErrOldCluster 488 } 489 errc <- rerr 490 }(ep) 491 } 492 // wait for success 493 for range eps { 494 if err = <-errc; err == nil { 495 break 496 } 497 } 498 cancel() 499 wg.Wait() 500 return err 501} 502 503// ActiveConnection returns the current in-use connection 504func (c *Client) ActiveConnection() *grpc.ClientConn { return c.conn } 505 506// isHaltErr returns true if the given error and context indicate no forward 507// progress can be made, even after reconnecting. 508func isHaltErr(ctx context.Context, err error) bool { 509 if ctx != nil && ctx.Err() != nil { 510 return true 511 } 512 if err == nil { 513 return false 514 } 515 ev, _ := status.FromError(err) 516 // Unavailable codes mean the system will be right back. 517 // (e.g., can't connect, lost leader) 518 // Treat Internal codes as if something failed, leaving the 519 // system in an inconsistent state, but retrying could make progress. 520 // (e.g., failed in middle of send, corrupted frame) 521 // TODO: are permanent Internal errors possible from grpc? 522 return ev.Code() != codes.Unavailable && ev.Code() != codes.Internal 523} 524 525// isUnavailableErr returns true if the given error is an unavailable error 526func isUnavailableErr(ctx context.Context, err error) bool { 527 if ctx != nil && ctx.Err() != nil { 528 return false 529 } 530 if err == nil { 531 return false 532 } 533 ev, ok := status.FromError(err) 534 if ok { 535 // Unavailable codes mean the system will be right back. 536 // (e.g., can't connect, lost leader) 537 return ev.Code() == codes.Unavailable 538 } 539 return false 540} 541 542func toErr(ctx context.Context, err error) error { 543 if err == nil { 544 return nil 545 } 546 err = rpctypes.Error(err) 547 if _, ok := err.(rpctypes.EtcdError); ok { 548 return err 549 } 550 if ev, ok := status.FromError(err); ok { 551 code := ev.Code() 552 switch code { 553 case codes.DeadlineExceeded: 554 fallthrough 555 case codes.Canceled: 556 if ctx.Err() != nil { 557 err = ctx.Err() 558 } 559 } 560 } 561 return err 562} 563 564func canceledByCaller(stopCtx context.Context, err error) bool { 565 if stopCtx.Err() == nil || err == nil { 566 return false 567 } 568 569 return err == context.Canceled || err == context.DeadlineExceeded 570} 571 572// IsConnCanceled returns true, if error is from a closed gRPC connection. 573// ref. https://github.com/grpc/grpc-go/pull/1854 574func IsConnCanceled(err error) bool { 575 if err == nil { 576 return false 577 } 578 579 // >= gRPC v1.23.x 580 s, ok := status.FromError(err) 581 if ok { 582 // connection is canceled or server has already closed the connection 583 return s.Code() == codes.Canceled || s.Message() == "transport is closing" 584 } 585 586 // >= gRPC v1.10.x 587 if err == context.Canceled { 588 return true 589 } 590 591 // <= gRPC v1.7.x returns 'errors.New("grpc: the client connection is closing")' 592 return strings.Contains(err.Error(), "grpc: the client connection is closing") 593} 594