1/* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package grpc 20 21import ( 22 "context" 23 "errors" 24 "fmt" 25 "math" 26 "net" 27 "reflect" 28 "strings" 29 "sync" 30 "sync/atomic" 31 "time" 32 33 "google.golang.org/grpc/balancer" 34 "google.golang.org/grpc/balancer/base" 35 "google.golang.org/grpc/codes" 36 "google.golang.org/grpc/connectivity" 37 "google.golang.org/grpc/credentials" 38 "google.golang.org/grpc/internal/backoff" 39 "google.golang.org/grpc/internal/channelz" 40 "google.golang.org/grpc/internal/grpcsync" 41 "google.golang.org/grpc/internal/grpcutil" 42 "google.golang.org/grpc/internal/transport" 43 "google.golang.org/grpc/keepalive" 44 "google.golang.org/grpc/resolver" 45 "google.golang.org/grpc/serviceconfig" 46 "google.golang.org/grpc/status" 47 48 _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin. 49 _ "google.golang.org/grpc/internal/resolver/dns" // To register dns resolver. 50 _ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver. 51) 52 53const ( 54 // minimum time to give a connection to complete 55 minConnectTimeout = 20 * time.Second 56 // must match grpclbName in grpclb/grpclb.go 57 grpclbName = "grpclb" 58) 59 60var ( 61 // ErrClientConnClosing indicates that the operation is illegal because 62 // the ClientConn is closing. 63 // 64 // Deprecated: this error should not be relied upon by users; use the status 65 // code of Canceled instead. 66 ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing") 67 // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs. 68 errConnDrain = errors.New("grpc: the connection is drained") 69 // errConnClosing indicates that the connection is closing. 70 errConnClosing = errors.New("grpc: the connection is closing") 71 // errBalancerClosed indicates that the balancer is closed. 72 errBalancerClosed = errors.New("grpc: balancer is closed") 73 // invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default 74 // service config. 75 invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid" 76) 77 78// The following errors are returned from Dial and DialContext 79var ( 80 // errNoTransportSecurity indicates that there is no transport security 81 // being set for ClientConn. Users should either set one or explicitly 82 // call WithInsecure DialOption to disable security. 83 errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)") 84 // errTransportCredsAndBundle indicates that creds bundle is used together 85 // with other individual Transport Credentials. 86 errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials") 87 // errTransportCredentialsMissing indicates that users want to transmit security 88 // information (e.g., OAuth2 token) which requires secure connection on an insecure 89 // connection. 90 errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)") 91 // errCredentialsConflict indicates that grpc.WithTransportCredentials() 92 // and grpc.WithInsecure() are both called for a connection. 93 errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)") 94) 95 96const ( 97 defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4 98 defaultClientMaxSendMessageSize = math.MaxInt32 99 // http2IOBufSize specifies the buffer size for sending frames. 100 defaultWriteBufSize = 32 * 1024 101 defaultReadBufSize = 32 * 1024 102) 103 104// Dial creates a client connection to the given target. 105func Dial(target string, opts ...DialOption) (*ClientConn, error) { 106 return DialContext(context.Background(), target, opts...) 107} 108 109// DialContext creates a client connection to the given target. By default, it's 110// a non-blocking dial (the function won't wait for connections to be 111// established, and connecting happens in the background). To make it a blocking 112// dial, use WithBlock() dial option. 113// 114// In the non-blocking case, the ctx does not act against the connection. It 115// only controls the setup steps. 116// 117// In the blocking case, ctx can be used to cancel or expire the pending 118// connection. Once this function returns, the cancellation and expiration of 119// ctx will be noop. Users should call ClientConn.Close to terminate all the 120// pending operations after this function returns. 121// 122// The target name syntax is defined in 123// https://github.com/grpc/grpc/blob/master/doc/naming.md. 124// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target. 125func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) { 126 cc := &ClientConn{ 127 target: target, 128 csMgr: &connectivityStateManager{}, 129 conns: make(map[*addrConn]struct{}), 130 dopts: defaultDialOptions(), 131 blockingpicker: newPickerWrapper(), 132 czData: new(channelzData), 133 firstResolveEvent: grpcsync.NewEvent(), 134 } 135 cc.retryThrottler.Store((*retryThrottler)(nil)) 136 cc.ctx, cc.cancel = context.WithCancel(context.Background()) 137 138 for _, opt := range opts { 139 opt.apply(&cc.dopts) 140 } 141 142 chainUnaryClientInterceptors(cc) 143 chainStreamClientInterceptors(cc) 144 145 defer func() { 146 if err != nil { 147 cc.Close() 148 } 149 }() 150 151 if channelz.IsOn() { 152 if cc.dopts.channelzParentID != 0 { 153 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target) 154 channelz.AddTraceEvent(cc.channelzID, 0, &channelz.TraceEventDesc{ 155 Desc: "Channel Created", 156 Severity: channelz.CtINFO, 157 Parent: &channelz.TraceEventDesc{ 158 Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID), 159 Severity: channelz.CtINFO, 160 }, 161 }) 162 } else { 163 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target) 164 channelz.Info(cc.channelzID, "Channel Created") 165 } 166 cc.csMgr.channelzID = cc.channelzID 167 } 168 169 if !cc.dopts.insecure { 170 if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil { 171 return nil, errNoTransportSecurity 172 } 173 if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil { 174 return nil, errTransportCredsAndBundle 175 } 176 } else { 177 if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil { 178 return nil, errCredentialsConflict 179 } 180 for _, cd := range cc.dopts.copts.PerRPCCredentials { 181 if cd.RequireTransportSecurity() { 182 return nil, errTransportCredentialsMissing 183 } 184 } 185 } 186 187 if cc.dopts.defaultServiceConfigRawJSON != nil { 188 scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON) 189 if scpr.Err != nil { 190 return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err) 191 } 192 cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig) 193 } 194 cc.mkp = cc.dopts.copts.KeepaliveParams 195 196 if cc.dopts.copts.Dialer == nil { 197 cc.dopts.copts.Dialer = func(ctx context.Context, addr string) (net.Conn, error) { 198 network, addr := parseDialTarget(addr) 199 return (&net.Dialer{}).DialContext(ctx, network, addr) 200 } 201 if cc.dopts.withProxy { 202 cc.dopts.copts.Dialer = newProxyDialer(cc.dopts.copts.Dialer) 203 } 204 } 205 206 if cc.dopts.copts.UserAgent != "" { 207 cc.dopts.copts.UserAgent += " " + grpcUA 208 } else { 209 cc.dopts.copts.UserAgent = grpcUA 210 } 211 212 if cc.dopts.timeout > 0 { 213 var cancel context.CancelFunc 214 ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout) 215 defer cancel() 216 } 217 defer func() { 218 select { 219 case <-ctx.Done(): 220 conn, err = nil, ctx.Err() 221 default: 222 } 223 }() 224 225 scSet := false 226 if cc.dopts.scChan != nil { 227 // Try to get an initial service config. 228 select { 229 case sc, ok := <-cc.dopts.scChan: 230 if ok { 231 cc.sc = &sc 232 scSet = true 233 } 234 default: 235 } 236 } 237 if cc.dopts.bs == nil { 238 cc.dopts.bs = backoff.DefaultExponential 239 } 240 241 // Determine the resolver to use. 242 cc.parsedTarget = grpcutil.ParseTarget(cc.target) 243 channelz.Infof(cc.channelzID, "parsed scheme: %q", cc.parsedTarget.Scheme) 244 resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme) 245 if resolverBuilder == nil { 246 // If resolver builder is still nil, the parsed target's scheme is 247 // not registered. Fallback to default resolver and set Endpoint to 248 // the original target. 249 channelz.Infof(cc.channelzID, "scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme) 250 cc.parsedTarget = resolver.Target{ 251 Scheme: resolver.GetDefaultScheme(), 252 Endpoint: target, 253 } 254 resolverBuilder = cc.getResolver(cc.parsedTarget.Scheme) 255 if resolverBuilder == nil { 256 return nil, fmt.Errorf("could not get resolver for default scheme: %q", cc.parsedTarget.Scheme) 257 } 258 } 259 260 creds := cc.dopts.copts.TransportCredentials 261 if creds != nil && creds.Info().ServerName != "" { 262 cc.authority = creds.Info().ServerName 263 } else if cc.dopts.insecure && cc.dopts.authority != "" { 264 cc.authority = cc.dopts.authority 265 } else { 266 // Use endpoint from "scheme://authority/endpoint" as the default 267 // authority for ClientConn. 268 cc.authority = cc.parsedTarget.Endpoint 269 } 270 271 if cc.dopts.scChan != nil && !scSet { 272 // Blocking wait for the initial service config. 273 select { 274 case sc, ok := <-cc.dopts.scChan: 275 if ok { 276 cc.sc = &sc 277 } 278 case <-ctx.Done(): 279 return nil, ctx.Err() 280 } 281 } 282 if cc.dopts.scChan != nil { 283 go cc.scWatcher() 284 } 285 286 var credsClone credentials.TransportCredentials 287 if creds := cc.dopts.copts.TransportCredentials; creds != nil { 288 credsClone = creds.Clone() 289 } 290 cc.balancerBuildOpts = balancer.BuildOptions{ 291 DialCreds: credsClone, 292 CredsBundle: cc.dopts.copts.CredsBundle, 293 Dialer: cc.dopts.copts.Dialer, 294 ChannelzParentID: cc.channelzID, 295 Target: cc.parsedTarget, 296 } 297 298 // Build the resolver. 299 rWrapper, err := newCCResolverWrapper(cc, resolverBuilder) 300 if err != nil { 301 return nil, fmt.Errorf("failed to build resolver: %v", err) 302 } 303 cc.mu.Lock() 304 cc.resolverWrapper = rWrapper 305 cc.mu.Unlock() 306 307 // A blocking dial blocks until the clientConn is ready. 308 if cc.dopts.block { 309 for { 310 s := cc.GetState() 311 if s == connectivity.Ready { 312 break 313 } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure { 314 if err = cc.blockingpicker.connectionError(); err != nil { 315 terr, ok := err.(interface { 316 Temporary() bool 317 }) 318 if ok && !terr.Temporary() { 319 return nil, err 320 } 321 } 322 } 323 if !cc.WaitForStateChange(ctx, s) { 324 // ctx got timeout or canceled. 325 return nil, ctx.Err() 326 } 327 } 328 } 329 330 return cc, nil 331} 332 333// chainUnaryClientInterceptors chains all unary client interceptors into one. 334func chainUnaryClientInterceptors(cc *ClientConn) { 335 interceptors := cc.dopts.chainUnaryInts 336 // Prepend dopts.unaryInt to the chaining interceptors if it exists, since unaryInt will 337 // be executed before any other chained interceptors. 338 if cc.dopts.unaryInt != nil { 339 interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...) 340 } 341 var chainedInt UnaryClientInterceptor 342 if len(interceptors) == 0 { 343 chainedInt = nil 344 } else if len(interceptors) == 1 { 345 chainedInt = interceptors[0] 346 } else { 347 chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error { 348 return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...) 349 } 350 } 351 cc.dopts.unaryInt = chainedInt 352} 353 354// getChainUnaryInvoker recursively generate the chained unary invoker. 355func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, finalInvoker UnaryInvoker) UnaryInvoker { 356 if curr == len(interceptors)-1 { 357 return finalInvoker 358 } 359 return func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error { 360 return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...) 361 } 362} 363 364// chainStreamClientInterceptors chains all stream client interceptors into one. 365func chainStreamClientInterceptors(cc *ClientConn) { 366 interceptors := cc.dopts.chainStreamInts 367 // Prepend dopts.streamInt to the chaining interceptors if it exists, since streamInt will 368 // be executed before any other chained interceptors. 369 if cc.dopts.streamInt != nil { 370 interceptors = append([]StreamClientInterceptor{cc.dopts.streamInt}, interceptors...) 371 } 372 var chainedInt StreamClientInterceptor 373 if len(interceptors) == 0 { 374 chainedInt = nil 375 } else if len(interceptors) == 1 { 376 chainedInt = interceptors[0] 377 } else { 378 chainedInt = func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error) { 379 return interceptors[0](ctx, desc, cc, method, getChainStreamer(interceptors, 0, streamer), opts...) 380 } 381 } 382 cc.dopts.streamInt = chainedInt 383} 384 385// getChainStreamer recursively generate the chained client stream constructor. 386func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStreamer Streamer) Streamer { 387 if curr == len(interceptors)-1 { 388 return finalStreamer 389 } 390 return func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) { 391 return interceptors[curr+1](ctx, desc, cc, method, getChainStreamer(interceptors, curr+1, finalStreamer), opts...) 392 } 393} 394 395// connectivityStateManager keeps the connectivity.State of ClientConn. 396// This struct will eventually be exported so the balancers can access it. 397type connectivityStateManager struct { 398 mu sync.Mutex 399 state connectivity.State 400 notifyChan chan struct{} 401 channelzID int64 402} 403 404// updateState updates the connectivity.State of ClientConn. 405// If there's a change it notifies goroutines waiting on state change to 406// happen. 407func (csm *connectivityStateManager) updateState(state connectivity.State) { 408 csm.mu.Lock() 409 defer csm.mu.Unlock() 410 if csm.state == connectivity.Shutdown { 411 return 412 } 413 if csm.state == state { 414 return 415 } 416 csm.state = state 417 channelz.Infof(csm.channelzID, "Channel Connectivity change to %v", state) 418 if csm.notifyChan != nil { 419 // There are other goroutines waiting on this channel. 420 close(csm.notifyChan) 421 csm.notifyChan = nil 422 } 423} 424 425func (csm *connectivityStateManager) getState() connectivity.State { 426 csm.mu.Lock() 427 defer csm.mu.Unlock() 428 return csm.state 429} 430 431func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} { 432 csm.mu.Lock() 433 defer csm.mu.Unlock() 434 if csm.notifyChan == nil { 435 csm.notifyChan = make(chan struct{}) 436 } 437 return csm.notifyChan 438} 439 440// ClientConnInterface defines the functions clients need to perform unary and 441// streaming RPCs. It is implemented by *ClientConn, and is only intended to 442// be referenced by generated code. 443type ClientConnInterface interface { 444 // Invoke performs a unary RPC and returns after the response is received 445 // into reply. 446 Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error 447 // NewStream begins a streaming RPC. 448 NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) 449} 450 451// Assert *ClientConn implements ClientConnInterface. 452var _ ClientConnInterface = (*ClientConn)(nil) 453 454// ClientConn represents a virtual connection to a conceptual endpoint, to 455// perform RPCs. 456// 457// A ClientConn is free to have zero or more actual connections to the endpoint 458// based on configuration, load, etc. It is also free to determine which actual 459// endpoints to use and may change it every RPC, permitting client-side load 460// balancing. 461// 462// A ClientConn encapsulates a range of functionality including name 463// resolution, TCP connection establishment (with retries and backoff) and TLS 464// handshakes. It also handles errors on established connections by 465// re-resolving the name and reconnecting. 466type ClientConn struct { 467 ctx context.Context 468 cancel context.CancelFunc 469 470 target string 471 parsedTarget resolver.Target 472 authority string 473 dopts dialOptions 474 csMgr *connectivityStateManager 475 476 balancerBuildOpts balancer.BuildOptions 477 blockingpicker *pickerWrapper 478 479 mu sync.RWMutex 480 resolverWrapper *ccResolverWrapper 481 sc *ServiceConfig 482 conns map[*addrConn]struct{} 483 // Keepalive parameter can be updated if a GoAway is received. 484 mkp keepalive.ClientParameters 485 curBalancerName string 486 balancerWrapper *ccBalancerWrapper 487 retryThrottler atomic.Value 488 489 firstResolveEvent *grpcsync.Event 490 491 channelzID int64 // channelz unique identification number 492 czData *channelzData 493} 494 495// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or 496// ctx expires. A true value is returned in former case and false in latter. 497// This is an EXPERIMENTAL API. 498func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool { 499 ch := cc.csMgr.getNotifyChan() 500 if cc.csMgr.getState() != sourceState { 501 return true 502 } 503 select { 504 case <-ctx.Done(): 505 return false 506 case <-ch: 507 return true 508 } 509} 510 511// GetState returns the connectivity.State of ClientConn. 512// This is an EXPERIMENTAL API. 513func (cc *ClientConn) GetState() connectivity.State { 514 return cc.csMgr.getState() 515} 516 517func (cc *ClientConn) scWatcher() { 518 for { 519 select { 520 case sc, ok := <-cc.dopts.scChan: 521 if !ok { 522 return 523 } 524 cc.mu.Lock() 525 // TODO: load balance policy runtime change is ignored. 526 // We may revisit this decision in the future. 527 cc.sc = &sc 528 cc.mu.Unlock() 529 case <-cc.ctx.Done(): 530 return 531 } 532 } 533} 534 535// waitForResolvedAddrs blocks until the resolver has provided addresses or the 536// context expires. Returns nil unless the context expires first; otherwise 537// returns a status error based on the context. 538func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error { 539 // This is on the RPC path, so we use a fast path to avoid the 540 // more-expensive "select" below after the resolver has returned once. 541 if cc.firstResolveEvent.HasFired() { 542 return nil 543 } 544 select { 545 case <-cc.firstResolveEvent.Done(): 546 return nil 547 case <-ctx.Done(): 548 return status.FromContextError(ctx.Err()).Err() 549 case <-cc.ctx.Done(): 550 return ErrClientConnClosing 551 } 552} 553 554var emptyServiceConfig *ServiceConfig 555 556func init() { 557 cfg := parseServiceConfig("{}") 558 if cfg.Err != nil { 559 panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err)) 560 } 561 emptyServiceConfig = cfg.Config.(*ServiceConfig) 562} 563 564func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) { 565 if cc.sc != nil { 566 cc.applyServiceConfigAndBalancer(cc.sc, addrs) 567 return 568 } 569 if cc.dopts.defaultServiceConfig != nil { 570 cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, addrs) 571 } else { 572 cc.applyServiceConfigAndBalancer(emptyServiceConfig, addrs) 573 } 574} 575 576func (cc *ClientConn) updateResolverState(s resolver.State, err error) error { 577 defer cc.firstResolveEvent.Fire() 578 cc.mu.Lock() 579 // Check if the ClientConn is already closed. Some fields (e.g. 580 // balancerWrapper) are set to nil when closing the ClientConn, and could 581 // cause nil pointer panic if we don't have this check. 582 if cc.conns == nil { 583 cc.mu.Unlock() 584 return nil 585 } 586 587 if err != nil { 588 // May need to apply the initial service config in case the resolver 589 // doesn't support service configs, or doesn't provide a service config 590 // with the new addresses. 591 cc.maybeApplyDefaultServiceConfig(nil) 592 593 if cc.balancerWrapper != nil { 594 cc.balancerWrapper.resolverError(err) 595 } 596 597 // No addresses are valid with err set; return early. 598 cc.mu.Unlock() 599 return balancer.ErrBadResolverState 600 } 601 602 var ret error 603 if cc.dopts.disableServiceConfig || s.ServiceConfig == nil { 604 cc.maybeApplyDefaultServiceConfig(s.Addresses) 605 // TODO: do we need to apply a failing LB policy if there is no 606 // default, per the error handling design? 607 } else { 608 if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok { 609 cc.applyServiceConfigAndBalancer(sc, s.Addresses) 610 } else { 611 ret = balancer.ErrBadResolverState 612 if cc.balancerWrapper == nil { 613 var err error 614 if s.ServiceConfig.Err != nil { 615 err = status.Errorf(codes.Unavailable, "error parsing service config: %v", s.ServiceConfig.Err) 616 } else { 617 err = status.Errorf(codes.Unavailable, "illegal service config type: %T", s.ServiceConfig.Config) 618 } 619 cc.blockingpicker.updatePicker(base.NewErrPicker(err)) 620 cc.csMgr.updateState(connectivity.TransientFailure) 621 cc.mu.Unlock() 622 return ret 623 } 624 } 625 } 626 627 var balCfg serviceconfig.LoadBalancingConfig 628 if cc.dopts.balancerBuilder == nil && cc.sc != nil && cc.sc.lbConfig != nil { 629 balCfg = cc.sc.lbConfig.cfg 630 } 631 632 cbn := cc.curBalancerName 633 bw := cc.balancerWrapper 634 cc.mu.Unlock() 635 if cbn != grpclbName { 636 // Filter any grpclb addresses since we don't have the grpclb balancer. 637 for i := 0; i < len(s.Addresses); { 638 if s.Addresses[i].Type == resolver.GRPCLB { 639 copy(s.Addresses[i:], s.Addresses[i+1:]) 640 s.Addresses = s.Addresses[:len(s.Addresses)-1] 641 continue 642 } 643 i++ 644 } 645 } 646 uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg}) 647 if ret == nil { 648 ret = uccsErr // prefer ErrBadResolver state since any other error is 649 // currently meaningless to the caller. 650 } 651 return ret 652} 653 654// switchBalancer starts the switching from current balancer to the balancer 655// with the given name. 656// 657// It will NOT send the current address list to the new balancer. If needed, 658// caller of this function should send address list to the new balancer after 659// this function returns. 660// 661// Caller must hold cc.mu. 662func (cc *ClientConn) switchBalancer(name string) { 663 if strings.EqualFold(cc.curBalancerName, name) { 664 return 665 } 666 667 channelz.Infof(cc.channelzID, "ClientConn switching balancer to %q", name) 668 if cc.dopts.balancerBuilder != nil { 669 channelz.Info(cc.channelzID, "ignoring balancer switching: Balancer DialOption used instead") 670 return 671 } 672 if cc.balancerWrapper != nil { 673 cc.balancerWrapper.close() 674 } 675 676 builder := balancer.Get(name) 677 if builder == nil { 678 channelz.Warningf(cc.channelzID, "Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName) 679 channelz.Infof(cc.channelzID, "failed to get balancer builder for: %v, using pick_first instead", name) 680 builder = newPickfirstBuilder() 681 } else { 682 channelz.Infof(cc.channelzID, "Channel switches to new LB policy %q", name) 683 } 684 685 cc.curBalancerName = builder.Name() 686 cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts) 687} 688 689func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) { 690 cc.mu.Lock() 691 if cc.conns == nil { 692 cc.mu.Unlock() 693 return 694 } 695 // TODO(bar switching) send updates to all balancer wrappers when balancer 696 // gracefully switching is supported. 697 cc.balancerWrapper.handleSubConnStateChange(sc, s, err) 698 cc.mu.Unlock() 699} 700 701// newAddrConn creates an addrConn for addrs and adds it to cc.conns. 702// 703// Caller needs to make sure len(addrs) > 0. 704func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) { 705 ac := &addrConn{ 706 state: connectivity.Idle, 707 cc: cc, 708 addrs: addrs, 709 scopts: opts, 710 dopts: cc.dopts, 711 czData: new(channelzData), 712 resetBackoff: make(chan struct{}), 713 } 714 ac.ctx, ac.cancel = context.WithCancel(cc.ctx) 715 // Track ac in cc. This needs to be done before any getTransport(...) is called. 716 cc.mu.Lock() 717 if cc.conns == nil { 718 cc.mu.Unlock() 719 return nil, ErrClientConnClosing 720 } 721 if channelz.IsOn() { 722 ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "") 723 channelz.AddTraceEvent(ac.channelzID, 0, &channelz.TraceEventDesc{ 724 Desc: "Subchannel Created", 725 Severity: channelz.CtINFO, 726 Parent: &channelz.TraceEventDesc{ 727 Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID), 728 Severity: channelz.CtINFO, 729 }, 730 }) 731 } 732 cc.conns[ac] = struct{}{} 733 cc.mu.Unlock() 734 return ac, nil 735} 736 737// removeAddrConn removes the addrConn in the subConn from clientConn. 738// It also tears down the ac with the given error. 739func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) { 740 cc.mu.Lock() 741 if cc.conns == nil { 742 cc.mu.Unlock() 743 return 744 } 745 delete(cc.conns, ac) 746 cc.mu.Unlock() 747 ac.tearDown(err) 748} 749 750func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric { 751 return &channelz.ChannelInternalMetric{ 752 State: cc.GetState(), 753 Target: cc.target, 754 CallsStarted: atomic.LoadInt64(&cc.czData.callsStarted), 755 CallsSucceeded: atomic.LoadInt64(&cc.czData.callsSucceeded), 756 CallsFailed: atomic.LoadInt64(&cc.czData.callsFailed), 757 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)), 758 } 759} 760 761// Target returns the target string of the ClientConn. 762// This is an EXPERIMENTAL API. 763func (cc *ClientConn) Target() string { 764 return cc.target 765} 766 767func (cc *ClientConn) incrCallsStarted() { 768 atomic.AddInt64(&cc.czData.callsStarted, 1) 769 atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano()) 770} 771 772func (cc *ClientConn) incrCallsSucceeded() { 773 atomic.AddInt64(&cc.czData.callsSucceeded, 1) 774} 775 776func (cc *ClientConn) incrCallsFailed() { 777 atomic.AddInt64(&cc.czData.callsFailed, 1) 778} 779 780// connect starts creating a transport. 781// It does nothing if the ac is not IDLE. 782// TODO(bar) Move this to the addrConn section. 783func (ac *addrConn) connect() error { 784 ac.mu.Lock() 785 if ac.state == connectivity.Shutdown { 786 ac.mu.Unlock() 787 return errConnClosing 788 } 789 if ac.state != connectivity.Idle { 790 ac.mu.Unlock() 791 return nil 792 } 793 // Update connectivity state within the lock to prevent subsequent or 794 // concurrent calls from resetting the transport more than once. 795 ac.updateConnectivityState(connectivity.Connecting, nil) 796 ac.mu.Unlock() 797 798 // Start a goroutine connecting to the server asynchronously. 799 go ac.resetTransport() 800 return nil 801} 802 803// tryUpdateAddrs tries to update ac.addrs with the new addresses list. 804// 805// If ac is Connecting, it returns false. The caller should tear down the ac and 806// create a new one. Note that the backoff will be reset when this happens. 807// 808// If ac is TransientFailure, it updates ac.addrs and returns true. The updated 809// addresses will be picked up by retry in the next iteration after backoff. 810// 811// If ac is Shutdown or Idle, it updates ac.addrs and returns true. 812// 813// If ac is Ready, it checks whether current connected address of ac is in the 814// new addrs list. 815// - If true, it updates ac.addrs and returns true. The ac will keep using 816// the existing connection. 817// - If false, it does nothing and returns false. 818func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool { 819 ac.mu.Lock() 820 defer ac.mu.Unlock() 821 channelz.Infof(ac.channelzID, "addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs) 822 if ac.state == connectivity.Shutdown || 823 ac.state == connectivity.TransientFailure || 824 ac.state == connectivity.Idle { 825 ac.addrs = addrs 826 return true 827 } 828 829 if ac.state == connectivity.Connecting { 830 return false 831 } 832 833 // ac.state is Ready, try to find the connected address. 834 var curAddrFound bool 835 for _, a := range addrs { 836 if reflect.DeepEqual(ac.curAddr, a) { 837 curAddrFound = true 838 break 839 } 840 } 841 channelz.Infof(ac.channelzID, "addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound) 842 if curAddrFound { 843 ac.addrs = addrs 844 } 845 846 return curAddrFound 847} 848 849// GetMethodConfig gets the method config of the input method. 850// If there's an exact match for input method (i.e. /service/method), we return 851// the corresponding MethodConfig. 852// If there isn't an exact match for the input method, we look for the default config 853// under the service (i.e /service/). If there is a default MethodConfig for 854// the service, we return it. 855// Otherwise, we return an empty MethodConfig. 856func (cc *ClientConn) GetMethodConfig(method string) MethodConfig { 857 // TODO: Avoid the locking here. 858 cc.mu.RLock() 859 defer cc.mu.RUnlock() 860 if cc.sc == nil { 861 return MethodConfig{} 862 } 863 m, ok := cc.sc.Methods[method] 864 if !ok { 865 i := strings.LastIndex(method, "/") 866 m = cc.sc.Methods[method[:i+1]] 867 } 868 return m 869} 870 871func (cc *ClientConn) healthCheckConfig() *healthCheckConfig { 872 cc.mu.RLock() 873 defer cc.mu.RUnlock() 874 if cc.sc == nil { 875 return nil 876 } 877 return cc.sc.healthCheckConfig 878} 879 880func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) { 881 t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{ 882 Ctx: ctx, 883 FullMethodName: method, 884 }) 885 if err != nil { 886 return nil, nil, toRPCErr(err) 887 } 888 return t, done, nil 889} 890 891func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, addrs []resolver.Address) { 892 if sc == nil { 893 // should never reach here. 894 return 895 } 896 cc.sc = sc 897 898 if cc.sc.retryThrottling != nil { 899 newThrottler := &retryThrottler{ 900 tokens: cc.sc.retryThrottling.MaxTokens, 901 max: cc.sc.retryThrottling.MaxTokens, 902 thresh: cc.sc.retryThrottling.MaxTokens / 2, 903 ratio: cc.sc.retryThrottling.TokenRatio, 904 } 905 cc.retryThrottler.Store(newThrottler) 906 } else { 907 cc.retryThrottler.Store((*retryThrottler)(nil)) 908 } 909 910 if cc.dopts.balancerBuilder == nil { 911 // Only look at balancer types and switch balancer if balancer dial 912 // option is not set. 913 var newBalancerName string 914 if cc.sc != nil && cc.sc.lbConfig != nil { 915 newBalancerName = cc.sc.lbConfig.name 916 } else { 917 var isGRPCLB bool 918 for _, a := range addrs { 919 if a.Type == resolver.GRPCLB { 920 isGRPCLB = true 921 break 922 } 923 } 924 if isGRPCLB { 925 newBalancerName = grpclbName 926 } else if cc.sc != nil && cc.sc.LB != nil { 927 newBalancerName = *cc.sc.LB 928 } else { 929 newBalancerName = PickFirstBalancerName 930 } 931 } 932 cc.switchBalancer(newBalancerName) 933 } else if cc.balancerWrapper == nil { 934 // Balancer dial option was set, and this is the first time handling 935 // resolved addresses. Build a balancer with dopts.balancerBuilder. 936 cc.curBalancerName = cc.dopts.balancerBuilder.Name() 937 cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts) 938 } 939} 940 941func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) { 942 cc.mu.RLock() 943 r := cc.resolverWrapper 944 cc.mu.RUnlock() 945 if r == nil { 946 return 947 } 948 go r.resolveNow(o) 949} 950 951// ResetConnectBackoff wakes up all subchannels in transient failure and causes 952// them to attempt another connection immediately. It also resets the backoff 953// times used for subsequent attempts regardless of the current state. 954// 955// In general, this function should not be used. Typical service or network 956// outages result in a reasonable client reconnection strategy by default. 957// However, if a previously unavailable network becomes available, this may be 958// used to trigger an immediate reconnect. 959// 960// This API is EXPERIMENTAL. 961func (cc *ClientConn) ResetConnectBackoff() { 962 cc.mu.Lock() 963 conns := cc.conns 964 cc.mu.Unlock() 965 for ac := range conns { 966 ac.resetConnectBackoff() 967 } 968} 969 970// Close tears down the ClientConn and all underlying connections. 971func (cc *ClientConn) Close() error { 972 defer cc.cancel() 973 974 cc.mu.Lock() 975 if cc.conns == nil { 976 cc.mu.Unlock() 977 return ErrClientConnClosing 978 } 979 conns := cc.conns 980 cc.conns = nil 981 cc.csMgr.updateState(connectivity.Shutdown) 982 983 rWrapper := cc.resolverWrapper 984 cc.resolverWrapper = nil 985 bWrapper := cc.balancerWrapper 986 cc.balancerWrapper = nil 987 cc.mu.Unlock() 988 989 cc.blockingpicker.close() 990 991 if rWrapper != nil { 992 rWrapper.close() 993 } 994 if bWrapper != nil { 995 bWrapper.close() 996 } 997 998 for ac := range conns { 999 ac.tearDown(ErrClientConnClosing) 1000 } 1001 if channelz.IsOn() { 1002 ted := &channelz.TraceEventDesc{ 1003 Desc: "Channel Deleted", 1004 Severity: channelz.CtINFO, 1005 } 1006 if cc.dopts.channelzParentID != 0 { 1007 ted.Parent = &channelz.TraceEventDesc{ 1008 Desc: fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID), 1009 Severity: channelz.CtINFO, 1010 } 1011 } 1012 channelz.AddTraceEvent(cc.channelzID, 0, ted) 1013 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to 1014 // the entity being deleted, and thus prevent it from being deleted right away. 1015 channelz.RemoveEntry(cc.channelzID) 1016 } 1017 return nil 1018} 1019 1020// addrConn is a network connection to a given address. 1021type addrConn struct { 1022 ctx context.Context 1023 cancel context.CancelFunc 1024 1025 cc *ClientConn 1026 dopts dialOptions 1027 acbw balancer.SubConn 1028 scopts balancer.NewSubConnOptions 1029 1030 // transport is set when there's a viable transport (note: ac state may not be READY as LB channel 1031 // health checking may require server to report healthy to set ac to READY), and is reset 1032 // to nil when the current transport should no longer be used to create a stream (e.g. after GoAway 1033 // is received, transport is closed, ac has been torn down). 1034 transport transport.ClientTransport // The current transport. 1035 1036 mu sync.Mutex 1037 curAddr resolver.Address // The current address. 1038 addrs []resolver.Address // All addresses that the resolver resolved to. 1039 1040 // Use updateConnectivityState for updating addrConn's connectivity state. 1041 state connectivity.State 1042 1043 backoffIdx int // Needs to be stateful for resetConnectBackoff. 1044 resetBackoff chan struct{} 1045 1046 channelzID int64 // channelz unique identification number. 1047 czData *channelzData 1048} 1049 1050// Note: this requires a lock on ac.mu. 1051func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) { 1052 if ac.state == s { 1053 return 1054 } 1055 ac.state = s 1056 channelz.Infof(ac.channelzID, "Subchannel Connectivity change to %v", s) 1057 ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr) 1058} 1059 1060// adjustParams updates parameters used to create transports upon 1061// receiving a GoAway. 1062func (ac *addrConn) adjustParams(r transport.GoAwayReason) { 1063 switch r { 1064 case transport.GoAwayTooManyPings: 1065 v := 2 * ac.dopts.copts.KeepaliveParams.Time 1066 ac.cc.mu.Lock() 1067 if v > ac.cc.mkp.Time { 1068 ac.cc.mkp.Time = v 1069 } 1070 ac.cc.mu.Unlock() 1071 } 1072} 1073 1074func (ac *addrConn) resetTransport() { 1075 for i := 0; ; i++ { 1076 if i > 0 { 1077 ac.cc.resolveNow(resolver.ResolveNowOptions{}) 1078 } 1079 1080 ac.mu.Lock() 1081 if ac.state == connectivity.Shutdown { 1082 ac.mu.Unlock() 1083 return 1084 } 1085 1086 addrs := ac.addrs 1087 backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx) 1088 // This will be the duration that dial gets to finish. 1089 dialDuration := minConnectTimeout 1090 if ac.dopts.minConnectTimeout != nil { 1091 dialDuration = ac.dopts.minConnectTimeout() 1092 } 1093 1094 if dialDuration < backoffFor { 1095 // Give dial more time as we keep failing to connect. 1096 dialDuration = backoffFor 1097 } 1098 // We can potentially spend all the time trying the first address, and 1099 // if the server accepts the connection and then hangs, the following 1100 // addresses will never be tried. 1101 // 1102 // The spec doesn't mention what should be done for multiple addresses. 1103 // https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm 1104 connectDeadline := time.Now().Add(dialDuration) 1105 1106 ac.updateConnectivityState(connectivity.Connecting, nil) 1107 ac.transport = nil 1108 ac.mu.Unlock() 1109 1110 newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline) 1111 if err != nil { 1112 // After exhausting all addresses, the addrConn enters 1113 // TRANSIENT_FAILURE. 1114 ac.mu.Lock() 1115 if ac.state == connectivity.Shutdown { 1116 ac.mu.Unlock() 1117 return 1118 } 1119 ac.updateConnectivityState(connectivity.TransientFailure, err) 1120 1121 // Backoff. 1122 b := ac.resetBackoff 1123 ac.mu.Unlock() 1124 1125 timer := time.NewTimer(backoffFor) 1126 select { 1127 case <-timer.C: 1128 ac.mu.Lock() 1129 ac.backoffIdx++ 1130 ac.mu.Unlock() 1131 case <-b: 1132 timer.Stop() 1133 case <-ac.ctx.Done(): 1134 timer.Stop() 1135 return 1136 } 1137 continue 1138 } 1139 1140 ac.mu.Lock() 1141 if ac.state == connectivity.Shutdown { 1142 ac.mu.Unlock() 1143 newTr.Close() 1144 return 1145 } 1146 ac.curAddr = addr 1147 ac.transport = newTr 1148 ac.backoffIdx = 0 1149 1150 hctx, hcancel := context.WithCancel(ac.ctx) 1151 ac.startHealthCheck(hctx) 1152 ac.mu.Unlock() 1153 1154 // Block until the created transport is down. And when this happens, 1155 // we restart from the top of the addr list. 1156 <-reconnect.Done() 1157 hcancel() 1158 // restart connecting - the top of the loop will set state to 1159 // CONNECTING. This is against the current connectivity semantics doc, 1160 // however it allows for graceful behavior for RPCs not yet dispatched 1161 // - unfortunate timing would otherwise lead to the RPC failing even 1162 // though the TRANSIENT_FAILURE state (called for by the doc) would be 1163 // instantaneous. 1164 // 1165 // Ideally we should transition to Idle here and block until there is 1166 // RPC activity that leads to the balancer requesting a reconnect of 1167 // the associated SubConn. 1168 } 1169} 1170 1171// tryAllAddrs tries to creates a connection to the addresses, and stop when at the 1172// first successful one. It returns the transport, the address and a Event in 1173// the successful case. The Event fires when the returned transport disconnects. 1174func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) (transport.ClientTransport, resolver.Address, *grpcsync.Event, error) { 1175 var firstConnErr error 1176 for _, addr := range addrs { 1177 ac.mu.Lock() 1178 if ac.state == connectivity.Shutdown { 1179 ac.mu.Unlock() 1180 return nil, resolver.Address{}, nil, errConnClosing 1181 } 1182 1183 ac.cc.mu.RLock() 1184 ac.dopts.copts.KeepaliveParams = ac.cc.mkp 1185 ac.cc.mu.RUnlock() 1186 1187 copts := ac.dopts.copts 1188 if ac.scopts.CredsBundle != nil { 1189 copts.CredsBundle = ac.scopts.CredsBundle 1190 } 1191 ac.mu.Unlock() 1192 1193 channelz.Infof(ac.channelzID, "Subchannel picks a new address %q to connect", addr.Addr) 1194 1195 newTr, reconnect, err := ac.createTransport(addr, copts, connectDeadline) 1196 if err == nil { 1197 return newTr, addr, reconnect, nil 1198 } 1199 if firstConnErr == nil { 1200 firstConnErr = err 1201 } 1202 ac.cc.blockingpicker.updateConnectionError(err) 1203 } 1204 1205 // Couldn't connect to any address. 1206 return nil, resolver.Address{}, nil, firstConnErr 1207} 1208 1209// createTransport creates a connection to addr. It returns the transport and a 1210// Event in the successful case. The Event fires when the returned transport 1211// disconnects. 1212func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) (transport.ClientTransport, *grpcsync.Event, error) { 1213 prefaceReceived := make(chan struct{}) 1214 onCloseCalled := make(chan struct{}) 1215 reconnect := grpcsync.NewEvent() 1216 1217 authority := ac.cc.authority 1218 // addr.ServerName takes precedent over ClientConn authority, if present. 1219 if addr.ServerName != "" { 1220 authority = addr.ServerName 1221 } 1222 1223 target := transport.TargetInfo{ 1224 Addr: addr.Addr, 1225 Metadata: addr.Metadata, 1226 Authority: authority, 1227 } 1228 1229 once := sync.Once{} 1230 onGoAway := func(r transport.GoAwayReason) { 1231 ac.mu.Lock() 1232 ac.adjustParams(r) 1233 once.Do(func() { 1234 if ac.state == connectivity.Ready { 1235 // Prevent this SubConn from being used for new RPCs by setting its 1236 // state to Connecting. 1237 // 1238 // TODO: this should be Idle when grpc-go properly supports it. 1239 ac.updateConnectivityState(connectivity.Connecting, nil) 1240 } 1241 }) 1242 ac.mu.Unlock() 1243 reconnect.Fire() 1244 } 1245 1246 onClose := func() { 1247 ac.mu.Lock() 1248 once.Do(func() { 1249 if ac.state == connectivity.Ready { 1250 // Prevent this SubConn from being used for new RPCs by setting its 1251 // state to Connecting. 1252 // 1253 // TODO: this should be Idle when grpc-go properly supports it. 1254 ac.updateConnectivityState(connectivity.Connecting, nil) 1255 } 1256 }) 1257 ac.mu.Unlock() 1258 close(onCloseCalled) 1259 reconnect.Fire() 1260 } 1261 1262 onPrefaceReceipt := func() { 1263 close(prefaceReceived) 1264 } 1265 1266 connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline) 1267 defer cancel() 1268 if channelz.IsOn() { 1269 copts.ChannelzParentID = ac.channelzID 1270 } 1271 1272 newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose) 1273 if err != nil { 1274 // newTr is either nil, or closed. 1275 channelz.Warningf(ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v. Err: %v. Reconnecting...", addr, err) 1276 return nil, nil, err 1277 } 1278 1279 select { 1280 case <-time.After(time.Until(connectDeadline)): 1281 // We didn't get the preface in time. 1282 newTr.Close() 1283 channelz.Warningf(ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr) 1284 return nil, nil, errors.New("timed out waiting for server handshake") 1285 case <-prefaceReceived: 1286 // We got the preface - huzzah! things are good. 1287 case <-onCloseCalled: 1288 // The transport has already closed - noop. 1289 return nil, nil, errors.New("connection closed") 1290 // TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix. 1291 } 1292 return newTr, reconnect, nil 1293} 1294 1295// startHealthCheck starts the health checking stream (RPC) to watch the health 1296// stats of this connection if health checking is requested and configured. 1297// 1298// LB channel health checking is enabled when all requirements below are met: 1299// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption 1300// 2. internal.HealthCheckFunc is set by importing the grpc/healthcheck package 1301// 3. a service config with non-empty healthCheckConfig field is provided 1302// 4. the load balancer requests it 1303// 1304// It sets addrConn to READY if the health checking stream is not started. 1305// 1306// Caller must hold ac.mu. 1307func (ac *addrConn) startHealthCheck(ctx context.Context) { 1308 var healthcheckManagingState bool 1309 defer func() { 1310 if !healthcheckManagingState { 1311 ac.updateConnectivityState(connectivity.Ready, nil) 1312 } 1313 }() 1314 1315 if ac.cc.dopts.disableHealthCheck { 1316 return 1317 } 1318 healthCheckConfig := ac.cc.healthCheckConfig() 1319 if healthCheckConfig == nil { 1320 return 1321 } 1322 if !ac.scopts.HealthCheckEnabled { 1323 return 1324 } 1325 healthCheckFunc := ac.cc.dopts.healthCheckFunc 1326 if healthCheckFunc == nil { 1327 // The health package is not imported to set health check function. 1328 // 1329 // TODO: add a link to the health check doc in the error message. 1330 channelz.Error(ac.channelzID, "Health check is requested but health check function is not set.") 1331 return 1332 } 1333 1334 healthcheckManagingState = true 1335 1336 // Set up the health check helper functions. 1337 currentTr := ac.transport 1338 newStream := func(method string) (interface{}, error) { 1339 ac.mu.Lock() 1340 if ac.transport != currentTr { 1341 ac.mu.Unlock() 1342 return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use") 1343 } 1344 ac.mu.Unlock() 1345 return newNonRetryClientStream(ctx, &StreamDesc{ServerStreams: true}, method, currentTr, ac) 1346 } 1347 setConnectivityState := func(s connectivity.State, lastErr error) { 1348 ac.mu.Lock() 1349 defer ac.mu.Unlock() 1350 if ac.transport != currentTr { 1351 return 1352 } 1353 ac.updateConnectivityState(s, lastErr) 1354 } 1355 // Start the health checking stream. 1356 go func() { 1357 err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName) 1358 if err != nil { 1359 if status.Code(err) == codes.Unimplemented { 1360 channelz.Error(ac.channelzID, "Subchannel health check is unimplemented at server side, thus health check is disabled") 1361 } else { 1362 channelz.Errorf(ac.channelzID, "HealthCheckFunc exits with unexpected error %v", err) 1363 } 1364 } 1365 }() 1366} 1367 1368func (ac *addrConn) resetConnectBackoff() { 1369 ac.mu.Lock() 1370 close(ac.resetBackoff) 1371 ac.backoffIdx = 0 1372 ac.resetBackoff = make(chan struct{}) 1373 ac.mu.Unlock() 1374} 1375 1376// getReadyTransport returns the transport if ac's state is READY. 1377// Otherwise it returns nil, false. 1378// If ac's state is IDLE, it will trigger ac to connect. 1379func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) { 1380 ac.mu.Lock() 1381 if ac.state == connectivity.Ready && ac.transport != nil { 1382 t := ac.transport 1383 ac.mu.Unlock() 1384 return t, true 1385 } 1386 var idle bool 1387 if ac.state == connectivity.Idle { 1388 idle = true 1389 } 1390 ac.mu.Unlock() 1391 // Trigger idle ac to connect. 1392 if idle { 1393 ac.connect() 1394 } 1395 return nil, false 1396} 1397 1398// tearDown starts to tear down the addrConn. 1399// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in 1400// some edge cases (e.g., the caller opens and closes many addrConn's in a 1401// tight loop. 1402// tearDown doesn't remove ac from ac.cc.conns. 1403func (ac *addrConn) tearDown(err error) { 1404 ac.mu.Lock() 1405 if ac.state == connectivity.Shutdown { 1406 ac.mu.Unlock() 1407 return 1408 } 1409 curTr := ac.transport 1410 ac.transport = nil 1411 // We have to set the state to Shutdown before anything else to prevent races 1412 // between setting the state and logic that waits on context cancellation / etc. 1413 ac.updateConnectivityState(connectivity.Shutdown, nil) 1414 ac.cancel() 1415 ac.curAddr = resolver.Address{} 1416 if err == errConnDrain && curTr != nil { 1417 // GracefulClose(...) may be executed multiple times when 1418 // i) receiving multiple GoAway frames from the server; or 1419 // ii) there are concurrent name resolver/Balancer triggered 1420 // address removal and GoAway. 1421 // We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu. 1422 ac.mu.Unlock() 1423 curTr.GracefulClose() 1424 ac.mu.Lock() 1425 } 1426 if channelz.IsOn() { 1427 channelz.AddTraceEvent(ac.channelzID, 0, &channelz.TraceEventDesc{ 1428 Desc: "Subchannel Deleted", 1429 Severity: channelz.CtINFO, 1430 Parent: &channelz.TraceEventDesc{ 1431 Desc: fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID), 1432 Severity: channelz.CtINFO, 1433 }, 1434 }) 1435 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to 1436 // the entity being deleted, and thus prevent it from being deleted right away. 1437 channelz.RemoveEntry(ac.channelzID) 1438 } 1439 ac.mu.Unlock() 1440} 1441 1442func (ac *addrConn) getState() connectivity.State { 1443 ac.mu.Lock() 1444 defer ac.mu.Unlock() 1445 return ac.state 1446} 1447 1448func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric { 1449 ac.mu.Lock() 1450 addr := ac.curAddr.Addr 1451 ac.mu.Unlock() 1452 return &channelz.ChannelInternalMetric{ 1453 State: ac.getState(), 1454 Target: addr, 1455 CallsStarted: atomic.LoadInt64(&ac.czData.callsStarted), 1456 CallsSucceeded: atomic.LoadInt64(&ac.czData.callsSucceeded), 1457 CallsFailed: atomic.LoadInt64(&ac.czData.callsFailed), 1458 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)), 1459 } 1460} 1461 1462func (ac *addrConn) incrCallsStarted() { 1463 atomic.AddInt64(&ac.czData.callsStarted, 1) 1464 atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano()) 1465} 1466 1467func (ac *addrConn) incrCallsSucceeded() { 1468 atomic.AddInt64(&ac.czData.callsSucceeded, 1) 1469} 1470 1471func (ac *addrConn) incrCallsFailed() { 1472 atomic.AddInt64(&ac.czData.callsFailed, 1) 1473} 1474 1475type retryThrottler struct { 1476 max float64 1477 thresh float64 1478 ratio float64 1479 1480 mu sync.Mutex 1481 tokens float64 // TODO(dfawley): replace with atomic and remove lock. 1482} 1483 1484// throttle subtracts a retry token from the pool and returns whether a retry 1485// should be throttled (disallowed) based upon the retry throttling policy in 1486// the service config. 1487func (rt *retryThrottler) throttle() bool { 1488 if rt == nil { 1489 return false 1490 } 1491 rt.mu.Lock() 1492 defer rt.mu.Unlock() 1493 rt.tokens-- 1494 if rt.tokens < 0 { 1495 rt.tokens = 0 1496 } 1497 return rt.tokens <= rt.thresh 1498} 1499 1500func (rt *retryThrottler) successfulRPC() { 1501 if rt == nil { 1502 return 1503 } 1504 rt.mu.Lock() 1505 defer rt.mu.Unlock() 1506 rt.tokens += rt.ratio 1507 if rt.tokens > rt.max { 1508 rt.tokens = rt.max 1509 } 1510} 1511 1512type channelzChannel struct { 1513 cc *ClientConn 1514} 1515 1516func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric { 1517 return c.cc.channelzMetric() 1518} 1519 1520// ErrClientConnTimeout indicates that the ClientConn cannot establish the 1521// underlying connections within the specified timeout. 1522// 1523// Deprecated: This error is never returned by grpc and should not be 1524// referenced by users. 1525var ErrClientConnTimeout = errors.New("grpc: timed out when dialing") 1526 1527func (cc *ClientConn) getResolver(scheme string) resolver.Builder { 1528 for _, rb := range cc.dopts.resolvers { 1529 if scheme == rb.Scheme() { 1530 return rb 1531 } 1532 } 1533 return resolver.Get(scheme) 1534} 1535