1/* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package grpc 20 21import ( 22 "context" 23 "errors" 24 "fmt" 25 "math" 26 "net" 27 "reflect" 28 "strings" 29 "sync" 30 "sync/atomic" 31 "time" 32 33 "google.golang.org/grpc/balancer" 34 "google.golang.org/grpc/balancer/base" 35 "google.golang.org/grpc/codes" 36 "google.golang.org/grpc/connectivity" 37 "google.golang.org/grpc/credentials" 38 "google.golang.org/grpc/grpclog" 39 "google.golang.org/grpc/internal/backoff" 40 "google.golang.org/grpc/internal/channelz" 41 "google.golang.org/grpc/internal/grpcsync" 42 "google.golang.org/grpc/internal/transport" 43 "google.golang.org/grpc/keepalive" 44 "google.golang.org/grpc/resolver" 45 "google.golang.org/grpc/serviceconfig" 46 "google.golang.org/grpc/status" 47 48 _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin. 49 _ "google.golang.org/grpc/internal/resolver/dns" // To register dns resolver. 50 _ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver. 51) 52 53const ( 54 // minimum time to give a connection to complete 55 minConnectTimeout = 20 * time.Second 56 // must match grpclbName in grpclb/grpclb.go 57 grpclbName = "grpclb" 58) 59 60var ( 61 // ErrClientConnClosing indicates that the operation is illegal because 62 // the ClientConn is closing. 63 // 64 // Deprecated: this error should not be relied upon by users; use the status 65 // code of Canceled instead. 66 ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing") 67 // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs. 68 errConnDrain = errors.New("grpc: the connection is drained") 69 // errConnClosing indicates that the connection is closing. 70 errConnClosing = errors.New("grpc: the connection is closing") 71 // errBalancerClosed indicates that the balancer is closed. 72 errBalancerClosed = errors.New("grpc: balancer is closed") 73 // invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default 74 // service config. 75 invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid" 76) 77 78// The following errors are returned from Dial and DialContext 79var ( 80 // errNoTransportSecurity indicates that there is no transport security 81 // being set for ClientConn. Users should either set one or explicitly 82 // call WithInsecure DialOption to disable security. 83 errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)") 84 // errTransportCredsAndBundle indicates that creds bundle is used together 85 // with other individual Transport Credentials. 86 errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials") 87 // errTransportCredentialsMissing indicates that users want to transmit security 88 // information (e.g., OAuth2 token) which requires secure connection on an insecure 89 // connection. 90 errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)") 91 // errCredentialsConflict indicates that grpc.WithTransportCredentials() 92 // and grpc.WithInsecure() are both called for a connection. 93 errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)") 94) 95 96const ( 97 defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4 98 defaultClientMaxSendMessageSize = math.MaxInt32 99 // http2IOBufSize specifies the buffer size for sending frames. 100 defaultWriteBufSize = 32 * 1024 101 defaultReadBufSize = 32 * 1024 102) 103 104// Dial creates a client connection to the given target. 105func Dial(target string, opts ...DialOption) (*ClientConn, error) { 106 return DialContext(context.Background(), target, opts...) 107} 108 109// DialContext creates a client connection to the given target. By default, it's 110// a non-blocking dial (the function won't wait for connections to be 111// established, and connecting happens in the background). To make it a blocking 112// dial, use WithBlock() dial option. 113// 114// In the non-blocking case, the ctx does not act against the connection. It 115// only controls the setup steps. 116// 117// In the blocking case, ctx can be used to cancel or expire the pending 118// connection. Once this function returns, the cancellation and expiration of 119// ctx will be noop. Users should call ClientConn.Close to terminate all the 120// pending operations after this function returns. 121// 122// The target name syntax is defined in 123// https://github.com/grpc/grpc/blob/master/doc/naming.md. 124// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target. 125func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) { 126 cc := &ClientConn{ 127 target: target, 128 csMgr: &connectivityStateManager{}, 129 conns: make(map[*addrConn]struct{}), 130 dopts: defaultDialOptions(), 131 blockingpicker: newPickerWrapper(), 132 czData: new(channelzData), 133 firstResolveEvent: grpcsync.NewEvent(), 134 } 135 cc.retryThrottler.Store((*retryThrottler)(nil)) 136 cc.ctx, cc.cancel = context.WithCancel(context.Background()) 137 138 for _, opt := range opts { 139 opt.apply(&cc.dopts) 140 } 141 142 chainUnaryClientInterceptors(cc) 143 chainStreamClientInterceptors(cc) 144 145 defer func() { 146 if err != nil { 147 cc.Close() 148 } 149 }() 150 151 if channelz.IsOn() { 152 if cc.dopts.channelzParentID != 0 { 153 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target) 154 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{ 155 Desc: "Channel Created", 156 Severity: channelz.CtINFO, 157 Parent: &channelz.TraceEventDesc{ 158 Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID), 159 Severity: channelz.CtINFO, 160 }, 161 }) 162 } else { 163 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target) 164 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{ 165 Desc: "Channel Created", 166 Severity: channelz.CtINFO, 167 }) 168 } 169 cc.csMgr.channelzID = cc.channelzID 170 } 171 172 if !cc.dopts.insecure { 173 if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil { 174 return nil, errNoTransportSecurity 175 } 176 if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil { 177 return nil, errTransportCredsAndBundle 178 } 179 } else { 180 if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil { 181 return nil, errCredentialsConflict 182 } 183 for _, cd := range cc.dopts.copts.PerRPCCredentials { 184 if cd.RequireTransportSecurity() { 185 return nil, errTransportCredentialsMissing 186 } 187 } 188 } 189 190 if cc.dopts.defaultServiceConfigRawJSON != nil { 191 scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON) 192 if scpr.Err != nil { 193 return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err) 194 } 195 cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig) 196 } 197 cc.mkp = cc.dopts.copts.KeepaliveParams 198 199 if cc.dopts.copts.Dialer == nil { 200 cc.dopts.copts.Dialer = newProxyDialer( 201 func(ctx context.Context, addr string) (net.Conn, error) { 202 network, addr := parseDialTarget(addr) 203 return (&net.Dialer{}).DialContext(ctx, network, addr) 204 }, 205 ) 206 } 207 208 if cc.dopts.copts.UserAgent != "" { 209 cc.dopts.copts.UserAgent += " " + grpcUA 210 } else { 211 cc.dopts.copts.UserAgent = grpcUA 212 } 213 214 if cc.dopts.timeout > 0 { 215 var cancel context.CancelFunc 216 ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout) 217 defer cancel() 218 } 219 defer func() { 220 select { 221 case <-ctx.Done(): 222 conn, err = nil, ctx.Err() 223 default: 224 } 225 }() 226 227 scSet := false 228 if cc.dopts.scChan != nil { 229 // Try to get an initial service config. 230 select { 231 case sc, ok := <-cc.dopts.scChan: 232 if ok { 233 cc.sc = &sc 234 scSet = true 235 } 236 default: 237 } 238 } 239 if cc.dopts.bs == nil { 240 cc.dopts.bs = backoff.DefaultExponential 241 } 242 if cc.dopts.resolverBuilder == nil { 243 // Only try to parse target when resolver builder is not already set. 244 cc.parsedTarget = parseTarget(cc.target) 245 grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme) 246 cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme) 247 if cc.dopts.resolverBuilder == nil { 248 // If resolver builder is still nil, the parsed target's scheme is 249 // not registered. Fallback to default resolver and set Endpoint to 250 // the original target. 251 grpclog.Infof("scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme) 252 cc.parsedTarget = resolver.Target{ 253 Scheme: resolver.GetDefaultScheme(), 254 Endpoint: target, 255 } 256 cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme) 257 } 258 } else { 259 cc.parsedTarget = resolver.Target{Endpoint: target} 260 } 261 creds := cc.dopts.copts.TransportCredentials 262 if creds != nil && creds.Info().ServerName != "" { 263 cc.authority = creds.Info().ServerName 264 } else if cc.dopts.insecure && cc.dopts.authority != "" { 265 cc.authority = cc.dopts.authority 266 } else { 267 // Use endpoint from "scheme://authority/endpoint" as the default 268 // authority for ClientConn. 269 cc.authority = cc.parsedTarget.Endpoint 270 } 271 272 if cc.dopts.scChan != nil && !scSet { 273 // Blocking wait for the initial service config. 274 select { 275 case sc, ok := <-cc.dopts.scChan: 276 if ok { 277 cc.sc = &sc 278 } 279 case <-ctx.Done(): 280 return nil, ctx.Err() 281 } 282 } 283 if cc.dopts.scChan != nil { 284 go cc.scWatcher() 285 } 286 287 var credsClone credentials.TransportCredentials 288 if creds := cc.dopts.copts.TransportCredentials; creds != nil { 289 credsClone = creds.Clone() 290 } 291 cc.balancerBuildOpts = balancer.BuildOptions{ 292 DialCreds: credsClone, 293 CredsBundle: cc.dopts.copts.CredsBundle, 294 Dialer: cc.dopts.copts.Dialer, 295 ChannelzParentID: cc.channelzID, 296 Target: cc.parsedTarget, 297 } 298 299 // Build the resolver. 300 rWrapper, err := newCCResolverWrapper(cc) 301 if err != nil { 302 return nil, fmt.Errorf("failed to build resolver: %v", err) 303 } 304 305 cc.mu.Lock() 306 cc.resolverWrapper = rWrapper 307 cc.mu.Unlock() 308 // A blocking dial blocks until the clientConn is ready. 309 if cc.dopts.block { 310 for { 311 s := cc.GetState() 312 if s == connectivity.Ready { 313 break 314 } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure { 315 if err = cc.blockingpicker.connectionError(); err != nil { 316 terr, ok := err.(interface { 317 Temporary() bool 318 }) 319 if ok && !terr.Temporary() { 320 return nil, err 321 } 322 } 323 } 324 if !cc.WaitForStateChange(ctx, s) { 325 // ctx got timeout or canceled. 326 return nil, ctx.Err() 327 } 328 } 329 } 330 331 return cc, nil 332} 333 334// chainUnaryClientInterceptors chains all unary client interceptors into one. 335func chainUnaryClientInterceptors(cc *ClientConn) { 336 interceptors := cc.dopts.chainUnaryInts 337 // Prepend dopts.unaryInt to the chaining interceptors if it exists, since unaryInt will 338 // be executed before any other chained interceptors. 339 if cc.dopts.unaryInt != nil { 340 interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...) 341 } 342 var chainedInt UnaryClientInterceptor 343 if len(interceptors) == 0 { 344 chainedInt = nil 345 } else if len(interceptors) == 1 { 346 chainedInt = interceptors[0] 347 } else { 348 chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error { 349 return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...) 350 } 351 } 352 cc.dopts.unaryInt = chainedInt 353} 354 355// getChainUnaryInvoker recursively generate the chained unary invoker. 356func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, finalInvoker UnaryInvoker) UnaryInvoker { 357 if curr == len(interceptors)-1 { 358 return finalInvoker 359 } 360 return func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error { 361 return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...) 362 } 363} 364 365// chainStreamClientInterceptors chains all stream client interceptors into one. 366func chainStreamClientInterceptors(cc *ClientConn) { 367 interceptors := cc.dopts.chainStreamInts 368 // Prepend dopts.streamInt to the chaining interceptors if it exists, since streamInt will 369 // be executed before any other chained interceptors. 370 if cc.dopts.streamInt != nil { 371 interceptors = append([]StreamClientInterceptor{cc.dopts.streamInt}, interceptors...) 372 } 373 var chainedInt StreamClientInterceptor 374 if len(interceptors) == 0 { 375 chainedInt = nil 376 } else if len(interceptors) == 1 { 377 chainedInt = interceptors[0] 378 } else { 379 chainedInt = func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error) { 380 return interceptors[0](ctx, desc, cc, method, getChainStreamer(interceptors, 0, streamer), opts...) 381 } 382 } 383 cc.dopts.streamInt = chainedInt 384} 385 386// getChainStreamer recursively generate the chained client stream constructor. 387func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStreamer Streamer) Streamer { 388 if curr == len(interceptors)-1 { 389 return finalStreamer 390 } 391 return func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) { 392 return interceptors[curr+1](ctx, desc, cc, method, getChainStreamer(interceptors, curr+1, finalStreamer), opts...) 393 } 394} 395 396// connectivityStateManager keeps the connectivity.State of ClientConn. 397// This struct will eventually be exported so the balancers can access it. 398type connectivityStateManager struct { 399 mu sync.Mutex 400 state connectivity.State 401 notifyChan chan struct{} 402 channelzID int64 403} 404 405// updateState updates the connectivity.State of ClientConn. 406// If there's a change it notifies goroutines waiting on state change to 407// happen. 408func (csm *connectivityStateManager) updateState(state connectivity.State) { 409 csm.mu.Lock() 410 defer csm.mu.Unlock() 411 if csm.state == connectivity.Shutdown { 412 return 413 } 414 if csm.state == state { 415 return 416 } 417 csm.state = state 418 if channelz.IsOn() { 419 channelz.AddTraceEvent(csm.channelzID, &channelz.TraceEventDesc{ 420 Desc: fmt.Sprintf("Channel Connectivity change to %v", state), 421 Severity: channelz.CtINFO, 422 }) 423 } 424 if csm.notifyChan != nil { 425 // There are other goroutines waiting on this channel. 426 close(csm.notifyChan) 427 csm.notifyChan = nil 428 } 429} 430 431func (csm *connectivityStateManager) getState() connectivity.State { 432 csm.mu.Lock() 433 defer csm.mu.Unlock() 434 return csm.state 435} 436 437func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} { 438 csm.mu.Lock() 439 defer csm.mu.Unlock() 440 if csm.notifyChan == nil { 441 csm.notifyChan = make(chan struct{}) 442 } 443 return csm.notifyChan 444} 445 446// ClientConn represents a virtual connection to a conceptual endpoint, to 447// perform RPCs. 448// 449// A ClientConn is free to have zero or more actual connections to the endpoint 450// based on configuration, load, etc. It is also free to determine which actual 451// endpoints to use and may change it every RPC, permitting client-side load 452// balancing. 453// 454// A ClientConn encapsulates a range of functionality including name 455// resolution, TCP connection establishment (with retries and backoff) and TLS 456// handshakes. It also handles errors on established connections by 457// re-resolving the name and reconnecting. 458type ClientConn struct { 459 ctx context.Context 460 cancel context.CancelFunc 461 462 target string 463 parsedTarget resolver.Target 464 authority string 465 dopts dialOptions 466 csMgr *connectivityStateManager 467 468 balancerBuildOpts balancer.BuildOptions 469 blockingpicker *pickerWrapper 470 471 mu sync.RWMutex 472 resolverWrapper *ccResolverWrapper 473 sc *ServiceConfig 474 conns map[*addrConn]struct{} 475 // Keepalive parameter can be updated if a GoAway is received. 476 mkp keepalive.ClientParameters 477 curBalancerName string 478 balancerWrapper *ccBalancerWrapper 479 retryThrottler atomic.Value 480 481 firstResolveEvent *grpcsync.Event 482 483 channelzID int64 // channelz unique identification number 484 czData *channelzData 485} 486 487// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or 488// ctx expires. A true value is returned in former case and false in latter. 489// This is an EXPERIMENTAL API. 490func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool { 491 ch := cc.csMgr.getNotifyChan() 492 if cc.csMgr.getState() != sourceState { 493 return true 494 } 495 select { 496 case <-ctx.Done(): 497 return false 498 case <-ch: 499 return true 500 } 501} 502 503// GetState returns the connectivity.State of ClientConn. 504// This is an EXPERIMENTAL API. 505func (cc *ClientConn) GetState() connectivity.State { 506 return cc.csMgr.getState() 507} 508 509func (cc *ClientConn) scWatcher() { 510 for { 511 select { 512 case sc, ok := <-cc.dopts.scChan: 513 if !ok { 514 return 515 } 516 cc.mu.Lock() 517 // TODO: load balance policy runtime change is ignored. 518 // We may revisit this decision in the future. 519 cc.sc = &sc 520 cc.mu.Unlock() 521 case <-cc.ctx.Done(): 522 return 523 } 524 } 525} 526 527// waitForResolvedAddrs blocks until the resolver has provided addresses or the 528// context expires. Returns nil unless the context expires first; otherwise 529// returns a status error based on the context. 530func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error { 531 // This is on the RPC path, so we use a fast path to avoid the 532 // more-expensive "select" below after the resolver has returned once. 533 if cc.firstResolveEvent.HasFired() { 534 return nil 535 } 536 select { 537 case <-cc.firstResolveEvent.Done(): 538 return nil 539 case <-ctx.Done(): 540 return status.FromContextError(ctx.Err()).Err() 541 case <-cc.ctx.Done(): 542 return ErrClientConnClosing 543 } 544} 545 546var emptyServiceConfig *ServiceConfig 547 548func init() { 549 cfg := parseServiceConfig("{}") 550 if cfg.Err != nil { 551 panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err)) 552 } 553 emptyServiceConfig = cfg.Config.(*ServiceConfig) 554} 555 556func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) { 557 if cc.sc != nil { 558 cc.applyServiceConfigAndBalancer(cc.sc, addrs) 559 return 560 } 561 if cc.dopts.defaultServiceConfig != nil { 562 cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, addrs) 563 } else { 564 cc.applyServiceConfigAndBalancer(emptyServiceConfig, addrs) 565 } 566} 567 568func (cc *ClientConn) updateResolverState(s resolver.State, err error) error { 569 defer cc.firstResolveEvent.Fire() 570 cc.mu.Lock() 571 // Check if the ClientConn is already closed. Some fields (e.g. 572 // balancerWrapper) are set to nil when closing the ClientConn, and could 573 // cause nil pointer panic if we don't have this check. 574 if cc.conns == nil { 575 cc.mu.Unlock() 576 return nil 577 } 578 579 if err != nil { 580 // May need to apply the initial service config in case the resolver 581 // doesn't support service configs, or doesn't provide a service config 582 // with the new addresses. 583 cc.maybeApplyDefaultServiceConfig(nil) 584 585 if cc.balancerWrapper != nil { 586 cc.balancerWrapper.resolverError(err) 587 } 588 589 // No addresses are valid with err set; return early. 590 cc.mu.Unlock() 591 return balancer.ErrBadResolverState 592 } 593 594 var ret error 595 if cc.dopts.disableServiceConfig || s.ServiceConfig == nil { 596 cc.maybeApplyDefaultServiceConfig(s.Addresses) 597 // TODO: do we need to apply a failing LB policy if there is no 598 // default, per the error handling design? 599 } else { 600 if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok { 601 cc.applyServiceConfigAndBalancer(sc, s.Addresses) 602 } else { 603 ret = balancer.ErrBadResolverState 604 if cc.balancerWrapper == nil { 605 var err error 606 if s.ServiceConfig.Err != nil { 607 err = status.Errorf(codes.Unavailable, "error parsing service config: %v", s.ServiceConfig.Err) 608 } else { 609 err = status.Errorf(codes.Unavailable, "illegal service config type: %T", s.ServiceConfig.Config) 610 } 611 cc.blockingpicker.updatePicker(base.NewErrPicker(err)) 612 cc.csMgr.updateState(connectivity.TransientFailure) 613 cc.mu.Unlock() 614 return ret 615 } 616 } 617 } 618 619 var balCfg serviceconfig.LoadBalancingConfig 620 if cc.dopts.balancerBuilder == nil && cc.sc != nil && cc.sc.lbConfig != nil { 621 balCfg = cc.sc.lbConfig.cfg 622 } 623 624 cbn := cc.curBalancerName 625 bw := cc.balancerWrapper 626 cc.mu.Unlock() 627 if cbn != grpclbName { 628 // Filter any grpclb addresses since we don't have the grpclb balancer. 629 for i := 0; i < len(s.Addresses); { 630 if s.Addresses[i].Type == resolver.GRPCLB { 631 copy(s.Addresses[i:], s.Addresses[i+1:]) 632 s.Addresses = s.Addresses[:len(s.Addresses)-1] 633 continue 634 } 635 i++ 636 } 637 } 638 uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg}) 639 if ret == nil { 640 ret = uccsErr // prefer ErrBadResolver state since any other error is 641 // currently meaningless to the caller. 642 } 643 return ret 644} 645 646// switchBalancer starts the switching from current balancer to the balancer 647// with the given name. 648// 649// It will NOT send the current address list to the new balancer. If needed, 650// caller of this function should send address list to the new balancer after 651// this function returns. 652// 653// Caller must hold cc.mu. 654func (cc *ClientConn) switchBalancer(name string) { 655 if strings.EqualFold(cc.curBalancerName, name) { 656 return 657 } 658 659 grpclog.Infof("ClientConn switching balancer to %q", name) 660 if cc.dopts.balancerBuilder != nil { 661 grpclog.Infoln("ignoring balancer switching: Balancer DialOption used instead") 662 return 663 } 664 if cc.balancerWrapper != nil { 665 cc.balancerWrapper.close() 666 } 667 668 builder := balancer.Get(name) 669 if channelz.IsOn() { 670 if builder == nil { 671 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{ 672 Desc: fmt.Sprintf("Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName), 673 Severity: channelz.CtWarning, 674 }) 675 } else { 676 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{ 677 Desc: fmt.Sprintf("Channel switches to new LB policy %q", name), 678 Severity: channelz.CtINFO, 679 }) 680 } 681 } 682 if builder == nil { 683 grpclog.Infof("failed to get balancer builder for: %v, using pick_first instead", name) 684 builder = newPickfirstBuilder() 685 } 686 687 cc.curBalancerName = builder.Name() 688 cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts) 689} 690 691func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) { 692 cc.mu.Lock() 693 if cc.conns == nil { 694 cc.mu.Unlock() 695 return 696 } 697 // TODO(bar switching) send updates to all balancer wrappers when balancer 698 // gracefully switching is supported. 699 cc.balancerWrapper.handleSubConnStateChange(sc, s, err) 700 cc.mu.Unlock() 701} 702 703// newAddrConn creates an addrConn for addrs and adds it to cc.conns. 704// 705// Caller needs to make sure len(addrs) > 0. 706func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) { 707 ac := &addrConn{ 708 cc: cc, 709 addrs: addrs, 710 scopts: opts, 711 dopts: cc.dopts, 712 czData: new(channelzData), 713 resetBackoff: make(chan struct{}), 714 } 715 ac.ctx, ac.cancel = context.WithCancel(cc.ctx) 716 // Track ac in cc. This needs to be done before any getTransport(...) is called. 717 cc.mu.Lock() 718 if cc.conns == nil { 719 cc.mu.Unlock() 720 return nil, ErrClientConnClosing 721 } 722 if channelz.IsOn() { 723 ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "") 724 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{ 725 Desc: "Subchannel Created", 726 Severity: channelz.CtINFO, 727 Parent: &channelz.TraceEventDesc{ 728 Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID), 729 Severity: channelz.CtINFO, 730 }, 731 }) 732 } 733 cc.conns[ac] = struct{}{} 734 cc.mu.Unlock() 735 return ac, nil 736} 737 738// removeAddrConn removes the addrConn in the subConn from clientConn. 739// It also tears down the ac with the given error. 740func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) { 741 cc.mu.Lock() 742 if cc.conns == nil { 743 cc.mu.Unlock() 744 return 745 } 746 delete(cc.conns, ac) 747 cc.mu.Unlock() 748 ac.tearDown(err) 749} 750 751func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric { 752 return &channelz.ChannelInternalMetric{ 753 State: cc.GetState(), 754 Target: cc.target, 755 CallsStarted: atomic.LoadInt64(&cc.czData.callsStarted), 756 CallsSucceeded: atomic.LoadInt64(&cc.czData.callsSucceeded), 757 CallsFailed: atomic.LoadInt64(&cc.czData.callsFailed), 758 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)), 759 } 760} 761 762// Target returns the target string of the ClientConn. 763// This is an EXPERIMENTAL API. 764func (cc *ClientConn) Target() string { 765 return cc.target 766} 767 768func (cc *ClientConn) incrCallsStarted() { 769 atomic.AddInt64(&cc.czData.callsStarted, 1) 770 atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano()) 771} 772 773func (cc *ClientConn) incrCallsSucceeded() { 774 atomic.AddInt64(&cc.czData.callsSucceeded, 1) 775} 776 777func (cc *ClientConn) incrCallsFailed() { 778 atomic.AddInt64(&cc.czData.callsFailed, 1) 779} 780 781// connect starts creating a transport. 782// It does nothing if the ac is not IDLE. 783// TODO(bar) Move this to the addrConn section. 784func (ac *addrConn) connect() error { 785 ac.mu.Lock() 786 if ac.state == connectivity.Shutdown { 787 ac.mu.Unlock() 788 return errConnClosing 789 } 790 if ac.state != connectivity.Idle { 791 ac.mu.Unlock() 792 return nil 793 } 794 // Update connectivity state within the lock to prevent subsequent or 795 // concurrent calls from resetting the transport more than once. 796 ac.updateConnectivityState(connectivity.Connecting, nil) 797 ac.mu.Unlock() 798 799 // Start a goroutine connecting to the server asynchronously. 800 go ac.resetTransport() 801 return nil 802} 803 804// tryUpdateAddrs tries to update ac.addrs with the new addresses list. 805// 806// If ac is Connecting, it returns false. The caller should tear down the ac and 807// create a new one. Note that the backoff will be reset when this happens. 808// 809// If ac is TransientFailure, it updates ac.addrs and returns true. The updated 810// addresses will be picked up by retry in the next iteration after backoff. 811// 812// If ac is Shutdown or Idle, it updates ac.addrs and returns true. 813// 814// If ac is Ready, it checks whether current connected address of ac is in the 815// new addrs list. 816// - If true, it updates ac.addrs and returns true. The ac will keep using 817// the existing connection. 818// - If false, it does nothing and returns false. 819func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool { 820 ac.mu.Lock() 821 defer ac.mu.Unlock() 822 grpclog.Infof("addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs) 823 if ac.state == connectivity.Shutdown || 824 ac.state == connectivity.TransientFailure || 825 ac.state == connectivity.Idle { 826 ac.addrs = addrs 827 return true 828 } 829 830 if ac.state == connectivity.Connecting { 831 return false 832 } 833 834 // ac.state is Ready, try to find the connected address. 835 var curAddrFound bool 836 for _, a := range addrs { 837 if reflect.DeepEqual(ac.curAddr, a) { 838 curAddrFound = true 839 break 840 } 841 } 842 grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound) 843 if curAddrFound { 844 ac.addrs = addrs 845 } 846 847 return curAddrFound 848} 849 850// GetMethodConfig gets the method config of the input method. 851// If there's an exact match for input method (i.e. /service/method), we return 852// the corresponding MethodConfig. 853// If there isn't an exact match for the input method, we look for the default config 854// under the service (i.e /service/). If there is a default MethodConfig for 855// the service, we return it. 856// Otherwise, we return an empty MethodConfig. 857func (cc *ClientConn) GetMethodConfig(method string) MethodConfig { 858 // TODO: Avoid the locking here. 859 cc.mu.RLock() 860 defer cc.mu.RUnlock() 861 if cc.sc == nil { 862 return MethodConfig{} 863 } 864 m, ok := cc.sc.Methods[method] 865 if !ok { 866 i := strings.LastIndex(method, "/") 867 m = cc.sc.Methods[method[:i+1]] 868 } 869 return m 870} 871 872func (cc *ClientConn) healthCheckConfig() *healthCheckConfig { 873 cc.mu.RLock() 874 defer cc.mu.RUnlock() 875 if cc.sc == nil { 876 return nil 877 } 878 return cc.sc.healthCheckConfig 879} 880 881func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) { 882 t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{ 883 Ctx: ctx, 884 FullMethodName: method, 885 }) 886 if err != nil { 887 return nil, nil, toRPCErr(err) 888 } 889 return t, done, nil 890} 891 892func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, addrs []resolver.Address) { 893 if sc == nil { 894 // should never reach here. 895 return 896 } 897 cc.sc = sc 898 899 if cc.sc.retryThrottling != nil { 900 newThrottler := &retryThrottler{ 901 tokens: cc.sc.retryThrottling.MaxTokens, 902 max: cc.sc.retryThrottling.MaxTokens, 903 thresh: cc.sc.retryThrottling.MaxTokens / 2, 904 ratio: cc.sc.retryThrottling.TokenRatio, 905 } 906 cc.retryThrottler.Store(newThrottler) 907 } else { 908 cc.retryThrottler.Store((*retryThrottler)(nil)) 909 } 910 911 if cc.dopts.balancerBuilder == nil { 912 // Only look at balancer types and switch balancer if balancer dial 913 // option is not set. 914 var newBalancerName string 915 if cc.sc != nil && cc.sc.lbConfig != nil { 916 newBalancerName = cc.sc.lbConfig.name 917 } else { 918 var isGRPCLB bool 919 for _, a := range addrs { 920 if a.Type == resolver.GRPCLB { 921 isGRPCLB = true 922 break 923 } 924 } 925 if isGRPCLB { 926 newBalancerName = grpclbName 927 } else if cc.sc != nil && cc.sc.LB != nil { 928 newBalancerName = *cc.sc.LB 929 } else { 930 newBalancerName = PickFirstBalancerName 931 } 932 } 933 cc.switchBalancer(newBalancerName) 934 } else if cc.balancerWrapper == nil { 935 // Balancer dial option was set, and this is the first time handling 936 // resolved addresses. Build a balancer with dopts.balancerBuilder. 937 cc.curBalancerName = cc.dopts.balancerBuilder.Name() 938 cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts) 939 } 940} 941 942func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) { 943 cc.mu.RLock() 944 r := cc.resolverWrapper 945 cc.mu.RUnlock() 946 if r == nil { 947 return 948 } 949 go r.resolveNow(o) 950} 951 952// ResetConnectBackoff wakes up all subchannels in transient failure and causes 953// them to attempt another connection immediately. It also resets the backoff 954// times used for subsequent attempts regardless of the current state. 955// 956// In general, this function should not be used. Typical service or network 957// outages result in a reasonable client reconnection strategy by default. 958// However, if a previously unavailable network becomes available, this may be 959// used to trigger an immediate reconnect. 960// 961// This API is EXPERIMENTAL. 962func (cc *ClientConn) ResetConnectBackoff() { 963 cc.mu.Lock() 964 conns := cc.conns 965 cc.mu.Unlock() 966 for ac := range conns { 967 ac.resetConnectBackoff() 968 } 969} 970 971// Close tears down the ClientConn and all underlying connections. 972func (cc *ClientConn) Close() error { 973 defer cc.cancel() 974 975 cc.mu.Lock() 976 if cc.conns == nil { 977 cc.mu.Unlock() 978 return ErrClientConnClosing 979 } 980 conns := cc.conns 981 cc.conns = nil 982 cc.csMgr.updateState(connectivity.Shutdown) 983 984 rWrapper := cc.resolverWrapper 985 cc.resolverWrapper = nil 986 bWrapper := cc.balancerWrapper 987 cc.balancerWrapper = nil 988 cc.mu.Unlock() 989 990 cc.blockingpicker.close() 991 992 if rWrapper != nil { 993 rWrapper.close() 994 } 995 if bWrapper != nil { 996 bWrapper.close() 997 } 998 999 for ac := range conns { 1000 ac.tearDown(ErrClientConnClosing) 1001 } 1002 if channelz.IsOn() { 1003 ted := &channelz.TraceEventDesc{ 1004 Desc: "Channel Deleted", 1005 Severity: channelz.CtINFO, 1006 } 1007 if cc.dopts.channelzParentID != 0 { 1008 ted.Parent = &channelz.TraceEventDesc{ 1009 Desc: fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID), 1010 Severity: channelz.CtINFO, 1011 } 1012 } 1013 channelz.AddTraceEvent(cc.channelzID, ted) 1014 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to 1015 // the entity being deleted, and thus prevent it from being deleted right away. 1016 channelz.RemoveEntry(cc.channelzID) 1017 } 1018 return nil 1019} 1020 1021// addrConn is a network connection to a given address. 1022type addrConn struct { 1023 ctx context.Context 1024 cancel context.CancelFunc 1025 1026 cc *ClientConn 1027 dopts dialOptions 1028 acbw balancer.SubConn 1029 scopts balancer.NewSubConnOptions 1030 1031 // transport is set when there's a viable transport (note: ac state may not be READY as LB channel 1032 // health checking may require server to report healthy to set ac to READY), and is reset 1033 // to nil when the current transport should no longer be used to create a stream (e.g. after GoAway 1034 // is received, transport is closed, ac has been torn down). 1035 transport transport.ClientTransport // The current transport. 1036 1037 mu sync.Mutex 1038 curAddr resolver.Address // The current address. 1039 addrs []resolver.Address // All addresses that the resolver resolved to. 1040 1041 // Use updateConnectivityState for updating addrConn's connectivity state. 1042 state connectivity.State 1043 1044 backoffIdx int // Needs to be stateful for resetConnectBackoff. 1045 resetBackoff chan struct{} 1046 1047 channelzID int64 // channelz unique identification number. 1048 czData *channelzData 1049} 1050 1051// Note: this requires a lock on ac.mu. 1052func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) { 1053 if ac.state == s { 1054 return 1055 } 1056 1057 updateMsg := fmt.Sprintf("Subchannel Connectivity change to %v", s) 1058 ac.state = s 1059 if channelz.IsOn() { 1060 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{ 1061 Desc: updateMsg, 1062 Severity: channelz.CtINFO, 1063 }) 1064 } 1065 ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr) 1066} 1067 1068// adjustParams updates parameters used to create transports upon 1069// receiving a GoAway. 1070func (ac *addrConn) adjustParams(r transport.GoAwayReason) { 1071 switch r { 1072 case transport.GoAwayTooManyPings: 1073 v := 2 * ac.dopts.copts.KeepaliveParams.Time 1074 ac.cc.mu.Lock() 1075 if v > ac.cc.mkp.Time { 1076 ac.cc.mkp.Time = v 1077 } 1078 ac.cc.mu.Unlock() 1079 } 1080} 1081 1082func (ac *addrConn) resetTransport() { 1083 for i := 0; ; i++ { 1084 if i > 0 { 1085 ac.cc.resolveNow(resolver.ResolveNowOptions{}) 1086 } 1087 1088 ac.mu.Lock() 1089 if ac.state == connectivity.Shutdown { 1090 ac.mu.Unlock() 1091 return 1092 } 1093 1094 addrs := ac.addrs 1095 backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx) 1096 // This will be the duration that dial gets to finish. 1097 dialDuration := minConnectTimeout 1098 if ac.dopts.minConnectTimeout != nil { 1099 dialDuration = ac.dopts.minConnectTimeout() 1100 } 1101 1102 if dialDuration < backoffFor { 1103 // Give dial more time as we keep failing to connect. 1104 dialDuration = backoffFor 1105 } 1106 // We can potentially spend all the time trying the first address, and 1107 // if the server accepts the connection and then hangs, the following 1108 // addresses will never be tried. 1109 // 1110 // The spec doesn't mention what should be done for multiple addresses. 1111 // https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm 1112 connectDeadline := time.Now().Add(dialDuration) 1113 1114 ac.updateConnectivityState(connectivity.Connecting, nil) 1115 ac.transport = nil 1116 ac.mu.Unlock() 1117 1118 newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline) 1119 if err != nil { 1120 // After exhausting all addresses, the addrConn enters 1121 // TRANSIENT_FAILURE. 1122 ac.mu.Lock() 1123 if ac.state == connectivity.Shutdown { 1124 ac.mu.Unlock() 1125 return 1126 } 1127 ac.updateConnectivityState(connectivity.TransientFailure, err) 1128 1129 // Backoff. 1130 b := ac.resetBackoff 1131 ac.mu.Unlock() 1132 1133 timer := time.NewTimer(backoffFor) 1134 select { 1135 case <-timer.C: 1136 ac.mu.Lock() 1137 ac.backoffIdx++ 1138 ac.mu.Unlock() 1139 case <-b: 1140 timer.Stop() 1141 case <-ac.ctx.Done(): 1142 timer.Stop() 1143 return 1144 } 1145 continue 1146 } 1147 1148 ac.mu.Lock() 1149 if ac.state == connectivity.Shutdown { 1150 ac.mu.Unlock() 1151 newTr.Close() 1152 return 1153 } 1154 ac.curAddr = addr 1155 ac.transport = newTr 1156 ac.backoffIdx = 0 1157 1158 hctx, hcancel := context.WithCancel(ac.ctx) 1159 ac.startHealthCheck(hctx) 1160 ac.mu.Unlock() 1161 1162 // Block until the created transport is down. And when this happens, 1163 // we restart from the top of the addr list. 1164 <-reconnect.Done() 1165 hcancel() 1166 // restart connecting - the top of the loop will set state to 1167 // CONNECTING. This is against the current connectivity semantics doc, 1168 // however it allows for graceful behavior for RPCs not yet dispatched 1169 // - unfortunate timing would otherwise lead to the RPC failing even 1170 // though the TRANSIENT_FAILURE state (called for by the doc) would be 1171 // instantaneous. 1172 // 1173 // Ideally we should transition to Idle here and block until there is 1174 // RPC activity that leads to the balancer requesting a reconnect of 1175 // the associated SubConn. 1176 } 1177} 1178 1179// tryAllAddrs tries to creates a connection to the addresses, and stop when at the 1180// first successful one. It returns the transport, the address and a Event in 1181// the successful case. The Event fires when the returned transport disconnects. 1182func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) (transport.ClientTransport, resolver.Address, *grpcsync.Event, error) { 1183 var firstConnErr error 1184 for _, addr := range addrs { 1185 ac.mu.Lock() 1186 if ac.state == connectivity.Shutdown { 1187 ac.mu.Unlock() 1188 return nil, resolver.Address{}, nil, errConnClosing 1189 } 1190 1191 ac.cc.mu.RLock() 1192 ac.dopts.copts.KeepaliveParams = ac.cc.mkp 1193 ac.cc.mu.RUnlock() 1194 1195 copts := ac.dopts.copts 1196 if ac.scopts.CredsBundle != nil { 1197 copts.CredsBundle = ac.scopts.CredsBundle 1198 } 1199 ac.mu.Unlock() 1200 1201 if channelz.IsOn() { 1202 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{ 1203 Desc: fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr), 1204 Severity: channelz.CtINFO, 1205 }) 1206 } 1207 1208 newTr, reconnect, err := ac.createTransport(addr, copts, connectDeadline) 1209 if err == nil { 1210 return newTr, addr, reconnect, nil 1211 } 1212 if firstConnErr == nil { 1213 firstConnErr = err 1214 } 1215 ac.cc.blockingpicker.updateConnectionError(err) 1216 } 1217 1218 // Couldn't connect to any address. 1219 return nil, resolver.Address{}, nil, firstConnErr 1220} 1221 1222// createTransport creates a connection to addr. It returns the transport and a 1223// Event in the successful case. The Event fires when the returned transport 1224// disconnects. 1225func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) (transport.ClientTransport, *grpcsync.Event, error) { 1226 prefaceReceived := make(chan struct{}) 1227 onCloseCalled := make(chan struct{}) 1228 reconnect := grpcsync.NewEvent() 1229 1230 authority := ac.cc.authority 1231 // addr.ServerName takes precedent over ClientConn authority, if present. 1232 if addr.ServerName != "" { 1233 authority = addr.ServerName 1234 } 1235 1236 target := transport.TargetInfo{ 1237 Addr: addr.Addr, 1238 Metadata: addr.Metadata, 1239 Authority: authority, 1240 } 1241 1242 once := sync.Once{} 1243 onGoAway := func(r transport.GoAwayReason) { 1244 ac.mu.Lock() 1245 ac.adjustParams(r) 1246 once.Do(func() { 1247 if ac.state == connectivity.Ready { 1248 // Prevent this SubConn from being used for new RPCs by setting its 1249 // state to Connecting. 1250 // 1251 // TODO: this should be Idle when grpc-go properly supports it. 1252 ac.updateConnectivityState(connectivity.Connecting, nil) 1253 } 1254 }) 1255 ac.mu.Unlock() 1256 reconnect.Fire() 1257 } 1258 1259 onClose := func() { 1260 ac.mu.Lock() 1261 once.Do(func() { 1262 if ac.state == connectivity.Ready { 1263 // Prevent this SubConn from being used for new RPCs by setting its 1264 // state to Connecting. 1265 // 1266 // TODO: this should be Idle when grpc-go properly supports it. 1267 ac.updateConnectivityState(connectivity.Connecting, nil) 1268 } 1269 }) 1270 ac.mu.Unlock() 1271 close(onCloseCalled) 1272 reconnect.Fire() 1273 } 1274 1275 onPrefaceReceipt := func() { 1276 close(prefaceReceived) 1277 } 1278 1279 connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline) 1280 defer cancel() 1281 if channelz.IsOn() { 1282 copts.ChannelzParentID = ac.channelzID 1283 } 1284 1285 newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose) 1286 if err != nil { 1287 // newTr is either nil, or closed. 1288 grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err) 1289 return nil, nil, err 1290 } 1291 1292 select { 1293 case <-time.After(time.Until(connectDeadline)): 1294 // We didn't get the preface in time. 1295 newTr.Close() 1296 grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr) 1297 return nil, nil, errors.New("timed out waiting for server handshake") 1298 case <-prefaceReceived: 1299 // We got the preface - huzzah! things are good. 1300 case <-onCloseCalled: 1301 // The transport has already closed - noop. 1302 return nil, nil, errors.New("connection closed") 1303 // TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix. 1304 } 1305 return newTr, reconnect, nil 1306} 1307 1308// startHealthCheck starts the health checking stream (RPC) to watch the health 1309// stats of this connection if health checking is requested and configured. 1310// 1311// LB channel health checking is enabled when all requirements below are met: 1312// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption 1313// 2. internal.HealthCheckFunc is set by importing the grpc/healthcheck package 1314// 3. a service config with non-empty healthCheckConfig field is provided 1315// 4. the load balancer requests it 1316// 1317// It sets addrConn to READY if the health checking stream is not started. 1318// 1319// Caller must hold ac.mu. 1320func (ac *addrConn) startHealthCheck(ctx context.Context) { 1321 var healthcheckManagingState bool 1322 defer func() { 1323 if !healthcheckManagingState { 1324 ac.updateConnectivityState(connectivity.Ready, nil) 1325 } 1326 }() 1327 1328 if ac.cc.dopts.disableHealthCheck { 1329 return 1330 } 1331 healthCheckConfig := ac.cc.healthCheckConfig() 1332 if healthCheckConfig == nil { 1333 return 1334 } 1335 if !ac.scopts.HealthCheckEnabled { 1336 return 1337 } 1338 healthCheckFunc := ac.cc.dopts.healthCheckFunc 1339 if healthCheckFunc == nil { 1340 // The health package is not imported to set health check function. 1341 // 1342 // TODO: add a link to the health check doc in the error message. 1343 grpclog.Error("Health check is requested but health check function is not set.") 1344 return 1345 } 1346 1347 healthcheckManagingState = true 1348 1349 // Set up the health check helper functions. 1350 currentTr := ac.transport 1351 newStream := func(method string) (interface{}, error) { 1352 ac.mu.Lock() 1353 if ac.transport != currentTr { 1354 ac.mu.Unlock() 1355 return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use") 1356 } 1357 ac.mu.Unlock() 1358 return newNonRetryClientStream(ctx, &StreamDesc{ServerStreams: true}, method, currentTr, ac) 1359 } 1360 setConnectivityState := func(s connectivity.State, lastErr error) { 1361 ac.mu.Lock() 1362 defer ac.mu.Unlock() 1363 if ac.transport != currentTr { 1364 return 1365 } 1366 ac.updateConnectivityState(s, lastErr) 1367 } 1368 // Start the health checking stream. 1369 go func() { 1370 err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName) 1371 if err != nil { 1372 if status.Code(err) == codes.Unimplemented { 1373 if channelz.IsOn() { 1374 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{ 1375 Desc: "Subchannel health check is unimplemented at server side, thus health check is disabled", 1376 Severity: channelz.CtError, 1377 }) 1378 } 1379 grpclog.Error("Subchannel health check is unimplemented at server side, thus health check is disabled") 1380 } else { 1381 grpclog.Errorf("HealthCheckFunc exits with unexpected error %v", err) 1382 } 1383 } 1384 }() 1385} 1386 1387func (ac *addrConn) resetConnectBackoff() { 1388 ac.mu.Lock() 1389 close(ac.resetBackoff) 1390 ac.backoffIdx = 0 1391 ac.resetBackoff = make(chan struct{}) 1392 ac.mu.Unlock() 1393} 1394 1395// getReadyTransport returns the transport if ac's state is READY. 1396// Otherwise it returns nil, false. 1397// If ac's state is IDLE, it will trigger ac to connect. 1398func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) { 1399 ac.mu.Lock() 1400 if ac.state == connectivity.Ready && ac.transport != nil { 1401 t := ac.transport 1402 ac.mu.Unlock() 1403 return t, true 1404 } 1405 var idle bool 1406 if ac.state == connectivity.Idle { 1407 idle = true 1408 } 1409 ac.mu.Unlock() 1410 // Trigger idle ac to connect. 1411 if idle { 1412 ac.connect() 1413 } 1414 return nil, false 1415} 1416 1417// tearDown starts to tear down the addrConn. 1418// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in 1419// some edge cases (e.g., the caller opens and closes many addrConn's in a 1420// tight loop. 1421// tearDown doesn't remove ac from ac.cc.conns. 1422func (ac *addrConn) tearDown(err error) { 1423 ac.mu.Lock() 1424 if ac.state == connectivity.Shutdown { 1425 ac.mu.Unlock() 1426 return 1427 } 1428 curTr := ac.transport 1429 ac.transport = nil 1430 // We have to set the state to Shutdown before anything else to prevent races 1431 // between setting the state and logic that waits on context cancellation / etc. 1432 ac.updateConnectivityState(connectivity.Shutdown, nil) 1433 ac.cancel() 1434 ac.curAddr = resolver.Address{} 1435 if err == errConnDrain && curTr != nil { 1436 // GracefulClose(...) may be executed multiple times when 1437 // i) receiving multiple GoAway frames from the server; or 1438 // ii) there are concurrent name resolver/Balancer triggered 1439 // address removal and GoAway. 1440 // We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu. 1441 ac.mu.Unlock() 1442 curTr.GracefulClose() 1443 ac.mu.Lock() 1444 } 1445 if channelz.IsOn() { 1446 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{ 1447 Desc: "Subchannel Deleted", 1448 Severity: channelz.CtINFO, 1449 Parent: &channelz.TraceEventDesc{ 1450 Desc: fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID), 1451 Severity: channelz.CtINFO, 1452 }, 1453 }) 1454 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to 1455 // the entity being deleted, and thus prevent it from being deleted right away. 1456 channelz.RemoveEntry(ac.channelzID) 1457 } 1458 ac.mu.Unlock() 1459} 1460 1461func (ac *addrConn) getState() connectivity.State { 1462 ac.mu.Lock() 1463 defer ac.mu.Unlock() 1464 return ac.state 1465} 1466 1467func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric { 1468 ac.mu.Lock() 1469 addr := ac.curAddr.Addr 1470 ac.mu.Unlock() 1471 return &channelz.ChannelInternalMetric{ 1472 State: ac.getState(), 1473 Target: addr, 1474 CallsStarted: atomic.LoadInt64(&ac.czData.callsStarted), 1475 CallsSucceeded: atomic.LoadInt64(&ac.czData.callsSucceeded), 1476 CallsFailed: atomic.LoadInt64(&ac.czData.callsFailed), 1477 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)), 1478 } 1479} 1480 1481func (ac *addrConn) incrCallsStarted() { 1482 atomic.AddInt64(&ac.czData.callsStarted, 1) 1483 atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano()) 1484} 1485 1486func (ac *addrConn) incrCallsSucceeded() { 1487 atomic.AddInt64(&ac.czData.callsSucceeded, 1) 1488} 1489 1490func (ac *addrConn) incrCallsFailed() { 1491 atomic.AddInt64(&ac.czData.callsFailed, 1) 1492} 1493 1494type retryThrottler struct { 1495 max float64 1496 thresh float64 1497 ratio float64 1498 1499 mu sync.Mutex 1500 tokens float64 // TODO(dfawley): replace with atomic and remove lock. 1501} 1502 1503// throttle subtracts a retry token from the pool and returns whether a retry 1504// should be throttled (disallowed) based upon the retry throttling policy in 1505// the service config. 1506func (rt *retryThrottler) throttle() bool { 1507 if rt == nil { 1508 return false 1509 } 1510 rt.mu.Lock() 1511 defer rt.mu.Unlock() 1512 rt.tokens-- 1513 if rt.tokens < 0 { 1514 rt.tokens = 0 1515 } 1516 return rt.tokens <= rt.thresh 1517} 1518 1519func (rt *retryThrottler) successfulRPC() { 1520 if rt == nil { 1521 return 1522 } 1523 rt.mu.Lock() 1524 defer rt.mu.Unlock() 1525 rt.tokens += rt.ratio 1526 if rt.tokens > rt.max { 1527 rt.tokens = rt.max 1528 } 1529} 1530 1531type channelzChannel struct { 1532 cc *ClientConn 1533} 1534 1535func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric { 1536 return c.cc.channelzMetric() 1537} 1538 1539// ErrClientConnTimeout indicates that the ClientConn cannot establish the 1540// underlying connections within the specified timeout. 1541// 1542// Deprecated: This error is never returned by grpc and should not be 1543// referenced by users. 1544var ErrClientConnTimeout = errors.New("grpc: timed out when dialing") 1545