1/* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package grpc 20 21import ( 22 "context" 23 "errors" 24 "fmt" 25 "math" 26 "reflect" 27 "strings" 28 "sync" 29 "sync/atomic" 30 "time" 31 32 "google.golang.org/grpc/balancer" 33 "google.golang.org/grpc/balancer/base" 34 "google.golang.org/grpc/codes" 35 "google.golang.org/grpc/connectivity" 36 "google.golang.org/grpc/credentials" 37 "google.golang.org/grpc/internal/backoff" 38 "google.golang.org/grpc/internal/channelz" 39 "google.golang.org/grpc/internal/grpcsync" 40 "google.golang.org/grpc/internal/grpcutil" 41 iresolver "google.golang.org/grpc/internal/resolver" 42 "google.golang.org/grpc/internal/transport" 43 "google.golang.org/grpc/keepalive" 44 "google.golang.org/grpc/resolver" 45 "google.golang.org/grpc/serviceconfig" 46 "google.golang.org/grpc/status" 47 48 _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin. 49 _ "google.golang.org/grpc/internal/resolver/dns" // To register dns resolver. 50 _ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver. 51 _ "google.golang.org/grpc/internal/resolver/unix" // To register unix resolver. 52) 53 54const ( 55 // minimum time to give a connection to complete 56 minConnectTimeout = 20 * time.Second 57 // must match grpclbName in grpclb/grpclb.go 58 grpclbName = "grpclb" 59) 60 61var ( 62 // ErrClientConnClosing indicates that the operation is illegal because 63 // the ClientConn is closing. 64 // 65 // Deprecated: this error should not be relied upon by users; use the status 66 // code of Canceled instead. 67 ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing") 68 // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs. 69 errConnDrain = errors.New("grpc: the connection is drained") 70 // errConnClosing indicates that the connection is closing. 71 errConnClosing = errors.New("grpc: the connection is closing") 72 // invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default 73 // service config. 74 invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid" 75) 76 77// The following errors are returned from Dial and DialContext 78var ( 79 // errNoTransportSecurity indicates that there is no transport security 80 // being set for ClientConn. Users should either set one or explicitly 81 // call WithInsecure DialOption to disable security. 82 errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)") 83 // errTransportCredsAndBundle indicates that creds bundle is used together 84 // with other individual Transport Credentials. 85 errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials") 86 // errTransportCredentialsMissing indicates that users want to transmit security 87 // information (e.g., OAuth2 token) which requires secure connection on an insecure 88 // connection. 89 errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)") 90 // errCredentialsConflict indicates that grpc.WithTransportCredentials() 91 // and grpc.WithInsecure() are both called for a connection. 92 errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)") 93) 94 95const ( 96 defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4 97 defaultClientMaxSendMessageSize = math.MaxInt32 98 // http2IOBufSize specifies the buffer size for sending frames. 99 defaultWriteBufSize = 32 * 1024 100 defaultReadBufSize = 32 * 1024 101) 102 103// Dial creates a client connection to the given target. 104func Dial(target string, opts ...DialOption) (*ClientConn, error) { 105 return DialContext(context.Background(), target, opts...) 106} 107 108type defaultConfigSelector struct { 109 sc *ServiceConfig 110} 111 112func (dcs *defaultConfigSelector) SelectConfig(rpcInfo iresolver.RPCInfo) *iresolver.RPCConfig { 113 return &iresolver.RPCConfig{ 114 Context: rpcInfo.Context, 115 MethodConfig: getMethodConfig(dcs.sc, rpcInfo.Method), 116 } 117} 118 119// DialContext creates a client connection to the given target. By default, it's 120// a non-blocking dial (the function won't wait for connections to be 121// established, and connecting happens in the background). To make it a blocking 122// dial, use WithBlock() dial option. 123// 124// In the non-blocking case, the ctx does not act against the connection. It 125// only controls the setup steps. 126// 127// In the blocking case, ctx can be used to cancel or expire the pending 128// connection. Once this function returns, the cancellation and expiration of 129// ctx will be noop. Users should call ClientConn.Close to terminate all the 130// pending operations after this function returns. 131// 132// The target name syntax is defined in 133// https://github.com/grpc/grpc/blob/master/doc/naming.md. 134// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target. 135func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) { 136 cc := &ClientConn{ 137 target: target, 138 csMgr: &connectivityStateManager{}, 139 conns: make(map[*addrConn]struct{}), 140 dopts: defaultDialOptions(), 141 blockingpicker: newPickerWrapper(), 142 czData: new(channelzData), 143 firstResolveEvent: grpcsync.NewEvent(), 144 } 145 cc.retryThrottler.Store((*retryThrottler)(nil)) 146 cc.ctx, cc.cancel = context.WithCancel(context.Background()) 147 148 for _, opt := range opts { 149 opt.apply(&cc.dopts) 150 } 151 152 chainUnaryClientInterceptors(cc) 153 chainStreamClientInterceptors(cc) 154 155 defer func() { 156 if err != nil { 157 cc.Close() 158 } 159 }() 160 161 if channelz.IsOn() { 162 if cc.dopts.channelzParentID != 0 { 163 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target) 164 channelz.AddTraceEvent(logger, cc.channelzID, 0, &channelz.TraceEventDesc{ 165 Desc: "Channel Created", 166 Severity: channelz.CtInfo, 167 Parent: &channelz.TraceEventDesc{ 168 Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID), 169 Severity: channelz.CtInfo, 170 }, 171 }) 172 } else { 173 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target) 174 channelz.Info(logger, cc.channelzID, "Channel Created") 175 } 176 cc.csMgr.channelzID = cc.channelzID 177 } 178 179 if !cc.dopts.insecure { 180 if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil { 181 return nil, errNoTransportSecurity 182 } 183 if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil { 184 return nil, errTransportCredsAndBundle 185 } 186 } else { 187 if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil { 188 return nil, errCredentialsConflict 189 } 190 for _, cd := range cc.dopts.copts.PerRPCCredentials { 191 if cd.RequireTransportSecurity() { 192 return nil, errTransportCredentialsMissing 193 } 194 } 195 } 196 197 if cc.dopts.defaultServiceConfigRawJSON != nil { 198 scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON) 199 if scpr.Err != nil { 200 return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err) 201 } 202 cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig) 203 } 204 cc.mkp = cc.dopts.copts.KeepaliveParams 205 206 if cc.dopts.copts.UserAgent != "" { 207 cc.dopts.copts.UserAgent += " " + grpcUA 208 } else { 209 cc.dopts.copts.UserAgent = grpcUA 210 } 211 212 if cc.dopts.timeout > 0 { 213 var cancel context.CancelFunc 214 ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout) 215 defer cancel() 216 } 217 defer func() { 218 select { 219 case <-ctx.Done(): 220 switch { 221 case ctx.Err() == err: 222 conn = nil 223 case err == nil || !cc.dopts.returnLastError: 224 conn, err = nil, ctx.Err() 225 default: 226 conn, err = nil, fmt.Errorf("%v: %v", ctx.Err(), err) 227 } 228 default: 229 } 230 }() 231 232 scSet := false 233 if cc.dopts.scChan != nil { 234 // Try to get an initial service config. 235 select { 236 case sc, ok := <-cc.dopts.scChan: 237 if ok { 238 cc.sc = &sc 239 cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc}) 240 scSet = true 241 } 242 default: 243 } 244 } 245 if cc.dopts.bs == nil { 246 cc.dopts.bs = backoff.DefaultExponential 247 } 248 249 // Determine the resolver to use. 250 cc.parsedTarget = grpcutil.ParseTarget(cc.target, cc.dopts.copts.Dialer != nil) 251 channelz.Infof(logger, cc.channelzID, "parsed scheme: %q", cc.parsedTarget.Scheme) 252 resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme) 253 if resolverBuilder == nil { 254 // If resolver builder is still nil, the parsed target's scheme is 255 // not registered. Fallback to default resolver and set Endpoint to 256 // the original target. 257 channelz.Infof(logger, cc.channelzID, "scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme) 258 cc.parsedTarget = resolver.Target{ 259 Scheme: resolver.GetDefaultScheme(), 260 Endpoint: target, 261 } 262 resolverBuilder = cc.getResolver(cc.parsedTarget.Scheme) 263 if resolverBuilder == nil { 264 return nil, fmt.Errorf("could not get resolver for default scheme: %q", cc.parsedTarget.Scheme) 265 } 266 } 267 268 creds := cc.dopts.copts.TransportCredentials 269 if creds != nil && creds.Info().ServerName != "" { 270 cc.authority = creds.Info().ServerName 271 } else if cc.dopts.insecure && cc.dopts.authority != "" { 272 cc.authority = cc.dopts.authority 273 } else if strings.HasPrefix(cc.target, "unix:") { 274 cc.authority = "localhost" 275 } else if strings.HasPrefix(cc.parsedTarget.Endpoint, ":") { 276 cc.authority = "localhost" + cc.parsedTarget.Endpoint 277 } else { 278 // Use endpoint from "scheme://authority/endpoint" as the default 279 // authority for ClientConn. 280 cc.authority = cc.parsedTarget.Endpoint 281 } 282 283 if cc.dopts.scChan != nil && !scSet { 284 // Blocking wait for the initial service config. 285 select { 286 case sc, ok := <-cc.dopts.scChan: 287 if ok { 288 cc.sc = &sc 289 cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc}) 290 } 291 case <-ctx.Done(): 292 return nil, ctx.Err() 293 } 294 } 295 if cc.dopts.scChan != nil { 296 go cc.scWatcher() 297 } 298 299 var credsClone credentials.TransportCredentials 300 if creds := cc.dopts.copts.TransportCredentials; creds != nil { 301 credsClone = creds.Clone() 302 } 303 cc.balancerBuildOpts = balancer.BuildOptions{ 304 DialCreds: credsClone, 305 CredsBundle: cc.dopts.copts.CredsBundle, 306 Dialer: cc.dopts.copts.Dialer, 307 CustomUserAgent: cc.dopts.copts.UserAgent, 308 ChannelzParentID: cc.channelzID, 309 Target: cc.parsedTarget, 310 } 311 312 // Build the resolver. 313 rWrapper, err := newCCResolverWrapper(cc, resolverBuilder) 314 if err != nil { 315 return nil, fmt.Errorf("failed to build resolver: %v", err) 316 } 317 cc.mu.Lock() 318 cc.resolverWrapper = rWrapper 319 cc.mu.Unlock() 320 321 // A blocking dial blocks until the clientConn is ready. 322 if cc.dopts.block { 323 for { 324 s := cc.GetState() 325 if s == connectivity.Ready { 326 break 327 } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure { 328 if err = cc.connectionError(); err != nil { 329 terr, ok := err.(interface { 330 Temporary() bool 331 }) 332 if ok && !terr.Temporary() { 333 return nil, err 334 } 335 } 336 } 337 if !cc.WaitForStateChange(ctx, s) { 338 // ctx got timeout or canceled. 339 if err = cc.connectionError(); err != nil && cc.dopts.returnLastError { 340 return nil, err 341 } 342 return nil, ctx.Err() 343 } 344 } 345 } 346 347 return cc, nil 348} 349 350// chainUnaryClientInterceptors chains all unary client interceptors into one. 351func chainUnaryClientInterceptors(cc *ClientConn) { 352 interceptors := cc.dopts.chainUnaryInts 353 // Prepend dopts.unaryInt to the chaining interceptors if it exists, since unaryInt will 354 // be executed before any other chained interceptors. 355 if cc.dopts.unaryInt != nil { 356 interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...) 357 } 358 var chainedInt UnaryClientInterceptor 359 if len(interceptors) == 0 { 360 chainedInt = nil 361 } else if len(interceptors) == 1 { 362 chainedInt = interceptors[0] 363 } else { 364 chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error { 365 return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...) 366 } 367 } 368 cc.dopts.unaryInt = chainedInt 369} 370 371// getChainUnaryInvoker recursively generate the chained unary invoker. 372func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, finalInvoker UnaryInvoker) UnaryInvoker { 373 if curr == len(interceptors)-1 { 374 return finalInvoker 375 } 376 return func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error { 377 return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...) 378 } 379} 380 381// chainStreamClientInterceptors chains all stream client interceptors into one. 382func chainStreamClientInterceptors(cc *ClientConn) { 383 interceptors := cc.dopts.chainStreamInts 384 // Prepend dopts.streamInt to the chaining interceptors if it exists, since streamInt will 385 // be executed before any other chained interceptors. 386 if cc.dopts.streamInt != nil { 387 interceptors = append([]StreamClientInterceptor{cc.dopts.streamInt}, interceptors...) 388 } 389 var chainedInt StreamClientInterceptor 390 if len(interceptors) == 0 { 391 chainedInt = nil 392 } else if len(interceptors) == 1 { 393 chainedInt = interceptors[0] 394 } else { 395 chainedInt = func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error) { 396 return interceptors[0](ctx, desc, cc, method, getChainStreamer(interceptors, 0, streamer), opts...) 397 } 398 } 399 cc.dopts.streamInt = chainedInt 400} 401 402// getChainStreamer recursively generate the chained client stream constructor. 403func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStreamer Streamer) Streamer { 404 if curr == len(interceptors)-1 { 405 return finalStreamer 406 } 407 return func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) { 408 return interceptors[curr+1](ctx, desc, cc, method, getChainStreamer(interceptors, curr+1, finalStreamer), opts...) 409 } 410} 411 412// connectivityStateManager keeps the connectivity.State of ClientConn. 413// This struct will eventually be exported so the balancers can access it. 414type connectivityStateManager struct { 415 mu sync.Mutex 416 state connectivity.State 417 notifyChan chan struct{} 418 channelzID int64 419} 420 421// updateState updates the connectivity.State of ClientConn. 422// If there's a change it notifies goroutines waiting on state change to 423// happen. 424func (csm *connectivityStateManager) updateState(state connectivity.State) { 425 csm.mu.Lock() 426 defer csm.mu.Unlock() 427 if csm.state == connectivity.Shutdown { 428 return 429 } 430 if csm.state == state { 431 return 432 } 433 csm.state = state 434 channelz.Infof(logger, csm.channelzID, "Channel Connectivity change to %v", state) 435 if csm.notifyChan != nil { 436 // There are other goroutines waiting on this channel. 437 close(csm.notifyChan) 438 csm.notifyChan = nil 439 } 440} 441 442func (csm *connectivityStateManager) getState() connectivity.State { 443 csm.mu.Lock() 444 defer csm.mu.Unlock() 445 return csm.state 446} 447 448func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} { 449 csm.mu.Lock() 450 defer csm.mu.Unlock() 451 if csm.notifyChan == nil { 452 csm.notifyChan = make(chan struct{}) 453 } 454 return csm.notifyChan 455} 456 457// ClientConnInterface defines the functions clients need to perform unary and 458// streaming RPCs. It is implemented by *ClientConn, and is only intended to 459// be referenced by generated code. 460type ClientConnInterface interface { 461 // Invoke performs a unary RPC and returns after the response is received 462 // into reply. 463 Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error 464 // NewStream begins a streaming RPC. 465 NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) 466} 467 468// Assert *ClientConn implements ClientConnInterface. 469var _ ClientConnInterface = (*ClientConn)(nil) 470 471// ClientConn represents a virtual connection to a conceptual endpoint, to 472// perform RPCs. 473// 474// A ClientConn is free to have zero or more actual connections to the endpoint 475// based on configuration, load, etc. It is also free to determine which actual 476// endpoints to use and may change it every RPC, permitting client-side load 477// balancing. 478// 479// A ClientConn encapsulates a range of functionality including name 480// resolution, TCP connection establishment (with retries and backoff) and TLS 481// handshakes. It also handles errors on established connections by 482// re-resolving the name and reconnecting. 483type ClientConn struct { 484 ctx context.Context 485 cancel context.CancelFunc 486 487 target string 488 parsedTarget resolver.Target 489 authority string 490 dopts dialOptions 491 csMgr *connectivityStateManager 492 493 balancerBuildOpts balancer.BuildOptions 494 blockingpicker *pickerWrapper 495 496 safeConfigSelector iresolver.SafeConfigSelector 497 498 mu sync.RWMutex 499 resolverWrapper *ccResolverWrapper 500 sc *ServiceConfig 501 conns map[*addrConn]struct{} 502 // Keepalive parameter can be updated if a GoAway is received. 503 mkp keepalive.ClientParameters 504 curBalancerName string 505 balancerWrapper *ccBalancerWrapper 506 retryThrottler atomic.Value 507 508 firstResolveEvent *grpcsync.Event 509 510 channelzID int64 // channelz unique identification number 511 czData *channelzData 512 513 lceMu sync.Mutex // protects lastConnectionError 514 lastConnectionError error 515} 516 517// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or 518// ctx expires. A true value is returned in former case and false in latter. 519// 520// Experimental 521// 522// Notice: This API is EXPERIMENTAL and may be changed or removed in a 523// later release. 524func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool { 525 ch := cc.csMgr.getNotifyChan() 526 if cc.csMgr.getState() != sourceState { 527 return true 528 } 529 select { 530 case <-ctx.Done(): 531 return false 532 case <-ch: 533 return true 534 } 535} 536 537// GetState returns the connectivity.State of ClientConn. 538// 539// Experimental 540// 541// Notice: This API is EXPERIMENTAL and may be changed or removed in a 542// later release. 543func (cc *ClientConn) GetState() connectivity.State { 544 return cc.csMgr.getState() 545} 546 547func (cc *ClientConn) scWatcher() { 548 for { 549 select { 550 case sc, ok := <-cc.dopts.scChan: 551 if !ok { 552 return 553 } 554 cc.mu.Lock() 555 // TODO: load balance policy runtime change is ignored. 556 // We may revisit this decision in the future. 557 cc.sc = &sc 558 cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc}) 559 cc.mu.Unlock() 560 case <-cc.ctx.Done(): 561 return 562 } 563 } 564} 565 566// waitForResolvedAddrs blocks until the resolver has provided addresses or the 567// context expires. Returns nil unless the context expires first; otherwise 568// returns a status error based on the context. 569func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error { 570 // This is on the RPC path, so we use a fast path to avoid the 571 // more-expensive "select" below after the resolver has returned once. 572 if cc.firstResolveEvent.HasFired() { 573 return nil 574 } 575 select { 576 case <-cc.firstResolveEvent.Done(): 577 return nil 578 case <-ctx.Done(): 579 return status.FromContextError(ctx.Err()).Err() 580 case <-cc.ctx.Done(): 581 return ErrClientConnClosing 582 } 583} 584 585var emptyServiceConfig *ServiceConfig 586 587func init() { 588 cfg := parseServiceConfig("{}") 589 if cfg.Err != nil { 590 panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err)) 591 } 592 emptyServiceConfig = cfg.Config.(*ServiceConfig) 593} 594 595func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) { 596 if cc.sc != nil { 597 cc.applyServiceConfigAndBalancer(cc.sc, nil, addrs) 598 return 599 } 600 if cc.dopts.defaultServiceConfig != nil { 601 cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, &defaultConfigSelector{cc.dopts.defaultServiceConfig}, addrs) 602 } else { 603 cc.applyServiceConfigAndBalancer(emptyServiceConfig, &defaultConfigSelector{emptyServiceConfig}, addrs) 604 } 605} 606 607func (cc *ClientConn) updateResolverState(s resolver.State, err error) error { 608 defer cc.firstResolveEvent.Fire() 609 cc.mu.Lock() 610 // Check if the ClientConn is already closed. Some fields (e.g. 611 // balancerWrapper) are set to nil when closing the ClientConn, and could 612 // cause nil pointer panic if we don't have this check. 613 if cc.conns == nil { 614 cc.mu.Unlock() 615 return nil 616 } 617 618 if err != nil { 619 // May need to apply the initial service config in case the resolver 620 // doesn't support service configs, or doesn't provide a service config 621 // with the new addresses. 622 cc.maybeApplyDefaultServiceConfig(nil) 623 624 if cc.balancerWrapper != nil { 625 cc.balancerWrapper.resolverError(err) 626 } 627 628 // No addresses are valid with err set; return early. 629 cc.mu.Unlock() 630 return balancer.ErrBadResolverState 631 } 632 633 var ret error 634 if cc.dopts.disableServiceConfig || s.ServiceConfig == nil { 635 cc.maybeApplyDefaultServiceConfig(s.Addresses) 636 // TODO: do we need to apply a failing LB policy if there is no 637 // default, per the error handling design? 638 } else { 639 if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok { 640 configSelector := iresolver.GetConfigSelector(s) 641 if configSelector != nil { 642 if len(s.ServiceConfig.Config.(*ServiceConfig).Methods) != 0 { 643 channelz.Infof(logger, cc.channelzID, "method configs in service config will be ignored due to presence of config selector") 644 } 645 } else { 646 configSelector = &defaultConfigSelector{sc} 647 } 648 cc.applyServiceConfigAndBalancer(sc, configSelector, s.Addresses) 649 } else { 650 ret = balancer.ErrBadResolverState 651 if cc.balancerWrapper == nil { 652 var err error 653 if s.ServiceConfig.Err != nil { 654 err = status.Errorf(codes.Unavailable, "error parsing service config: %v", s.ServiceConfig.Err) 655 } else { 656 err = status.Errorf(codes.Unavailable, "illegal service config type: %T", s.ServiceConfig.Config) 657 } 658 cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{cc.sc}) 659 cc.blockingpicker.updatePicker(base.NewErrPicker(err)) 660 cc.csMgr.updateState(connectivity.TransientFailure) 661 cc.mu.Unlock() 662 return ret 663 } 664 } 665 } 666 667 var balCfg serviceconfig.LoadBalancingConfig 668 if cc.dopts.balancerBuilder == nil && cc.sc != nil && cc.sc.lbConfig != nil { 669 balCfg = cc.sc.lbConfig.cfg 670 } 671 672 cbn := cc.curBalancerName 673 bw := cc.balancerWrapper 674 cc.mu.Unlock() 675 if cbn != grpclbName { 676 // Filter any grpclb addresses since we don't have the grpclb balancer. 677 for i := 0; i < len(s.Addresses); { 678 if s.Addresses[i].Type == resolver.GRPCLB { 679 copy(s.Addresses[i:], s.Addresses[i+1:]) 680 s.Addresses = s.Addresses[:len(s.Addresses)-1] 681 continue 682 } 683 i++ 684 } 685 } 686 uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg}) 687 if ret == nil { 688 ret = uccsErr // prefer ErrBadResolver state since any other error is 689 // currently meaningless to the caller. 690 } 691 return ret 692} 693 694// switchBalancer starts the switching from current balancer to the balancer 695// with the given name. 696// 697// It will NOT send the current address list to the new balancer. If needed, 698// caller of this function should send address list to the new balancer after 699// this function returns. 700// 701// Caller must hold cc.mu. 702func (cc *ClientConn) switchBalancer(name string) { 703 if strings.EqualFold(cc.curBalancerName, name) { 704 return 705 } 706 707 channelz.Infof(logger, cc.channelzID, "ClientConn switching balancer to %q", name) 708 if cc.dopts.balancerBuilder != nil { 709 channelz.Info(logger, cc.channelzID, "ignoring balancer switching: Balancer DialOption used instead") 710 return 711 } 712 if cc.balancerWrapper != nil { 713 cc.balancerWrapper.close() 714 } 715 716 builder := balancer.Get(name) 717 if builder == nil { 718 channelz.Warningf(logger, cc.channelzID, "Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName) 719 channelz.Infof(logger, cc.channelzID, "failed to get balancer builder for: %v, using pick_first instead", name) 720 builder = newPickfirstBuilder() 721 } else { 722 channelz.Infof(logger, cc.channelzID, "Channel switches to new LB policy %q", name) 723 } 724 725 cc.curBalancerName = builder.Name() 726 cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts) 727} 728 729func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) { 730 cc.mu.Lock() 731 if cc.conns == nil { 732 cc.mu.Unlock() 733 return 734 } 735 // TODO(bar switching) send updates to all balancer wrappers when balancer 736 // gracefully switching is supported. 737 cc.balancerWrapper.handleSubConnStateChange(sc, s, err) 738 cc.mu.Unlock() 739} 740 741// newAddrConn creates an addrConn for addrs and adds it to cc.conns. 742// 743// Caller needs to make sure len(addrs) > 0. 744func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) { 745 ac := &addrConn{ 746 state: connectivity.Idle, 747 cc: cc, 748 addrs: addrs, 749 scopts: opts, 750 dopts: cc.dopts, 751 czData: new(channelzData), 752 resetBackoff: make(chan struct{}), 753 } 754 ac.ctx, ac.cancel = context.WithCancel(cc.ctx) 755 // Track ac in cc. This needs to be done before any getTransport(...) is called. 756 cc.mu.Lock() 757 if cc.conns == nil { 758 cc.mu.Unlock() 759 return nil, ErrClientConnClosing 760 } 761 if channelz.IsOn() { 762 ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "") 763 channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{ 764 Desc: "Subchannel Created", 765 Severity: channelz.CtInfo, 766 Parent: &channelz.TraceEventDesc{ 767 Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID), 768 Severity: channelz.CtInfo, 769 }, 770 }) 771 } 772 cc.conns[ac] = struct{}{} 773 cc.mu.Unlock() 774 return ac, nil 775} 776 777// removeAddrConn removes the addrConn in the subConn from clientConn. 778// It also tears down the ac with the given error. 779func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) { 780 cc.mu.Lock() 781 if cc.conns == nil { 782 cc.mu.Unlock() 783 return 784 } 785 delete(cc.conns, ac) 786 cc.mu.Unlock() 787 ac.tearDown(err) 788} 789 790func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric { 791 return &channelz.ChannelInternalMetric{ 792 State: cc.GetState(), 793 Target: cc.target, 794 CallsStarted: atomic.LoadInt64(&cc.czData.callsStarted), 795 CallsSucceeded: atomic.LoadInt64(&cc.czData.callsSucceeded), 796 CallsFailed: atomic.LoadInt64(&cc.czData.callsFailed), 797 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)), 798 } 799} 800 801// Target returns the target string of the ClientConn. 802// 803// Experimental 804// 805// Notice: This API is EXPERIMENTAL and may be changed or removed in a 806// later release. 807func (cc *ClientConn) Target() string { 808 return cc.target 809} 810 811func (cc *ClientConn) incrCallsStarted() { 812 atomic.AddInt64(&cc.czData.callsStarted, 1) 813 atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano()) 814} 815 816func (cc *ClientConn) incrCallsSucceeded() { 817 atomic.AddInt64(&cc.czData.callsSucceeded, 1) 818} 819 820func (cc *ClientConn) incrCallsFailed() { 821 atomic.AddInt64(&cc.czData.callsFailed, 1) 822} 823 824// connect starts creating a transport. 825// It does nothing if the ac is not IDLE. 826// TODO(bar) Move this to the addrConn section. 827func (ac *addrConn) connect() error { 828 ac.mu.Lock() 829 if ac.state == connectivity.Shutdown { 830 ac.mu.Unlock() 831 return errConnClosing 832 } 833 if ac.state != connectivity.Idle { 834 ac.mu.Unlock() 835 return nil 836 } 837 // Update connectivity state within the lock to prevent subsequent or 838 // concurrent calls from resetting the transport more than once. 839 ac.updateConnectivityState(connectivity.Connecting, nil) 840 ac.mu.Unlock() 841 842 // Start a goroutine connecting to the server asynchronously. 843 go ac.resetTransport() 844 return nil 845} 846 847// tryUpdateAddrs tries to update ac.addrs with the new addresses list. 848// 849// If ac is Connecting, it returns false. The caller should tear down the ac and 850// create a new one. Note that the backoff will be reset when this happens. 851// 852// If ac is TransientFailure, it updates ac.addrs and returns true. The updated 853// addresses will be picked up by retry in the next iteration after backoff. 854// 855// If ac is Shutdown or Idle, it updates ac.addrs and returns true. 856// 857// If ac is Ready, it checks whether current connected address of ac is in the 858// new addrs list. 859// - If true, it updates ac.addrs and returns true. The ac will keep using 860// the existing connection. 861// - If false, it does nothing and returns false. 862func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool { 863 ac.mu.Lock() 864 defer ac.mu.Unlock() 865 channelz.Infof(logger, ac.channelzID, "addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs) 866 if ac.state == connectivity.Shutdown || 867 ac.state == connectivity.TransientFailure || 868 ac.state == connectivity.Idle { 869 ac.addrs = addrs 870 return true 871 } 872 873 if ac.state == connectivity.Connecting { 874 return false 875 } 876 877 // ac.state is Ready, try to find the connected address. 878 var curAddrFound bool 879 for _, a := range addrs { 880 if reflect.DeepEqual(ac.curAddr, a) { 881 curAddrFound = true 882 break 883 } 884 } 885 channelz.Infof(logger, ac.channelzID, "addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound) 886 if curAddrFound { 887 ac.addrs = addrs 888 } 889 890 return curAddrFound 891} 892 893func getMethodConfig(sc *ServiceConfig, method string) MethodConfig { 894 if sc == nil { 895 return MethodConfig{} 896 } 897 if m, ok := sc.Methods[method]; ok { 898 return m 899 } 900 i := strings.LastIndex(method, "/") 901 if m, ok := sc.Methods[method[:i+1]]; ok { 902 return m 903 } 904 return sc.Methods[""] 905} 906 907// GetMethodConfig gets the method config of the input method. 908// If there's an exact match for input method (i.e. /service/method), we return 909// the corresponding MethodConfig. 910// If there isn't an exact match for the input method, we look for the service's default 911// config under the service (i.e /service/) and then for the default for all services (empty string). 912// 913// If there is a default MethodConfig for the service, we return it. 914// Otherwise, we return an empty MethodConfig. 915func (cc *ClientConn) GetMethodConfig(method string) MethodConfig { 916 // TODO: Avoid the locking here. 917 cc.mu.RLock() 918 defer cc.mu.RUnlock() 919 return getMethodConfig(cc.sc, method) 920} 921 922func (cc *ClientConn) healthCheckConfig() *healthCheckConfig { 923 cc.mu.RLock() 924 defer cc.mu.RUnlock() 925 if cc.sc == nil { 926 return nil 927 } 928 return cc.sc.healthCheckConfig 929} 930 931func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) { 932 t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{ 933 Ctx: ctx, 934 FullMethodName: method, 935 }) 936 if err != nil { 937 return nil, nil, toRPCErr(err) 938 } 939 return t, done, nil 940} 941 942func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector, addrs []resolver.Address) { 943 if sc == nil { 944 // should never reach here. 945 return 946 } 947 cc.sc = sc 948 if configSelector != nil { 949 cc.safeConfigSelector.UpdateConfigSelector(configSelector) 950 } 951 952 if cc.sc.retryThrottling != nil { 953 newThrottler := &retryThrottler{ 954 tokens: cc.sc.retryThrottling.MaxTokens, 955 max: cc.sc.retryThrottling.MaxTokens, 956 thresh: cc.sc.retryThrottling.MaxTokens / 2, 957 ratio: cc.sc.retryThrottling.TokenRatio, 958 } 959 cc.retryThrottler.Store(newThrottler) 960 } else { 961 cc.retryThrottler.Store((*retryThrottler)(nil)) 962 } 963 964 if cc.dopts.balancerBuilder == nil { 965 // Only look at balancer types and switch balancer if balancer dial 966 // option is not set. 967 var newBalancerName string 968 if cc.sc != nil && cc.sc.lbConfig != nil { 969 newBalancerName = cc.sc.lbConfig.name 970 } else { 971 var isGRPCLB bool 972 for _, a := range addrs { 973 if a.Type == resolver.GRPCLB { 974 isGRPCLB = true 975 break 976 } 977 } 978 if isGRPCLB { 979 newBalancerName = grpclbName 980 } else if cc.sc != nil && cc.sc.LB != nil { 981 newBalancerName = *cc.sc.LB 982 } else { 983 newBalancerName = PickFirstBalancerName 984 } 985 } 986 cc.switchBalancer(newBalancerName) 987 } else if cc.balancerWrapper == nil { 988 // Balancer dial option was set, and this is the first time handling 989 // resolved addresses. Build a balancer with dopts.balancerBuilder. 990 cc.curBalancerName = cc.dopts.balancerBuilder.Name() 991 cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts) 992 } 993} 994 995func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) { 996 cc.mu.RLock() 997 r := cc.resolverWrapper 998 cc.mu.RUnlock() 999 if r == nil { 1000 return 1001 } 1002 go r.resolveNow(o) 1003} 1004 1005// ResetConnectBackoff wakes up all subchannels in transient failure and causes 1006// them to attempt another connection immediately. It also resets the backoff 1007// times used for subsequent attempts regardless of the current state. 1008// 1009// In general, this function should not be used. Typical service or network 1010// outages result in a reasonable client reconnection strategy by default. 1011// However, if a previously unavailable network becomes available, this may be 1012// used to trigger an immediate reconnect. 1013// 1014// Experimental 1015// 1016// Notice: This API is EXPERIMENTAL and may be changed or removed in a 1017// later release. 1018func (cc *ClientConn) ResetConnectBackoff() { 1019 cc.mu.Lock() 1020 conns := cc.conns 1021 cc.mu.Unlock() 1022 for ac := range conns { 1023 ac.resetConnectBackoff() 1024 } 1025} 1026 1027// Close tears down the ClientConn and all underlying connections. 1028func (cc *ClientConn) Close() error { 1029 defer cc.cancel() 1030 1031 cc.mu.Lock() 1032 if cc.conns == nil { 1033 cc.mu.Unlock() 1034 return ErrClientConnClosing 1035 } 1036 conns := cc.conns 1037 cc.conns = nil 1038 cc.csMgr.updateState(connectivity.Shutdown) 1039 1040 rWrapper := cc.resolverWrapper 1041 cc.resolverWrapper = nil 1042 bWrapper := cc.balancerWrapper 1043 cc.balancerWrapper = nil 1044 cc.mu.Unlock() 1045 1046 cc.blockingpicker.close() 1047 1048 if rWrapper != nil { 1049 rWrapper.close() 1050 } 1051 if bWrapper != nil { 1052 bWrapper.close() 1053 } 1054 1055 for ac := range conns { 1056 ac.tearDown(ErrClientConnClosing) 1057 } 1058 if channelz.IsOn() { 1059 ted := &channelz.TraceEventDesc{ 1060 Desc: "Channel Deleted", 1061 Severity: channelz.CtInfo, 1062 } 1063 if cc.dopts.channelzParentID != 0 { 1064 ted.Parent = &channelz.TraceEventDesc{ 1065 Desc: fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID), 1066 Severity: channelz.CtInfo, 1067 } 1068 } 1069 channelz.AddTraceEvent(logger, cc.channelzID, 0, ted) 1070 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to 1071 // the entity being deleted, and thus prevent it from being deleted right away. 1072 channelz.RemoveEntry(cc.channelzID) 1073 } 1074 return nil 1075} 1076 1077// addrConn is a network connection to a given address. 1078type addrConn struct { 1079 ctx context.Context 1080 cancel context.CancelFunc 1081 1082 cc *ClientConn 1083 dopts dialOptions 1084 acbw balancer.SubConn 1085 scopts balancer.NewSubConnOptions 1086 1087 // transport is set when there's a viable transport (note: ac state may not be READY as LB channel 1088 // health checking may require server to report healthy to set ac to READY), and is reset 1089 // to nil when the current transport should no longer be used to create a stream (e.g. after GoAway 1090 // is received, transport is closed, ac has been torn down). 1091 transport transport.ClientTransport // The current transport. 1092 1093 mu sync.Mutex 1094 curAddr resolver.Address // The current address. 1095 addrs []resolver.Address // All addresses that the resolver resolved to. 1096 1097 // Use updateConnectivityState for updating addrConn's connectivity state. 1098 state connectivity.State 1099 1100 backoffIdx int // Needs to be stateful for resetConnectBackoff. 1101 resetBackoff chan struct{} 1102 1103 channelzID int64 // channelz unique identification number. 1104 czData *channelzData 1105} 1106 1107// Note: this requires a lock on ac.mu. 1108func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) { 1109 if ac.state == s { 1110 return 1111 } 1112 ac.state = s 1113 channelz.Infof(logger, ac.channelzID, "Subchannel Connectivity change to %v", s) 1114 ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr) 1115} 1116 1117// adjustParams updates parameters used to create transports upon 1118// receiving a GoAway. 1119func (ac *addrConn) adjustParams(r transport.GoAwayReason) { 1120 switch r { 1121 case transport.GoAwayTooManyPings: 1122 v := 2 * ac.dopts.copts.KeepaliveParams.Time 1123 ac.cc.mu.Lock() 1124 if v > ac.cc.mkp.Time { 1125 ac.cc.mkp.Time = v 1126 } 1127 ac.cc.mu.Unlock() 1128 } 1129} 1130 1131func (ac *addrConn) resetTransport() { 1132 for i := 0; ; i++ { 1133 if i > 0 { 1134 ac.cc.resolveNow(resolver.ResolveNowOptions{}) 1135 } 1136 1137 ac.mu.Lock() 1138 if ac.state == connectivity.Shutdown { 1139 ac.mu.Unlock() 1140 return 1141 } 1142 1143 addrs := ac.addrs 1144 backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx) 1145 // This will be the duration that dial gets to finish. 1146 dialDuration := minConnectTimeout 1147 if ac.dopts.minConnectTimeout != nil { 1148 dialDuration = ac.dopts.minConnectTimeout() 1149 } 1150 1151 if dialDuration < backoffFor { 1152 // Give dial more time as we keep failing to connect. 1153 dialDuration = backoffFor 1154 } 1155 // We can potentially spend all the time trying the first address, and 1156 // if the server accepts the connection and then hangs, the following 1157 // addresses will never be tried. 1158 // 1159 // The spec doesn't mention what should be done for multiple addresses. 1160 // https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm 1161 connectDeadline := time.Now().Add(dialDuration) 1162 1163 ac.updateConnectivityState(connectivity.Connecting, nil) 1164 ac.transport = nil 1165 ac.mu.Unlock() 1166 1167 newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline) 1168 if err != nil { 1169 // After exhausting all addresses, the addrConn enters 1170 // TRANSIENT_FAILURE. 1171 ac.mu.Lock() 1172 if ac.state == connectivity.Shutdown { 1173 ac.mu.Unlock() 1174 return 1175 } 1176 ac.updateConnectivityState(connectivity.TransientFailure, err) 1177 1178 // Backoff. 1179 b := ac.resetBackoff 1180 ac.mu.Unlock() 1181 1182 timer := time.NewTimer(backoffFor) 1183 select { 1184 case <-timer.C: 1185 ac.mu.Lock() 1186 ac.backoffIdx++ 1187 ac.mu.Unlock() 1188 case <-b: 1189 timer.Stop() 1190 case <-ac.ctx.Done(): 1191 timer.Stop() 1192 return 1193 } 1194 continue 1195 } 1196 1197 ac.mu.Lock() 1198 if ac.state == connectivity.Shutdown { 1199 ac.mu.Unlock() 1200 newTr.Close() 1201 return 1202 } 1203 ac.curAddr = addr 1204 ac.transport = newTr 1205 ac.backoffIdx = 0 1206 1207 hctx, hcancel := context.WithCancel(ac.ctx) 1208 ac.startHealthCheck(hctx) 1209 ac.mu.Unlock() 1210 1211 // Block until the created transport is down. And when this happens, 1212 // we restart from the top of the addr list. 1213 <-reconnect.Done() 1214 hcancel() 1215 // restart connecting - the top of the loop will set state to 1216 // CONNECTING. This is against the current connectivity semantics doc, 1217 // however it allows for graceful behavior for RPCs not yet dispatched 1218 // - unfortunate timing would otherwise lead to the RPC failing even 1219 // though the TRANSIENT_FAILURE state (called for by the doc) would be 1220 // instantaneous. 1221 // 1222 // Ideally we should transition to Idle here and block until there is 1223 // RPC activity that leads to the balancer requesting a reconnect of 1224 // the associated SubConn. 1225 } 1226} 1227 1228// tryAllAddrs tries to creates a connection to the addresses, and stop when at the 1229// first successful one. It returns the transport, the address and a Event in 1230// the successful case. The Event fires when the returned transport disconnects. 1231func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) (transport.ClientTransport, resolver.Address, *grpcsync.Event, error) { 1232 var firstConnErr error 1233 for _, addr := range addrs { 1234 ac.mu.Lock() 1235 if ac.state == connectivity.Shutdown { 1236 ac.mu.Unlock() 1237 return nil, resolver.Address{}, nil, errConnClosing 1238 } 1239 1240 ac.cc.mu.RLock() 1241 ac.dopts.copts.KeepaliveParams = ac.cc.mkp 1242 ac.cc.mu.RUnlock() 1243 1244 copts := ac.dopts.copts 1245 if ac.scopts.CredsBundle != nil { 1246 copts.CredsBundle = ac.scopts.CredsBundle 1247 } 1248 ac.mu.Unlock() 1249 1250 channelz.Infof(logger, ac.channelzID, "Subchannel picks a new address %q to connect", addr.Addr) 1251 1252 newTr, reconnect, err := ac.createTransport(addr, copts, connectDeadline) 1253 if err == nil { 1254 return newTr, addr, reconnect, nil 1255 } 1256 if firstConnErr == nil { 1257 firstConnErr = err 1258 } 1259 ac.cc.updateConnectionError(err) 1260 } 1261 1262 // Couldn't connect to any address. 1263 return nil, resolver.Address{}, nil, firstConnErr 1264} 1265 1266// createTransport creates a connection to addr. It returns the transport and a 1267// Event in the successful case. The Event fires when the returned transport 1268// disconnects. 1269func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) (transport.ClientTransport, *grpcsync.Event, error) { 1270 prefaceReceived := make(chan struct{}) 1271 onCloseCalled := make(chan struct{}) 1272 reconnect := grpcsync.NewEvent() 1273 1274 // addr.ServerName takes precedent over ClientConn authority, if present. 1275 if addr.ServerName == "" { 1276 addr.ServerName = ac.cc.authority 1277 } 1278 1279 once := sync.Once{} 1280 onGoAway := func(r transport.GoAwayReason) { 1281 ac.mu.Lock() 1282 ac.adjustParams(r) 1283 once.Do(func() { 1284 if ac.state == connectivity.Ready { 1285 // Prevent this SubConn from being used for new RPCs by setting its 1286 // state to Connecting. 1287 // 1288 // TODO: this should be Idle when grpc-go properly supports it. 1289 ac.updateConnectivityState(connectivity.Connecting, nil) 1290 } 1291 }) 1292 ac.mu.Unlock() 1293 reconnect.Fire() 1294 } 1295 1296 onClose := func() { 1297 ac.mu.Lock() 1298 once.Do(func() { 1299 if ac.state == connectivity.Ready { 1300 // Prevent this SubConn from being used for new RPCs by setting its 1301 // state to Connecting. 1302 // 1303 // TODO: this should be Idle when grpc-go properly supports it. 1304 ac.updateConnectivityState(connectivity.Connecting, nil) 1305 } 1306 }) 1307 ac.mu.Unlock() 1308 close(onCloseCalled) 1309 reconnect.Fire() 1310 } 1311 1312 onPrefaceReceipt := func() { 1313 close(prefaceReceived) 1314 } 1315 1316 connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline) 1317 defer cancel() 1318 if channelz.IsOn() { 1319 copts.ChannelzParentID = ac.channelzID 1320 } 1321 1322 newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, onPrefaceReceipt, onGoAway, onClose) 1323 if err != nil { 1324 // newTr is either nil, or closed. 1325 channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v. Err: %v. Reconnecting...", addr, err) 1326 return nil, nil, err 1327 } 1328 1329 select { 1330 case <-time.After(time.Until(connectDeadline)): 1331 // We didn't get the preface in time. 1332 newTr.Close() 1333 channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr) 1334 return nil, nil, errors.New("timed out waiting for server handshake") 1335 case <-prefaceReceived: 1336 // We got the preface - huzzah! things are good. 1337 case <-onCloseCalled: 1338 // The transport has already closed - noop. 1339 return nil, nil, errors.New("connection closed") 1340 // TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix. 1341 } 1342 return newTr, reconnect, nil 1343} 1344 1345// startHealthCheck starts the health checking stream (RPC) to watch the health 1346// stats of this connection if health checking is requested and configured. 1347// 1348// LB channel health checking is enabled when all requirements below are met: 1349// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption 1350// 2. internal.HealthCheckFunc is set by importing the grpc/health package 1351// 3. a service config with non-empty healthCheckConfig field is provided 1352// 4. the load balancer requests it 1353// 1354// It sets addrConn to READY if the health checking stream is not started. 1355// 1356// Caller must hold ac.mu. 1357func (ac *addrConn) startHealthCheck(ctx context.Context) { 1358 var healthcheckManagingState bool 1359 defer func() { 1360 if !healthcheckManagingState { 1361 ac.updateConnectivityState(connectivity.Ready, nil) 1362 } 1363 }() 1364 1365 if ac.cc.dopts.disableHealthCheck { 1366 return 1367 } 1368 healthCheckConfig := ac.cc.healthCheckConfig() 1369 if healthCheckConfig == nil { 1370 return 1371 } 1372 if !ac.scopts.HealthCheckEnabled { 1373 return 1374 } 1375 healthCheckFunc := ac.cc.dopts.healthCheckFunc 1376 if healthCheckFunc == nil { 1377 // The health package is not imported to set health check function. 1378 // 1379 // TODO: add a link to the health check doc in the error message. 1380 channelz.Error(logger, ac.channelzID, "Health check is requested but health check function is not set.") 1381 return 1382 } 1383 1384 healthcheckManagingState = true 1385 1386 // Set up the health check helper functions. 1387 currentTr := ac.transport 1388 newStream := func(method string) (interface{}, error) { 1389 ac.mu.Lock() 1390 if ac.transport != currentTr { 1391 ac.mu.Unlock() 1392 return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use") 1393 } 1394 ac.mu.Unlock() 1395 return newNonRetryClientStream(ctx, &StreamDesc{ServerStreams: true}, method, currentTr, ac) 1396 } 1397 setConnectivityState := func(s connectivity.State, lastErr error) { 1398 ac.mu.Lock() 1399 defer ac.mu.Unlock() 1400 if ac.transport != currentTr { 1401 return 1402 } 1403 ac.updateConnectivityState(s, lastErr) 1404 } 1405 // Start the health checking stream. 1406 go func() { 1407 err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName) 1408 if err != nil { 1409 if status.Code(err) == codes.Unimplemented { 1410 channelz.Error(logger, ac.channelzID, "Subchannel health check is unimplemented at server side, thus health check is disabled") 1411 } else { 1412 channelz.Errorf(logger, ac.channelzID, "HealthCheckFunc exits with unexpected error %v", err) 1413 } 1414 } 1415 }() 1416} 1417 1418func (ac *addrConn) resetConnectBackoff() { 1419 ac.mu.Lock() 1420 close(ac.resetBackoff) 1421 ac.backoffIdx = 0 1422 ac.resetBackoff = make(chan struct{}) 1423 ac.mu.Unlock() 1424} 1425 1426// getReadyTransport returns the transport if ac's state is READY. 1427// Otherwise it returns nil, false. 1428// If ac's state is IDLE, it will trigger ac to connect. 1429func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) { 1430 ac.mu.Lock() 1431 if ac.state == connectivity.Ready && ac.transport != nil { 1432 t := ac.transport 1433 ac.mu.Unlock() 1434 return t, true 1435 } 1436 var idle bool 1437 if ac.state == connectivity.Idle { 1438 idle = true 1439 } 1440 ac.mu.Unlock() 1441 // Trigger idle ac to connect. 1442 if idle { 1443 ac.connect() 1444 } 1445 return nil, false 1446} 1447 1448// tearDown starts to tear down the addrConn. 1449// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in 1450// some edge cases (e.g., the caller opens and closes many addrConn's in a 1451// tight loop. 1452// tearDown doesn't remove ac from ac.cc.conns. 1453func (ac *addrConn) tearDown(err error) { 1454 ac.mu.Lock() 1455 if ac.state == connectivity.Shutdown { 1456 ac.mu.Unlock() 1457 return 1458 } 1459 curTr := ac.transport 1460 ac.transport = nil 1461 // We have to set the state to Shutdown before anything else to prevent races 1462 // between setting the state and logic that waits on context cancellation / etc. 1463 ac.updateConnectivityState(connectivity.Shutdown, nil) 1464 ac.cancel() 1465 ac.curAddr = resolver.Address{} 1466 if err == errConnDrain && curTr != nil { 1467 // GracefulClose(...) may be executed multiple times when 1468 // i) receiving multiple GoAway frames from the server; or 1469 // ii) there are concurrent name resolver/Balancer triggered 1470 // address removal and GoAway. 1471 // We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu. 1472 ac.mu.Unlock() 1473 curTr.GracefulClose() 1474 ac.mu.Lock() 1475 } 1476 if channelz.IsOn() { 1477 channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{ 1478 Desc: "Subchannel Deleted", 1479 Severity: channelz.CtInfo, 1480 Parent: &channelz.TraceEventDesc{ 1481 Desc: fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID), 1482 Severity: channelz.CtInfo, 1483 }, 1484 }) 1485 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to 1486 // the entity being deleted, and thus prevent it from being deleted right away. 1487 channelz.RemoveEntry(ac.channelzID) 1488 } 1489 ac.mu.Unlock() 1490} 1491 1492func (ac *addrConn) getState() connectivity.State { 1493 ac.mu.Lock() 1494 defer ac.mu.Unlock() 1495 return ac.state 1496} 1497 1498func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric { 1499 ac.mu.Lock() 1500 addr := ac.curAddr.Addr 1501 ac.mu.Unlock() 1502 return &channelz.ChannelInternalMetric{ 1503 State: ac.getState(), 1504 Target: addr, 1505 CallsStarted: atomic.LoadInt64(&ac.czData.callsStarted), 1506 CallsSucceeded: atomic.LoadInt64(&ac.czData.callsSucceeded), 1507 CallsFailed: atomic.LoadInt64(&ac.czData.callsFailed), 1508 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)), 1509 } 1510} 1511 1512func (ac *addrConn) incrCallsStarted() { 1513 atomic.AddInt64(&ac.czData.callsStarted, 1) 1514 atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano()) 1515} 1516 1517func (ac *addrConn) incrCallsSucceeded() { 1518 atomic.AddInt64(&ac.czData.callsSucceeded, 1) 1519} 1520 1521func (ac *addrConn) incrCallsFailed() { 1522 atomic.AddInt64(&ac.czData.callsFailed, 1) 1523} 1524 1525type retryThrottler struct { 1526 max float64 1527 thresh float64 1528 ratio float64 1529 1530 mu sync.Mutex 1531 tokens float64 // TODO(dfawley): replace with atomic and remove lock. 1532} 1533 1534// throttle subtracts a retry token from the pool and returns whether a retry 1535// should be throttled (disallowed) based upon the retry throttling policy in 1536// the service config. 1537func (rt *retryThrottler) throttle() bool { 1538 if rt == nil { 1539 return false 1540 } 1541 rt.mu.Lock() 1542 defer rt.mu.Unlock() 1543 rt.tokens-- 1544 if rt.tokens < 0 { 1545 rt.tokens = 0 1546 } 1547 return rt.tokens <= rt.thresh 1548} 1549 1550func (rt *retryThrottler) successfulRPC() { 1551 if rt == nil { 1552 return 1553 } 1554 rt.mu.Lock() 1555 defer rt.mu.Unlock() 1556 rt.tokens += rt.ratio 1557 if rt.tokens > rt.max { 1558 rt.tokens = rt.max 1559 } 1560} 1561 1562type channelzChannel struct { 1563 cc *ClientConn 1564} 1565 1566func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric { 1567 return c.cc.channelzMetric() 1568} 1569 1570// ErrClientConnTimeout indicates that the ClientConn cannot establish the 1571// underlying connections within the specified timeout. 1572// 1573// Deprecated: This error is never returned by grpc and should not be 1574// referenced by users. 1575var ErrClientConnTimeout = errors.New("grpc: timed out when dialing") 1576 1577func (cc *ClientConn) getResolver(scheme string) resolver.Builder { 1578 for _, rb := range cc.dopts.resolvers { 1579 if scheme == rb.Scheme() { 1580 return rb 1581 } 1582 } 1583 return resolver.Get(scheme) 1584} 1585 1586func (cc *ClientConn) updateConnectionError(err error) { 1587 cc.lceMu.Lock() 1588 cc.lastConnectionError = err 1589 cc.lceMu.Unlock() 1590} 1591 1592func (cc *ClientConn) connectionError() error { 1593 cc.lceMu.Lock() 1594 defer cc.lceMu.Unlock() 1595 return cc.lastConnectionError 1596} 1597