1/* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package grpc 20 21import ( 22 "context" 23 "errors" 24 "fmt" 25 "math" 26 "net" 27 "reflect" 28 "strings" 29 "sync" 30 "sync/atomic" 31 "time" 32 33 "google.golang.org/grpc/balancer" 34 "google.golang.org/grpc/balancer/base" 35 "google.golang.org/grpc/codes" 36 "google.golang.org/grpc/connectivity" 37 "google.golang.org/grpc/credentials" 38 "google.golang.org/grpc/internal/backoff" 39 "google.golang.org/grpc/internal/channelz" 40 "google.golang.org/grpc/internal/grpcsync" 41 "google.golang.org/grpc/internal/grpcutil" 42 "google.golang.org/grpc/internal/transport" 43 "google.golang.org/grpc/keepalive" 44 "google.golang.org/grpc/resolver" 45 "google.golang.org/grpc/serviceconfig" 46 "google.golang.org/grpc/status" 47 48 _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin. 49 _ "google.golang.org/grpc/internal/resolver/dns" // To register dns resolver. 50 _ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver. 51) 52 53const ( 54 // minimum time to give a connection to complete 55 minConnectTimeout = 20 * time.Second 56 // must match grpclbName in grpclb/grpclb.go 57 grpclbName = "grpclb" 58) 59 60var ( 61 // ErrClientConnClosing indicates that the operation is illegal because 62 // the ClientConn is closing. 63 // 64 // Deprecated: this error should not be relied upon by users; use the status 65 // code of Canceled instead. 66 ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing") 67 // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs. 68 errConnDrain = errors.New("grpc: the connection is drained") 69 // errConnClosing indicates that the connection is closing. 70 errConnClosing = errors.New("grpc: the connection is closing") 71 // errBalancerClosed indicates that the balancer is closed. 72 errBalancerClosed = errors.New("grpc: balancer is closed") 73 // invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default 74 // service config. 75 invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid" 76) 77 78// The following errors are returned from Dial and DialContext 79var ( 80 // errNoTransportSecurity indicates that there is no transport security 81 // being set for ClientConn. Users should either set one or explicitly 82 // call WithInsecure DialOption to disable security. 83 errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)") 84 // errTransportCredsAndBundle indicates that creds bundle is used together 85 // with other individual Transport Credentials. 86 errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials") 87 // errTransportCredentialsMissing indicates that users want to transmit security 88 // information (e.g., OAuth2 token) which requires secure connection on an insecure 89 // connection. 90 errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)") 91 // errCredentialsConflict indicates that grpc.WithTransportCredentials() 92 // and grpc.WithInsecure() are both called for a connection. 93 errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)") 94) 95 96const ( 97 defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4 98 defaultClientMaxSendMessageSize = math.MaxInt32 99 // http2IOBufSize specifies the buffer size for sending frames. 100 defaultWriteBufSize = 32 * 1024 101 defaultReadBufSize = 32 * 1024 102) 103 104// Dial creates a client connection to the given target. 105func Dial(target string, opts ...DialOption) (*ClientConn, error) { 106 return DialContext(context.Background(), target, opts...) 107} 108 109// DialContext creates a client connection to the given target. By default, it's 110// a non-blocking dial (the function won't wait for connections to be 111// established, and connecting happens in the background). To make it a blocking 112// dial, use WithBlock() dial option. 113// 114// In the non-blocking case, the ctx does not act against the connection. It 115// only controls the setup steps. 116// 117// In the blocking case, ctx can be used to cancel or expire the pending 118// connection. Once this function returns, the cancellation and expiration of 119// ctx will be noop. Users should call ClientConn.Close to terminate all the 120// pending operations after this function returns. 121// 122// The target name syntax is defined in 123// https://github.com/grpc/grpc/blob/master/doc/naming.md. 124// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target. 125func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) { 126 cc := &ClientConn{ 127 target: target, 128 csMgr: &connectivityStateManager{}, 129 conns: make(map[*addrConn]struct{}), 130 dopts: defaultDialOptions(), 131 blockingpicker: newPickerWrapper(), 132 czData: new(channelzData), 133 firstResolveEvent: grpcsync.NewEvent(), 134 } 135 cc.retryThrottler.Store((*retryThrottler)(nil)) 136 cc.ctx, cc.cancel = context.WithCancel(context.Background()) 137 138 for _, opt := range opts { 139 opt.apply(&cc.dopts) 140 } 141 142 chainUnaryClientInterceptors(cc) 143 chainStreamClientInterceptors(cc) 144 145 defer func() { 146 if err != nil { 147 cc.Close() 148 } 149 }() 150 151 if channelz.IsOn() { 152 if cc.dopts.channelzParentID != 0 { 153 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target) 154 channelz.AddTraceEvent(cc.channelzID, 0, &channelz.TraceEventDesc{ 155 Desc: "Channel Created", 156 Severity: channelz.CtINFO, 157 Parent: &channelz.TraceEventDesc{ 158 Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID), 159 Severity: channelz.CtINFO, 160 }, 161 }) 162 } else { 163 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target) 164 channelz.Info(cc.channelzID, "Channel Created") 165 } 166 cc.csMgr.channelzID = cc.channelzID 167 } 168 169 if !cc.dopts.insecure { 170 if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil { 171 return nil, errNoTransportSecurity 172 } 173 if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil { 174 return nil, errTransportCredsAndBundle 175 } 176 } else { 177 if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil { 178 return nil, errCredentialsConflict 179 } 180 for _, cd := range cc.dopts.copts.PerRPCCredentials { 181 if cd.RequireTransportSecurity() { 182 return nil, errTransportCredentialsMissing 183 } 184 } 185 } 186 187 if cc.dopts.defaultServiceConfigRawJSON != nil { 188 scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON) 189 if scpr.Err != nil { 190 return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err) 191 } 192 cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig) 193 } 194 cc.mkp = cc.dopts.copts.KeepaliveParams 195 196 if cc.dopts.copts.Dialer == nil { 197 cc.dopts.copts.Dialer = newProxyDialer( 198 func(ctx context.Context, addr string) (net.Conn, error) { 199 network, addr := parseDialTarget(addr) 200 return (&net.Dialer{}).DialContext(ctx, network, addr) 201 }, 202 ) 203 } 204 205 if cc.dopts.copts.UserAgent != "" { 206 cc.dopts.copts.UserAgent += " " + grpcUA 207 } else { 208 cc.dopts.copts.UserAgent = grpcUA 209 } 210 211 if cc.dopts.timeout > 0 { 212 var cancel context.CancelFunc 213 ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout) 214 defer cancel() 215 } 216 defer func() { 217 select { 218 case <-ctx.Done(): 219 conn, err = nil, ctx.Err() 220 default: 221 } 222 }() 223 224 scSet := false 225 if cc.dopts.scChan != nil { 226 // Try to get an initial service config. 227 select { 228 case sc, ok := <-cc.dopts.scChan: 229 if ok { 230 cc.sc = &sc 231 scSet = true 232 } 233 default: 234 } 235 } 236 if cc.dopts.bs == nil { 237 cc.dopts.bs = backoff.DefaultExponential 238 } 239 240 // Determine the resolver to use. 241 cc.parsedTarget = grpcutil.ParseTarget(cc.target) 242 channelz.Infof(cc.channelzID, "parsed scheme: %q", cc.parsedTarget.Scheme) 243 resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme) 244 if resolverBuilder == nil { 245 // If resolver builder is still nil, the parsed target's scheme is 246 // not registered. Fallback to default resolver and set Endpoint to 247 // the original target. 248 channelz.Infof(cc.channelzID, "scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme) 249 cc.parsedTarget = resolver.Target{ 250 Scheme: resolver.GetDefaultScheme(), 251 Endpoint: target, 252 } 253 resolverBuilder = cc.getResolver(cc.parsedTarget.Scheme) 254 if resolverBuilder == nil { 255 return nil, fmt.Errorf("could not get resolver for default scheme: %q", cc.parsedTarget.Scheme) 256 } 257 } 258 259 creds := cc.dopts.copts.TransportCredentials 260 if creds != nil && creds.Info().ServerName != "" { 261 cc.authority = creds.Info().ServerName 262 } else if cc.dopts.insecure && cc.dopts.authority != "" { 263 cc.authority = cc.dopts.authority 264 } else { 265 // Use endpoint from "scheme://authority/endpoint" as the default 266 // authority for ClientConn. 267 cc.authority = cc.parsedTarget.Endpoint 268 } 269 270 if cc.dopts.scChan != nil && !scSet { 271 // Blocking wait for the initial service config. 272 select { 273 case sc, ok := <-cc.dopts.scChan: 274 if ok { 275 cc.sc = &sc 276 } 277 case <-ctx.Done(): 278 return nil, ctx.Err() 279 } 280 } 281 if cc.dopts.scChan != nil { 282 go cc.scWatcher() 283 } 284 285 var credsClone credentials.TransportCredentials 286 if creds := cc.dopts.copts.TransportCredentials; creds != nil { 287 credsClone = creds.Clone() 288 } 289 cc.balancerBuildOpts = balancer.BuildOptions{ 290 DialCreds: credsClone, 291 CredsBundle: cc.dopts.copts.CredsBundle, 292 Dialer: cc.dopts.copts.Dialer, 293 ChannelzParentID: cc.channelzID, 294 Target: cc.parsedTarget, 295 } 296 297 // Build the resolver. 298 rWrapper, err := newCCResolverWrapper(cc, resolverBuilder) 299 if err != nil { 300 return nil, fmt.Errorf("failed to build resolver: %v", err) 301 } 302 cc.mu.Lock() 303 cc.resolverWrapper = rWrapper 304 cc.mu.Unlock() 305 306 // A blocking dial blocks until the clientConn is ready. 307 if cc.dopts.block { 308 for { 309 s := cc.GetState() 310 if s == connectivity.Ready { 311 break 312 } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure { 313 if err = cc.blockingpicker.connectionError(); err != nil { 314 terr, ok := err.(interface { 315 Temporary() bool 316 }) 317 if ok && !terr.Temporary() { 318 return nil, err 319 } 320 } 321 } 322 if !cc.WaitForStateChange(ctx, s) { 323 // ctx got timeout or canceled. 324 return nil, ctx.Err() 325 } 326 } 327 } 328 329 return cc, nil 330} 331 332// chainUnaryClientInterceptors chains all unary client interceptors into one. 333func chainUnaryClientInterceptors(cc *ClientConn) { 334 interceptors := cc.dopts.chainUnaryInts 335 // Prepend dopts.unaryInt to the chaining interceptors if it exists, since unaryInt will 336 // be executed before any other chained interceptors. 337 if cc.dopts.unaryInt != nil { 338 interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...) 339 } 340 var chainedInt UnaryClientInterceptor 341 if len(interceptors) == 0 { 342 chainedInt = nil 343 } else if len(interceptors) == 1 { 344 chainedInt = interceptors[0] 345 } else { 346 chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error { 347 return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...) 348 } 349 } 350 cc.dopts.unaryInt = chainedInt 351} 352 353// getChainUnaryInvoker recursively generate the chained unary invoker. 354func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, finalInvoker UnaryInvoker) UnaryInvoker { 355 if curr == len(interceptors)-1 { 356 return finalInvoker 357 } 358 return func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error { 359 return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...) 360 } 361} 362 363// chainStreamClientInterceptors chains all stream client interceptors into one. 364func chainStreamClientInterceptors(cc *ClientConn) { 365 interceptors := cc.dopts.chainStreamInts 366 // Prepend dopts.streamInt to the chaining interceptors if it exists, since streamInt will 367 // be executed before any other chained interceptors. 368 if cc.dopts.streamInt != nil { 369 interceptors = append([]StreamClientInterceptor{cc.dopts.streamInt}, interceptors...) 370 } 371 var chainedInt StreamClientInterceptor 372 if len(interceptors) == 0 { 373 chainedInt = nil 374 } else if len(interceptors) == 1 { 375 chainedInt = interceptors[0] 376 } else { 377 chainedInt = func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error) { 378 return interceptors[0](ctx, desc, cc, method, getChainStreamer(interceptors, 0, streamer), opts...) 379 } 380 } 381 cc.dopts.streamInt = chainedInt 382} 383 384// getChainStreamer recursively generate the chained client stream constructor. 385func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStreamer Streamer) Streamer { 386 if curr == len(interceptors)-1 { 387 return finalStreamer 388 } 389 return func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) { 390 return interceptors[curr+1](ctx, desc, cc, method, getChainStreamer(interceptors, curr+1, finalStreamer), opts...) 391 } 392} 393 394// connectivityStateManager keeps the connectivity.State of ClientConn. 395// This struct will eventually be exported so the balancers can access it. 396type connectivityStateManager struct { 397 mu sync.Mutex 398 state connectivity.State 399 notifyChan chan struct{} 400 channelzID int64 401} 402 403// updateState updates the connectivity.State of ClientConn. 404// If there's a change it notifies goroutines waiting on state change to 405// happen. 406func (csm *connectivityStateManager) updateState(state connectivity.State) { 407 csm.mu.Lock() 408 defer csm.mu.Unlock() 409 if csm.state == connectivity.Shutdown { 410 return 411 } 412 if csm.state == state { 413 return 414 } 415 csm.state = state 416 channelz.Infof(csm.channelzID, "Channel Connectivity change to %v", state) 417 if csm.notifyChan != nil { 418 // There are other goroutines waiting on this channel. 419 close(csm.notifyChan) 420 csm.notifyChan = nil 421 } 422} 423 424func (csm *connectivityStateManager) getState() connectivity.State { 425 csm.mu.Lock() 426 defer csm.mu.Unlock() 427 return csm.state 428} 429 430func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} { 431 csm.mu.Lock() 432 defer csm.mu.Unlock() 433 if csm.notifyChan == nil { 434 csm.notifyChan = make(chan struct{}) 435 } 436 return csm.notifyChan 437} 438 439// ClientConnInterface defines the functions clients need to perform unary and 440// streaming RPCs. It is implemented by *ClientConn, and is only intended to 441// be referenced by generated code. 442type ClientConnInterface interface { 443 // Invoke performs a unary RPC and returns after the response is received 444 // into reply. 445 Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error 446 // NewStream begins a streaming RPC. 447 NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) 448} 449 450// Assert *ClientConn implements ClientConnInterface. 451var _ ClientConnInterface = (*ClientConn)(nil) 452 453// ClientConn represents a virtual connection to a conceptual endpoint, to 454// perform RPCs. 455// 456// A ClientConn is free to have zero or more actual connections to the endpoint 457// based on configuration, load, etc. It is also free to determine which actual 458// endpoints to use and may change it every RPC, permitting client-side load 459// balancing. 460// 461// A ClientConn encapsulates a range of functionality including name 462// resolution, TCP connection establishment (with retries and backoff) and TLS 463// handshakes. It also handles errors on established connections by 464// re-resolving the name and reconnecting. 465type ClientConn struct { 466 ctx context.Context 467 cancel context.CancelFunc 468 469 target string 470 parsedTarget resolver.Target 471 authority string 472 dopts dialOptions 473 csMgr *connectivityStateManager 474 475 balancerBuildOpts balancer.BuildOptions 476 blockingpicker *pickerWrapper 477 478 mu sync.RWMutex 479 resolverWrapper *ccResolverWrapper 480 sc *ServiceConfig 481 conns map[*addrConn]struct{} 482 // Keepalive parameter can be updated if a GoAway is received. 483 mkp keepalive.ClientParameters 484 curBalancerName string 485 balancerWrapper *ccBalancerWrapper 486 retryThrottler atomic.Value 487 488 firstResolveEvent *grpcsync.Event 489 490 channelzID int64 // channelz unique identification number 491 czData *channelzData 492} 493 494// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or 495// ctx expires. A true value is returned in former case and false in latter. 496// This is an EXPERIMENTAL API. 497func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool { 498 ch := cc.csMgr.getNotifyChan() 499 if cc.csMgr.getState() != sourceState { 500 return true 501 } 502 select { 503 case <-ctx.Done(): 504 return false 505 case <-ch: 506 return true 507 } 508} 509 510// GetState returns the connectivity.State of ClientConn. 511// This is an EXPERIMENTAL API. 512func (cc *ClientConn) GetState() connectivity.State { 513 return cc.csMgr.getState() 514} 515 516func (cc *ClientConn) scWatcher() { 517 for { 518 select { 519 case sc, ok := <-cc.dopts.scChan: 520 if !ok { 521 return 522 } 523 cc.mu.Lock() 524 // TODO: load balance policy runtime change is ignored. 525 // We may revisit this decision in the future. 526 cc.sc = &sc 527 cc.mu.Unlock() 528 case <-cc.ctx.Done(): 529 return 530 } 531 } 532} 533 534// waitForResolvedAddrs blocks until the resolver has provided addresses or the 535// context expires. Returns nil unless the context expires first; otherwise 536// returns a status error based on the context. 537func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error { 538 // This is on the RPC path, so we use a fast path to avoid the 539 // more-expensive "select" below after the resolver has returned once. 540 if cc.firstResolveEvent.HasFired() { 541 return nil 542 } 543 select { 544 case <-cc.firstResolveEvent.Done(): 545 return nil 546 case <-ctx.Done(): 547 return status.FromContextError(ctx.Err()).Err() 548 case <-cc.ctx.Done(): 549 return ErrClientConnClosing 550 } 551} 552 553var emptyServiceConfig *ServiceConfig 554 555func init() { 556 cfg := parseServiceConfig("{}") 557 if cfg.Err != nil { 558 panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err)) 559 } 560 emptyServiceConfig = cfg.Config.(*ServiceConfig) 561} 562 563func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) { 564 if cc.sc != nil { 565 cc.applyServiceConfigAndBalancer(cc.sc, addrs) 566 return 567 } 568 if cc.dopts.defaultServiceConfig != nil { 569 cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, addrs) 570 } else { 571 cc.applyServiceConfigAndBalancer(emptyServiceConfig, addrs) 572 } 573} 574 575func (cc *ClientConn) updateResolverState(s resolver.State, err error) error { 576 defer cc.firstResolveEvent.Fire() 577 cc.mu.Lock() 578 // Check if the ClientConn is already closed. Some fields (e.g. 579 // balancerWrapper) are set to nil when closing the ClientConn, and could 580 // cause nil pointer panic if we don't have this check. 581 if cc.conns == nil { 582 cc.mu.Unlock() 583 return nil 584 } 585 586 if err != nil { 587 // May need to apply the initial service config in case the resolver 588 // doesn't support service configs, or doesn't provide a service config 589 // with the new addresses. 590 cc.maybeApplyDefaultServiceConfig(nil) 591 592 if cc.balancerWrapper != nil { 593 cc.balancerWrapper.resolverError(err) 594 } 595 596 // No addresses are valid with err set; return early. 597 cc.mu.Unlock() 598 return balancer.ErrBadResolverState 599 } 600 601 var ret error 602 if cc.dopts.disableServiceConfig || s.ServiceConfig == nil { 603 cc.maybeApplyDefaultServiceConfig(s.Addresses) 604 // TODO: do we need to apply a failing LB policy if there is no 605 // default, per the error handling design? 606 } else { 607 if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok { 608 cc.applyServiceConfigAndBalancer(sc, s.Addresses) 609 } else { 610 ret = balancer.ErrBadResolverState 611 if cc.balancerWrapper == nil { 612 var err error 613 if s.ServiceConfig.Err != nil { 614 err = status.Errorf(codes.Unavailable, "error parsing service config: %v", s.ServiceConfig.Err) 615 } else { 616 err = status.Errorf(codes.Unavailable, "illegal service config type: %T", s.ServiceConfig.Config) 617 } 618 cc.blockingpicker.updatePicker(base.NewErrPicker(err)) 619 cc.csMgr.updateState(connectivity.TransientFailure) 620 cc.mu.Unlock() 621 return ret 622 } 623 } 624 } 625 626 var balCfg serviceconfig.LoadBalancingConfig 627 if cc.dopts.balancerBuilder == nil && cc.sc != nil && cc.sc.lbConfig != nil { 628 balCfg = cc.sc.lbConfig.cfg 629 } 630 631 cbn := cc.curBalancerName 632 bw := cc.balancerWrapper 633 cc.mu.Unlock() 634 if cbn != grpclbName { 635 // Filter any grpclb addresses since we don't have the grpclb balancer. 636 for i := 0; i < len(s.Addresses); { 637 if s.Addresses[i].Type == resolver.GRPCLB { 638 copy(s.Addresses[i:], s.Addresses[i+1:]) 639 s.Addresses = s.Addresses[:len(s.Addresses)-1] 640 continue 641 } 642 i++ 643 } 644 } 645 uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg}) 646 if ret == nil { 647 ret = uccsErr // prefer ErrBadResolver state since any other error is 648 // currently meaningless to the caller. 649 } 650 return ret 651} 652 653// switchBalancer starts the switching from current balancer to the balancer 654// with the given name. 655// 656// It will NOT send the current address list to the new balancer. If needed, 657// caller of this function should send address list to the new balancer after 658// this function returns. 659// 660// Caller must hold cc.mu. 661func (cc *ClientConn) switchBalancer(name string) { 662 if strings.EqualFold(cc.curBalancerName, name) { 663 return 664 } 665 666 channelz.Infof(cc.channelzID, "ClientConn switching balancer to %q", name) 667 if cc.dopts.balancerBuilder != nil { 668 channelz.Info(cc.channelzID, "ignoring balancer switching: Balancer DialOption used instead") 669 return 670 } 671 if cc.balancerWrapper != nil { 672 cc.balancerWrapper.close() 673 } 674 675 builder := balancer.Get(name) 676 if builder == nil { 677 channelz.Warningf(cc.channelzID, "Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName) 678 channelz.Infof(cc.channelzID, "failed to get balancer builder for: %v, using pick_first instead", name) 679 builder = newPickfirstBuilder() 680 } else { 681 channelz.Infof(cc.channelzID, "Channel switches to new LB policy %q", name) 682 } 683 684 cc.curBalancerName = builder.Name() 685 cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts) 686} 687 688func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) { 689 cc.mu.Lock() 690 if cc.conns == nil { 691 cc.mu.Unlock() 692 return 693 } 694 // TODO(bar switching) send updates to all balancer wrappers when balancer 695 // gracefully switching is supported. 696 cc.balancerWrapper.handleSubConnStateChange(sc, s, err) 697 cc.mu.Unlock() 698} 699 700// newAddrConn creates an addrConn for addrs and adds it to cc.conns. 701// 702// Caller needs to make sure len(addrs) > 0. 703func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) { 704 ac := &addrConn{ 705 state: connectivity.Idle, 706 cc: cc, 707 addrs: addrs, 708 scopts: opts, 709 dopts: cc.dopts, 710 czData: new(channelzData), 711 resetBackoff: make(chan struct{}), 712 } 713 ac.ctx, ac.cancel = context.WithCancel(cc.ctx) 714 // Track ac in cc. This needs to be done before any getTransport(...) is called. 715 cc.mu.Lock() 716 if cc.conns == nil { 717 cc.mu.Unlock() 718 return nil, ErrClientConnClosing 719 } 720 if channelz.IsOn() { 721 ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "") 722 channelz.AddTraceEvent(ac.channelzID, 0, &channelz.TraceEventDesc{ 723 Desc: "Subchannel Created", 724 Severity: channelz.CtINFO, 725 Parent: &channelz.TraceEventDesc{ 726 Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID), 727 Severity: channelz.CtINFO, 728 }, 729 }) 730 } 731 cc.conns[ac] = struct{}{} 732 cc.mu.Unlock() 733 return ac, nil 734} 735 736// removeAddrConn removes the addrConn in the subConn from clientConn. 737// It also tears down the ac with the given error. 738func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) { 739 cc.mu.Lock() 740 if cc.conns == nil { 741 cc.mu.Unlock() 742 return 743 } 744 delete(cc.conns, ac) 745 cc.mu.Unlock() 746 ac.tearDown(err) 747} 748 749func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric { 750 return &channelz.ChannelInternalMetric{ 751 State: cc.GetState(), 752 Target: cc.target, 753 CallsStarted: atomic.LoadInt64(&cc.czData.callsStarted), 754 CallsSucceeded: atomic.LoadInt64(&cc.czData.callsSucceeded), 755 CallsFailed: atomic.LoadInt64(&cc.czData.callsFailed), 756 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)), 757 } 758} 759 760// Target returns the target string of the ClientConn. 761// This is an EXPERIMENTAL API. 762func (cc *ClientConn) Target() string { 763 return cc.target 764} 765 766func (cc *ClientConn) incrCallsStarted() { 767 atomic.AddInt64(&cc.czData.callsStarted, 1) 768 atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano()) 769} 770 771func (cc *ClientConn) incrCallsSucceeded() { 772 atomic.AddInt64(&cc.czData.callsSucceeded, 1) 773} 774 775func (cc *ClientConn) incrCallsFailed() { 776 atomic.AddInt64(&cc.czData.callsFailed, 1) 777} 778 779// connect starts creating a transport. 780// It does nothing if the ac is not IDLE. 781// TODO(bar) Move this to the addrConn section. 782func (ac *addrConn) connect() error { 783 ac.mu.Lock() 784 if ac.state == connectivity.Shutdown { 785 ac.mu.Unlock() 786 return errConnClosing 787 } 788 if ac.state != connectivity.Idle { 789 ac.mu.Unlock() 790 return nil 791 } 792 // Update connectivity state within the lock to prevent subsequent or 793 // concurrent calls from resetting the transport more than once. 794 ac.updateConnectivityState(connectivity.Connecting, nil) 795 ac.mu.Unlock() 796 797 // Start a goroutine connecting to the server asynchronously. 798 go ac.resetTransport() 799 return nil 800} 801 802// tryUpdateAddrs tries to update ac.addrs with the new addresses list. 803// 804// If ac is Connecting, it returns false. The caller should tear down the ac and 805// create a new one. Note that the backoff will be reset when this happens. 806// 807// If ac is TransientFailure, it updates ac.addrs and returns true. The updated 808// addresses will be picked up by retry in the next iteration after backoff. 809// 810// If ac is Shutdown or Idle, it updates ac.addrs and returns true. 811// 812// If ac is Ready, it checks whether current connected address of ac is in the 813// new addrs list. 814// - If true, it updates ac.addrs and returns true. The ac will keep using 815// the existing connection. 816// - If false, it does nothing and returns false. 817func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool { 818 ac.mu.Lock() 819 defer ac.mu.Unlock() 820 channelz.Infof(ac.channelzID, "addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs) 821 if ac.state == connectivity.Shutdown || 822 ac.state == connectivity.TransientFailure || 823 ac.state == connectivity.Idle { 824 ac.addrs = addrs 825 return true 826 } 827 828 if ac.state == connectivity.Connecting { 829 return false 830 } 831 832 // ac.state is Ready, try to find the connected address. 833 var curAddrFound bool 834 for _, a := range addrs { 835 if reflect.DeepEqual(ac.curAddr, a) { 836 curAddrFound = true 837 break 838 } 839 } 840 channelz.Infof(ac.channelzID, "addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound) 841 if curAddrFound { 842 ac.addrs = addrs 843 } 844 845 return curAddrFound 846} 847 848// GetMethodConfig gets the method config of the input method. 849// If there's an exact match for input method (i.e. /service/method), we return 850// the corresponding MethodConfig. 851// If there isn't an exact match for the input method, we look for the default config 852// under the service (i.e /service/). If there is a default MethodConfig for 853// the service, we return it. 854// Otherwise, we return an empty MethodConfig. 855func (cc *ClientConn) GetMethodConfig(method string) MethodConfig { 856 // TODO: Avoid the locking here. 857 cc.mu.RLock() 858 defer cc.mu.RUnlock() 859 if cc.sc == nil { 860 return MethodConfig{} 861 } 862 m, ok := cc.sc.Methods[method] 863 if !ok { 864 i := strings.LastIndex(method, "/") 865 m = cc.sc.Methods[method[:i+1]] 866 } 867 return m 868} 869 870func (cc *ClientConn) healthCheckConfig() *healthCheckConfig { 871 cc.mu.RLock() 872 defer cc.mu.RUnlock() 873 if cc.sc == nil { 874 return nil 875 } 876 return cc.sc.healthCheckConfig 877} 878 879func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) { 880 t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{ 881 Ctx: ctx, 882 FullMethodName: method, 883 }) 884 if err != nil { 885 return nil, nil, toRPCErr(err) 886 } 887 return t, done, nil 888} 889 890func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, addrs []resolver.Address) { 891 if sc == nil { 892 // should never reach here. 893 return 894 } 895 cc.sc = sc 896 897 if cc.sc.retryThrottling != nil { 898 newThrottler := &retryThrottler{ 899 tokens: cc.sc.retryThrottling.MaxTokens, 900 max: cc.sc.retryThrottling.MaxTokens, 901 thresh: cc.sc.retryThrottling.MaxTokens / 2, 902 ratio: cc.sc.retryThrottling.TokenRatio, 903 } 904 cc.retryThrottler.Store(newThrottler) 905 } else { 906 cc.retryThrottler.Store((*retryThrottler)(nil)) 907 } 908 909 if cc.dopts.balancerBuilder == nil { 910 // Only look at balancer types and switch balancer if balancer dial 911 // option is not set. 912 var newBalancerName string 913 if cc.sc != nil && cc.sc.lbConfig != nil { 914 newBalancerName = cc.sc.lbConfig.name 915 } else { 916 var isGRPCLB bool 917 for _, a := range addrs { 918 if a.Type == resolver.GRPCLB { 919 isGRPCLB = true 920 break 921 } 922 } 923 if isGRPCLB { 924 newBalancerName = grpclbName 925 } else if cc.sc != nil && cc.sc.LB != nil { 926 newBalancerName = *cc.sc.LB 927 } else { 928 newBalancerName = PickFirstBalancerName 929 } 930 } 931 cc.switchBalancer(newBalancerName) 932 } else if cc.balancerWrapper == nil { 933 // Balancer dial option was set, and this is the first time handling 934 // resolved addresses. Build a balancer with dopts.balancerBuilder. 935 cc.curBalancerName = cc.dopts.balancerBuilder.Name() 936 cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts) 937 } 938} 939 940func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) { 941 cc.mu.RLock() 942 r := cc.resolverWrapper 943 cc.mu.RUnlock() 944 if r == nil { 945 return 946 } 947 go r.resolveNow(o) 948} 949 950// ResetConnectBackoff wakes up all subchannels in transient failure and causes 951// them to attempt another connection immediately. It also resets the backoff 952// times used for subsequent attempts regardless of the current state. 953// 954// In general, this function should not be used. Typical service or network 955// outages result in a reasonable client reconnection strategy by default. 956// However, if a previously unavailable network becomes available, this may be 957// used to trigger an immediate reconnect. 958// 959// This API is EXPERIMENTAL. 960func (cc *ClientConn) ResetConnectBackoff() { 961 cc.mu.Lock() 962 conns := cc.conns 963 cc.mu.Unlock() 964 for ac := range conns { 965 ac.resetConnectBackoff() 966 } 967} 968 969// Close tears down the ClientConn and all underlying connections. 970func (cc *ClientConn) Close() error { 971 defer cc.cancel() 972 973 cc.mu.Lock() 974 if cc.conns == nil { 975 cc.mu.Unlock() 976 return ErrClientConnClosing 977 } 978 conns := cc.conns 979 cc.conns = nil 980 cc.csMgr.updateState(connectivity.Shutdown) 981 982 rWrapper := cc.resolverWrapper 983 cc.resolverWrapper = nil 984 bWrapper := cc.balancerWrapper 985 cc.balancerWrapper = nil 986 cc.mu.Unlock() 987 988 cc.blockingpicker.close() 989 990 if rWrapper != nil { 991 rWrapper.close() 992 } 993 if bWrapper != nil { 994 bWrapper.close() 995 } 996 997 for ac := range conns { 998 ac.tearDown(ErrClientConnClosing) 999 } 1000 if channelz.IsOn() { 1001 ted := &channelz.TraceEventDesc{ 1002 Desc: "Channel Deleted", 1003 Severity: channelz.CtINFO, 1004 } 1005 if cc.dopts.channelzParentID != 0 { 1006 ted.Parent = &channelz.TraceEventDesc{ 1007 Desc: fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID), 1008 Severity: channelz.CtINFO, 1009 } 1010 } 1011 channelz.AddTraceEvent(cc.channelzID, 0, ted) 1012 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to 1013 // the entity being deleted, and thus prevent it from being deleted right away. 1014 channelz.RemoveEntry(cc.channelzID) 1015 } 1016 return nil 1017} 1018 1019// addrConn is a network connection to a given address. 1020type addrConn struct { 1021 ctx context.Context 1022 cancel context.CancelFunc 1023 1024 cc *ClientConn 1025 dopts dialOptions 1026 acbw balancer.SubConn 1027 scopts balancer.NewSubConnOptions 1028 1029 // transport is set when there's a viable transport (note: ac state may not be READY as LB channel 1030 // health checking may require server to report healthy to set ac to READY), and is reset 1031 // to nil when the current transport should no longer be used to create a stream (e.g. after GoAway 1032 // is received, transport is closed, ac has been torn down). 1033 transport transport.ClientTransport // The current transport. 1034 1035 mu sync.Mutex 1036 curAddr resolver.Address // The current address. 1037 addrs []resolver.Address // All addresses that the resolver resolved to. 1038 1039 // Use updateConnectivityState for updating addrConn's connectivity state. 1040 state connectivity.State 1041 1042 backoffIdx int // Needs to be stateful for resetConnectBackoff. 1043 resetBackoff chan struct{} 1044 1045 channelzID int64 // channelz unique identification number. 1046 czData *channelzData 1047} 1048 1049// Note: this requires a lock on ac.mu. 1050func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) { 1051 if ac.state == s { 1052 return 1053 } 1054 ac.state = s 1055 channelz.Infof(ac.channelzID, "Subchannel Connectivity change to %v", s) 1056 ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr) 1057} 1058 1059// adjustParams updates parameters used to create transports upon 1060// receiving a GoAway. 1061func (ac *addrConn) adjustParams(r transport.GoAwayReason) { 1062 switch r { 1063 case transport.GoAwayTooManyPings: 1064 v := 2 * ac.dopts.copts.KeepaliveParams.Time 1065 ac.cc.mu.Lock() 1066 if v > ac.cc.mkp.Time { 1067 ac.cc.mkp.Time = v 1068 } 1069 ac.cc.mu.Unlock() 1070 } 1071} 1072 1073func (ac *addrConn) resetTransport() { 1074 for i := 0; ; i++ { 1075 if i > 0 { 1076 ac.cc.resolveNow(resolver.ResolveNowOptions{}) 1077 } 1078 1079 ac.mu.Lock() 1080 if ac.state == connectivity.Shutdown { 1081 ac.mu.Unlock() 1082 return 1083 } 1084 1085 addrs := ac.addrs 1086 backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx) 1087 // This will be the duration that dial gets to finish. 1088 dialDuration := minConnectTimeout 1089 if ac.dopts.minConnectTimeout != nil { 1090 dialDuration = ac.dopts.minConnectTimeout() 1091 } 1092 1093 if dialDuration < backoffFor { 1094 // Give dial more time as we keep failing to connect. 1095 dialDuration = backoffFor 1096 } 1097 // We can potentially spend all the time trying the first address, and 1098 // if the server accepts the connection and then hangs, the following 1099 // addresses will never be tried. 1100 // 1101 // The spec doesn't mention what should be done for multiple addresses. 1102 // https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm 1103 connectDeadline := time.Now().Add(dialDuration) 1104 1105 ac.updateConnectivityState(connectivity.Connecting, nil) 1106 ac.transport = nil 1107 ac.mu.Unlock() 1108 1109 newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline) 1110 if err != nil { 1111 // After exhausting all addresses, the addrConn enters 1112 // TRANSIENT_FAILURE. 1113 ac.mu.Lock() 1114 if ac.state == connectivity.Shutdown { 1115 ac.mu.Unlock() 1116 return 1117 } 1118 ac.updateConnectivityState(connectivity.TransientFailure, err) 1119 1120 // Backoff. 1121 b := ac.resetBackoff 1122 ac.mu.Unlock() 1123 1124 timer := time.NewTimer(backoffFor) 1125 select { 1126 case <-timer.C: 1127 ac.mu.Lock() 1128 ac.backoffIdx++ 1129 ac.mu.Unlock() 1130 case <-b: 1131 timer.Stop() 1132 case <-ac.ctx.Done(): 1133 timer.Stop() 1134 return 1135 } 1136 continue 1137 } 1138 1139 ac.mu.Lock() 1140 if ac.state == connectivity.Shutdown { 1141 ac.mu.Unlock() 1142 newTr.Close() 1143 return 1144 } 1145 ac.curAddr = addr 1146 ac.transport = newTr 1147 ac.backoffIdx = 0 1148 1149 hctx, hcancel := context.WithCancel(ac.ctx) 1150 ac.startHealthCheck(hctx) 1151 ac.mu.Unlock() 1152 1153 // Block until the created transport is down. And when this happens, 1154 // we restart from the top of the addr list. 1155 <-reconnect.Done() 1156 hcancel() 1157 // restart connecting - the top of the loop will set state to 1158 // CONNECTING. This is against the current connectivity semantics doc, 1159 // however it allows for graceful behavior for RPCs not yet dispatched 1160 // - unfortunate timing would otherwise lead to the RPC failing even 1161 // though the TRANSIENT_FAILURE state (called for by the doc) would be 1162 // instantaneous. 1163 // 1164 // Ideally we should transition to Idle here and block until there is 1165 // RPC activity that leads to the balancer requesting a reconnect of 1166 // the associated SubConn. 1167 } 1168} 1169 1170// tryAllAddrs tries to creates a connection to the addresses, and stop when at the 1171// first successful one. It returns the transport, the address and a Event in 1172// the successful case. The Event fires when the returned transport disconnects. 1173func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) (transport.ClientTransport, resolver.Address, *grpcsync.Event, error) { 1174 var firstConnErr error 1175 for _, addr := range addrs { 1176 ac.mu.Lock() 1177 if ac.state == connectivity.Shutdown { 1178 ac.mu.Unlock() 1179 return nil, resolver.Address{}, nil, errConnClosing 1180 } 1181 1182 ac.cc.mu.RLock() 1183 ac.dopts.copts.KeepaliveParams = ac.cc.mkp 1184 ac.cc.mu.RUnlock() 1185 1186 copts := ac.dopts.copts 1187 if ac.scopts.CredsBundle != nil { 1188 copts.CredsBundle = ac.scopts.CredsBundle 1189 } 1190 ac.mu.Unlock() 1191 1192 channelz.Infof(ac.channelzID, "Subchannel picks a new address %q to connect", addr.Addr) 1193 1194 newTr, reconnect, err := ac.createTransport(addr, copts, connectDeadline) 1195 if err == nil { 1196 return newTr, addr, reconnect, nil 1197 } 1198 if firstConnErr == nil { 1199 firstConnErr = err 1200 } 1201 ac.cc.blockingpicker.updateConnectionError(err) 1202 } 1203 1204 // Couldn't connect to any address. 1205 return nil, resolver.Address{}, nil, firstConnErr 1206} 1207 1208// createTransport creates a connection to addr. It returns the transport and a 1209// Event in the successful case. The Event fires when the returned transport 1210// disconnects. 1211func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) (transport.ClientTransport, *grpcsync.Event, error) { 1212 prefaceReceived := make(chan struct{}) 1213 onCloseCalled := make(chan struct{}) 1214 reconnect := grpcsync.NewEvent() 1215 1216 authority := ac.cc.authority 1217 // addr.ServerName takes precedent over ClientConn authority, if present. 1218 if addr.ServerName != "" { 1219 authority = addr.ServerName 1220 } 1221 1222 target := transport.TargetInfo{ 1223 Addr: addr.Addr, 1224 Metadata: addr.Metadata, 1225 Authority: authority, 1226 } 1227 1228 once := sync.Once{} 1229 onGoAway := func(r transport.GoAwayReason) { 1230 ac.mu.Lock() 1231 ac.adjustParams(r) 1232 once.Do(func() { 1233 if ac.state == connectivity.Ready { 1234 // Prevent this SubConn from being used for new RPCs by setting its 1235 // state to Connecting. 1236 // 1237 // TODO: this should be Idle when grpc-go properly supports it. 1238 ac.updateConnectivityState(connectivity.Connecting, nil) 1239 } 1240 }) 1241 ac.mu.Unlock() 1242 reconnect.Fire() 1243 } 1244 1245 onClose := func() { 1246 ac.mu.Lock() 1247 once.Do(func() { 1248 if ac.state == connectivity.Ready { 1249 // Prevent this SubConn from being used for new RPCs by setting its 1250 // state to Connecting. 1251 // 1252 // TODO: this should be Idle when grpc-go properly supports it. 1253 ac.updateConnectivityState(connectivity.Connecting, nil) 1254 } 1255 }) 1256 ac.mu.Unlock() 1257 close(onCloseCalled) 1258 reconnect.Fire() 1259 } 1260 1261 onPrefaceReceipt := func() { 1262 close(prefaceReceived) 1263 } 1264 1265 connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline) 1266 defer cancel() 1267 if channelz.IsOn() { 1268 copts.ChannelzParentID = ac.channelzID 1269 } 1270 1271 newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose) 1272 if err != nil { 1273 // newTr is either nil, or closed. 1274 channelz.Warningf(ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v. Err: %v. Reconnecting...", addr, err) 1275 return nil, nil, err 1276 } 1277 1278 select { 1279 case <-time.After(time.Until(connectDeadline)): 1280 // We didn't get the preface in time. 1281 newTr.Close() 1282 channelz.Warningf(ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr) 1283 return nil, nil, errors.New("timed out waiting for server handshake") 1284 case <-prefaceReceived: 1285 // We got the preface - huzzah! things are good. 1286 case <-onCloseCalled: 1287 // The transport has already closed - noop. 1288 return nil, nil, errors.New("connection closed") 1289 // TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix. 1290 } 1291 return newTr, reconnect, nil 1292} 1293 1294// startHealthCheck starts the health checking stream (RPC) to watch the health 1295// stats of this connection if health checking is requested and configured. 1296// 1297// LB channel health checking is enabled when all requirements below are met: 1298// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption 1299// 2. internal.HealthCheckFunc is set by importing the grpc/healthcheck package 1300// 3. a service config with non-empty healthCheckConfig field is provided 1301// 4. the load balancer requests it 1302// 1303// It sets addrConn to READY if the health checking stream is not started. 1304// 1305// Caller must hold ac.mu. 1306func (ac *addrConn) startHealthCheck(ctx context.Context) { 1307 var healthcheckManagingState bool 1308 defer func() { 1309 if !healthcheckManagingState { 1310 ac.updateConnectivityState(connectivity.Ready, nil) 1311 } 1312 }() 1313 1314 if ac.cc.dopts.disableHealthCheck { 1315 return 1316 } 1317 healthCheckConfig := ac.cc.healthCheckConfig() 1318 if healthCheckConfig == nil { 1319 return 1320 } 1321 if !ac.scopts.HealthCheckEnabled { 1322 return 1323 } 1324 healthCheckFunc := ac.cc.dopts.healthCheckFunc 1325 if healthCheckFunc == nil { 1326 // The health package is not imported to set health check function. 1327 // 1328 // TODO: add a link to the health check doc in the error message. 1329 channelz.Error(ac.channelzID, "Health check is requested but health check function is not set.") 1330 return 1331 } 1332 1333 healthcheckManagingState = true 1334 1335 // Set up the health check helper functions. 1336 currentTr := ac.transport 1337 newStream := func(method string) (interface{}, error) { 1338 ac.mu.Lock() 1339 if ac.transport != currentTr { 1340 ac.mu.Unlock() 1341 return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use") 1342 } 1343 ac.mu.Unlock() 1344 return newNonRetryClientStream(ctx, &StreamDesc{ServerStreams: true}, method, currentTr, ac) 1345 } 1346 setConnectivityState := func(s connectivity.State, lastErr error) { 1347 ac.mu.Lock() 1348 defer ac.mu.Unlock() 1349 if ac.transport != currentTr { 1350 return 1351 } 1352 ac.updateConnectivityState(s, lastErr) 1353 } 1354 // Start the health checking stream. 1355 go func() { 1356 err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName) 1357 if err != nil { 1358 if status.Code(err) == codes.Unimplemented { 1359 channelz.Error(ac.channelzID, "Subchannel health check is unimplemented at server side, thus health check is disabled") 1360 } else { 1361 channelz.Errorf(ac.channelzID, "HealthCheckFunc exits with unexpected error %v", err) 1362 } 1363 } 1364 }() 1365} 1366 1367func (ac *addrConn) resetConnectBackoff() { 1368 ac.mu.Lock() 1369 close(ac.resetBackoff) 1370 ac.backoffIdx = 0 1371 ac.resetBackoff = make(chan struct{}) 1372 ac.mu.Unlock() 1373} 1374 1375// getReadyTransport returns the transport if ac's state is READY. 1376// Otherwise it returns nil, false. 1377// If ac's state is IDLE, it will trigger ac to connect. 1378func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) { 1379 ac.mu.Lock() 1380 if ac.state == connectivity.Ready && ac.transport != nil { 1381 t := ac.transport 1382 ac.mu.Unlock() 1383 return t, true 1384 } 1385 var idle bool 1386 if ac.state == connectivity.Idle { 1387 idle = true 1388 } 1389 ac.mu.Unlock() 1390 // Trigger idle ac to connect. 1391 if idle { 1392 ac.connect() 1393 } 1394 return nil, false 1395} 1396 1397// tearDown starts to tear down the addrConn. 1398// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in 1399// some edge cases (e.g., the caller opens and closes many addrConn's in a 1400// tight loop. 1401// tearDown doesn't remove ac from ac.cc.conns. 1402func (ac *addrConn) tearDown(err error) { 1403 ac.mu.Lock() 1404 if ac.state == connectivity.Shutdown { 1405 ac.mu.Unlock() 1406 return 1407 } 1408 curTr := ac.transport 1409 ac.transport = nil 1410 // We have to set the state to Shutdown before anything else to prevent races 1411 // between setting the state and logic that waits on context cancellation / etc. 1412 ac.updateConnectivityState(connectivity.Shutdown, nil) 1413 ac.cancel() 1414 ac.curAddr = resolver.Address{} 1415 if err == errConnDrain && curTr != nil { 1416 // GracefulClose(...) may be executed multiple times when 1417 // i) receiving multiple GoAway frames from the server; or 1418 // ii) there are concurrent name resolver/Balancer triggered 1419 // address removal and GoAway. 1420 // We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu. 1421 ac.mu.Unlock() 1422 curTr.GracefulClose() 1423 ac.mu.Lock() 1424 } 1425 if channelz.IsOn() { 1426 channelz.AddTraceEvent(ac.channelzID, 0, &channelz.TraceEventDesc{ 1427 Desc: "Subchannel Deleted", 1428 Severity: channelz.CtINFO, 1429 Parent: &channelz.TraceEventDesc{ 1430 Desc: fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID), 1431 Severity: channelz.CtINFO, 1432 }, 1433 }) 1434 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to 1435 // the entity being deleted, and thus prevent it from being deleted right away. 1436 channelz.RemoveEntry(ac.channelzID) 1437 } 1438 ac.mu.Unlock() 1439} 1440 1441func (ac *addrConn) getState() connectivity.State { 1442 ac.mu.Lock() 1443 defer ac.mu.Unlock() 1444 return ac.state 1445} 1446 1447func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric { 1448 ac.mu.Lock() 1449 addr := ac.curAddr.Addr 1450 ac.mu.Unlock() 1451 return &channelz.ChannelInternalMetric{ 1452 State: ac.getState(), 1453 Target: addr, 1454 CallsStarted: atomic.LoadInt64(&ac.czData.callsStarted), 1455 CallsSucceeded: atomic.LoadInt64(&ac.czData.callsSucceeded), 1456 CallsFailed: atomic.LoadInt64(&ac.czData.callsFailed), 1457 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)), 1458 } 1459} 1460 1461func (ac *addrConn) incrCallsStarted() { 1462 atomic.AddInt64(&ac.czData.callsStarted, 1) 1463 atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano()) 1464} 1465 1466func (ac *addrConn) incrCallsSucceeded() { 1467 atomic.AddInt64(&ac.czData.callsSucceeded, 1) 1468} 1469 1470func (ac *addrConn) incrCallsFailed() { 1471 atomic.AddInt64(&ac.czData.callsFailed, 1) 1472} 1473 1474type retryThrottler struct { 1475 max float64 1476 thresh float64 1477 ratio float64 1478 1479 mu sync.Mutex 1480 tokens float64 // TODO(dfawley): replace with atomic and remove lock. 1481} 1482 1483// throttle subtracts a retry token from the pool and returns whether a retry 1484// should be throttled (disallowed) based upon the retry throttling policy in 1485// the service config. 1486func (rt *retryThrottler) throttle() bool { 1487 if rt == nil { 1488 return false 1489 } 1490 rt.mu.Lock() 1491 defer rt.mu.Unlock() 1492 rt.tokens-- 1493 if rt.tokens < 0 { 1494 rt.tokens = 0 1495 } 1496 return rt.tokens <= rt.thresh 1497} 1498 1499func (rt *retryThrottler) successfulRPC() { 1500 if rt == nil { 1501 return 1502 } 1503 rt.mu.Lock() 1504 defer rt.mu.Unlock() 1505 rt.tokens += rt.ratio 1506 if rt.tokens > rt.max { 1507 rt.tokens = rt.max 1508 } 1509} 1510 1511type channelzChannel struct { 1512 cc *ClientConn 1513} 1514 1515func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric { 1516 return c.cc.channelzMetric() 1517} 1518 1519// ErrClientConnTimeout indicates that the ClientConn cannot establish the 1520// underlying connections within the specified timeout. 1521// 1522// Deprecated: This error is never returned by grpc and should not be 1523// referenced by users. 1524var ErrClientConnTimeout = errors.New("grpc: timed out when dialing") 1525 1526func (cc *ClientConn) getResolver(scheme string) resolver.Builder { 1527 for _, rb := range cc.dopts.resolvers { 1528 if cc.parsedTarget.Scheme == rb.Scheme() { 1529 return rb 1530 } 1531 } 1532 return resolver.Get(cc.parsedTarget.Scheme) 1533} 1534