1// Copyright 2016 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package clientv3 16 17import ( 18 "context" 19 "crypto/tls" 20 "errors" 21 "fmt" 22 "net" 23 "net/url" 24 "os" 25 "strconv" 26 "strings" 27 "sync" 28 "time" 29 30 "github.com/google/uuid" 31 "go.etcd.io/etcd/clientv3/balancer" 32 "go.etcd.io/etcd/clientv3/balancer/picker" 33 "go.etcd.io/etcd/clientv3/balancer/resolver/endpoint" 34 "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" 35 "go.uber.org/zap" 36 "google.golang.org/grpc" 37 "google.golang.org/grpc/codes" 38 "google.golang.org/grpc/credentials" 39 "google.golang.org/grpc/keepalive" 40 "google.golang.org/grpc/metadata" 41 "google.golang.org/grpc/status" 42) 43 44var ( 45 ErrNoAvailableEndpoints = errors.New("etcdclient: no available endpoints") 46 ErrOldCluster = errors.New("etcdclient: old cluster version") 47 48 roundRobinBalancerName = fmt.Sprintf("etcd-%s", picker.RoundrobinBalanced.String()) 49) 50 51func init() { 52 lg := zap.NewNop() 53 if os.Getenv("ETCD_CLIENT_DEBUG") != "" { 54 var err error 55 lg, err = zap.NewProductionConfig().Build() // info level logging 56 if err != nil { 57 panic(err) 58 } 59 } 60 balancer.RegisterBuilder(balancer.Config{ 61 Policy: picker.RoundrobinBalanced, 62 Name: roundRobinBalancerName, 63 Logger: lg, 64 }) 65} 66 67// Client provides and manages an etcd v3 client session. 68type Client struct { 69 Cluster 70 KV 71 Lease 72 Watcher 73 Auth 74 Maintenance 75 76 conn *grpc.ClientConn 77 78 cfg Config 79 creds *credentials.TransportCredentials 80 balancer balancer.Balancer 81 resolverGroup *endpoint.ResolverGroup 82 mu *sync.Mutex 83 84 ctx context.Context 85 cancel context.CancelFunc 86 87 // Username is a user name for authentication. 88 Username string 89 // Password is a password for authentication. 90 Password string 91 // tokenCred is an instance of WithPerRPCCredentials()'s argument 92 tokenCred *authTokenCredential 93 94 callOpts []grpc.CallOption 95 96 lg *zap.Logger 97} 98 99// New creates a new etcdv3 client from a given configuration. 100func New(cfg Config) (*Client, error) { 101 if len(cfg.Endpoints) == 0 { 102 return nil, ErrNoAvailableEndpoints 103 } 104 105 return newClient(&cfg) 106} 107 108// NewCtxClient creates a client with a context but no underlying grpc 109// connection. This is useful for embedded cases that override the 110// service interface implementations and do not need connection management. 111func NewCtxClient(ctx context.Context) *Client { 112 cctx, cancel := context.WithCancel(ctx) 113 return &Client{ctx: cctx, cancel: cancel} 114} 115 116// NewFromURL creates a new etcdv3 client from a URL. 117func NewFromURL(url string) (*Client, error) { 118 return New(Config{Endpoints: []string{url}}) 119} 120 121// NewFromURLs creates a new etcdv3 client from URLs. 122func NewFromURLs(urls []string) (*Client, error) { 123 return New(Config{Endpoints: urls}) 124} 125 126// Close shuts down the client's etcd connections. 127func (c *Client) Close() error { 128 c.cancel() 129 c.Watcher.Close() 130 c.Lease.Close() 131 if c.resolverGroup != nil { 132 c.resolverGroup.Close() 133 } 134 if c.conn != nil { 135 return toErr(c.ctx, c.conn.Close()) 136 } 137 return c.ctx.Err() 138} 139 140// Ctx is a context for "out of band" messages (e.g., for sending 141// "clean up" message when another context is canceled). It is 142// canceled on client Close(). 143func (c *Client) Ctx() context.Context { return c.ctx } 144 145// Endpoints lists the registered endpoints for the client. 146func (c *Client) Endpoints() (eps []string) { 147 // copy the slice; protect original endpoints from being changed 148 eps = make([]string, len(c.cfg.Endpoints)) 149 copy(eps, c.cfg.Endpoints) 150 return 151} 152 153// SetEndpoints updates client's endpoints. 154func (c *Client) SetEndpoints(eps ...string) { 155 c.mu.Lock() 156 defer c.mu.Unlock() 157 c.cfg.Endpoints = eps 158 c.resolverGroup.SetEndpoints(eps) 159} 160 161// Sync synchronizes client's endpoints with the known endpoints from the etcd membership. 162func (c *Client) Sync(ctx context.Context) error { 163 mresp, err := c.MemberList(ctx) 164 if err != nil { 165 return err 166 } 167 var eps []string 168 for _, m := range mresp.Members { 169 eps = append(eps, m.ClientURLs...) 170 } 171 c.SetEndpoints(eps...) 172 return nil 173} 174 175func (c *Client) autoSync() { 176 if c.cfg.AutoSyncInterval == time.Duration(0) { 177 return 178 } 179 180 for { 181 select { 182 case <-c.ctx.Done(): 183 return 184 case <-time.After(c.cfg.AutoSyncInterval): 185 ctx, cancel := context.WithTimeout(c.ctx, 5*time.Second) 186 err := c.Sync(ctx) 187 cancel() 188 if err != nil && err != c.ctx.Err() { 189 lg.Lvl(4).Infof("Auto sync endpoints failed: %v", err) 190 } 191 } 192 } 193} 194 195type authTokenCredential struct { 196 token string 197 tokenMu *sync.RWMutex 198} 199 200func (cred authTokenCredential) RequireTransportSecurity() bool { 201 return false 202} 203 204func (cred authTokenCredential) GetRequestMetadata(ctx context.Context, s ...string) (map[string]string, error) { 205 cred.tokenMu.RLock() 206 defer cred.tokenMu.RUnlock() 207 return map[string]string{ 208 rpctypes.TokenFieldNameGRPC: cred.token, 209 }, nil 210} 211 212func (c *Client) processCreds(scheme string) (creds *credentials.TransportCredentials) { 213 creds = c.creds 214 switch scheme { 215 case "unix": 216 case "http": 217 creds = nil 218 case "https", "unixs": 219 if creds != nil { 220 break 221 } 222 tlsconfig := &tls.Config{} 223 emptyCreds := credentials.NewTLS(tlsconfig) 224 creds = &emptyCreds 225 default: 226 creds = nil 227 } 228 return creds 229} 230 231// dialSetupOpts gives the dial opts prior to any authentication. 232func (c *Client) dialSetupOpts(creds *credentials.TransportCredentials, dopts ...grpc.DialOption) (opts []grpc.DialOption, err error) { 233 if c.cfg.DialKeepAliveTime > 0 { 234 params := keepalive.ClientParameters{ 235 Time: c.cfg.DialKeepAliveTime, 236 Timeout: c.cfg.DialKeepAliveTimeout, 237 PermitWithoutStream: c.cfg.PermitWithoutStream, 238 } 239 opts = append(opts, grpc.WithKeepaliveParams(params)) 240 } 241 opts = append(opts, dopts...) 242 243 // Provide a net dialer that supports cancelation and timeout. 244 f := func(dialEp string, t time.Duration) (net.Conn, error) { 245 proto, host, _ := endpoint.ParseEndpoint(dialEp) 246 select { 247 case <-c.ctx.Done(): 248 return nil, c.ctx.Err() 249 default: 250 } 251 dialer := &net.Dialer{Timeout: t} 252 return dialer.DialContext(c.ctx, proto, host) 253 } 254 opts = append(opts, grpc.WithDialer(f)) 255 256 if creds != nil { 257 opts = append(opts, grpc.WithTransportCredentials(*creds)) 258 } else { 259 opts = append(opts, grpc.WithInsecure()) 260 } 261 262 // Interceptor retry and backoff. 263 // TODO: Replace all of clientv3/retry.go with interceptor based retry, or with 264 // https://github.com/grpc/proposal/blob/master/A6-client-retries.md#retry-policy 265 // once it is available. 266 rrBackoff := withBackoff(c.roundRobinQuorumBackoff(defaultBackoffWaitBetween, defaultBackoffJitterFraction)) 267 opts = append(opts, 268 // Disable stream retry by default since go-grpc-middleware/retry does not support client streams. 269 // Streams that are safe to retry are enabled individually. 270 grpc.WithStreamInterceptor(c.streamClientInterceptor(c.lg, withMax(0), rrBackoff)), 271 grpc.WithUnaryInterceptor(c.unaryClientInterceptor(c.lg, withMax(defaultUnaryMaxRetries), rrBackoff)), 272 ) 273 274 return opts, nil 275} 276 277// Dial connects to a single endpoint using the client's config. 278func (c *Client) Dial(ep string) (*grpc.ClientConn, error) { 279 creds := c.directDialCreds(ep) 280 // Use the grpc passthrough resolver to directly dial a single endpoint. 281 // This resolver passes through the 'unix' and 'unixs' endpoints schemes used 282 // by etcd without modification, allowing us to directly dial endpoints and 283 // using the same dial functions that we use for load balancer dialing. 284 return c.dial(fmt.Sprintf("passthrough:///%s", ep), creds) 285} 286 287func (c *Client) getToken(ctx context.Context) error { 288 var err error // return last error in a case of fail 289 var auth *authenticator 290 291 for i := 0; i < len(c.cfg.Endpoints); i++ { 292 ep := c.cfg.Endpoints[i] 293 // use dial options without dopts to avoid reusing the client balancer 294 var dOpts []grpc.DialOption 295 _, host, _ := endpoint.ParseEndpoint(ep) 296 target := c.resolverGroup.Target(host) 297 creds := c.dialWithBalancerCreds(ep) 298 dOpts, err = c.dialSetupOpts(creds, c.cfg.DialOptions...) 299 if err != nil { 300 err = fmt.Errorf("failed to configure auth dialer: %v", err) 301 continue 302 } 303 dOpts = append(dOpts, grpc.WithBalancerName(roundRobinBalancerName)) 304 auth, err = newAuthenticator(ctx, target, dOpts, c) 305 if err != nil { 306 continue 307 } 308 defer auth.close() 309 310 var resp *AuthenticateResponse 311 resp, err = auth.authenticate(ctx, c.Username, c.Password) 312 if err != nil { 313 continue 314 } 315 316 c.tokenCred.tokenMu.Lock() 317 c.tokenCred.token = resp.Token 318 c.tokenCred.tokenMu.Unlock() 319 320 return nil 321 } 322 323 return err 324} 325 326// dialWithBalancer dials the client's current load balanced resolver group. The scheme of the host 327// of the provided endpoint determines the scheme used for all endpoints of the client connection. 328func (c *Client) dialWithBalancer(ep string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) { 329 _, host, _ := endpoint.ParseEndpoint(ep) 330 target := c.resolverGroup.Target(host) 331 creds := c.dialWithBalancerCreds(ep) 332 return c.dial(target, creds, dopts...) 333} 334 335// dial configures and dials any grpc balancer target. 336func (c *Client) dial(target string, creds *credentials.TransportCredentials, dopts ...grpc.DialOption) (*grpc.ClientConn, error) { 337 opts, err := c.dialSetupOpts(creds, dopts...) 338 if err != nil { 339 return nil, fmt.Errorf("failed to configure dialer: %v", err) 340 } 341 342 if c.Username != "" && c.Password != "" { 343 c.tokenCred = &authTokenCredential{ 344 tokenMu: &sync.RWMutex{}, 345 } 346 347 ctx, cancel := c.ctx, func() {} 348 if c.cfg.DialTimeout > 0 { 349 ctx, cancel = context.WithTimeout(ctx, c.cfg.DialTimeout) 350 } 351 352 err = c.getToken(ctx) 353 if err != nil { 354 if toErr(ctx, err) != rpctypes.ErrAuthNotEnabled { 355 if err == ctx.Err() && ctx.Err() != c.ctx.Err() { 356 err = context.DeadlineExceeded 357 } 358 cancel() 359 return nil, err 360 } 361 } else { 362 opts = append(opts, grpc.WithPerRPCCredentials(c.tokenCred)) 363 } 364 cancel() 365 } 366 367 opts = append(opts, c.cfg.DialOptions...) 368 369 dctx := c.ctx 370 if c.cfg.DialTimeout > 0 { 371 var cancel context.CancelFunc 372 dctx, cancel = context.WithTimeout(c.ctx, c.cfg.DialTimeout) 373 defer cancel() // TODO: Is this right for cases where grpc.WithBlock() is not set on the dial options? 374 } 375 376 conn, err := grpc.DialContext(dctx, target, opts...) 377 if err != nil { 378 return nil, err 379 } 380 return conn, nil 381} 382 383func (c *Client) directDialCreds(ep string) *credentials.TransportCredentials { 384 _, hostPort, scheme := endpoint.ParseEndpoint(ep) 385 creds := c.creds 386 if len(scheme) != 0 { 387 creds = c.processCreds(scheme) 388 if creds != nil { 389 c := *creds 390 clone := c.Clone() 391 // Set the server name must to the endpoint hostname without port since grpc 392 // otherwise attempts to check if x509 cert is valid for the full endpoint 393 // including the scheme and port, which fails. 394 host, _ := endpoint.ParseHostPort(hostPort) 395 clone.OverrideServerName(host) 396 creds = &clone 397 } 398 } 399 return creds 400} 401 402func (c *Client) dialWithBalancerCreds(ep string) *credentials.TransportCredentials { 403 _, _, scheme := endpoint.ParseEndpoint(ep) 404 creds := c.creds 405 if len(scheme) != 0 { 406 creds = c.processCreds(scheme) 407 } 408 return creds 409} 410 411// WithRequireLeader requires client requests to only succeed 412// when the cluster has a leader. 413func WithRequireLeader(ctx context.Context) context.Context { 414 md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader) 415 return metadata.NewOutgoingContext(ctx, md) 416} 417 418func newClient(cfg *Config) (*Client, error) { 419 if cfg == nil { 420 cfg = &Config{} 421 } 422 var creds *credentials.TransportCredentials 423 if cfg.TLS != nil { 424 c := credentials.NewTLS(cfg.TLS) 425 creds = &c 426 } 427 428 // use a temporary skeleton client to bootstrap first connection 429 baseCtx := context.TODO() 430 if cfg.Context != nil { 431 baseCtx = cfg.Context 432 } 433 434 ctx, cancel := context.WithCancel(baseCtx) 435 client := &Client{ 436 conn: nil, 437 cfg: *cfg, 438 creds: creds, 439 ctx: ctx, 440 cancel: cancel, 441 mu: new(sync.Mutex), 442 callOpts: defaultCallOpts, 443 } 444 445 lcfg := DefaultLogConfig 446 if cfg.LogConfig != nil { 447 lcfg = *cfg.LogConfig 448 } 449 var err error 450 client.lg, err = lcfg.Build() 451 if err != nil { 452 return nil, err 453 } 454 455 if cfg.Username != "" && cfg.Password != "" { 456 client.Username = cfg.Username 457 client.Password = cfg.Password 458 } 459 if cfg.MaxCallSendMsgSize > 0 || cfg.MaxCallRecvMsgSize > 0 { 460 if cfg.MaxCallRecvMsgSize > 0 && cfg.MaxCallSendMsgSize > cfg.MaxCallRecvMsgSize { 461 return nil, fmt.Errorf("gRPC message recv limit (%d bytes) must be greater than send limit (%d bytes)", cfg.MaxCallRecvMsgSize, cfg.MaxCallSendMsgSize) 462 } 463 callOpts := []grpc.CallOption{ 464 defaultFailFast, 465 defaultMaxCallSendMsgSize, 466 defaultMaxCallRecvMsgSize, 467 } 468 if cfg.MaxCallSendMsgSize > 0 { 469 callOpts[1] = grpc.MaxCallSendMsgSize(cfg.MaxCallSendMsgSize) 470 } 471 if cfg.MaxCallRecvMsgSize > 0 { 472 callOpts[2] = grpc.MaxCallRecvMsgSize(cfg.MaxCallRecvMsgSize) 473 } 474 client.callOpts = callOpts 475 } 476 477 // Prepare a 'endpoint://<unique-client-id>/' resolver for the client and create a endpoint target to pass 478 // to dial so the client knows to use this resolver. 479 client.resolverGroup, err = endpoint.NewResolverGroup(fmt.Sprintf("client-%s", uuid.New().String())) 480 if err != nil { 481 client.cancel() 482 return nil, err 483 } 484 client.resolverGroup.SetEndpoints(cfg.Endpoints) 485 486 if len(cfg.Endpoints) < 1 { 487 return nil, fmt.Errorf("at least one Endpoint must is required in client config") 488 } 489 dialEndpoint := cfg.Endpoints[0] 490 491 // Use a provided endpoint target so that for https:// without any tls config given, then 492 // grpc will assume the certificate server name is the endpoint host. 493 conn, err := client.dialWithBalancer(dialEndpoint, grpc.WithBalancerName(roundRobinBalancerName)) 494 if err != nil { 495 client.cancel() 496 client.resolverGroup.Close() 497 return nil, err 498 } 499 // TODO: With the old grpc balancer interface, we waited until the dial timeout 500 // for the balancer to be ready. Is there an equivalent wait we should do with the new grpc balancer interface? 501 client.conn = conn 502 503 client.Cluster = NewCluster(client) 504 client.KV = NewKV(client) 505 client.Lease = NewLease(client) 506 client.Watcher = NewWatcher(client) 507 client.Auth = NewAuth(client) 508 client.Maintenance = NewMaintenance(client) 509 510 if cfg.RejectOldCluster { 511 if err := client.checkVersion(); err != nil { 512 client.Close() 513 return nil, err 514 } 515 } 516 517 go client.autoSync() 518 return client, nil 519} 520 521// roundRobinQuorumBackoff retries against quorum between each backoff. 522// This is intended for use with a round robin load balancer. 523func (c *Client) roundRobinQuorumBackoff(waitBetween time.Duration, jitterFraction float64) backoffFunc { 524 return func(attempt uint) time.Duration { 525 // after each round robin across quorum, backoff for our wait between duration 526 n := uint(len(c.Endpoints())) 527 quorum := (n/2 + 1) 528 if attempt%quorum == 0 { 529 c.lg.Info("backoff", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum), zap.Duration("waitBetween", waitBetween), zap.Float64("jitterFraction", jitterFraction)) 530 return jitterUp(waitBetween, jitterFraction) 531 } 532 c.lg.Info("backoff skipped", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum)) 533 return 0 534 } 535} 536 537func (c *Client) checkVersion() (err error) { 538 var wg sync.WaitGroup 539 errc := make(chan error, len(c.cfg.Endpoints)) 540 ctx, cancel := context.WithCancel(c.ctx) 541 if c.cfg.DialTimeout > 0 { 542 ctx, cancel = context.WithTimeout(ctx, c.cfg.DialTimeout) 543 } 544 wg.Add(len(c.cfg.Endpoints)) 545 for _, ep := range c.cfg.Endpoints { 546 // if cluster is current, any endpoint gives a recent version 547 go func(e string) { 548 defer wg.Done() 549 resp, rerr := c.Status(ctx, e) 550 if rerr != nil { 551 errc <- rerr 552 return 553 } 554 vs := strings.Split(resp.Version, ".") 555 maj, min := 0, 0 556 if len(vs) >= 2 { 557 maj, _ = strconv.Atoi(vs[0]) 558 min, rerr = strconv.Atoi(vs[1]) 559 } 560 if maj < 3 || (maj == 3 && min < 2) { 561 rerr = ErrOldCluster 562 } 563 errc <- rerr 564 }(ep) 565 } 566 // wait for success 567 for i := 0; i < len(c.cfg.Endpoints); i++ { 568 if err = <-errc; err == nil { 569 break 570 } 571 } 572 cancel() 573 wg.Wait() 574 return err 575} 576 577// ActiveConnection returns the current in-use connection 578func (c *Client) ActiveConnection() *grpc.ClientConn { return c.conn } 579 580// isHaltErr returns true if the given error and context indicate no forward 581// progress can be made, even after reconnecting. 582func isHaltErr(ctx context.Context, err error) bool { 583 if ctx != nil && ctx.Err() != nil { 584 return true 585 } 586 if err == nil { 587 return false 588 } 589 ev, _ := status.FromError(err) 590 // Unavailable codes mean the system will be right back. 591 // (e.g., can't connect, lost leader) 592 // Treat Internal codes as if something failed, leaving the 593 // system in an inconsistent state, but retrying could make progress. 594 // (e.g., failed in middle of send, corrupted frame) 595 // TODO: are permanent Internal errors possible from grpc? 596 return ev.Code() != codes.Unavailable && ev.Code() != codes.Internal 597} 598 599// isUnavailableErr returns true if the given error is an unavailable error 600func isUnavailableErr(ctx context.Context, err error) bool { 601 if ctx != nil && ctx.Err() != nil { 602 return false 603 } 604 if err == nil { 605 return false 606 } 607 ev, _ := status.FromError(err) 608 // Unavailable codes mean the system will be right back. 609 // (e.g., can't connect, lost leader) 610 return ev.Code() == codes.Unavailable 611} 612 613func toErr(ctx context.Context, err error) error { 614 if err == nil { 615 return nil 616 } 617 err = rpctypes.Error(err) 618 if _, ok := err.(rpctypes.EtcdError); ok { 619 return err 620 } 621 if ev, ok := status.FromError(err); ok { 622 code := ev.Code() 623 switch code { 624 case codes.DeadlineExceeded: 625 fallthrough 626 case codes.Canceled: 627 if ctx.Err() != nil { 628 err = ctx.Err() 629 } 630 case codes.Unavailable: 631 case codes.FailedPrecondition: 632 err = grpc.ErrClientConnClosing 633 } 634 } 635 return err 636} 637 638func canceledByCaller(stopCtx context.Context, err error) bool { 639 if stopCtx.Err() == nil || err == nil { 640 return false 641 } 642 643 return err == context.Canceled || err == context.DeadlineExceeded 644} 645 646// IsConnCanceled returns true, if error is from a closed gRPC connection. 647// ref. https://github.com/grpc/grpc-go/pull/1854 648func IsConnCanceled(err error) bool { 649 if err == nil { 650 return false 651 } 652 // >= gRPC v1.10.x 653 s, ok := status.FromError(err) 654 if ok { 655 // connection is canceled or server has already closed the connection 656 return s.Code() == codes.Canceled || s.Message() == "transport is closing" 657 } 658 // >= gRPC v1.10.x 659 if err == context.Canceled { 660 return true 661 } 662 // <= gRPC v1.7.x returns 'errors.New("grpc: the client connection is closing")' 663 return strings.Contains(err.Error(), "grpc: the client connection is closing") 664} 665 666func getHost(ep string) string { 667 url, uerr := url.Parse(ep) 668 if uerr != nil || !strings.Contains(ep, "://") { 669 return ep 670 } 671 return url.Host 672} 673