1/* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package grpc 20 21import ( 22 "context" 23 "errors" 24 "fmt" 25 "math" 26 "reflect" 27 "strings" 28 "sync" 29 "sync/atomic" 30 "time" 31 32 "google.golang.org/grpc/balancer" 33 "google.golang.org/grpc/balancer/base" 34 "google.golang.org/grpc/codes" 35 "google.golang.org/grpc/connectivity" 36 "google.golang.org/grpc/credentials" 37 "google.golang.org/grpc/internal/backoff" 38 "google.golang.org/grpc/internal/channelz" 39 "google.golang.org/grpc/internal/grpcsync" 40 "google.golang.org/grpc/internal/grpcutil" 41 iresolver "google.golang.org/grpc/internal/resolver" 42 "google.golang.org/grpc/internal/transport" 43 "google.golang.org/grpc/keepalive" 44 "google.golang.org/grpc/resolver" 45 "google.golang.org/grpc/serviceconfig" 46 "google.golang.org/grpc/status" 47 48 _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin. 49 _ "google.golang.org/grpc/internal/resolver/dns" // To register dns resolver. 50 _ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver. 51 _ "google.golang.org/grpc/internal/resolver/unix" // To register unix resolver. 52) 53 54const ( 55 // minimum time to give a connection to complete 56 minConnectTimeout = 20 * time.Second 57 // must match grpclbName in grpclb/grpclb.go 58 grpclbName = "grpclb" 59) 60 61var ( 62 // ErrClientConnClosing indicates that the operation is illegal because 63 // the ClientConn is closing. 64 // 65 // Deprecated: this error should not be relied upon by users; use the status 66 // code of Canceled instead. 67 ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing") 68 // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs. 69 errConnDrain = errors.New("grpc: the connection is drained") 70 // errConnClosing indicates that the connection is closing. 71 errConnClosing = errors.New("grpc: the connection is closing") 72 // invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default 73 // service config. 74 invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid" 75) 76 77// The following errors are returned from Dial and DialContext 78var ( 79 // errNoTransportSecurity indicates that there is no transport security 80 // being set for ClientConn. Users should either set one or explicitly 81 // call WithInsecure DialOption to disable security. 82 errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)") 83 // errTransportCredsAndBundle indicates that creds bundle is used together 84 // with other individual Transport Credentials. 85 errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials") 86 // errTransportCredentialsMissing indicates that users want to transmit security 87 // information (e.g., OAuth2 token) which requires secure connection on an insecure 88 // connection. 89 errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)") 90 // errCredentialsConflict indicates that grpc.WithTransportCredentials() 91 // and grpc.WithInsecure() are both called for a connection. 92 errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)") 93) 94 95const ( 96 defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4 97 defaultClientMaxSendMessageSize = math.MaxInt32 98 // http2IOBufSize specifies the buffer size for sending frames. 99 defaultWriteBufSize = 32 * 1024 100 defaultReadBufSize = 32 * 1024 101) 102 103// Dial creates a client connection to the given target. 104func Dial(target string, opts ...DialOption) (*ClientConn, error) { 105 return DialContext(context.Background(), target, opts...) 106} 107 108type defaultConfigSelector struct { 109 sc *ServiceConfig 110} 111 112func (dcs *defaultConfigSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RPCConfig, error) { 113 return &iresolver.RPCConfig{ 114 Context: rpcInfo.Context, 115 MethodConfig: getMethodConfig(dcs.sc, rpcInfo.Method), 116 }, nil 117} 118 119// DialContext creates a client connection to the given target. By default, it's 120// a non-blocking dial (the function won't wait for connections to be 121// established, and connecting happens in the background). To make it a blocking 122// dial, use WithBlock() dial option. 123// 124// In the non-blocking case, the ctx does not act against the connection. It 125// only controls the setup steps. 126// 127// In the blocking case, ctx can be used to cancel or expire the pending 128// connection. Once this function returns, the cancellation and expiration of 129// ctx will be noop. Users should call ClientConn.Close to terminate all the 130// pending operations after this function returns. 131// 132// The target name syntax is defined in 133// https://github.com/grpc/grpc/blob/master/doc/naming.md. 134// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target. 135func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) { 136 cc := &ClientConn{ 137 target: target, 138 csMgr: &connectivityStateManager{}, 139 conns: make(map[*addrConn]struct{}), 140 dopts: defaultDialOptions(), 141 blockingpicker: newPickerWrapper(), 142 czData: new(channelzData), 143 firstResolveEvent: grpcsync.NewEvent(), 144 } 145 cc.retryThrottler.Store((*retryThrottler)(nil)) 146 cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil}) 147 cc.ctx, cc.cancel = context.WithCancel(context.Background()) 148 149 for _, opt := range opts { 150 opt.apply(&cc.dopts) 151 } 152 153 chainUnaryClientInterceptors(cc) 154 chainStreamClientInterceptors(cc) 155 156 defer func() { 157 if err != nil { 158 cc.Close() 159 } 160 }() 161 162 if channelz.IsOn() { 163 if cc.dopts.channelzParentID != 0 { 164 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target) 165 channelz.AddTraceEvent(logger, cc.channelzID, 0, &channelz.TraceEventDesc{ 166 Desc: "Channel Created", 167 Severity: channelz.CtInfo, 168 Parent: &channelz.TraceEventDesc{ 169 Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID), 170 Severity: channelz.CtInfo, 171 }, 172 }) 173 } else { 174 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target) 175 channelz.Info(logger, cc.channelzID, "Channel Created") 176 } 177 cc.csMgr.channelzID = cc.channelzID 178 } 179 180 if !cc.dopts.insecure { 181 if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil { 182 return nil, errNoTransportSecurity 183 } 184 if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil { 185 return nil, errTransportCredsAndBundle 186 } 187 } else { 188 if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil { 189 return nil, errCredentialsConflict 190 } 191 for _, cd := range cc.dopts.copts.PerRPCCredentials { 192 if cd.RequireTransportSecurity() { 193 return nil, errTransportCredentialsMissing 194 } 195 } 196 } 197 198 if cc.dopts.defaultServiceConfigRawJSON != nil { 199 scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON) 200 if scpr.Err != nil { 201 return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err) 202 } 203 cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig) 204 } 205 cc.mkp = cc.dopts.copts.KeepaliveParams 206 207 if cc.dopts.copts.UserAgent != "" { 208 cc.dopts.copts.UserAgent += " " + grpcUA 209 } else { 210 cc.dopts.copts.UserAgent = grpcUA 211 } 212 213 if cc.dopts.timeout > 0 { 214 var cancel context.CancelFunc 215 ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout) 216 defer cancel() 217 } 218 defer func() { 219 select { 220 case <-ctx.Done(): 221 switch { 222 case ctx.Err() == err: 223 conn = nil 224 case err == nil || !cc.dopts.returnLastError: 225 conn, err = nil, ctx.Err() 226 default: 227 conn, err = nil, fmt.Errorf("%v: %v", ctx.Err(), err) 228 } 229 default: 230 } 231 }() 232 233 scSet := false 234 if cc.dopts.scChan != nil { 235 // Try to get an initial service config. 236 select { 237 case sc, ok := <-cc.dopts.scChan: 238 if ok { 239 cc.sc = &sc 240 cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc}) 241 scSet = true 242 } 243 default: 244 } 245 } 246 if cc.dopts.bs == nil { 247 cc.dopts.bs = backoff.DefaultExponential 248 } 249 250 // Determine the resolver to use. 251 cc.parsedTarget = grpcutil.ParseTarget(cc.target, cc.dopts.copts.Dialer != nil) 252 channelz.Infof(logger, cc.channelzID, "parsed scheme: %q", cc.parsedTarget.Scheme) 253 resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme) 254 if resolverBuilder == nil { 255 // If resolver builder is still nil, the parsed target's scheme is 256 // not registered. Fallback to default resolver and set Endpoint to 257 // the original target. 258 channelz.Infof(logger, cc.channelzID, "scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme) 259 cc.parsedTarget = resolver.Target{ 260 Scheme: resolver.GetDefaultScheme(), 261 Endpoint: target, 262 } 263 resolverBuilder = cc.getResolver(cc.parsedTarget.Scheme) 264 if resolverBuilder == nil { 265 return nil, fmt.Errorf("could not get resolver for default scheme: %q", cc.parsedTarget.Scheme) 266 } 267 } 268 269 creds := cc.dopts.copts.TransportCredentials 270 if creds != nil && creds.Info().ServerName != "" { 271 cc.authority = creds.Info().ServerName 272 } else if cc.dopts.insecure && cc.dopts.authority != "" { 273 cc.authority = cc.dopts.authority 274 } else if strings.HasPrefix(cc.target, "unix:") || strings.HasPrefix(cc.target, "unix-abstract:") { 275 cc.authority = "localhost" 276 } else if strings.HasPrefix(cc.parsedTarget.Endpoint, ":") { 277 cc.authority = "localhost" + cc.parsedTarget.Endpoint 278 } else { 279 // Use endpoint from "scheme://authority/endpoint" as the default 280 // authority for ClientConn. 281 cc.authority = cc.parsedTarget.Endpoint 282 } 283 284 if cc.dopts.scChan != nil && !scSet { 285 // Blocking wait for the initial service config. 286 select { 287 case sc, ok := <-cc.dopts.scChan: 288 if ok { 289 cc.sc = &sc 290 cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc}) 291 } 292 case <-ctx.Done(): 293 return nil, ctx.Err() 294 } 295 } 296 if cc.dopts.scChan != nil { 297 go cc.scWatcher() 298 } 299 300 var credsClone credentials.TransportCredentials 301 if creds := cc.dopts.copts.TransportCredentials; creds != nil { 302 credsClone = creds.Clone() 303 } 304 cc.balancerBuildOpts = balancer.BuildOptions{ 305 DialCreds: credsClone, 306 CredsBundle: cc.dopts.copts.CredsBundle, 307 Dialer: cc.dopts.copts.Dialer, 308 CustomUserAgent: cc.dopts.copts.UserAgent, 309 ChannelzParentID: cc.channelzID, 310 Target: cc.parsedTarget, 311 } 312 313 // Build the resolver. 314 rWrapper, err := newCCResolverWrapper(cc, resolverBuilder) 315 if err != nil { 316 return nil, fmt.Errorf("failed to build resolver: %v", err) 317 } 318 cc.mu.Lock() 319 cc.resolverWrapper = rWrapper 320 cc.mu.Unlock() 321 322 // A blocking dial blocks until the clientConn is ready. 323 if cc.dopts.block { 324 for { 325 cc.Connect() 326 s := cc.GetState() 327 if s == connectivity.Ready { 328 break 329 } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure { 330 if err = cc.connectionError(); err != nil { 331 terr, ok := err.(interface { 332 Temporary() bool 333 }) 334 if ok && !terr.Temporary() { 335 return nil, err 336 } 337 } 338 } 339 if !cc.WaitForStateChange(ctx, s) { 340 // ctx got timeout or canceled. 341 if err = cc.connectionError(); err != nil && cc.dopts.returnLastError { 342 return nil, err 343 } 344 return nil, ctx.Err() 345 } 346 } 347 } 348 349 return cc, nil 350} 351 352// chainUnaryClientInterceptors chains all unary client interceptors into one. 353func chainUnaryClientInterceptors(cc *ClientConn) { 354 interceptors := cc.dopts.chainUnaryInts 355 // Prepend dopts.unaryInt to the chaining interceptors if it exists, since unaryInt will 356 // be executed before any other chained interceptors. 357 if cc.dopts.unaryInt != nil { 358 interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...) 359 } 360 var chainedInt UnaryClientInterceptor 361 if len(interceptors) == 0 { 362 chainedInt = nil 363 } else if len(interceptors) == 1 { 364 chainedInt = interceptors[0] 365 } else { 366 chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error { 367 return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...) 368 } 369 } 370 cc.dopts.unaryInt = chainedInt 371} 372 373// getChainUnaryInvoker recursively generate the chained unary invoker. 374func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, finalInvoker UnaryInvoker) UnaryInvoker { 375 if curr == len(interceptors)-1 { 376 return finalInvoker 377 } 378 return func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error { 379 return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...) 380 } 381} 382 383// chainStreamClientInterceptors chains all stream client interceptors into one. 384func chainStreamClientInterceptors(cc *ClientConn) { 385 interceptors := cc.dopts.chainStreamInts 386 // Prepend dopts.streamInt to the chaining interceptors if it exists, since streamInt will 387 // be executed before any other chained interceptors. 388 if cc.dopts.streamInt != nil { 389 interceptors = append([]StreamClientInterceptor{cc.dopts.streamInt}, interceptors...) 390 } 391 var chainedInt StreamClientInterceptor 392 if len(interceptors) == 0 { 393 chainedInt = nil 394 } else if len(interceptors) == 1 { 395 chainedInt = interceptors[0] 396 } else { 397 chainedInt = func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error) { 398 return interceptors[0](ctx, desc, cc, method, getChainStreamer(interceptors, 0, streamer), opts...) 399 } 400 } 401 cc.dopts.streamInt = chainedInt 402} 403 404// getChainStreamer recursively generate the chained client stream constructor. 405func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStreamer Streamer) Streamer { 406 if curr == len(interceptors)-1 { 407 return finalStreamer 408 } 409 return func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) { 410 return interceptors[curr+1](ctx, desc, cc, method, getChainStreamer(interceptors, curr+1, finalStreamer), opts...) 411 } 412} 413 414// connectivityStateManager keeps the connectivity.State of ClientConn. 415// This struct will eventually be exported so the balancers can access it. 416type connectivityStateManager struct { 417 mu sync.Mutex 418 state connectivity.State 419 notifyChan chan struct{} 420 channelzID int64 421} 422 423// updateState updates the connectivity.State of ClientConn. 424// If there's a change it notifies goroutines waiting on state change to 425// happen. 426func (csm *connectivityStateManager) updateState(state connectivity.State) { 427 csm.mu.Lock() 428 defer csm.mu.Unlock() 429 if csm.state == connectivity.Shutdown { 430 return 431 } 432 if csm.state == state { 433 return 434 } 435 csm.state = state 436 channelz.Infof(logger, csm.channelzID, "Channel Connectivity change to %v", state) 437 if csm.notifyChan != nil { 438 // There are other goroutines waiting on this channel. 439 close(csm.notifyChan) 440 csm.notifyChan = nil 441 } 442} 443 444func (csm *connectivityStateManager) getState() connectivity.State { 445 csm.mu.Lock() 446 defer csm.mu.Unlock() 447 return csm.state 448} 449 450func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} { 451 csm.mu.Lock() 452 defer csm.mu.Unlock() 453 if csm.notifyChan == nil { 454 csm.notifyChan = make(chan struct{}) 455 } 456 return csm.notifyChan 457} 458 459// ClientConnInterface defines the functions clients need to perform unary and 460// streaming RPCs. It is implemented by *ClientConn, and is only intended to 461// be referenced by generated code. 462type ClientConnInterface interface { 463 // Invoke performs a unary RPC and returns after the response is received 464 // into reply. 465 Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error 466 // NewStream begins a streaming RPC. 467 NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) 468} 469 470// Assert *ClientConn implements ClientConnInterface. 471var _ ClientConnInterface = (*ClientConn)(nil) 472 473// ClientConn represents a virtual connection to a conceptual endpoint, to 474// perform RPCs. 475// 476// A ClientConn is free to have zero or more actual connections to the endpoint 477// based on configuration, load, etc. It is also free to determine which actual 478// endpoints to use and may change it every RPC, permitting client-side load 479// balancing. 480// 481// A ClientConn encapsulates a range of functionality including name 482// resolution, TCP connection establishment (with retries and backoff) and TLS 483// handshakes. It also handles errors on established connections by 484// re-resolving the name and reconnecting. 485type ClientConn struct { 486 ctx context.Context 487 cancel context.CancelFunc 488 489 target string 490 parsedTarget resolver.Target 491 authority string 492 dopts dialOptions 493 csMgr *connectivityStateManager 494 495 balancerBuildOpts balancer.BuildOptions 496 blockingpicker *pickerWrapper 497 498 safeConfigSelector iresolver.SafeConfigSelector 499 500 mu sync.RWMutex 501 resolverWrapper *ccResolverWrapper 502 sc *ServiceConfig 503 conns map[*addrConn]struct{} 504 // Keepalive parameter can be updated if a GoAway is received. 505 mkp keepalive.ClientParameters 506 curBalancerName string 507 balancerWrapper *ccBalancerWrapper 508 retryThrottler atomic.Value 509 510 firstResolveEvent *grpcsync.Event 511 512 channelzID int64 // channelz unique identification number 513 czData *channelzData 514 515 lceMu sync.Mutex // protects lastConnectionError 516 lastConnectionError error 517} 518 519// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or 520// ctx expires. A true value is returned in former case and false in latter. 521// 522// Experimental 523// 524// Notice: This API is EXPERIMENTAL and may be changed or removed in a 525// later release. 526func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool { 527 ch := cc.csMgr.getNotifyChan() 528 if cc.csMgr.getState() != sourceState { 529 return true 530 } 531 select { 532 case <-ctx.Done(): 533 return false 534 case <-ch: 535 return true 536 } 537} 538 539// GetState returns the connectivity.State of ClientConn. 540// 541// Experimental 542// 543// Notice: This API is EXPERIMENTAL and may be changed or removed in a later 544// release. 545func (cc *ClientConn) GetState() connectivity.State { 546 return cc.csMgr.getState() 547} 548 549// Connect causes all subchannels in the ClientConn to attempt to connect if 550// the channel is idle. Does not wait for the connection attempts to begin 551// before returning. 552// 553// Experimental 554// 555// Notice: This API is EXPERIMENTAL and may be changed or removed in a later 556// release. 557func (cc *ClientConn) Connect() { 558 cc.mu.Lock() 559 defer cc.mu.Unlock() 560 if cc.balancerWrapper != nil && cc.balancerWrapper.exitIdle() { 561 return 562 } 563 for ac := range cc.conns { 564 go ac.connect() 565 } 566} 567 568func (cc *ClientConn) scWatcher() { 569 for { 570 select { 571 case sc, ok := <-cc.dopts.scChan: 572 if !ok { 573 return 574 } 575 cc.mu.Lock() 576 // TODO: load balance policy runtime change is ignored. 577 // We may revisit this decision in the future. 578 cc.sc = &sc 579 cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc}) 580 cc.mu.Unlock() 581 case <-cc.ctx.Done(): 582 return 583 } 584 } 585} 586 587// waitForResolvedAddrs blocks until the resolver has provided addresses or the 588// context expires. Returns nil unless the context expires first; otherwise 589// returns a status error based on the context. 590func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error { 591 // This is on the RPC path, so we use a fast path to avoid the 592 // more-expensive "select" below after the resolver has returned once. 593 if cc.firstResolveEvent.HasFired() { 594 return nil 595 } 596 select { 597 case <-cc.firstResolveEvent.Done(): 598 return nil 599 case <-ctx.Done(): 600 return status.FromContextError(ctx.Err()).Err() 601 case <-cc.ctx.Done(): 602 return ErrClientConnClosing 603 } 604} 605 606var emptyServiceConfig *ServiceConfig 607 608func init() { 609 cfg := parseServiceConfig("{}") 610 if cfg.Err != nil { 611 panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err)) 612 } 613 emptyServiceConfig = cfg.Config.(*ServiceConfig) 614} 615 616func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) { 617 if cc.sc != nil { 618 cc.applyServiceConfigAndBalancer(cc.sc, nil, addrs) 619 return 620 } 621 if cc.dopts.defaultServiceConfig != nil { 622 cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, &defaultConfigSelector{cc.dopts.defaultServiceConfig}, addrs) 623 } else { 624 cc.applyServiceConfigAndBalancer(emptyServiceConfig, &defaultConfigSelector{emptyServiceConfig}, addrs) 625 } 626} 627 628func (cc *ClientConn) updateResolverState(s resolver.State, err error) error { 629 defer cc.firstResolveEvent.Fire() 630 cc.mu.Lock() 631 // Check if the ClientConn is already closed. Some fields (e.g. 632 // balancerWrapper) are set to nil when closing the ClientConn, and could 633 // cause nil pointer panic if we don't have this check. 634 if cc.conns == nil { 635 cc.mu.Unlock() 636 return nil 637 } 638 639 if err != nil { 640 // May need to apply the initial service config in case the resolver 641 // doesn't support service configs, or doesn't provide a service config 642 // with the new addresses. 643 cc.maybeApplyDefaultServiceConfig(nil) 644 645 if cc.balancerWrapper != nil { 646 cc.balancerWrapper.resolverError(err) 647 } 648 649 // No addresses are valid with err set; return early. 650 cc.mu.Unlock() 651 return balancer.ErrBadResolverState 652 } 653 654 var ret error 655 if cc.dopts.disableServiceConfig || s.ServiceConfig == nil { 656 cc.maybeApplyDefaultServiceConfig(s.Addresses) 657 // TODO: do we need to apply a failing LB policy if there is no 658 // default, per the error handling design? 659 } else { 660 if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok { 661 configSelector := iresolver.GetConfigSelector(s) 662 if configSelector != nil { 663 if len(s.ServiceConfig.Config.(*ServiceConfig).Methods) != 0 { 664 channelz.Infof(logger, cc.channelzID, "method configs in service config will be ignored due to presence of config selector") 665 } 666 } else { 667 configSelector = &defaultConfigSelector{sc} 668 } 669 cc.applyServiceConfigAndBalancer(sc, configSelector, s.Addresses) 670 } else { 671 ret = balancer.ErrBadResolverState 672 if cc.balancerWrapper == nil { 673 var err error 674 if s.ServiceConfig.Err != nil { 675 err = status.Errorf(codes.Unavailable, "error parsing service config: %v", s.ServiceConfig.Err) 676 } else { 677 err = status.Errorf(codes.Unavailable, "illegal service config type: %T", s.ServiceConfig.Config) 678 } 679 cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{cc.sc}) 680 cc.blockingpicker.updatePicker(base.NewErrPicker(err)) 681 cc.csMgr.updateState(connectivity.TransientFailure) 682 cc.mu.Unlock() 683 return ret 684 } 685 } 686 } 687 688 var balCfg serviceconfig.LoadBalancingConfig 689 if cc.dopts.balancerBuilder == nil && cc.sc != nil && cc.sc.lbConfig != nil { 690 balCfg = cc.sc.lbConfig.cfg 691 } 692 693 cbn := cc.curBalancerName 694 bw := cc.balancerWrapper 695 cc.mu.Unlock() 696 if cbn != grpclbName { 697 // Filter any grpclb addresses since we don't have the grpclb balancer. 698 for i := 0; i < len(s.Addresses); { 699 if s.Addresses[i].Type == resolver.GRPCLB { 700 copy(s.Addresses[i:], s.Addresses[i+1:]) 701 s.Addresses = s.Addresses[:len(s.Addresses)-1] 702 continue 703 } 704 i++ 705 } 706 } 707 uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg}) 708 if ret == nil { 709 ret = uccsErr // prefer ErrBadResolver state since any other error is 710 // currently meaningless to the caller. 711 } 712 return ret 713} 714 715// switchBalancer starts the switching from current balancer to the balancer 716// with the given name. 717// 718// It will NOT send the current address list to the new balancer. If needed, 719// caller of this function should send address list to the new balancer after 720// this function returns. 721// 722// Caller must hold cc.mu. 723func (cc *ClientConn) switchBalancer(name string) { 724 if strings.EqualFold(cc.curBalancerName, name) { 725 return 726 } 727 728 channelz.Infof(logger, cc.channelzID, "ClientConn switching balancer to %q", name) 729 if cc.dopts.balancerBuilder != nil { 730 channelz.Info(logger, cc.channelzID, "ignoring balancer switching: Balancer DialOption used instead") 731 return 732 } 733 if cc.balancerWrapper != nil { 734 // Don't hold cc.mu while closing the balancers. The balancers may call 735 // methods that require cc.mu (e.g. cc.NewSubConn()). Holding the mutex 736 // would cause a deadlock in that case. 737 cc.mu.Unlock() 738 cc.balancerWrapper.close() 739 cc.mu.Lock() 740 } 741 742 builder := balancer.Get(name) 743 if builder == nil { 744 channelz.Warningf(logger, cc.channelzID, "Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName) 745 channelz.Infof(logger, cc.channelzID, "failed to get balancer builder for: %v, using pick_first instead", name) 746 builder = newPickfirstBuilder() 747 } else { 748 channelz.Infof(logger, cc.channelzID, "Channel switches to new LB policy %q", name) 749 } 750 751 cc.curBalancerName = builder.Name() 752 cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts) 753} 754 755func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) { 756 cc.mu.Lock() 757 if cc.conns == nil { 758 cc.mu.Unlock() 759 return 760 } 761 // TODO(bar switching) send updates to all balancer wrappers when balancer 762 // gracefully switching is supported. 763 cc.balancerWrapper.handleSubConnStateChange(sc, s, err) 764 cc.mu.Unlock() 765} 766 767// newAddrConn creates an addrConn for addrs and adds it to cc.conns. 768// 769// Caller needs to make sure len(addrs) > 0. 770func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) { 771 ac := &addrConn{ 772 state: connectivity.Idle, 773 cc: cc, 774 addrs: addrs, 775 scopts: opts, 776 dopts: cc.dopts, 777 czData: new(channelzData), 778 resetBackoff: make(chan struct{}), 779 } 780 ac.ctx, ac.cancel = context.WithCancel(cc.ctx) 781 // Track ac in cc. This needs to be done before any getTransport(...) is called. 782 cc.mu.Lock() 783 if cc.conns == nil { 784 cc.mu.Unlock() 785 return nil, ErrClientConnClosing 786 } 787 if channelz.IsOn() { 788 ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "") 789 channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{ 790 Desc: "Subchannel Created", 791 Severity: channelz.CtInfo, 792 Parent: &channelz.TraceEventDesc{ 793 Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID), 794 Severity: channelz.CtInfo, 795 }, 796 }) 797 } 798 cc.conns[ac] = struct{}{} 799 cc.mu.Unlock() 800 return ac, nil 801} 802 803// removeAddrConn removes the addrConn in the subConn from clientConn. 804// It also tears down the ac with the given error. 805func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) { 806 cc.mu.Lock() 807 if cc.conns == nil { 808 cc.mu.Unlock() 809 return 810 } 811 delete(cc.conns, ac) 812 cc.mu.Unlock() 813 ac.tearDown(err) 814} 815 816func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric { 817 return &channelz.ChannelInternalMetric{ 818 State: cc.GetState(), 819 Target: cc.target, 820 CallsStarted: atomic.LoadInt64(&cc.czData.callsStarted), 821 CallsSucceeded: atomic.LoadInt64(&cc.czData.callsSucceeded), 822 CallsFailed: atomic.LoadInt64(&cc.czData.callsFailed), 823 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)), 824 } 825} 826 827// Target returns the target string of the ClientConn. 828// 829// Experimental 830// 831// Notice: This API is EXPERIMENTAL and may be changed or removed in a 832// later release. 833func (cc *ClientConn) Target() string { 834 return cc.target 835} 836 837func (cc *ClientConn) incrCallsStarted() { 838 atomic.AddInt64(&cc.czData.callsStarted, 1) 839 atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano()) 840} 841 842func (cc *ClientConn) incrCallsSucceeded() { 843 atomic.AddInt64(&cc.czData.callsSucceeded, 1) 844} 845 846func (cc *ClientConn) incrCallsFailed() { 847 atomic.AddInt64(&cc.czData.callsFailed, 1) 848} 849 850// connect starts creating a transport. 851// It does nothing if the ac is not IDLE. 852// TODO(bar) Move this to the addrConn section. 853func (ac *addrConn) connect() error { 854 ac.mu.Lock() 855 if ac.state == connectivity.Shutdown { 856 ac.mu.Unlock() 857 return errConnClosing 858 } 859 if ac.state != connectivity.Idle { 860 ac.mu.Unlock() 861 return nil 862 } 863 // Update connectivity state within the lock to prevent subsequent or 864 // concurrent calls from resetting the transport more than once. 865 ac.updateConnectivityState(connectivity.Connecting, nil) 866 ac.mu.Unlock() 867 868 ac.resetTransport() 869 return nil 870} 871 872// tryUpdateAddrs tries to update ac.addrs with the new addresses list. 873// 874// If ac is Connecting, it returns false. The caller should tear down the ac and 875// create a new one. Note that the backoff will be reset when this happens. 876// 877// If ac is TransientFailure, it updates ac.addrs and returns true. The updated 878// addresses will be picked up by retry in the next iteration after backoff. 879// 880// If ac is Shutdown or Idle, it updates ac.addrs and returns true. 881// 882// If ac is Ready, it checks whether current connected address of ac is in the 883// new addrs list. 884// - If true, it updates ac.addrs and returns true. The ac will keep using 885// the existing connection. 886// - If false, it does nothing and returns false. 887func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool { 888 ac.mu.Lock() 889 defer ac.mu.Unlock() 890 channelz.Infof(logger, ac.channelzID, "addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs) 891 if ac.state == connectivity.Shutdown || 892 ac.state == connectivity.TransientFailure || 893 ac.state == connectivity.Idle { 894 ac.addrs = addrs 895 return true 896 } 897 898 if ac.state == connectivity.Connecting { 899 return false 900 } 901 902 // ac.state is Ready, try to find the connected address. 903 var curAddrFound bool 904 for _, a := range addrs { 905 // a.ServerName takes precedent over ClientConn authority, if present. 906 if a.ServerName == "" { 907 a.ServerName = ac.cc.authority 908 } 909 if reflect.DeepEqual(ac.curAddr, a) { 910 curAddrFound = true 911 break 912 } 913 } 914 channelz.Infof(logger, ac.channelzID, "addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound) 915 if curAddrFound { 916 ac.addrs = addrs 917 } 918 919 return curAddrFound 920} 921 922func getMethodConfig(sc *ServiceConfig, method string) MethodConfig { 923 if sc == nil { 924 return MethodConfig{} 925 } 926 if m, ok := sc.Methods[method]; ok { 927 return m 928 } 929 i := strings.LastIndex(method, "/") 930 if m, ok := sc.Methods[method[:i+1]]; ok { 931 return m 932 } 933 return sc.Methods[""] 934} 935 936// GetMethodConfig gets the method config of the input method. 937// If there's an exact match for input method (i.e. /service/method), we return 938// the corresponding MethodConfig. 939// If there isn't an exact match for the input method, we look for the service's default 940// config under the service (i.e /service/) and then for the default for all services (empty string). 941// 942// If there is a default MethodConfig for the service, we return it. 943// Otherwise, we return an empty MethodConfig. 944func (cc *ClientConn) GetMethodConfig(method string) MethodConfig { 945 // TODO: Avoid the locking here. 946 cc.mu.RLock() 947 defer cc.mu.RUnlock() 948 return getMethodConfig(cc.sc, method) 949} 950 951func (cc *ClientConn) healthCheckConfig() *healthCheckConfig { 952 cc.mu.RLock() 953 defer cc.mu.RUnlock() 954 if cc.sc == nil { 955 return nil 956 } 957 return cc.sc.healthCheckConfig 958} 959 960func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) { 961 t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{ 962 Ctx: ctx, 963 FullMethodName: method, 964 }) 965 if err != nil { 966 return nil, nil, toRPCErr(err) 967 } 968 return t, done, nil 969} 970 971func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector, addrs []resolver.Address) { 972 if sc == nil { 973 // should never reach here. 974 return 975 } 976 cc.sc = sc 977 if configSelector != nil { 978 cc.safeConfigSelector.UpdateConfigSelector(configSelector) 979 } 980 981 if cc.sc.retryThrottling != nil { 982 newThrottler := &retryThrottler{ 983 tokens: cc.sc.retryThrottling.MaxTokens, 984 max: cc.sc.retryThrottling.MaxTokens, 985 thresh: cc.sc.retryThrottling.MaxTokens / 2, 986 ratio: cc.sc.retryThrottling.TokenRatio, 987 } 988 cc.retryThrottler.Store(newThrottler) 989 } else { 990 cc.retryThrottler.Store((*retryThrottler)(nil)) 991 } 992 993 if cc.dopts.balancerBuilder == nil { 994 // Only look at balancer types and switch balancer if balancer dial 995 // option is not set. 996 var newBalancerName string 997 if cc.sc != nil && cc.sc.lbConfig != nil { 998 newBalancerName = cc.sc.lbConfig.name 999 } else { 1000 var isGRPCLB bool 1001 for _, a := range addrs { 1002 if a.Type == resolver.GRPCLB { 1003 isGRPCLB = true 1004 break 1005 } 1006 } 1007 if isGRPCLB { 1008 newBalancerName = grpclbName 1009 } else if cc.sc != nil && cc.sc.LB != nil { 1010 newBalancerName = *cc.sc.LB 1011 } else { 1012 newBalancerName = PickFirstBalancerName 1013 } 1014 } 1015 cc.switchBalancer(newBalancerName) 1016 } else if cc.balancerWrapper == nil { 1017 // Balancer dial option was set, and this is the first time handling 1018 // resolved addresses. Build a balancer with dopts.balancerBuilder. 1019 cc.curBalancerName = cc.dopts.balancerBuilder.Name() 1020 cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts) 1021 } 1022} 1023 1024func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) { 1025 cc.mu.RLock() 1026 r := cc.resolverWrapper 1027 cc.mu.RUnlock() 1028 if r == nil { 1029 return 1030 } 1031 go r.resolveNow(o) 1032} 1033 1034// ResetConnectBackoff wakes up all subchannels in transient failure and causes 1035// them to attempt another connection immediately. It also resets the backoff 1036// times used for subsequent attempts regardless of the current state. 1037// 1038// In general, this function should not be used. Typical service or network 1039// outages result in a reasonable client reconnection strategy by default. 1040// However, if a previously unavailable network becomes available, this may be 1041// used to trigger an immediate reconnect. 1042// 1043// Experimental 1044// 1045// Notice: This API is EXPERIMENTAL and may be changed or removed in a 1046// later release. 1047func (cc *ClientConn) ResetConnectBackoff() { 1048 cc.mu.Lock() 1049 conns := cc.conns 1050 cc.mu.Unlock() 1051 for ac := range conns { 1052 ac.resetConnectBackoff() 1053 } 1054} 1055 1056// Close tears down the ClientConn and all underlying connections. 1057func (cc *ClientConn) Close() error { 1058 defer cc.cancel() 1059 1060 cc.mu.Lock() 1061 if cc.conns == nil { 1062 cc.mu.Unlock() 1063 return ErrClientConnClosing 1064 } 1065 conns := cc.conns 1066 cc.conns = nil 1067 cc.csMgr.updateState(connectivity.Shutdown) 1068 1069 rWrapper := cc.resolverWrapper 1070 cc.resolverWrapper = nil 1071 bWrapper := cc.balancerWrapper 1072 cc.balancerWrapper = nil 1073 cc.mu.Unlock() 1074 1075 cc.blockingpicker.close() 1076 1077 if bWrapper != nil { 1078 bWrapper.close() 1079 } 1080 if rWrapper != nil { 1081 rWrapper.close() 1082 } 1083 1084 for ac := range conns { 1085 ac.tearDown(ErrClientConnClosing) 1086 } 1087 if channelz.IsOn() { 1088 ted := &channelz.TraceEventDesc{ 1089 Desc: "Channel Deleted", 1090 Severity: channelz.CtInfo, 1091 } 1092 if cc.dopts.channelzParentID != 0 { 1093 ted.Parent = &channelz.TraceEventDesc{ 1094 Desc: fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID), 1095 Severity: channelz.CtInfo, 1096 } 1097 } 1098 channelz.AddTraceEvent(logger, cc.channelzID, 0, ted) 1099 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to 1100 // the entity being deleted, and thus prevent it from being deleted right away. 1101 channelz.RemoveEntry(cc.channelzID) 1102 } 1103 return nil 1104} 1105 1106// addrConn is a network connection to a given address. 1107type addrConn struct { 1108 ctx context.Context 1109 cancel context.CancelFunc 1110 1111 cc *ClientConn 1112 dopts dialOptions 1113 acbw balancer.SubConn 1114 scopts balancer.NewSubConnOptions 1115 1116 // transport is set when there's a viable transport (note: ac state may not be READY as LB channel 1117 // health checking may require server to report healthy to set ac to READY), and is reset 1118 // to nil when the current transport should no longer be used to create a stream (e.g. after GoAway 1119 // is received, transport is closed, ac has been torn down). 1120 transport transport.ClientTransport // The current transport. 1121 1122 mu sync.Mutex 1123 curAddr resolver.Address // The current address. 1124 addrs []resolver.Address // All addresses that the resolver resolved to. 1125 1126 // Use updateConnectivityState for updating addrConn's connectivity state. 1127 state connectivity.State 1128 1129 backoffIdx int // Needs to be stateful for resetConnectBackoff. 1130 resetBackoff chan struct{} 1131 1132 channelzID int64 // channelz unique identification number. 1133 czData *channelzData 1134} 1135 1136// Note: this requires a lock on ac.mu. 1137func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) { 1138 if ac.state == s { 1139 return 1140 } 1141 ac.state = s 1142 channelz.Infof(logger, ac.channelzID, "Subchannel Connectivity change to %v", s) 1143 ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr) 1144} 1145 1146// adjustParams updates parameters used to create transports upon 1147// receiving a GoAway. 1148func (ac *addrConn) adjustParams(r transport.GoAwayReason) { 1149 switch r { 1150 case transport.GoAwayTooManyPings: 1151 v := 2 * ac.dopts.copts.KeepaliveParams.Time 1152 ac.cc.mu.Lock() 1153 if v > ac.cc.mkp.Time { 1154 ac.cc.mkp.Time = v 1155 } 1156 ac.cc.mu.Unlock() 1157 } 1158} 1159 1160func (ac *addrConn) resetTransport() { 1161 ac.mu.Lock() 1162 if ac.state == connectivity.Shutdown { 1163 ac.mu.Unlock() 1164 return 1165 } 1166 1167 addrs := ac.addrs 1168 backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx) 1169 // This will be the duration that dial gets to finish. 1170 dialDuration := minConnectTimeout 1171 if ac.dopts.minConnectTimeout != nil { 1172 dialDuration = ac.dopts.minConnectTimeout() 1173 } 1174 1175 if dialDuration < backoffFor { 1176 // Give dial more time as we keep failing to connect. 1177 dialDuration = backoffFor 1178 } 1179 // We can potentially spend all the time trying the first address, and 1180 // if the server accepts the connection and then hangs, the following 1181 // addresses will never be tried. 1182 // 1183 // The spec doesn't mention what should be done for multiple addresses. 1184 // https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm 1185 connectDeadline := time.Now().Add(dialDuration) 1186 1187 ac.updateConnectivityState(connectivity.Connecting, nil) 1188 ac.mu.Unlock() 1189 1190 if err := ac.tryAllAddrs(addrs, connectDeadline); err != nil { 1191 ac.cc.resolveNow(resolver.ResolveNowOptions{}) 1192 // After exhausting all addresses, the addrConn enters 1193 // TRANSIENT_FAILURE. 1194 ac.mu.Lock() 1195 if ac.state == connectivity.Shutdown { 1196 ac.mu.Unlock() 1197 return 1198 } 1199 ac.updateConnectivityState(connectivity.TransientFailure, err) 1200 1201 // Backoff. 1202 b := ac.resetBackoff 1203 ac.mu.Unlock() 1204 1205 timer := time.NewTimer(backoffFor) 1206 select { 1207 case <-timer.C: 1208 ac.mu.Lock() 1209 ac.backoffIdx++ 1210 ac.mu.Unlock() 1211 case <-b: 1212 timer.Stop() 1213 case <-ac.ctx.Done(): 1214 timer.Stop() 1215 return 1216 } 1217 1218 ac.mu.Lock() 1219 if ac.state != connectivity.Shutdown { 1220 ac.updateConnectivityState(connectivity.Idle, err) 1221 } 1222 ac.mu.Unlock() 1223 return 1224 } 1225 // Success; reset backoff. 1226 ac.mu.Lock() 1227 ac.backoffIdx = 0 1228 ac.mu.Unlock() 1229} 1230 1231// tryAllAddrs tries to creates a connection to the addresses, and stop when at 1232// the first successful one. It returns an error if no address was successfully 1233// connected, or updates ac appropriately with the new transport. 1234func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) error { 1235 var firstConnErr error 1236 for _, addr := range addrs { 1237 ac.mu.Lock() 1238 if ac.state == connectivity.Shutdown { 1239 ac.mu.Unlock() 1240 return errConnClosing 1241 } 1242 1243 ac.cc.mu.RLock() 1244 ac.dopts.copts.KeepaliveParams = ac.cc.mkp 1245 ac.cc.mu.RUnlock() 1246 1247 copts := ac.dopts.copts 1248 if ac.scopts.CredsBundle != nil { 1249 copts.CredsBundle = ac.scopts.CredsBundle 1250 } 1251 ac.mu.Unlock() 1252 1253 channelz.Infof(logger, ac.channelzID, "Subchannel picks a new address %q to connect", addr.Addr) 1254 1255 err := ac.createTransport(addr, copts, connectDeadline) 1256 if err == nil { 1257 return nil 1258 } 1259 if firstConnErr == nil { 1260 firstConnErr = err 1261 } 1262 ac.cc.updateConnectionError(err) 1263 } 1264 1265 // Couldn't connect to any address. 1266 return firstConnErr 1267} 1268 1269// createTransport creates a connection to addr. It returns an error if the 1270// address was not successfully connected, or updates ac appropriately with the 1271// new transport. 1272func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error { 1273 // TODO: Delete prefaceReceived and move the logic to wait for it into the 1274 // transport. 1275 prefaceReceived := grpcsync.NewEvent() 1276 connClosed := grpcsync.NewEvent() 1277 1278 // addr.ServerName takes precedent over ClientConn authority, if present. 1279 if addr.ServerName == "" { 1280 addr.ServerName = ac.cc.authority 1281 } 1282 1283 hctx, hcancel := context.WithCancel(ac.ctx) 1284 hcStarted := false // protected by ac.mu 1285 1286 onClose := func() { 1287 ac.mu.Lock() 1288 defer ac.mu.Unlock() 1289 defer connClosed.Fire() 1290 if !hcStarted || hctx.Err() != nil { 1291 // We didn't start the health check or set the state to READY, so 1292 // no need to do anything else here. 1293 // 1294 // OR, we have already cancelled the health check context, meaning 1295 // we have already called onClose once for this transport. In this 1296 // case it would be dangerous to clear the transport and update the 1297 // state, since there may be a new transport in this addrConn. 1298 return 1299 } 1300 hcancel() 1301 ac.transport = nil 1302 // Refresh the name resolver 1303 ac.cc.resolveNow(resolver.ResolveNowOptions{}) 1304 if ac.state != connectivity.Shutdown { 1305 ac.updateConnectivityState(connectivity.Idle, nil) 1306 } 1307 } 1308 1309 onGoAway := func(r transport.GoAwayReason) { 1310 ac.mu.Lock() 1311 ac.adjustParams(r) 1312 ac.mu.Unlock() 1313 onClose() 1314 } 1315 1316 connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline) 1317 defer cancel() 1318 if channelz.IsOn() { 1319 copts.ChannelzParentID = ac.channelzID 1320 } 1321 1322 newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, func() { prefaceReceived.Fire() }, onGoAway, onClose) 1323 if err != nil { 1324 // newTr is either nil, or closed. 1325 channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v. Err: %v", addr, err) 1326 return err 1327 } 1328 1329 select { 1330 case <-connectCtx.Done(): 1331 // We didn't get the preface in time. 1332 // The error we pass to Close() is immaterial since there are no open 1333 // streams at this point, so no trailers with error details will be sent 1334 // out. We just need to pass a non-nil error. 1335 newTr.Close(transport.ErrConnClosing) 1336 if connectCtx.Err() == context.DeadlineExceeded { 1337 err := errors.New("failed to receive server preface within timeout") 1338 channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: %v", addr, err) 1339 return err 1340 } 1341 return nil 1342 case <-prefaceReceived.Done(): 1343 // We got the preface - huzzah! things are good. 1344 ac.mu.Lock() 1345 defer ac.mu.Unlock() 1346 if connClosed.HasFired() { 1347 // onClose called first; go idle but do nothing else. 1348 if ac.state != connectivity.Shutdown { 1349 ac.updateConnectivityState(connectivity.Idle, nil) 1350 } 1351 return nil 1352 } 1353 if ac.state == connectivity.Shutdown { 1354 // This can happen if the subConn was removed while in `Connecting` 1355 // state. tearDown() would have set the state to `Shutdown`, but 1356 // would not have closed the transport since ac.transport would not 1357 // been set at that point. 1358 // 1359 // We run this in a goroutine because newTr.Close() calls onClose() 1360 // inline, which requires locking ac.mu. 1361 // 1362 // The error we pass to Close() is immaterial since there are no open 1363 // streams at this point, so no trailers with error details will be sent 1364 // out. We just need to pass a non-nil error. 1365 go newTr.Close(transport.ErrConnClosing) 1366 return nil 1367 } 1368 ac.curAddr = addr 1369 ac.transport = newTr 1370 hcStarted = true 1371 ac.startHealthCheck(hctx) // Will set state to READY if appropriate. 1372 return nil 1373 case <-connClosed.Done(): 1374 // The transport has already closed. If we received the preface, too, 1375 // this is not an error. 1376 select { 1377 case <-prefaceReceived.Done(): 1378 return nil 1379 default: 1380 return errors.New("connection closed before server preface received") 1381 } 1382 } 1383} 1384 1385// startHealthCheck starts the health checking stream (RPC) to watch the health 1386// stats of this connection if health checking is requested and configured. 1387// 1388// LB channel health checking is enabled when all requirements below are met: 1389// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption 1390// 2. internal.HealthCheckFunc is set by importing the grpc/health package 1391// 3. a service config with non-empty healthCheckConfig field is provided 1392// 4. the load balancer requests it 1393// 1394// It sets addrConn to READY if the health checking stream is not started. 1395// 1396// Caller must hold ac.mu. 1397func (ac *addrConn) startHealthCheck(ctx context.Context) { 1398 var healthcheckManagingState bool 1399 defer func() { 1400 if !healthcheckManagingState { 1401 ac.updateConnectivityState(connectivity.Ready, nil) 1402 } 1403 }() 1404 1405 if ac.cc.dopts.disableHealthCheck { 1406 return 1407 } 1408 healthCheckConfig := ac.cc.healthCheckConfig() 1409 if healthCheckConfig == nil { 1410 return 1411 } 1412 if !ac.scopts.HealthCheckEnabled { 1413 return 1414 } 1415 healthCheckFunc := ac.cc.dopts.healthCheckFunc 1416 if healthCheckFunc == nil { 1417 // The health package is not imported to set health check function. 1418 // 1419 // TODO: add a link to the health check doc in the error message. 1420 channelz.Error(logger, ac.channelzID, "Health check is requested but health check function is not set.") 1421 return 1422 } 1423 1424 healthcheckManagingState = true 1425 1426 // Set up the health check helper functions. 1427 currentTr := ac.transport 1428 newStream := func(method string) (interface{}, error) { 1429 ac.mu.Lock() 1430 if ac.transport != currentTr { 1431 ac.mu.Unlock() 1432 return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use") 1433 } 1434 ac.mu.Unlock() 1435 return newNonRetryClientStream(ctx, &StreamDesc{ServerStreams: true}, method, currentTr, ac) 1436 } 1437 setConnectivityState := func(s connectivity.State, lastErr error) { 1438 ac.mu.Lock() 1439 defer ac.mu.Unlock() 1440 if ac.transport != currentTr { 1441 return 1442 } 1443 ac.updateConnectivityState(s, lastErr) 1444 } 1445 // Start the health checking stream. 1446 go func() { 1447 err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName) 1448 if err != nil { 1449 if status.Code(err) == codes.Unimplemented { 1450 channelz.Error(logger, ac.channelzID, "Subchannel health check is unimplemented at server side, thus health check is disabled") 1451 } else { 1452 channelz.Errorf(logger, ac.channelzID, "HealthCheckFunc exits with unexpected error %v", err) 1453 } 1454 } 1455 }() 1456} 1457 1458func (ac *addrConn) resetConnectBackoff() { 1459 ac.mu.Lock() 1460 close(ac.resetBackoff) 1461 ac.backoffIdx = 0 1462 ac.resetBackoff = make(chan struct{}) 1463 ac.mu.Unlock() 1464} 1465 1466// getReadyTransport returns the transport if ac's state is READY or nil if not. 1467func (ac *addrConn) getReadyTransport() transport.ClientTransport { 1468 ac.mu.Lock() 1469 defer ac.mu.Unlock() 1470 if ac.state == connectivity.Ready { 1471 return ac.transport 1472 } 1473 return nil 1474} 1475 1476// tearDown starts to tear down the addrConn. 1477// 1478// Note that tearDown doesn't remove ac from ac.cc.conns, so the addrConn struct 1479// will leak. In most cases, call cc.removeAddrConn() instead. 1480func (ac *addrConn) tearDown(err error) { 1481 ac.mu.Lock() 1482 if ac.state == connectivity.Shutdown { 1483 ac.mu.Unlock() 1484 return 1485 } 1486 curTr := ac.transport 1487 ac.transport = nil 1488 // We have to set the state to Shutdown before anything else to prevent races 1489 // between setting the state and logic that waits on context cancellation / etc. 1490 ac.updateConnectivityState(connectivity.Shutdown, nil) 1491 ac.cancel() 1492 ac.curAddr = resolver.Address{} 1493 if err == errConnDrain && curTr != nil { 1494 // GracefulClose(...) may be executed multiple times when 1495 // i) receiving multiple GoAway frames from the server; or 1496 // ii) there are concurrent name resolver/Balancer triggered 1497 // address removal and GoAway. 1498 // We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu. 1499 ac.mu.Unlock() 1500 curTr.GracefulClose() 1501 ac.mu.Lock() 1502 } 1503 if channelz.IsOn() { 1504 channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{ 1505 Desc: "Subchannel Deleted", 1506 Severity: channelz.CtInfo, 1507 Parent: &channelz.TraceEventDesc{ 1508 Desc: fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID), 1509 Severity: channelz.CtInfo, 1510 }, 1511 }) 1512 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to 1513 // the entity being deleted, and thus prevent it from being deleted right away. 1514 channelz.RemoveEntry(ac.channelzID) 1515 } 1516 ac.mu.Unlock() 1517} 1518 1519func (ac *addrConn) getState() connectivity.State { 1520 ac.mu.Lock() 1521 defer ac.mu.Unlock() 1522 return ac.state 1523} 1524 1525func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric { 1526 ac.mu.Lock() 1527 addr := ac.curAddr.Addr 1528 ac.mu.Unlock() 1529 return &channelz.ChannelInternalMetric{ 1530 State: ac.getState(), 1531 Target: addr, 1532 CallsStarted: atomic.LoadInt64(&ac.czData.callsStarted), 1533 CallsSucceeded: atomic.LoadInt64(&ac.czData.callsSucceeded), 1534 CallsFailed: atomic.LoadInt64(&ac.czData.callsFailed), 1535 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)), 1536 } 1537} 1538 1539func (ac *addrConn) incrCallsStarted() { 1540 atomic.AddInt64(&ac.czData.callsStarted, 1) 1541 atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano()) 1542} 1543 1544func (ac *addrConn) incrCallsSucceeded() { 1545 atomic.AddInt64(&ac.czData.callsSucceeded, 1) 1546} 1547 1548func (ac *addrConn) incrCallsFailed() { 1549 atomic.AddInt64(&ac.czData.callsFailed, 1) 1550} 1551 1552type retryThrottler struct { 1553 max float64 1554 thresh float64 1555 ratio float64 1556 1557 mu sync.Mutex 1558 tokens float64 // TODO(dfawley): replace with atomic and remove lock. 1559} 1560 1561// throttle subtracts a retry token from the pool and returns whether a retry 1562// should be throttled (disallowed) based upon the retry throttling policy in 1563// the service config. 1564func (rt *retryThrottler) throttle() bool { 1565 if rt == nil { 1566 return false 1567 } 1568 rt.mu.Lock() 1569 defer rt.mu.Unlock() 1570 rt.tokens-- 1571 if rt.tokens < 0 { 1572 rt.tokens = 0 1573 } 1574 return rt.tokens <= rt.thresh 1575} 1576 1577func (rt *retryThrottler) successfulRPC() { 1578 if rt == nil { 1579 return 1580 } 1581 rt.mu.Lock() 1582 defer rt.mu.Unlock() 1583 rt.tokens += rt.ratio 1584 if rt.tokens > rt.max { 1585 rt.tokens = rt.max 1586 } 1587} 1588 1589type channelzChannel struct { 1590 cc *ClientConn 1591} 1592 1593func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric { 1594 return c.cc.channelzMetric() 1595} 1596 1597// ErrClientConnTimeout indicates that the ClientConn cannot establish the 1598// underlying connections within the specified timeout. 1599// 1600// Deprecated: This error is never returned by grpc and should not be 1601// referenced by users. 1602var ErrClientConnTimeout = errors.New("grpc: timed out when dialing") 1603 1604func (cc *ClientConn) getResolver(scheme string) resolver.Builder { 1605 for _, rb := range cc.dopts.resolvers { 1606 if scheme == rb.Scheme() { 1607 return rb 1608 } 1609 } 1610 return resolver.Get(scheme) 1611} 1612 1613func (cc *ClientConn) updateConnectionError(err error) { 1614 cc.lceMu.Lock() 1615 cc.lastConnectionError = err 1616 cc.lceMu.Unlock() 1617} 1618 1619func (cc *ClientConn) connectionError() error { 1620 cc.lceMu.Lock() 1621 defer cc.lceMu.Unlock() 1622 return cc.lastConnectionError 1623} 1624