1/* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package grpc 20 21import ( 22 "context" 23 "errors" 24 "fmt" 25 "math" 26 "net" 27 "reflect" 28 "strings" 29 "sync" 30 "sync/atomic" 31 "time" 32 33 "google.golang.org/grpc/balancer" 34 "google.golang.org/grpc/balancer/base" 35 "google.golang.org/grpc/codes" 36 "google.golang.org/grpc/connectivity" 37 "google.golang.org/grpc/credentials" 38 "google.golang.org/grpc/grpclog" 39 "google.golang.org/grpc/internal/backoff" 40 "google.golang.org/grpc/internal/channelz" 41 "google.golang.org/grpc/internal/grpcsync" 42 "google.golang.org/grpc/internal/transport" 43 "google.golang.org/grpc/keepalive" 44 "google.golang.org/grpc/resolver" 45 "google.golang.org/grpc/serviceconfig" 46 "google.golang.org/grpc/status" 47 48 _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin. 49 _ "google.golang.org/grpc/internal/resolver/dns" // To register dns resolver. 50 _ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver. 51) 52 53const ( 54 // minimum time to give a connection to complete 55 minConnectTimeout = 20 * time.Second 56 // must match grpclbName in grpclb/grpclb.go 57 grpclbName = "grpclb" 58) 59 60var ( 61 // ErrClientConnClosing indicates that the operation is illegal because 62 // the ClientConn is closing. 63 // 64 // Deprecated: this error should not be relied upon by users; use the status 65 // code of Canceled instead. 66 ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing") 67 // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs. 68 errConnDrain = errors.New("grpc: the connection is drained") 69 // errConnClosing indicates that the connection is closing. 70 errConnClosing = errors.New("grpc: the connection is closing") 71 // errBalancerClosed indicates that the balancer is closed. 72 errBalancerClosed = errors.New("grpc: balancer is closed") 73 // invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default 74 // service config. 75 invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid" 76) 77 78// The following errors are returned from Dial and DialContext 79var ( 80 // errNoTransportSecurity indicates that there is no transport security 81 // being set for ClientConn. Users should either set one or explicitly 82 // call WithInsecure DialOption to disable security. 83 errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)") 84 // errTransportCredsAndBundle indicates that creds bundle is used together 85 // with other individual Transport Credentials. 86 errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials") 87 // errTransportCredentialsMissing indicates that users want to transmit security 88 // information (e.g., OAuth2 token) which requires secure connection on an insecure 89 // connection. 90 errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)") 91 // errCredentialsConflict indicates that grpc.WithTransportCredentials() 92 // and grpc.WithInsecure() are both called for a connection. 93 errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)") 94) 95 96const ( 97 defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4 98 defaultClientMaxSendMessageSize = math.MaxInt32 99 // http2IOBufSize specifies the buffer size for sending frames. 100 defaultWriteBufSize = 32 * 1024 101 defaultReadBufSize = 32 * 1024 102) 103 104// Dial creates a client connection to the given target. 105func Dial(target string, opts ...DialOption) (*ClientConn, error) { 106 return DialContext(context.Background(), target, opts...) 107} 108 109// DialContext creates a client connection to the given target. By default, it's 110// a non-blocking dial (the function won't wait for connections to be 111// established, and connecting happens in the background). To make it a blocking 112// dial, use WithBlock() dial option. 113// 114// In the non-blocking case, the ctx does not act against the connection. It 115// only controls the setup steps. 116// 117// In the blocking case, ctx can be used to cancel or expire the pending 118// connection. Once this function returns, the cancellation and expiration of 119// ctx will be noop. Users should call ClientConn.Close to terminate all the 120// pending operations after this function returns. 121// 122// The target name syntax is defined in 123// https://github.com/grpc/grpc/blob/master/doc/naming.md. 124// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target. 125func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) { 126 cc := &ClientConn{ 127 target: target, 128 csMgr: &connectivityStateManager{}, 129 conns: make(map[*addrConn]struct{}), 130 dopts: defaultDialOptions(), 131 blockingpicker: newPickerWrapper(), 132 czData: new(channelzData), 133 firstResolveEvent: grpcsync.NewEvent(), 134 } 135 cc.retryThrottler.Store((*retryThrottler)(nil)) 136 cc.ctx, cc.cancel = context.WithCancel(context.Background()) 137 138 for _, opt := range opts { 139 opt.apply(&cc.dopts) 140 } 141 142 chainUnaryClientInterceptors(cc) 143 chainStreamClientInterceptors(cc) 144 145 defer func() { 146 if err != nil { 147 cc.Close() 148 } 149 }() 150 151 if channelz.IsOn() { 152 if cc.dopts.channelzParentID != 0 { 153 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target) 154 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{ 155 Desc: "Channel Created", 156 Severity: channelz.CtINFO, 157 Parent: &channelz.TraceEventDesc{ 158 Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID), 159 Severity: channelz.CtINFO, 160 }, 161 }) 162 } else { 163 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target) 164 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{ 165 Desc: "Channel Created", 166 Severity: channelz.CtINFO, 167 }) 168 } 169 cc.csMgr.channelzID = cc.channelzID 170 } 171 172 if !cc.dopts.insecure { 173 if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil { 174 return nil, errNoTransportSecurity 175 } 176 if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil { 177 return nil, errTransportCredsAndBundle 178 } 179 } else { 180 if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil { 181 return nil, errCredentialsConflict 182 } 183 for _, cd := range cc.dopts.copts.PerRPCCredentials { 184 if cd.RequireTransportSecurity() { 185 return nil, errTransportCredentialsMissing 186 } 187 } 188 } 189 190 if cc.dopts.defaultServiceConfigRawJSON != nil { 191 scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON) 192 if scpr.Err != nil { 193 return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err) 194 } 195 cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig) 196 } 197 cc.mkp = cc.dopts.copts.KeepaliveParams 198 199 if cc.dopts.copts.Dialer == nil { 200 cc.dopts.copts.Dialer = newProxyDialer( 201 func(ctx context.Context, addr string) (net.Conn, error) { 202 network, addr := parseDialTarget(addr) 203 return (&net.Dialer{}).DialContext(ctx, network, addr) 204 }, 205 ) 206 } 207 208 if cc.dopts.copts.UserAgent != "" { 209 cc.dopts.copts.UserAgent += " " + grpcUA 210 } else { 211 cc.dopts.copts.UserAgent = grpcUA 212 } 213 214 if cc.dopts.timeout > 0 { 215 var cancel context.CancelFunc 216 ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout) 217 defer cancel() 218 } 219 defer func() { 220 select { 221 case <-ctx.Done(): 222 conn, err = nil, ctx.Err() 223 default: 224 } 225 }() 226 227 scSet := false 228 if cc.dopts.scChan != nil { 229 // Try to get an initial service config. 230 select { 231 case sc, ok := <-cc.dopts.scChan: 232 if ok { 233 cc.sc = &sc 234 scSet = true 235 } 236 default: 237 } 238 } 239 if cc.dopts.bs == nil { 240 cc.dopts.bs = backoff.DefaultExponential 241 } 242 if cc.dopts.resolverBuilder == nil { 243 // Only try to parse target when resolver builder is not already set. 244 cc.parsedTarget = parseTarget(cc.target) 245 grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme) 246 cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme) 247 if cc.dopts.resolverBuilder == nil { 248 // If resolver builder is still nil, the parsed target's scheme is 249 // not registered. Fallback to default resolver and set Endpoint to 250 // the original target. 251 grpclog.Infof("scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme) 252 cc.parsedTarget = resolver.Target{ 253 Scheme: resolver.GetDefaultScheme(), 254 Endpoint: target, 255 } 256 cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme) 257 } 258 } else { 259 cc.parsedTarget = resolver.Target{Endpoint: target} 260 } 261 creds := cc.dopts.copts.TransportCredentials 262 if creds != nil && creds.Info().ServerName != "" { 263 cc.authority = creds.Info().ServerName 264 } else if cc.dopts.insecure && cc.dopts.authority != "" { 265 cc.authority = cc.dopts.authority 266 } else { 267 // Use endpoint from "scheme://authority/endpoint" as the default 268 // authority for ClientConn. 269 cc.authority = cc.parsedTarget.Endpoint 270 } 271 272 if cc.dopts.scChan != nil && !scSet { 273 // Blocking wait for the initial service config. 274 select { 275 case sc, ok := <-cc.dopts.scChan: 276 if ok { 277 cc.sc = &sc 278 } 279 case <-ctx.Done(): 280 return nil, ctx.Err() 281 } 282 } 283 if cc.dopts.scChan != nil { 284 go cc.scWatcher() 285 } 286 287 var credsClone credentials.TransportCredentials 288 if creds := cc.dopts.copts.TransportCredentials; creds != nil { 289 credsClone = creds.Clone() 290 } 291 cc.balancerBuildOpts = balancer.BuildOptions{ 292 DialCreds: credsClone, 293 CredsBundle: cc.dopts.copts.CredsBundle, 294 Dialer: cc.dopts.copts.Dialer, 295 ChannelzParentID: cc.channelzID, 296 Target: cc.parsedTarget, 297 } 298 299 // Build the resolver. 300 rWrapper, err := newCCResolverWrapper(cc) 301 if err != nil { 302 return nil, fmt.Errorf("failed to build resolver: %v", err) 303 } 304 305 cc.mu.Lock() 306 cc.resolverWrapper = rWrapper 307 cc.mu.Unlock() 308 // A blocking dial blocks until the clientConn is ready. 309 if cc.dopts.block { 310 for { 311 s := cc.GetState() 312 if s == connectivity.Ready { 313 break 314 } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure { 315 if err = cc.blockingpicker.connectionError(); err != nil { 316 terr, ok := err.(interface { 317 Temporary() bool 318 }) 319 if ok && !terr.Temporary() { 320 return nil, err 321 } 322 } 323 } 324 if !cc.WaitForStateChange(ctx, s) { 325 // ctx got timeout or canceled. 326 return nil, ctx.Err() 327 } 328 } 329 } 330 331 return cc, nil 332} 333 334// chainUnaryClientInterceptors chains all unary client interceptors into one. 335func chainUnaryClientInterceptors(cc *ClientConn) { 336 interceptors := cc.dopts.chainUnaryInts 337 // Prepend dopts.unaryInt to the chaining interceptors if it exists, since unaryInt will 338 // be executed before any other chained interceptors. 339 if cc.dopts.unaryInt != nil { 340 interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...) 341 } 342 var chainedInt UnaryClientInterceptor 343 if len(interceptors) == 0 { 344 chainedInt = nil 345 } else if len(interceptors) == 1 { 346 chainedInt = interceptors[0] 347 } else { 348 chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error { 349 return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...) 350 } 351 } 352 cc.dopts.unaryInt = chainedInt 353} 354 355// getChainUnaryInvoker recursively generate the chained unary invoker. 356func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, finalInvoker UnaryInvoker) UnaryInvoker { 357 if curr == len(interceptors)-1 { 358 return finalInvoker 359 } 360 return func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error { 361 return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...) 362 } 363} 364 365// chainStreamClientInterceptors chains all stream client interceptors into one. 366func chainStreamClientInterceptors(cc *ClientConn) { 367 interceptors := cc.dopts.chainStreamInts 368 // Prepend dopts.streamInt to the chaining interceptors if it exists, since streamInt will 369 // be executed before any other chained interceptors. 370 if cc.dopts.streamInt != nil { 371 interceptors = append([]StreamClientInterceptor{cc.dopts.streamInt}, interceptors...) 372 } 373 var chainedInt StreamClientInterceptor 374 if len(interceptors) == 0 { 375 chainedInt = nil 376 } else if len(interceptors) == 1 { 377 chainedInt = interceptors[0] 378 } else { 379 chainedInt = func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error) { 380 return interceptors[0](ctx, desc, cc, method, getChainStreamer(interceptors, 0, streamer), opts...) 381 } 382 } 383 cc.dopts.streamInt = chainedInt 384} 385 386// getChainStreamer recursively generate the chained client stream constructor. 387func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStreamer Streamer) Streamer { 388 if curr == len(interceptors)-1 { 389 return finalStreamer 390 } 391 return func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) { 392 return interceptors[curr+1](ctx, desc, cc, method, getChainStreamer(interceptors, curr+1, finalStreamer), opts...) 393 } 394} 395 396// connectivityStateManager keeps the connectivity.State of ClientConn. 397// This struct will eventually be exported so the balancers can access it. 398type connectivityStateManager struct { 399 mu sync.Mutex 400 state connectivity.State 401 notifyChan chan struct{} 402 channelzID int64 403} 404 405// updateState updates the connectivity.State of ClientConn. 406// If there's a change it notifies goroutines waiting on state change to 407// happen. 408func (csm *connectivityStateManager) updateState(state connectivity.State) { 409 csm.mu.Lock() 410 defer csm.mu.Unlock() 411 if csm.state == connectivity.Shutdown { 412 return 413 } 414 if csm.state == state { 415 return 416 } 417 csm.state = state 418 if channelz.IsOn() { 419 channelz.AddTraceEvent(csm.channelzID, &channelz.TraceEventDesc{ 420 Desc: fmt.Sprintf("Channel Connectivity change to %v", state), 421 Severity: channelz.CtINFO, 422 }) 423 } 424 if csm.notifyChan != nil { 425 // There are other goroutines waiting on this channel. 426 close(csm.notifyChan) 427 csm.notifyChan = nil 428 } 429} 430 431func (csm *connectivityStateManager) getState() connectivity.State { 432 csm.mu.Lock() 433 defer csm.mu.Unlock() 434 return csm.state 435} 436 437func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} { 438 csm.mu.Lock() 439 defer csm.mu.Unlock() 440 if csm.notifyChan == nil { 441 csm.notifyChan = make(chan struct{}) 442 } 443 return csm.notifyChan 444} 445 446// ClientConn represents a virtual connection to a conceptual endpoint, to 447// perform RPCs. 448// 449// A ClientConn is free to have zero or more actual connections to the endpoint 450// based on configuration, load, etc. It is also free to determine which actual 451// endpoints to use and may change it every RPC, permitting client-side load 452// balancing. 453// 454// A ClientConn encapsulates a range of functionality including name 455// resolution, TCP connection establishment (with retries and backoff) and TLS 456// handshakes. It also handles errors on established connections by 457// re-resolving the name and reconnecting. 458type ClientConn struct { 459 ctx context.Context 460 cancel context.CancelFunc 461 462 target string 463 parsedTarget resolver.Target 464 authority string 465 dopts dialOptions 466 csMgr *connectivityStateManager 467 468 balancerBuildOpts balancer.BuildOptions 469 blockingpicker *pickerWrapper 470 471 mu sync.RWMutex 472 resolverWrapper *ccResolverWrapper 473 sc *ServiceConfig 474 conns map[*addrConn]struct{} 475 // Keepalive parameter can be updated if a GoAway is received. 476 mkp keepalive.ClientParameters 477 curBalancerName string 478 balancerWrapper *ccBalancerWrapper 479 retryThrottler atomic.Value 480 481 firstResolveEvent *grpcsync.Event 482 483 channelzID int64 // channelz unique identification number 484 czData *channelzData 485} 486 487// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or 488// ctx expires. A true value is returned in former case and false in latter. 489// This is an EXPERIMENTAL API. 490func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool { 491 ch := cc.csMgr.getNotifyChan() 492 if cc.csMgr.getState() != sourceState { 493 return true 494 } 495 select { 496 case <-ctx.Done(): 497 return false 498 case <-ch: 499 return true 500 } 501} 502 503// GetState returns the connectivity.State of ClientConn. 504// This is an EXPERIMENTAL API. 505func (cc *ClientConn) GetState() connectivity.State { 506 return cc.csMgr.getState() 507} 508 509func (cc *ClientConn) scWatcher() { 510 for { 511 select { 512 case sc, ok := <-cc.dopts.scChan: 513 if !ok { 514 return 515 } 516 cc.mu.Lock() 517 // TODO: load balance policy runtime change is ignored. 518 // We may revisit this decision in the future. 519 cc.sc = &sc 520 cc.mu.Unlock() 521 case <-cc.ctx.Done(): 522 return 523 } 524 } 525} 526 527// waitForResolvedAddrs blocks until the resolver has provided addresses or the 528// context expires. Returns nil unless the context expires first; otherwise 529// returns a status error based on the context. 530func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error { 531 // This is on the RPC path, so we use a fast path to avoid the 532 // more-expensive "select" below after the resolver has returned once. 533 if cc.firstResolveEvent.HasFired() { 534 return nil 535 } 536 select { 537 case <-cc.firstResolveEvent.Done(): 538 return nil 539 case <-ctx.Done(): 540 return status.FromContextError(ctx.Err()).Err() 541 case <-cc.ctx.Done(): 542 return ErrClientConnClosing 543 } 544} 545 546var emptyServiceConfig *ServiceConfig 547 548func init() { 549 cfg := parseServiceConfig("{}") 550 if cfg.Err != nil { 551 panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err)) 552 } 553 emptyServiceConfig = cfg.Config.(*ServiceConfig) 554} 555 556func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) { 557 if cc.sc != nil { 558 cc.applyServiceConfigAndBalancer(cc.sc, addrs) 559 return 560 } 561 if cc.dopts.defaultServiceConfig != nil { 562 cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, addrs) 563 } else { 564 cc.applyServiceConfigAndBalancer(emptyServiceConfig, addrs) 565 } 566} 567 568func (cc *ClientConn) updateResolverState(s resolver.State, err error) error { 569 defer cc.firstResolveEvent.Fire() 570 cc.mu.Lock() 571 // Check if the ClientConn is already closed. Some fields (e.g. 572 // balancerWrapper) are set to nil when closing the ClientConn, and could 573 // cause nil pointer panic if we don't have this check. 574 if cc.conns == nil { 575 cc.mu.Unlock() 576 return nil 577 } 578 579 if err != nil { 580 // May need to apply the initial service config in case the resolver 581 // doesn't support service configs, or doesn't provide a service config 582 // with the new addresses. 583 cc.maybeApplyDefaultServiceConfig(nil) 584 585 if cc.balancerWrapper != nil { 586 cc.balancerWrapper.resolverError(err) 587 } 588 589 // No addresses are valid with err set; return early. 590 cc.mu.Unlock() 591 return balancer.ErrBadResolverState 592 } 593 594 var ret error 595 if cc.dopts.disableServiceConfig || s.ServiceConfig == nil { 596 cc.maybeApplyDefaultServiceConfig(s.Addresses) 597 // TODO: do we need to apply a failing LB policy if there is no 598 // default, per the error handling design? 599 } else { 600 if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok { 601 cc.applyServiceConfigAndBalancer(sc, s.Addresses) 602 } else { 603 ret = balancer.ErrBadResolverState 604 if cc.balancerWrapper == nil { 605 var err error 606 if s.ServiceConfig.Err != nil { 607 err = status.Errorf(codes.Unavailable, "error parsing service config: %v", s.ServiceConfig.Err) 608 } else { 609 err = status.Errorf(codes.Unavailable, "illegal service config type: %T", s.ServiceConfig.Config) 610 } 611 cc.blockingpicker.updatePicker(base.NewErrPicker(err)) 612 cc.csMgr.updateState(connectivity.TransientFailure) 613 cc.mu.Unlock() 614 return ret 615 } 616 } 617 } 618 619 var balCfg serviceconfig.LoadBalancingConfig 620 if cc.dopts.balancerBuilder == nil && cc.sc != nil && cc.sc.lbConfig != nil { 621 balCfg = cc.sc.lbConfig.cfg 622 } 623 624 cbn := cc.curBalancerName 625 bw := cc.balancerWrapper 626 cc.mu.Unlock() 627 if cbn != grpclbName { 628 // Filter any grpclb addresses since we don't have the grpclb balancer. 629 for i := 0; i < len(s.Addresses); { 630 if s.Addresses[i].Type == resolver.GRPCLB { 631 copy(s.Addresses[i:], s.Addresses[i+1:]) 632 s.Addresses = s.Addresses[:len(s.Addresses)-1] 633 continue 634 } 635 i++ 636 } 637 } 638 uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg}) 639 if ret == nil { 640 ret = uccsErr // prefer ErrBadResolver state since any other error is 641 // currently meaningless to the caller. 642 } 643 return ret 644} 645 646// switchBalancer starts the switching from current balancer to the balancer 647// with the given name. 648// 649// It will NOT send the current address list to the new balancer. If needed, 650// caller of this function should send address list to the new balancer after 651// this function returns. 652// 653// Caller must hold cc.mu. 654func (cc *ClientConn) switchBalancer(name string) { 655 if strings.EqualFold(cc.curBalancerName, name) { 656 return 657 } 658 659 grpclog.Infof("ClientConn switching balancer to %q", name) 660 if cc.dopts.balancerBuilder != nil { 661 grpclog.Infoln("ignoring balancer switching: Balancer DialOption used instead") 662 return 663 } 664 if cc.balancerWrapper != nil { 665 cc.balancerWrapper.close() 666 } 667 668 builder := balancer.Get(name) 669 if channelz.IsOn() { 670 if builder == nil { 671 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{ 672 Desc: fmt.Sprintf("Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName), 673 Severity: channelz.CtWarning, 674 }) 675 } else { 676 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{ 677 Desc: fmt.Sprintf("Channel switches to new LB policy %q", name), 678 Severity: channelz.CtINFO, 679 }) 680 } 681 } 682 if builder == nil { 683 grpclog.Infof("failed to get balancer builder for: %v, using pick_first instead", name) 684 builder = newPickfirstBuilder() 685 } 686 687 cc.curBalancerName = builder.Name() 688 cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts) 689} 690 691func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) { 692 cc.mu.Lock() 693 if cc.conns == nil { 694 cc.mu.Unlock() 695 return 696 } 697 // TODO(bar switching) send updates to all balancer wrappers when balancer 698 // gracefully switching is supported. 699 cc.balancerWrapper.handleSubConnStateChange(sc, s) 700 cc.mu.Unlock() 701} 702 703// newAddrConn creates an addrConn for addrs and adds it to cc.conns. 704// 705// Caller needs to make sure len(addrs) > 0. 706func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) { 707 ac := &addrConn{ 708 cc: cc, 709 addrs: addrs, 710 scopts: opts, 711 dopts: cc.dopts, 712 czData: new(channelzData), 713 resetBackoff: make(chan struct{}), 714 } 715 ac.ctx, ac.cancel = context.WithCancel(cc.ctx) 716 // Track ac in cc. This needs to be done before any getTransport(...) is called. 717 cc.mu.Lock() 718 if cc.conns == nil { 719 cc.mu.Unlock() 720 return nil, ErrClientConnClosing 721 } 722 if channelz.IsOn() { 723 ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "") 724 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{ 725 Desc: "Subchannel Created", 726 Severity: channelz.CtINFO, 727 Parent: &channelz.TraceEventDesc{ 728 Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID), 729 Severity: channelz.CtINFO, 730 }, 731 }) 732 } 733 cc.conns[ac] = struct{}{} 734 cc.mu.Unlock() 735 return ac, nil 736} 737 738// removeAddrConn removes the addrConn in the subConn from clientConn. 739// It also tears down the ac with the given error. 740func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) { 741 cc.mu.Lock() 742 if cc.conns == nil { 743 cc.mu.Unlock() 744 return 745 } 746 delete(cc.conns, ac) 747 cc.mu.Unlock() 748 ac.tearDown(err) 749} 750 751func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric { 752 return &channelz.ChannelInternalMetric{ 753 State: cc.GetState(), 754 Target: cc.target, 755 CallsStarted: atomic.LoadInt64(&cc.czData.callsStarted), 756 CallsSucceeded: atomic.LoadInt64(&cc.czData.callsSucceeded), 757 CallsFailed: atomic.LoadInt64(&cc.czData.callsFailed), 758 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)), 759 } 760} 761 762// Target returns the target string of the ClientConn. 763// This is an EXPERIMENTAL API. 764func (cc *ClientConn) Target() string { 765 return cc.target 766} 767 768func (cc *ClientConn) incrCallsStarted() { 769 atomic.AddInt64(&cc.czData.callsStarted, 1) 770 atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano()) 771} 772 773func (cc *ClientConn) incrCallsSucceeded() { 774 atomic.AddInt64(&cc.czData.callsSucceeded, 1) 775} 776 777func (cc *ClientConn) incrCallsFailed() { 778 atomic.AddInt64(&cc.czData.callsFailed, 1) 779} 780 781// connect starts creating a transport. 782// It does nothing if the ac is not IDLE. 783// TODO(bar) Move this to the addrConn section. 784func (ac *addrConn) connect() error { 785 ac.mu.Lock() 786 if ac.state == connectivity.Shutdown { 787 ac.mu.Unlock() 788 return errConnClosing 789 } 790 if ac.state != connectivity.Idle { 791 ac.mu.Unlock() 792 return nil 793 } 794 // Update connectivity state within the lock to prevent subsequent or 795 // concurrent calls from resetting the transport more than once. 796 ac.updateConnectivityState(connectivity.Connecting) 797 ac.mu.Unlock() 798 799 // Start a goroutine connecting to the server asynchronously. 800 go ac.resetTransport() 801 return nil 802} 803 804// tryUpdateAddrs tries to update ac.addrs with the new addresses list. 805// 806// If ac is Connecting, it returns false. The caller should tear down the ac and 807// create a new one. Note that the backoff will be reset when this happens. 808// 809// If ac is TransientFailure, it updates ac.addrs and returns true. The updated 810// addresses will be picked up by retry in the next iteration after backoff. 811// 812// If ac is Shutdown or Idle, it updates ac.addrs and returns true. 813// 814// If ac is Ready, it checks whether current connected address of ac is in the 815// new addrs list. 816// - If true, it updates ac.addrs and returns true. The ac will keep using 817// the existing connection. 818// - If false, it does nothing and returns false. 819func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool { 820 ac.mu.Lock() 821 defer ac.mu.Unlock() 822 grpclog.Infof("addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs) 823 if ac.state == connectivity.Shutdown || 824 ac.state == connectivity.TransientFailure || 825 ac.state == connectivity.Idle { 826 ac.addrs = addrs 827 return true 828 } 829 830 if ac.state == connectivity.Connecting { 831 return false 832 } 833 834 // ac.state is Ready, try to find the connected address. 835 var curAddrFound bool 836 for _, a := range addrs { 837 if reflect.DeepEqual(ac.curAddr, a) { 838 curAddrFound = true 839 break 840 } 841 } 842 grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound) 843 if curAddrFound { 844 ac.addrs = addrs 845 } 846 847 return curAddrFound 848} 849 850// GetMethodConfig gets the method config of the input method. 851// If there's an exact match for input method (i.e. /service/method), we return 852// the corresponding MethodConfig. 853// If there isn't an exact match for the input method, we look for the default config 854// under the service (i.e /service/). If there is a default MethodConfig for 855// the service, we return it. 856// Otherwise, we return an empty MethodConfig. 857func (cc *ClientConn) GetMethodConfig(method string) MethodConfig { 858 // TODO: Avoid the locking here. 859 cc.mu.RLock() 860 defer cc.mu.RUnlock() 861 if cc.sc == nil { 862 return MethodConfig{} 863 } 864 m, ok := cc.sc.Methods[method] 865 if !ok { 866 i := strings.LastIndex(method, "/") 867 m = cc.sc.Methods[method[:i+1]] 868 } 869 return m 870} 871 872func (cc *ClientConn) healthCheckConfig() *healthCheckConfig { 873 cc.mu.RLock() 874 defer cc.mu.RUnlock() 875 if cc.sc == nil { 876 return nil 877 } 878 return cc.sc.healthCheckConfig 879} 880 881func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) { 882 t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{ 883 FullMethodName: method, 884 }) 885 if err != nil { 886 return nil, nil, toRPCErr(err) 887 } 888 return t, done, nil 889} 890 891func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, addrs []resolver.Address) { 892 if sc == nil { 893 // should never reach here. 894 return 895 } 896 cc.sc = sc 897 898 if cc.sc.retryThrottling != nil { 899 newThrottler := &retryThrottler{ 900 tokens: cc.sc.retryThrottling.MaxTokens, 901 max: cc.sc.retryThrottling.MaxTokens, 902 thresh: cc.sc.retryThrottling.MaxTokens / 2, 903 ratio: cc.sc.retryThrottling.TokenRatio, 904 } 905 cc.retryThrottler.Store(newThrottler) 906 } else { 907 cc.retryThrottler.Store((*retryThrottler)(nil)) 908 } 909 910 if cc.dopts.balancerBuilder == nil { 911 // Only look at balancer types and switch balancer if balancer dial 912 // option is not set. 913 var newBalancerName string 914 if cc.sc != nil && cc.sc.lbConfig != nil { 915 newBalancerName = cc.sc.lbConfig.name 916 } else { 917 var isGRPCLB bool 918 for _, a := range addrs { 919 if a.Type == resolver.GRPCLB { 920 isGRPCLB = true 921 break 922 } 923 } 924 if isGRPCLB { 925 newBalancerName = grpclbName 926 } else if cc.sc != nil && cc.sc.LB != nil { 927 newBalancerName = *cc.sc.LB 928 } else { 929 newBalancerName = PickFirstBalancerName 930 } 931 } 932 cc.switchBalancer(newBalancerName) 933 } else if cc.balancerWrapper == nil { 934 // Balancer dial option was set, and this is the first time handling 935 // resolved addresses. Build a balancer with dopts.balancerBuilder. 936 cc.curBalancerName = cc.dopts.balancerBuilder.Name() 937 cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts) 938 } 939} 940 941func (cc *ClientConn) resolveNow(o resolver.ResolveNowOption) { 942 cc.mu.RLock() 943 r := cc.resolverWrapper 944 cc.mu.RUnlock() 945 if r == nil { 946 return 947 } 948 go r.resolveNow(o) 949} 950 951// ResetConnectBackoff wakes up all subchannels in transient failure and causes 952// them to attempt another connection immediately. It also resets the backoff 953// times used for subsequent attempts regardless of the current state. 954// 955// In general, this function should not be used. Typical service or network 956// outages result in a reasonable client reconnection strategy by default. 957// However, if a previously unavailable network becomes available, this may be 958// used to trigger an immediate reconnect. 959// 960// This API is EXPERIMENTAL. 961func (cc *ClientConn) ResetConnectBackoff() { 962 cc.mu.Lock() 963 conns := cc.conns 964 cc.mu.Unlock() 965 for ac := range conns { 966 ac.resetConnectBackoff() 967 } 968} 969 970// Close tears down the ClientConn and all underlying connections. 971func (cc *ClientConn) Close() error { 972 defer cc.cancel() 973 974 cc.mu.Lock() 975 if cc.conns == nil { 976 cc.mu.Unlock() 977 return ErrClientConnClosing 978 } 979 conns := cc.conns 980 cc.conns = nil 981 cc.csMgr.updateState(connectivity.Shutdown) 982 983 rWrapper := cc.resolverWrapper 984 cc.resolverWrapper = nil 985 bWrapper := cc.balancerWrapper 986 cc.balancerWrapper = nil 987 cc.mu.Unlock() 988 989 cc.blockingpicker.close() 990 991 if rWrapper != nil { 992 rWrapper.close() 993 } 994 if bWrapper != nil { 995 bWrapper.close() 996 } 997 998 for ac := range conns { 999 ac.tearDown(ErrClientConnClosing) 1000 } 1001 if channelz.IsOn() { 1002 ted := &channelz.TraceEventDesc{ 1003 Desc: "Channel Deleted", 1004 Severity: channelz.CtINFO, 1005 } 1006 if cc.dopts.channelzParentID != 0 { 1007 ted.Parent = &channelz.TraceEventDesc{ 1008 Desc: fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID), 1009 Severity: channelz.CtINFO, 1010 } 1011 } 1012 channelz.AddTraceEvent(cc.channelzID, ted) 1013 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to 1014 // the entity being deleted, and thus prevent it from being deleted right away. 1015 channelz.RemoveEntry(cc.channelzID) 1016 } 1017 return nil 1018} 1019 1020// addrConn is a network connection to a given address. 1021type addrConn struct { 1022 ctx context.Context 1023 cancel context.CancelFunc 1024 1025 cc *ClientConn 1026 dopts dialOptions 1027 acbw balancer.SubConn 1028 scopts balancer.NewSubConnOptions 1029 1030 // transport is set when there's a viable transport (note: ac state may not be READY as LB channel 1031 // health checking may require server to report healthy to set ac to READY), and is reset 1032 // to nil when the current transport should no longer be used to create a stream (e.g. after GoAway 1033 // is received, transport is closed, ac has been torn down). 1034 transport transport.ClientTransport // The current transport. 1035 1036 mu sync.Mutex 1037 curAddr resolver.Address // The current address. 1038 addrs []resolver.Address // All addresses that the resolver resolved to. 1039 1040 // Use updateConnectivityState for updating addrConn's connectivity state. 1041 state connectivity.State 1042 1043 backoffIdx int // Needs to be stateful for resetConnectBackoff. 1044 resetBackoff chan struct{} 1045 1046 channelzID int64 // channelz unique identification number. 1047 czData *channelzData 1048} 1049 1050// Note: this requires a lock on ac.mu. 1051func (ac *addrConn) updateConnectivityState(s connectivity.State) { 1052 if ac.state == s { 1053 return 1054 } 1055 1056 updateMsg := fmt.Sprintf("Subchannel Connectivity change to %v", s) 1057 ac.state = s 1058 if channelz.IsOn() { 1059 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{ 1060 Desc: updateMsg, 1061 Severity: channelz.CtINFO, 1062 }) 1063 } 1064 ac.cc.handleSubConnStateChange(ac.acbw, s) 1065} 1066 1067// adjustParams updates parameters used to create transports upon 1068// receiving a GoAway. 1069func (ac *addrConn) adjustParams(r transport.GoAwayReason) { 1070 switch r { 1071 case transport.GoAwayTooManyPings: 1072 v := 2 * ac.dopts.copts.KeepaliveParams.Time 1073 ac.cc.mu.Lock() 1074 if v > ac.cc.mkp.Time { 1075 ac.cc.mkp.Time = v 1076 } 1077 ac.cc.mu.Unlock() 1078 } 1079} 1080 1081func (ac *addrConn) resetTransport() { 1082 for i := 0; ; i++ { 1083 if i > 0 { 1084 ac.cc.resolveNow(resolver.ResolveNowOption{}) 1085 } 1086 1087 ac.mu.Lock() 1088 if ac.state == connectivity.Shutdown { 1089 ac.mu.Unlock() 1090 return 1091 } 1092 1093 addrs := ac.addrs 1094 backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx) 1095 // This will be the duration that dial gets to finish. 1096 dialDuration := minConnectTimeout 1097 if ac.dopts.minConnectTimeout != nil { 1098 dialDuration = ac.dopts.minConnectTimeout() 1099 } 1100 1101 if dialDuration < backoffFor { 1102 // Give dial more time as we keep failing to connect. 1103 dialDuration = backoffFor 1104 } 1105 // We can potentially spend all the time trying the first address, and 1106 // if the server accepts the connection and then hangs, the following 1107 // addresses will never be tried. 1108 // 1109 // The spec doesn't mention what should be done for multiple addresses. 1110 // https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm 1111 connectDeadline := time.Now().Add(dialDuration) 1112 1113 ac.updateConnectivityState(connectivity.Connecting) 1114 ac.transport = nil 1115 ac.mu.Unlock() 1116 1117 newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline) 1118 if err != nil { 1119 // After exhausting all addresses, the addrConn enters 1120 // TRANSIENT_FAILURE. 1121 ac.mu.Lock() 1122 if ac.state == connectivity.Shutdown { 1123 ac.mu.Unlock() 1124 return 1125 } 1126 ac.updateConnectivityState(connectivity.TransientFailure) 1127 1128 // Backoff. 1129 b := ac.resetBackoff 1130 ac.mu.Unlock() 1131 1132 timer := time.NewTimer(backoffFor) 1133 select { 1134 case <-timer.C: 1135 ac.mu.Lock() 1136 ac.backoffIdx++ 1137 ac.mu.Unlock() 1138 case <-b: 1139 timer.Stop() 1140 case <-ac.ctx.Done(): 1141 timer.Stop() 1142 return 1143 } 1144 continue 1145 } 1146 1147 ac.mu.Lock() 1148 if ac.state == connectivity.Shutdown { 1149 ac.mu.Unlock() 1150 newTr.Close() 1151 return 1152 } 1153 ac.curAddr = addr 1154 ac.transport = newTr 1155 ac.backoffIdx = 0 1156 1157 hctx, hcancel := context.WithCancel(ac.ctx) 1158 ac.startHealthCheck(hctx) 1159 ac.mu.Unlock() 1160 1161 // Block until the created transport is down. And when this happens, 1162 // we restart from the top of the addr list. 1163 <-reconnect.Done() 1164 hcancel() 1165 // restart connecting - the top of the loop will set state to 1166 // CONNECTING. This is against the current connectivity semantics doc, 1167 // however it allows for graceful behavior for RPCs not yet dispatched 1168 // - unfortunate timing would otherwise lead to the RPC failing even 1169 // though the TRANSIENT_FAILURE state (called for by the doc) would be 1170 // instantaneous. 1171 // 1172 // Ideally we should transition to Idle here and block until there is 1173 // RPC activity that leads to the balancer requesting a reconnect of 1174 // the associated SubConn. 1175 } 1176} 1177 1178// tryAllAddrs tries to creates a connection to the addresses, and stop when at the 1179// first successful one. It returns the transport, the address and a Event in 1180// the successful case. The Event fires when the returned transport disconnects. 1181func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) (transport.ClientTransport, resolver.Address, *grpcsync.Event, error) { 1182 for _, addr := range addrs { 1183 ac.mu.Lock() 1184 if ac.state == connectivity.Shutdown { 1185 ac.mu.Unlock() 1186 return nil, resolver.Address{}, nil, errConnClosing 1187 } 1188 1189 ac.cc.mu.RLock() 1190 ac.dopts.copts.KeepaliveParams = ac.cc.mkp 1191 ac.cc.mu.RUnlock() 1192 1193 copts := ac.dopts.copts 1194 if ac.scopts.CredsBundle != nil { 1195 copts.CredsBundle = ac.scopts.CredsBundle 1196 } 1197 ac.mu.Unlock() 1198 1199 if channelz.IsOn() { 1200 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{ 1201 Desc: fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr), 1202 Severity: channelz.CtINFO, 1203 }) 1204 } 1205 1206 newTr, reconnect, err := ac.createTransport(addr, copts, connectDeadline) 1207 if err == nil { 1208 return newTr, addr, reconnect, nil 1209 } 1210 ac.cc.blockingpicker.updateConnectionError(err) 1211 } 1212 1213 // Couldn't connect to any address. 1214 return nil, resolver.Address{}, nil, fmt.Errorf("couldn't connect to any address") 1215} 1216 1217// createTransport creates a connection to addr. It returns the transport and a 1218// Event in the successful case. The Event fires when the returned transport 1219// disconnects. 1220func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) (transport.ClientTransport, *grpcsync.Event, error) { 1221 prefaceReceived := make(chan struct{}) 1222 onCloseCalled := make(chan struct{}) 1223 reconnect := grpcsync.NewEvent() 1224 1225 authority := ac.cc.authority 1226 // addr.ServerName takes precedent over ClientConn authority, if present. 1227 if addr.ServerName != "" { 1228 authority = addr.ServerName 1229 } 1230 1231 target := transport.TargetInfo{ 1232 Addr: addr.Addr, 1233 Metadata: addr.Metadata, 1234 Authority: authority, 1235 } 1236 1237 once := sync.Once{} 1238 onGoAway := func(r transport.GoAwayReason) { 1239 ac.mu.Lock() 1240 ac.adjustParams(r) 1241 once.Do(func() { 1242 if ac.state == connectivity.Ready { 1243 // Prevent this SubConn from being used for new RPCs by setting its 1244 // state to Connecting. 1245 // 1246 // TODO: this should be Idle when grpc-go properly supports it. 1247 ac.updateConnectivityState(connectivity.Connecting) 1248 } 1249 }) 1250 ac.mu.Unlock() 1251 reconnect.Fire() 1252 } 1253 1254 onClose := func() { 1255 ac.mu.Lock() 1256 once.Do(func() { 1257 if ac.state == connectivity.Ready { 1258 // Prevent this SubConn from being used for new RPCs by setting its 1259 // state to Connecting. 1260 // 1261 // TODO: this should be Idle when grpc-go properly supports it. 1262 ac.updateConnectivityState(connectivity.Connecting) 1263 } 1264 }) 1265 ac.mu.Unlock() 1266 close(onCloseCalled) 1267 reconnect.Fire() 1268 } 1269 1270 onPrefaceReceipt := func() { 1271 close(prefaceReceived) 1272 } 1273 1274 connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline) 1275 defer cancel() 1276 if channelz.IsOn() { 1277 copts.ChannelzParentID = ac.channelzID 1278 } 1279 1280 newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose) 1281 if err != nil { 1282 // newTr is either nil, or closed. 1283 grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err) 1284 return nil, nil, err 1285 } 1286 1287 select { 1288 case <-time.After(connectDeadline.Sub(time.Now())): 1289 // We didn't get the preface in time. 1290 newTr.Close() 1291 grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr) 1292 return nil, nil, errors.New("timed out waiting for server handshake") 1293 case <-prefaceReceived: 1294 // We got the preface - huzzah! things are good. 1295 case <-onCloseCalled: 1296 // The transport has already closed - noop. 1297 return nil, nil, errors.New("connection closed") 1298 // TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix. 1299 } 1300 return newTr, reconnect, nil 1301} 1302 1303// startHealthCheck starts the health checking stream (RPC) to watch the health 1304// stats of this connection if health checking is requested and configured. 1305// 1306// LB channel health checking is enabled when all requirements below are met: 1307// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption 1308// 2. internal.HealthCheckFunc is set by importing the grpc/healthcheck package 1309// 3. a service config with non-empty healthCheckConfig field is provided 1310// 4. the load balancer requests it 1311// 1312// It sets addrConn to READY if the health checking stream is not started. 1313// 1314// Caller must hold ac.mu. 1315func (ac *addrConn) startHealthCheck(ctx context.Context) { 1316 var healthcheckManagingState bool 1317 defer func() { 1318 if !healthcheckManagingState { 1319 ac.updateConnectivityState(connectivity.Ready) 1320 } 1321 }() 1322 1323 if ac.cc.dopts.disableHealthCheck { 1324 return 1325 } 1326 healthCheckConfig := ac.cc.healthCheckConfig() 1327 if healthCheckConfig == nil { 1328 return 1329 } 1330 if !ac.scopts.HealthCheckEnabled { 1331 return 1332 } 1333 healthCheckFunc := ac.cc.dopts.healthCheckFunc 1334 if healthCheckFunc == nil { 1335 // The health package is not imported to set health check function. 1336 // 1337 // TODO: add a link to the health check doc in the error message. 1338 grpclog.Error("Health check is requested but health check function is not set.") 1339 return 1340 } 1341 1342 healthcheckManagingState = true 1343 1344 // Set up the health check helper functions. 1345 currentTr := ac.transport 1346 newStream := func(method string) (interface{}, error) { 1347 ac.mu.Lock() 1348 if ac.transport != currentTr { 1349 ac.mu.Unlock() 1350 return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use") 1351 } 1352 ac.mu.Unlock() 1353 return newNonRetryClientStream(ctx, &StreamDesc{ServerStreams: true}, method, currentTr, ac) 1354 } 1355 setConnectivityState := func(s connectivity.State) { 1356 ac.mu.Lock() 1357 defer ac.mu.Unlock() 1358 if ac.transport != currentTr { 1359 return 1360 } 1361 ac.updateConnectivityState(s) 1362 } 1363 // Start the health checking stream. 1364 go func() { 1365 err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName) 1366 if err != nil { 1367 if status.Code(err) == codes.Unimplemented { 1368 if channelz.IsOn() { 1369 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{ 1370 Desc: "Subchannel health check is unimplemented at server side, thus health check is disabled", 1371 Severity: channelz.CtError, 1372 }) 1373 } 1374 grpclog.Error("Subchannel health check is unimplemented at server side, thus health check is disabled") 1375 } else { 1376 grpclog.Errorf("HealthCheckFunc exits with unexpected error %v", err) 1377 } 1378 } 1379 }() 1380} 1381 1382func (ac *addrConn) resetConnectBackoff() { 1383 ac.mu.Lock() 1384 close(ac.resetBackoff) 1385 ac.backoffIdx = 0 1386 ac.resetBackoff = make(chan struct{}) 1387 ac.mu.Unlock() 1388} 1389 1390// getReadyTransport returns the transport if ac's state is READY. 1391// Otherwise it returns nil, false. 1392// If ac's state is IDLE, it will trigger ac to connect. 1393func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) { 1394 ac.mu.Lock() 1395 if ac.state == connectivity.Ready && ac.transport != nil { 1396 t := ac.transport 1397 ac.mu.Unlock() 1398 return t, true 1399 } 1400 var idle bool 1401 if ac.state == connectivity.Idle { 1402 idle = true 1403 } 1404 ac.mu.Unlock() 1405 // Trigger idle ac to connect. 1406 if idle { 1407 ac.connect() 1408 } 1409 return nil, false 1410} 1411 1412// tearDown starts to tear down the addrConn. 1413// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in 1414// some edge cases (e.g., the caller opens and closes many addrConn's in a 1415// tight loop. 1416// tearDown doesn't remove ac from ac.cc.conns. 1417func (ac *addrConn) tearDown(err error) { 1418 ac.mu.Lock() 1419 if ac.state == connectivity.Shutdown { 1420 ac.mu.Unlock() 1421 return 1422 } 1423 curTr := ac.transport 1424 ac.transport = nil 1425 // We have to set the state to Shutdown before anything else to prevent races 1426 // between setting the state and logic that waits on context cancellation / etc. 1427 ac.updateConnectivityState(connectivity.Shutdown) 1428 ac.cancel() 1429 ac.curAddr = resolver.Address{} 1430 if err == errConnDrain && curTr != nil { 1431 // GracefulClose(...) may be executed multiple times when 1432 // i) receiving multiple GoAway frames from the server; or 1433 // ii) there are concurrent name resolver/Balancer triggered 1434 // address removal and GoAway. 1435 // We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu. 1436 ac.mu.Unlock() 1437 curTr.GracefulClose() 1438 ac.mu.Lock() 1439 } 1440 if channelz.IsOn() { 1441 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{ 1442 Desc: "Subchannel Deleted", 1443 Severity: channelz.CtINFO, 1444 Parent: &channelz.TraceEventDesc{ 1445 Desc: fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID), 1446 Severity: channelz.CtINFO, 1447 }, 1448 }) 1449 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to 1450 // the entity being deleted, and thus prevent it from being deleted right away. 1451 channelz.RemoveEntry(ac.channelzID) 1452 } 1453 ac.mu.Unlock() 1454} 1455 1456func (ac *addrConn) getState() connectivity.State { 1457 ac.mu.Lock() 1458 defer ac.mu.Unlock() 1459 return ac.state 1460} 1461 1462func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric { 1463 ac.mu.Lock() 1464 addr := ac.curAddr.Addr 1465 ac.mu.Unlock() 1466 return &channelz.ChannelInternalMetric{ 1467 State: ac.getState(), 1468 Target: addr, 1469 CallsStarted: atomic.LoadInt64(&ac.czData.callsStarted), 1470 CallsSucceeded: atomic.LoadInt64(&ac.czData.callsSucceeded), 1471 CallsFailed: atomic.LoadInt64(&ac.czData.callsFailed), 1472 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)), 1473 } 1474} 1475 1476func (ac *addrConn) incrCallsStarted() { 1477 atomic.AddInt64(&ac.czData.callsStarted, 1) 1478 atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano()) 1479} 1480 1481func (ac *addrConn) incrCallsSucceeded() { 1482 atomic.AddInt64(&ac.czData.callsSucceeded, 1) 1483} 1484 1485func (ac *addrConn) incrCallsFailed() { 1486 atomic.AddInt64(&ac.czData.callsFailed, 1) 1487} 1488 1489type retryThrottler struct { 1490 max float64 1491 thresh float64 1492 ratio float64 1493 1494 mu sync.Mutex 1495 tokens float64 // TODO(dfawley): replace with atomic and remove lock. 1496} 1497 1498// throttle subtracts a retry token from the pool and returns whether a retry 1499// should be throttled (disallowed) based upon the retry throttling policy in 1500// the service config. 1501func (rt *retryThrottler) throttle() bool { 1502 if rt == nil { 1503 return false 1504 } 1505 rt.mu.Lock() 1506 defer rt.mu.Unlock() 1507 rt.tokens-- 1508 if rt.tokens < 0 { 1509 rt.tokens = 0 1510 } 1511 return rt.tokens <= rt.thresh 1512} 1513 1514func (rt *retryThrottler) successfulRPC() { 1515 if rt == nil { 1516 return 1517 } 1518 rt.mu.Lock() 1519 defer rt.mu.Unlock() 1520 rt.tokens += rt.ratio 1521 if rt.tokens > rt.max { 1522 rt.tokens = rt.max 1523 } 1524} 1525 1526type channelzChannel struct { 1527 cc *ClientConn 1528} 1529 1530func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric { 1531 return c.cc.channelzMetric() 1532} 1533 1534// ErrClientConnTimeout indicates that the ClientConn cannot establish the 1535// underlying connections within the specified timeout. 1536// 1537// Deprecated: This error is never returned by grpc and should not be 1538// referenced by users. 1539var ErrClientConnTimeout = errors.New("grpc: timed out when dialing") 1540