1/* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package grpc 20 21import ( 22 "context" 23 "errors" 24 "fmt" 25 "math" 26 "net" 27 "reflect" 28 "strings" 29 "sync" 30 "sync/atomic" 31 "time" 32 33 "google.golang.org/grpc/balancer" 34 "google.golang.org/grpc/balancer/base" 35 "google.golang.org/grpc/codes" 36 "google.golang.org/grpc/connectivity" 37 "google.golang.org/grpc/credentials" 38 "google.golang.org/grpc/grpclog" 39 "google.golang.org/grpc/internal/backoff" 40 "google.golang.org/grpc/internal/channelz" 41 "google.golang.org/grpc/internal/grpcsync" 42 "google.golang.org/grpc/internal/transport" 43 "google.golang.org/grpc/keepalive" 44 "google.golang.org/grpc/resolver" 45 "google.golang.org/grpc/serviceconfig" 46 "google.golang.org/grpc/status" 47 48 _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin. 49 _ "google.golang.org/grpc/internal/resolver/dns" // To register dns resolver. 50 _ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver. 51) 52 53const ( 54 // minimum time to give a connection to complete 55 minConnectTimeout = 20 * time.Second 56 // must match grpclbName in grpclb/grpclb.go 57 grpclbName = "grpclb" 58) 59 60var ( 61 // ErrClientConnClosing indicates that the operation is illegal because 62 // the ClientConn is closing. 63 // 64 // Deprecated: this error should not be relied upon by users; use the status 65 // code of Canceled instead. 66 ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing") 67 // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs. 68 errConnDrain = errors.New("grpc: the connection is drained") 69 // errConnClosing indicates that the connection is closing. 70 errConnClosing = errors.New("grpc: the connection is closing") 71 // errBalancerClosed indicates that the balancer is closed. 72 errBalancerClosed = errors.New("grpc: balancer is closed") 73 // invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default 74 // service config. 75 invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid" 76) 77 78// The following errors are returned from Dial and DialContext 79var ( 80 // errNoTransportSecurity indicates that there is no transport security 81 // being set for ClientConn. Users should either set one or explicitly 82 // call WithInsecure DialOption to disable security. 83 errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)") 84 // errTransportCredsAndBundle indicates that creds bundle is used together 85 // with other individual Transport Credentials. 86 errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials") 87 // errTransportCredentialsMissing indicates that users want to transmit security 88 // information (e.g., OAuth2 token) which requires secure connection on an insecure 89 // connection. 90 errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)") 91 // errCredentialsConflict indicates that grpc.WithTransportCredentials() 92 // and grpc.WithInsecure() are both called for a connection. 93 errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)") 94) 95 96const ( 97 defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4 98 defaultClientMaxSendMessageSize = math.MaxInt32 99 // http2IOBufSize specifies the buffer size for sending frames. 100 defaultWriteBufSize = 32 * 1024 101 defaultReadBufSize = 32 * 1024 102) 103 104// Dial creates a client connection to the given target. 105func Dial(target string, opts ...DialOption) (*ClientConn, error) { 106 return DialContext(context.Background(), target, opts...) 107} 108 109// DialContext creates a client connection to the given target. By default, it's 110// a non-blocking dial (the function won't wait for connections to be 111// established, and connecting happens in the background). To make it a blocking 112// dial, use WithBlock() dial option. 113// 114// In the non-blocking case, the ctx does not act against the connection. It 115// only controls the setup steps. 116// 117// In the blocking case, ctx can be used to cancel or expire the pending 118// connection. Once this function returns, the cancellation and expiration of 119// ctx will be noop. Users should call ClientConn.Close to terminate all the 120// pending operations after this function returns. 121// 122// The target name syntax is defined in 123// https://github.com/grpc/grpc/blob/master/doc/naming.md. 124// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target. 125func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) { 126 cc := &ClientConn{ 127 target: target, 128 csMgr: &connectivityStateManager{}, 129 conns: make(map[*addrConn]struct{}), 130 dopts: defaultDialOptions(), 131 blockingpicker: newPickerWrapper(), 132 czData: new(channelzData), 133 firstResolveEvent: grpcsync.NewEvent(), 134 } 135 cc.retryThrottler.Store((*retryThrottler)(nil)) 136 cc.ctx, cc.cancel = context.WithCancel(context.Background()) 137 138 for _, opt := range opts { 139 opt.apply(&cc.dopts) 140 } 141 142 chainUnaryClientInterceptors(cc) 143 chainStreamClientInterceptors(cc) 144 145 defer func() { 146 if err != nil { 147 cc.Close() 148 } 149 }() 150 151 if channelz.IsOn() { 152 if cc.dopts.channelzParentID != 0 { 153 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target) 154 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{ 155 Desc: "Channel Created", 156 Severity: channelz.CtINFO, 157 Parent: &channelz.TraceEventDesc{ 158 Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID), 159 Severity: channelz.CtINFO, 160 }, 161 }) 162 } else { 163 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target) 164 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{ 165 Desc: "Channel Created", 166 Severity: channelz.CtINFO, 167 }) 168 } 169 cc.csMgr.channelzID = cc.channelzID 170 } 171 172 if !cc.dopts.insecure { 173 if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil { 174 return nil, errNoTransportSecurity 175 } 176 if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil { 177 return nil, errTransportCredsAndBundle 178 } 179 } else { 180 if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil { 181 return nil, errCredentialsConflict 182 } 183 for _, cd := range cc.dopts.copts.PerRPCCredentials { 184 if cd.RequireTransportSecurity() { 185 return nil, errTransportCredentialsMissing 186 } 187 } 188 } 189 190 if cc.dopts.defaultServiceConfigRawJSON != nil { 191 scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON) 192 if scpr.Err != nil { 193 return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err) 194 } 195 cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig) 196 } 197 cc.mkp = cc.dopts.copts.KeepaliveParams 198 199 if cc.dopts.copts.Dialer == nil { 200 cc.dopts.copts.Dialer = newProxyDialer( 201 func(ctx context.Context, addr string) (net.Conn, error) { 202 network, addr := parseDialTarget(addr) 203 return (&net.Dialer{}).DialContext(ctx, network, addr) 204 }, 205 ) 206 } 207 208 if cc.dopts.copts.UserAgent != "" { 209 cc.dopts.copts.UserAgent += " " + grpcUA 210 } else { 211 cc.dopts.copts.UserAgent = grpcUA 212 } 213 214 if cc.dopts.timeout > 0 { 215 var cancel context.CancelFunc 216 ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout) 217 defer cancel() 218 } 219 defer func() { 220 select { 221 case <-ctx.Done(): 222 conn, err = nil, ctx.Err() 223 default: 224 } 225 }() 226 227 scSet := false 228 if cc.dopts.scChan != nil { 229 // Try to get an initial service config. 230 select { 231 case sc, ok := <-cc.dopts.scChan: 232 if ok { 233 cc.sc = &sc 234 scSet = true 235 } 236 default: 237 } 238 } 239 if cc.dopts.bs == nil { 240 cc.dopts.bs = backoff.DefaultExponential 241 } 242 243 // Determine the resolver to use. 244 cc.parsedTarget = parseTarget(cc.target) 245 grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme) 246 resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme) 247 if resolverBuilder == nil { 248 // If resolver builder is still nil, the parsed target's scheme is 249 // not registered. Fallback to default resolver and set Endpoint to 250 // the original target. 251 grpclog.Infof("scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme) 252 cc.parsedTarget = resolver.Target{ 253 Scheme: resolver.GetDefaultScheme(), 254 Endpoint: target, 255 } 256 resolverBuilder = cc.getResolver(cc.parsedTarget.Scheme) 257 if resolverBuilder == nil { 258 return nil, fmt.Errorf("could not get resolver for default scheme: %q", cc.parsedTarget.Scheme) 259 } 260 } 261 262 creds := cc.dopts.copts.TransportCredentials 263 if creds != nil && creds.Info().ServerName != "" { 264 cc.authority = creds.Info().ServerName 265 } else if cc.dopts.insecure && cc.dopts.authority != "" { 266 cc.authority = cc.dopts.authority 267 } else { 268 // Use endpoint from "scheme://authority/endpoint" as the default 269 // authority for ClientConn. 270 cc.authority = cc.parsedTarget.Endpoint 271 } 272 273 if cc.dopts.scChan != nil && !scSet { 274 // Blocking wait for the initial service config. 275 select { 276 case sc, ok := <-cc.dopts.scChan: 277 if ok { 278 cc.sc = &sc 279 } 280 case <-ctx.Done(): 281 return nil, ctx.Err() 282 } 283 } 284 if cc.dopts.scChan != nil { 285 go cc.scWatcher() 286 } 287 288 var credsClone credentials.TransportCredentials 289 if creds := cc.dopts.copts.TransportCredentials; creds != nil { 290 credsClone = creds.Clone() 291 } 292 cc.balancerBuildOpts = balancer.BuildOptions{ 293 DialCreds: credsClone, 294 CredsBundle: cc.dopts.copts.CredsBundle, 295 Dialer: cc.dopts.copts.Dialer, 296 ChannelzParentID: cc.channelzID, 297 Target: cc.parsedTarget, 298 } 299 300 // Build the resolver. 301 rWrapper, err := newCCResolverWrapper(cc, resolverBuilder) 302 if err != nil { 303 return nil, fmt.Errorf("failed to build resolver: %v", err) 304 } 305 cc.mu.Lock() 306 cc.resolverWrapper = rWrapper 307 cc.mu.Unlock() 308 309 // A blocking dial blocks until the clientConn is ready. 310 if cc.dopts.block { 311 for { 312 s := cc.GetState() 313 if s == connectivity.Ready { 314 break 315 } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure { 316 if err = cc.blockingpicker.connectionError(); err != nil { 317 terr, ok := err.(interface { 318 Temporary() bool 319 }) 320 if ok && !terr.Temporary() { 321 return nil, err 322 } 323 } 324 } 325 if !cc.WaitForStateChange(ctx, s) { 326 // ctx got timeout or canceled. 327 return nil, ctx.Err() 328 } 329 } 330 } 331 332 return cc, nil 333} 334 335// chainUnaryClientInterceptors chains all unary client interceptors into one. 336func chainUnaryClientInterceptors(cc *ClientConn) { 337 interceptors := cc.dopts.chainUnaryInts 338 // Prepend dopts.unaryInt to the chaining interceptors if it exists, since unaryInt will 339 // be executed before any other chained interceptors. 340 if cc.dopts.unaryInt != nil { 341 interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...) 342 } 343 var chainedInt UnaryClientInterceptor 344 if len(interceptors) == 0 { 345 chainedInt = nil 346 } else if len(interceptors) == 1 { 347 chainedInt = interceptors[0] 348 } else { 349 chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error { 350 return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...) 351 } 352 } 353 cc.dopts.unaryInt = chainedInt 354} 355 356// getChainUnaryInvoker recursively generate the chained unary invoker. 357func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, finalInvoker UnaryInvoker) UnaryInvoker { 358 if curr == len(interceptors)-1 { 359 return finalInvoker 360 } 361 return func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error { 362 return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...) 363 } 364} 365 366// chainStreamClientInterceptors chains all stream client interceptors into one. 367func chainStreamClientInterceptors(cc *ClientConn) { 368 interceptors := cc.dopts.chainStreamInts 369 // Prepend dopts.streamInt to the chaining interceptors if it exists, since streamInt will 370 // be executed before any other chained interceptors. 371 if cc.dopts.streamInt != nil { 372 interceptors = append([]StreamClientInterceptor{cc.dopts.streamInt}, interceptors...) 373 } 374 var chainedInt StreamClientInterceptor 375 if len(interceptors) == 0 { 376 chainedInt = nil 377 } else if len(interceptors) == 1 { 378 chainedInt = interceptors[0] 379 } else { 380 chainedInt = func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error) { 381 return interceptors[0](ctx, desc, cc, method, getChainStreamer(interceptors, 0, streamer), opts...) 382 } 383 } 384 cc.dopts.streamInt = chainedInt 385} 386 387// getChainStreamer recursively generate the chained client stream constructor. 388func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStreamer Streamer) Streamer { 389 if curr == len(interceptors)-1 { 390 return finalStreamer 391 } 392 return func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) { 393 return interceptors[curr+1](ctx, desc, cc, method, getChainStreamer(interceptors, curr+1, finalStreamer), opts...) 394 } 395} 396 397// connectivityStateManager keeps the connectivity.State of ClientConn. 398// This struct will eventually be exported so the balancers can access it. 399type connectivityStateManager struct { 400 mu sync.Mutex 401 state connectivity.State 402 notifyChan chan struct{} 403 channelzID int64 404} 405 406// updateState updates the connectivity.State of ClientConn. 407// If there's a change it notifies goroutines waiting on state change to 408// happen. 409func (csm *connectivityStateManager) updateState(state connectivity.State) { 410 csm.mu.Lock() 411 defer csm.mu.Unlock() 412 if csm.state == connectivity.Shutdown { 413 return 414 } 415 if csm.state == state { 416 return 417 } 418 csm.state = state 419 if channelz.IsOn() { 420 channelz.AddTraceEvent(csm.channelzID, &channelz.TraceEventDesc{ 421 Desc: fmt.Sprintf("Channel Connectivity change to %v", state), 422 Severity: channelz.CtINFO, 423 }) 424 } 425 if csm.notifyChan != nil { 426 // There are other goroutines waiting on this channel. 427 close(csm.notifyChan) 428 csm.notifyChan = nil 429 } 430} 431 432func (csm *connectivityStateManager) getState() connectivity.State { 433 csm.mu.Lock() 434 defer csm.mu.Unlock() 435 return csm.state 436} 437 438func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} { 439 csm.mu.Lock() 440 defer csm.mu.Unlock() 441 if csm.notifyChan == nil { 442 csm.notifyChan = make(chan struct{}) 443 } 444 return csm.notifyChan 445} 446 447// ClientConnInterface defines the functions clients need to perform unary and 448// streaming RPCs. It is implemented by *ClientConn, and is only intended to 449// be referenced by generated code. 450type ClientConnInterface interface { 451 // Invoke performs a unary RPC and returns after the response is received 452 // into reply. 453 Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error 454 // NewStream begins a streaming RPC. 455 NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) 456} 457 458// Assert *ClientConn implements ClientConnInterface. 459var _ ClientConnInterface = (*ClientConn)(nil) 460 461// ClientConn represents a virtual connection to a conceptual endpoint, to 462// perform RPCs. 463// 464// A ClientConn is free to have zero or more actual connections to the endpoint 465// based on configuration, load, etc. It is also free to determine which actual 466// endpoints to use and may change it every RPC, permitting client-side load 467// balancing. 468// 469// A ClientConn encapsulates a range of functionality including name 470// resolution, TCP connection establishment (with retries and backoff) and TLS 471// handshakes. It also handles errors on established connections by 472// re-resolving the name and reconnecting. 473type ClientConn struct { 474 ctx context.Context 475 cancel context.CancelFunc 476 477 target string 478 parsedTarget resolver.Target 479 authority string 480 dopts dialOptions 481 csMgr *connectivityStateManager 482 483 balancerBuildOpts balancer.BuildOptions 484 blockingpicker *pickerWrapper 485 486 mu sync.RWMutex 487 resolverWrapper *ccResolverWrapper 488 sc *ServiceConfig 489 conns map[*addrConn]struct{} 490 // Keepalive parameter can be updated if a GoAway is received. 491 mkp keepalive.ClientParameters 492 curBalancerName string 493 balancerWrapper *ccBalancerWrapper 494 retryThrottler atomic.Value 495 496 firstResolveEvent *grpcsync.Event 497 498 channelzID int64 // channelz unique identification number 499 czData *channelzData 500} 501 502// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or 503// ctx expires. A true value is returned in former case and false in latter. 504// This is an EXPERIMENTAL API. 505func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool { 506 ch := cc.csMgr.getNotifyChan() 507 if cc.csMgr.getState() != sourceState { 508 return true 509 } 510 select { 511 case <-ctx.Done(): 512 return false 513 case <-ch: 514 return true 515 } 516} 517 518// GetState returns the connectivity.State of ClientConn. 519// This is an EXPERIMENTAL API. 520func (cc *ClientConn) GetState() connectivity.State { 521 return cc.csMgr.getState() 522} 523 524func (cc *ClientConn) scWatcher() { 525 for { 526 select { 527 case sc, ok := <-cc.dopts.scChan: 528 if !ok { 529 return 530 } 531 cc.mu.Lock() 532 // TODO: load balance policy runtime change is ignored. 533 // We may revisit this decision in the future. 534 cc.sc = &sc 535 cc.mu.Unlock() 536 case <-cc.ctx.Done(): 537 return 538 } 539 } 540} 541 542// waitForResolvedAddrs blocks until the resolver has provided addresses or the 543// context expires. Returns nil unless the context expires first; otherwise 544// returns a status error based on the context. 545func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error { 546 // This is on the RPC path, so we use a fast path to avoid the 547 // more-expensive "select" below after the resolver has returned once. 548 if cc.firstResolveEvent.HasFired() { 549 return nil 550 } 551 select { 552 case <-cc.firstResolveEvent.Done(): 553 return nil 554 case <-ctx.Done(): 555 return status.FromContextError(ctx.Err()).Err() 556 case <-cc.ctx.Done(): 557 return ErrClientConnClosing 558 } 559} 560 561var emptyServiceConfig *ServiceConfig 562 563func init() { 564 cfg := parseServiceConfig("{}") 565 if cfg.Err != nil { 566 panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err)) 567 } 568 emptyServiceConfig = cfg.Config.(*ServiceConfig) 569} 570 571func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) { 572 if cc.sc != nil { 573 cc.applyServiceConfigAndBalancer(cc.sc, addrs) 574 return 575 } 576 if cc.dopts.defaultServiceConfig != nil { 577 cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, addrs) 578 } else { 579 cc.applyServiceConfigAndBalancer(emptyServiceConfig, addrs) 580 } 581} 582 583func (cc *ClientConn) updateResolverState(s resolver.State, err error) error { 584 defer cc.firstResolveEvent.Fire() 585 cc.mu.Lock() 586 // Check if the ClientConn is already closed. Some fields (e.g. 587 // balancerWrapper) are set to nil when closing the ClientConn, and could 588 // cause nil pointer panic if we don't have this check. 589 if cc.conns == nil { 590 cc.mu.Unlock() 591 return nil 592 } 593 594 if err != nil { 595 // May need to apply the initial service config in case the resolver 596 // doesn't support service configs, or doesn't provide a service config 597 // with the new addresses. 598 cc.maybeApplyDefaultServiceConfig(nil) 599 600 if cc.balancerWrapper != nil { 601 cc.balancerWrapper.resolverError(err) 602 } 603 604 // No addresses are valid with err set; return early. 605 cc.mu.Unlock() 606 return balancer.ErrBadResolverState 607 } 608 609 var ret error 610 if cc.dopts.disableServiceConfig || s.ServiceConfig == nil { 611 cc.maybeApplyDefaultServiceConfig(s.Addresses) 612 // TODO: do we need to apply a failing LB policy if there is no 613 // default, per the error handling design? 614 } else { 615 if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok { 616 cc.applyServiceConfigAndBalancer(sc, s.Addresses) 617 } else { 618 ret = balancer.ErrBadResolverState 619 if cc.balancerWrapper == nil { 620 var err error 621 if s.ServiceConfig.Err != nil { 622 err = status.Errorf(codes.Unavailable, "error parsing service config: %v", s.ServiceConfig.Err) 623 } else { 624 err = status.Errorf(codes.Unavailable, "illegal service config type: %T", s.ServiceConfig.Config) 625 } 626 cc.blockingpicker.updatePicker(base.NewErrPicker(err)) 627 cc.csMgr.updateState(connectivity.TransientFailure) 628 cc.mu.Unlock() 629 return ret 630 } 631 } 632 } 633 634 var balCfg serviceconfig.LoadBalancingConfig 635 if cc.dopts.balancerBuilder == nil && cc.sc != nil && cc.sc.lbConfig != nil { 636 balCfg = cc.sc.lbConfig.cfg 637 } 638 639 cbn := cc.curBalancerName 640 bw := cc.balancerWrapper 641 cc.mu.Unlock() 642 if cbn != grpclbName { 643 // Filter any grpclb addresses since we don't have the grpclb balancer. 644 for i := 0; i < len(s.Addresses); { 645 if s.Addresses[i].Type == resolver.GRPCLB { 646 copy(s.Addresses[i:], s.Addresses[i+1:]) 647 s.Addresses = s.Addresses[:len(s.Addresses)-1] 648 continue 649 } 650 i++ 651 } 652 } 653 uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg}) 654 if ret == nil { 655 ret = uccsErr // prefer ErrBadResolver state since any other error is 656 // currently meaningless to the caller. 657 } 658 return ret 659} 660 661// switchBalancer starts the switching from current balancer to the balancer 662// with the given name. 663// 664// It will NOT send the current address list to the new balancer. If needed, 665// caller of this function should send address list to the new balancer after 666// this function returns. 667// 668// Caller must hold cc.mu. 669func (cc *ClientConn) switchBalancer(name string) { 670 if strings.EqualFold(cc.curBalancerName, name) { 671 return 672 } 673 674 grpclog.Infof("ClientConn switching balancer to %q", name) 675 if cc.dopts.balancerBuilder != nil { 676 grpclog.Infoln("ignoring balancer switching: Balancer DialOption used instead") 677 return 678 } 679 if cc.balancerWrapper != nil { 680 cc.balancerWrapper.close() 681 } 682 683 builder := balancer.Get(name) 684 if channelz.IsOn() { 685 if builder == nil { 686 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{ 687 Desc: fmt.Sprintf("Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName), 688 Severity: channelz.CtWarning, 689 }) 690 } else { 691 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{ 692 Desc: fmt.Sprintf("Channel switches to new LB policy %q", name), 693 Severity: channelz.CtINFO, 694 }) 695 } 696 } 697 if builder == nil { 698 grpclog.Infof("failed to get balancer builder for: %v, using pick_first instead", name) 699 builder = newPickfirstBuilder() 700 } 701 702 cc.curBalancerName = builder.Name() 703 cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts) 704} 705 706func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) { 707 cc.mu.Lock() 708 if cc.conns == nil { 709 cc.mu.Unlock() 710 return 711 } 712 // TODO(bar switching) send updates to all balancer wrappers when balancer 713 // gracefully switching is supported. 714 cc.balancerWrapper.handleSubConnStateChange(sc, s, err) 715 cc.mu.Unlock() 716} 717 718// newAddrConn creates an addrConn for addrs and adds it to cc.conns. 719// 720// Caller needs to make sure len(addrs) > 0. 721func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) { 722 ac := &addrConn{ 723 cc: cc, 724 addrs: addrs, 725 scopts: opts, 726 dopts: cc.dopts, 727 czData: new(channelzData), 728 resetBackoff: make(chan struct{}), 729 } 730 ac.ctx, ac.cancel = context.WithCancel(cc.ctx) 731 // Track ac in cc. This needs to be done before any getTransport(...) is called. 732 cc.mu.Lock() 733 if cc.conns == nil { 734 cc.mu.Unlock() 735 return nil, ErrClientConnClosing 736 } 737 if channelz.IsOn() { 738 ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "") 739 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{ 740 Desc: "Subchannel Created", 741 Severity: channelz.CtINFO, 742 Parent: &channelz.TraceEventDesc{ 743 Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID), 744 Severity: channelz.CtINFO, 745 }, 746 }) 747 } 748 cc.conns[ac] = struct{}{} 749 cc.mu.Unlock() 750 return ac, nil 751} 752 753// removeAddrConn removes the addrConn in the subConn from clientConn. 754// It also tears down the ac with the given error. 755func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) { 756 cc.mu.Lock() 757 if cc.conns == nil { 758 cc.mu.Unlock() 759 return 760 } 761 delete(cc.conns, ac) 762 cc.mu.Unlock() 763 ac.tearDown(err) 764} 765 766func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric { 767 return &channelz.ChannelInternalMetric{ 768 State: cc.GetState(), 769 Target: cc.target, 770 CallsStarted: atomic.LoadInt64(&cc.czData.callsStarted), 771 CallsSucceeded: atomic.LoadInt64(&cc.czData.callsSucceeded), 772 CallsFailed: atomic.LoadInt64(&cc.czData.callsFailed), 773 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)), 774 } 775} 776 777// Target returns the target string of the ClientConn. 778// This is an EXPERIMENTAL API. 779func (cc *ClientConn) Target() string { 780 return cc.target 781} 782 783func (cc *ClientConn) incrCallsStarted() { 784 atomic.AddInt64(&cc.czData.callsStarted, 1) 785 atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano()) 786} 787 788func (cc *ClientConn) incrCallsSucceeded() { 789 atomic.AddInt64(&cc.czData.callsSucceeded, 1) 790} 791 792func (cc *ClientConn) incrCallsFailed() { 793 atomic.AddInt64(&cc.czData.callsFailed, 1) 794} 795 796// connect starts creating a transport. 797// It does nothing if the ac is not IDLE. 798// TODO(bar) Move this to the addrConn section. 799func (ac *addrConn) connect() error { 800 ac.mu.Lock() 801 if ac.state == connectivity.Shutdown { 802 ac.mu.Unlock() 803 return errConnClosing 804 } 805 if ac.state != connectivity.Idle { 806 ac.mu.Unlock() 807 return nil 808 } 809 // Update connectivity state within the lock to prevent subsequent or 810 // concurrent calls from resetting the transport more than once. 811 ac.updateConnectivityState(connectivity.Connecting, nil) 812 ac.mu.Unlock() 813 814 // Start a goroutine connecting to the server asynchronously. 815 go ac.resetTransport() 816 return nil 817} 818 819// tryUpdateAddrs tries to update ac.addrs with the new addresses list. 820// 821// If ac is Connecting, it returns false. The caller should tear down the ac and 822// create a new one. Note that the backoff will be reset when this happens. 823// 824// If ac is TransientFailure, it updates ac.addrs and returns true. The updated 825// addresses will be picked up by retry in the next iteration after backoff. 826// 827// If ac is Shutdown or Idle, it updates ac.addrs and returns true. 828// 829// If ac is Ready, it checks whether current connected address of ac is in the 830// new addrs list. 831// - If true, it updates ac.addrs and returns true. The ac will keep using 832// the existing connection. 833// - If false, it does nothing and returns false. 834func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool { 835 ac.mu.Lock() 836 defer ac.mu.Unlock() 837 grpclog.Infof("addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs) 838 if ac.state == connectivity.Shutdown || 839 ac.state == connectivity.TransientFailure || 840 ac.state == connectivity.Idle { 841 ac.addrs = addrs 842 return true 843 } 844 845 if ac.state == connectivity.Connecting { 846 return false 847 } 848 849 // ac.state is Ready, try to find the connected address. 850 var curAddrFound bool 851 for _, a := range addrs { 852 if reflect.DeepEqual(ac.curAddr, a) { 853 curAddrFound = true 854 break 855 } 856 } 857 grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound) 858 if curAddrFound { 859 ac.addrs = addrs 860 } 861 862 return curAddrFound 863} 864 865// GetMethodConfig gets the method config of the input method. 866// If there's an exact match for input method (i.e. /service/method), we return 867// the corresponding MethodConfig. 868// If there isn't an exact match for the input method, we look for the default config 869// under the service (i.e /service/). If there is a default MethodConfig for 870// the service, we return it. 871// Otherwise, we return an empty MethodConfig. 872func (cc *ClientConn) GetMethodConfig(method string) MethodConfig { 873 // TODO: Avoid the locking here. 874 cc.mu.RLock() 875 defer cc.mu.RUnlock() 876 if cc.sc == nil { 877 return MethodConfig{} 878 } 879 m, ok := cc.sc.Methods[method] 880 if !ok { 881 i := strings.LastIndex(method, "/") 882 m = cc.sc.Methods[method[:i+1]] 883 } 884 return m 885} 886 887func (cc *ClientConn) healthCheckConfig() *healthCheckConfig { 888 cc.mu.RLock() 889 defer cc.mu.RUnlock() 890 if cc.sc == nil { 891 return nil 892 } 893 return cc.sc.healthCheckConfig 894} 895 896func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) { 897 t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{ 898 Ctx: ctx, 899 FullMethodName: method, 900 }) 901 if err != nil { 902 return nil, nil, toRPCErr(err) 903 } 904 return t, done, nil 905} 906 907func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, addrs []resolver.Address) { 908 if sc == nil { 909 // should never reach here. 910 return 911 } 912 cc.sc = sc 913 914 if cc.sc.retryThrottling != nil { 915 newThrottler := &retryThrottler{ 916 tokens: cc.sc.retryThrottling.MaxTokens, 917 max: cc.sc.retryThrottling.MaxTokens, 918 thresh: cc.sc.retryThrottling.MaxTokens / 2, 919 ratio: cc.sc.retryThrottling.TokenRatio, 920 } 921 cc.retryThrottler.Store(newThrottler) 922 } else { 923 cc.retryThrottler.Store((*retryThrottler)(nil)) 924 } 925 926 if cc.dopts.balancerBuilder == nil { 927 // Only look at balancer types and switch balancer if balancer dial 928 // option is not set. 929 var newBalancerName string 930 if cc.sc != nil && cc.sc.lbConfig != nil { 931 newBalancerName = cc.sc.lbConfig.name 932 } else { 933 var isGRPCLB bool 934 for _, a := range addrs { 935 if a.Type == resolver.GRPCLB { 936 isGRPCLB = true 937 break 938 } 939 } 940 if isGRPCLB { 941 newBalancerName = grpclbName 942 } else if cc.sc != nil && cc.sc.LB != nil { 943 newBalancerName = *cc.sc.LB 944 } else { 945 newBalancerName = PickFirstBalancerName 946 } 947 } 948 cc.switchBalancer(newBalancerName) 949 } else if cc.balancerWrapper == nil { 950 // Balancer dial option was set, and this is the first time handling 951 // resolved addresses. Build a balancer with dopts.balancerBuilder. 952 cc.curBalancerName = cc.dopts.balancerBuilder.Name() 953 cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts) 954 } 955} 956 957func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) { 958 cc.mu.RLock() 959 r := cc.resolverWrapper 960 cc.mu.RUnlock() 961 if r == nil { 962 return 963 } 964 go r.resolveNow(o) 965} 966 967// ResetConnectBackoff wakes up all subchannels in transient failure and causes 968// them to attempt another connection immediately. It also resets the backoff 969// times used for subsequent attempts regardless of the current state. 970// 971// In general, this function should not be used. Typical service or network 972// outages result in a reasonable client reconnection strategy by default. 973// However, if a previously unavailable network becomes available, this may be 974// used to trigger an immediate reconnect. 975// 976// This API is EXPERIMENTAL. 977func (cc *ClientConn) ResetConnectBackoff() { 978 cc.mu.Lock() 979 conns := cc.conns 980 cc.mu.Unlock() 981 for ac := range conns { 982 ac.resetConnectBackoff() 983 } 984} 985 986// Close tears down the ClientConn and all underlying connections. 987func (cc *ClientConn) Close() error { 988 defer cc.cancel() 989 990 cc.mu.Lock() 991 if cc.conns == nil { 992 cc.mu.Unlock() 993 return ErrClientConnClosing 994 } 995 conns := cc.conns 996 cc.conns = nil 997 cc.csMgr.updateState(connectivity.Shutdown) 998 999 rWrapper := cc.resolverWrapper 1000 cc.resolverWrapper = nil 1001 bWrapper := cc.balancerWrapper 1002 cc.balancerWrapper = nil 1003 cc.mu.Unlock() 1004 1005 cc.blockingpicker.close() 1006 1007 if rWrapper != nil { 1008 rWrapper.close() 1009 } 1010 if bWrapper != nil { 1011 bWrapper.close() 1012 } 1013 1014 for ac := range conns { 1015 ac.tearDown(ErrClientConnClosing) 1016 } 1017 if channelz.IsOn() { 1018 ted := &channelz.TraceEventDesc{ 1019 Desc: "Channel Deleted", 1020 Severity: channelz.CtINFO, 1021 } 1022 if cc.dopts.channelzParentID != 0 { 1023 ted.Parent = &channelz.TraceEventDesc{ 1024 Desc: fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID), 1025 Severity: channelz.CtINFO, 1026 } 1027 } 1028 channelz.AddTraceEvent(cc.channelzID, ted) 1029 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to 1030 // the entity being deleted, and thus prevent it from being deleted right away. 1031 channelz.RemoveEntry(cc.channelzID) 1032 } 1033 return nil 1034} 1035 1036// addrConn is a network connection to a given address. 1037type addrConn struct { 1038 ctx context.Context 1039 cancel context.CancelFunc 1040 1041 cc *ClientConn 1042 dopts dialOptions 1043 acbw balancer.SubConn 1044 scopts balancer.NewSubConnOptions 1045 1046 // transport is set when there's a viable transport (note: ac state may not be READY as LB channel 1047 // health checking may require server to report healthy to set ac to READY), and is reset 1048 // to nil when the current transport should no longer be used to create a stream (e.g. after GoAway 1049 // is received, transport is closed, ac has been torn down). 1050 transport transport.ClientTransport // The current transport. 1051 1052 mu sync.Mutex 1053 curAddr resolver.Address // The current address. 1054 addrs []resolver.Address // All addresses that the resolver resolved to. 1055 1056 // Use updateConnectivityState for updating addrConn's connectivity state. 1057 state connectivity.State 1058 1059 backoffIdx int // Needs to be stateful for resetConnectBackoff. 1060 resetBackoff chan struct{} 1061 1062 channelzID int64 // channelz unique identification number. 1063 czData *channelzData 1064} 1065 1066// Note: this requires a lock on ac.mu. 1067func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) { 1068 if ac.state == s { 1069 return 1070 } 1071 1072 updateMsg := fmt.Sprintf("Subchannel Connectivity change to %v", s) 1073 ac.state = s 1074 if channelz.IsOn() { 1075 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{ 1076 Desc: updateMsg, 1077 Severity: channelz.CtINFO, 1078 }) 1079 } 1080 ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr) 1081} 1082 1083// adjustParams updates parameters used to create transports upon 1084// receiving a GoAway. 1085func (ac *addrConn) adjustParams(r transport.GoAwayReason) { 1086 switch r { 1087 case transport.GoAwayTooManyPings: 1088 v := 2 * ac.dopts.copts.KeepaliveParams.Time 1089 ac.cc.mu.Lock() 1090 if v > ac.cc.mkp.Time { 1091 ac.cc.mkp.Time = v 1092 } 1093 ac.cc.mu.Unlock() 1094 } 1095} 1096 1097func (ac *addrConn) resetTransport() { 1098 for i := 0; ; i++ { 1099 if i > 0 { 1100 ac.cc.resolveNow(resolver.ResolveNowOptions{}) 1101 } 1102 1103 ac.mu.Lock() 1104 if ac.state == connectivity.Shutdown { 1105 ac.mu.Unlock() 1106 return 1107 } 1108 1109 addrs := ac.addrs 1110 backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx) 1111 // This will be the duration that dial gets to finish. 1112 dialDuration := minConnectTimeout 1113 if ac.dopts.minConnectTimeout != nil { 1114 dialDuration = ac.dopts.minConnectTimeout() 1115 } 1116 1117 if dialDuration < backoffFor { 1118 // Give dial more time as we keep failing to connect. 1119 dialDuration = backoffFor 1120 } 1121 // We can potentially spend all the time trying the first address, and 1122 // if the server accepts the connection and then hangs, the following 1123 // addresses will never be tried. 1124 // 1125 // The spec doesn't mention what should be done for multiple addresses. 1126 // https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm 1127 connectDeadline := time.Now().Add(dialDuration) 1128 1129 ac.updateConnectivityState(connectivity.Connecting, nil) 1130 ac.transport = nil 1131 ac.mu.Unlock() 1132 1133 newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline) 1134 if err != nil { 1135 // After exhausting all addresses, the addrConn enters 1136 // TRANSIENT_FAILURE. 1137 ac.mu.Lock() 1138 if ac.state == connectivity.Shutdown { 1139 ac.mu.Unlock() 1140 return 1141 } 1142 ac.updateConnectivityState(connectivity.TransientFailure, err) 1143 1144 // Backoff. 1145 b := ac.resetBackoff 1146 ac.mu.Unlock() 1147 1148 timer := time.NewTimer(backoffFor) 1149 select { 1150 case <-timer.C: 1151 ac.mu.Lock() 1152 ac.backoffIdx++ 1153 ac.mu.Unlock() 1154 case <-b: 1155 timer.Stop() 1156 case <-ac.ctx.Done(): 1157 timer.Stop() 1158 return 1159 } 1160 continue 1161 } 1162 1163 ac.mu.Lock() 1164 if ac.state == connectivity.Shutdown { 1165 ac.mu.Unlock() 1166 newTr.Close() 1167 return 1168 } 1169 ac.curAddr = addr 1170 ac.transport = newTr 1171 ac.backoffIdx = 0 1172 1173 hctx, hcancel := context.WithCancel(ac.ctx) 1174 ac.startHealthCheck(hctx) 1175 ac.mu.Unlock() 1176 1177 // Block until the created transport is down. And when this happens, 1178 // we restart from the top of the addr list. 1179 <-reconnect.Done() 1180 hcancel() 1181 // restart connecting - the top of the loop will set state to 1182 // CONNECTING. This is against the current connectivity semantics doc, 1183 // however it allows for graceful behavior for RPCs not yet dispatched 1184 // - unfortunate timing would otherwise lead to the RPC failing even 1185 // though the TRANSIENT_FAILURE state (called for by the doc) would be 1186 // instantaneous. 1187 // 1188 // Ideally we should transition to Idle here and block until there is 1189 // RPC activity that leads to the balancer requesting a reconnect of 1190 // the associated SubConn. 1191 } 1192} 1193 1194// tryAllAddrs tries to creates a connection to the addresses, and stop when at the 1195// first successful one. It returns the transport, the address and a Event in 1196// the successful case. The Event fires when the returned transport disconnects. 1197func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) (transport.ClientTransport, resolver.Address, *grpcsync.Event, error) { 1198 var firstConnErr error 1199 for _, addr := range addrs { 1200 ac.mu.Lock() 1201 if ac.state == connectivity.Shutdown { 1202 ac.mu.Unlock() 1203 return nil, resolver.Address{}, nil, errConnClosing 1204 } 1205 1206 ac.cc.mu.RLock() 1207 ac.dopts.copts.KeepaliveParams = ac.cc.mkp 1208 ac.cc.mu.RUnlock() 1209 1210 copts := ac.dopts.copts 1211 if ac.scopts.CredsBundle != nil { 1212 copts.CredsBundle = ac.scopts.CredsBundle 1213 } 1214 ac.mu.Unlock() 1215 1216 if channelz.IsOn() { 1217 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{ 1218 Desc: fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr), 1219 Severity: channelz.CtINFO, 1220 }) 1221 } 1222 1223 newTr, reconnect, err := ac.createTransport(addr, copts, connectDeadline) 1224 if err == nil { 1225 return newTr, addr, reconnect, nil 1226 } 1227 if firstConnErr == nil { 1228 firstConnErr = err 1229 } 1230 ac.cc.blockingpicker.updateConnectionError(err) 1231 } 1232 1233 // Couldn't connect to any address. 1234 return nil, resolver.Address{}, nil, firstConnErr 1235} 1236 1237// createTransport creates a connection to addr. It returns the transport and a 1238// Event in the successful case. The Event fires when the returned transport 1239// disconnects. 1240func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) (transport.ClientTransport, *grpcsync.Event, error) { 1241 prefaceReceived := make(chan struct{}) 1242 onCloseCalled := make(chan struct{}) 1243 reconnect := grpcsync.NewEvent() 1244 1245 authority := ac.cc.authority 1246 // addr.ServerName takes precedent over ClientConn authority, if present. 1247 if addr.ServerName != "" { 1248 authority = addr.ServerName 1249 } 1250 1251 target := transport.TargetInfo{ 1252 Addr: addr.Addr, 1253 Metadata: addr.Metadata, 1254 Authority: authority, 1255 } 1256 1257 once := sync.Once{} 1258 onGoAway := func(r transport.GoAwayReason) { 1259 ac.mu.Lock() 1260 ac.adjustParams(r) 1261 once.Do(func() { 1262 if ac.state == connectivity.Ready { 1263 // Prevent this SubConn from being used for new RPCs by setting its 1264 // state to Connecting. 1265 // 1266 // TODO: this should be Idle when grpc-go properly supports it. 1267 ac.updateConnectivityState(connectivity.Connecting, nil) 1268 } 1269 }) 1270 ac.mu.Unlock() 1271 reconnect.Fire() 1272 } 1273 1274 onClose := func() { 1275 ac.mu.Lock() 1276 once.Do(func() { 1277 if ac.state == connectivity.Ready { 1278 // Prevent this SubConn from being used for new RPCs by setting its 1279 // state to Connecting. 1280 // 1281 // TODO: this should be Idle when grpc-go properly supports it. 1282 ac.updateConnectivityState(connectivity.Connecting, nil) 1283 } 1284 }) 1285 ac.mu.Unlock() 1286 close(onCloseCalled) 1287 reconnect.Fire() 1288 } 1289 1290 onPrefaceReceipt := func() { 1291 close(prefaceReceived) 1292 } 1293 1294 connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline) 1295 defer cancel() 1296 if channelz.IsOn() { 1297 copts.ChannelzParentID = ac.channelzID 1298 } 1299 1300 newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose) 1301 if err != nil { 1302 // newTr is either nil, or closed. 1303 grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err) 1304 return nil, nil, err 1305 } 1306 1307 select { 1308 case <-time.After(time.Until(connectDeadline)): 1309 // We didn't get the preface in time. 1310 newTr.Close() 1311 grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr) 1312 return nil, nil, errors.New("timed out waiting for server handshake") 1313 case <-prefaceReceived: 1314 // We got the preface - huzzah! things are good. 1315 case <-onCloseCalled: 1316 // The transport has already closed - noop. 1317 return nil, nil, errors.New("connection closed") 1318 // TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix. 1319 } 1320 return newTr, reconnect, nil 1321} 1322 1323// startHealthCheck starts the health checking stream (RPC) to watch the health 1324// stats of this connection if health checking is requested and configured. 1325// 1326// LB channel health checking is enabled when all requirements below are met: 1327// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption 1328// 2. internal.HealthCheckFunc is set by importing the grpc/healthcheck package 1329// 3. a service config with non-empty healthCheckConfig field is provided 1330// 4. the load balancer requests it 1331// 1332// It sets addrConn to READY if the health checking stream is not started. 1333// 1334// Caller must hold ac.mu. 1335func (ac *addrConn) startHealthCheck(ctx context.Context) { 1336 var healthcheckManagingState bool 1337 defer func() { 1338 if !healthcheckManagingState { 1339 ac.updateConnectivityState(connectivity.Ready, nil) 1340 } 1341 }() 1342 1343 if ac.cc.dopts.disableHealthCheck { 1344 return 1345 } 1346 healthCheckConfig := ac.cc.healthCheckConfig() 1347 if healthCheckConfig == nil { 1348 return 1349 } 1350 if !ac.scopts.HealthCheckEnabled { 1351 return 1352 } 1353 healthCheckFunc := ac.cc.dopts.healthCheckFunc 1354 if healthCheckFunc == nil { 1355 // The health package is not imported to set health check function. 1356 // 1357 // TODO: add a link to the health check doc in the error message. 1358 grpclog.Error("Health check is requested but health check function is not set.") 1359 return 1360 } 1361 1362 healthcheckManagingState = true 1363 1364 // Set up the health check helper functions. 1365 currentTr := ac.transport 1366 newStream := func(method string) (interface{}, error) { 1367 ac.mu.Lock() 1368 if ac.transport != currentTr { 1369 ac.mu.Unlock() 1370 return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use") 1371 } 1372 ac.mu.Unlock() 1373 return newNonRetryClientStream(ctx, &StreamDesc{ServerStreams: true}, method, currentTr, ac) 1374 } 1375 setConnectivityState := func(s connectivity.State, lastErr error) { 1376 ac.mu.Lock() 1377 defer ac.mu.Unlock() 1378 if ac.transport != currentTr { 1379 return 1380 } 1381 ac.updateConnectivityState(s, lastErr) 1382 } 1383 // Start the health checking stream. 1384 go func() { 1385 err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName) 1386 if err != nil { 1387 if status.Code(err) == codes.Unimplemented { 1388 if channelz.IsOn() { 1389 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{ 1390 Desc: "Subchannel health check is unimplemented at server side, thus health check is disabled", 1391 Severity: channelz.CtError, 1392 }) 1393 } 1394 grpclog.Error("Subchannel health check is unimplemented at server side, thus health check is disabled") 1395 } else { 1396 grpclog.Errorf("HealthCheckFunc exits with unexpected error %v", err) 1397 } 1398 } 1399 }() 1400} 1401 1402func (ac *addrConn) resetConnectBackoff() { 1403 ac.mu.Lock() 1404 close(ac.resetBackoff) 1405 ac.backoffIdx = 0 1406 ac.resetBackoff = make(chan struct{}) 1407 ac.mu.Unlock() 1408} 1409 1410// getReadyTransport returns the transport if ac's state is READY. 1411// Otherwise it returns nil, false. 1412// If ac's state is IDLE, it will trigger ac to connect. 1413func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) { 1414 ac.mu.Lock() 1415 if ac.state == connectivity.Ready && ac.transport != nil { 1416 t := ac.transport 1417 ac.mu.Unlock() 1418 return t, true 1419 } 1420 var idle bool 1421 if ac.state == connectivity.Idle { 1422 idle = true 1423 } 1424 ac.mu.Unlock() 1425 // Trigger idle ac to connect. 1426 if idle { 1427 ac.connect() 1428 } 1429 return nil, false 1430} 1431 1432// tearDown starts to tear down the addrConn. 1433// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in 1434// some edge cases (e.g., the caller opens and closes many addrConn's in a 1435// tight loop. 1436// tearDown doesn't remove ac from ac.cc.conns. 1437func (ac *addrConn) tearDown(err error) { 1438 ac.mu.Lock() 1439 if ac.state == connectivity.Shutdown { 1440 ac.mu.Unlock() 1441 return 1442 } 1443 curTr := ac.transport 1444 ac.transport = nil 1445 // We have to set the state to Shutdown before anything else to prevent races 1446 // between setting the state and logic that waits on context cancellation / etc. 1447 ac.updateConnectivityState(connectivity.Shutdown, nil) 1448 ac.cancel() 1449 ac.curAddr = resolver.Address{} 1450 if err == errConnDrain && curTr != nil { 1451 // GracefulClose(...) may be executed multiple times when 1452 // i) receiving multiple GoAway frames from the server; or 1453 // ii) there are concurrent name resolver/Balancer triggered 1454 // address removal and GoAway. 1455 // We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu. 1456 ac.mu.Unlock() 1457 curTr.GracefulClose() 1458 ac.mu.Lock() 1459 } 1460 if channelz.IsOn() { 1461 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{ 1462 Desc: "Subchannel Deleted", 1463 Severity: channelz.CtINFO, 1464 Parent: &channelz.TraceEventDesc{ 1465 Desc: fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID), 1466 Severity: channelz.CtINFO, 1467 }, 1468 }) 1469 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to 1470 // the entity being deleted, and thus prevent it from being deleted right away. 1471 channelz.RemoveEntry(ac.channelzID) 1472 } 1473 ac.mu.Unlock() 1474} 1475 1476func (ac *addrConn) getState() connectivity.State { 1477 ac.mu.Lock() 1478 defer ac.mu.Unlock() 1479 return ac.state 1480} 1481 1482func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric { 1483 ac.mu.Lock() 1484 addr := ac.curAddr.Addr 1485 ac.mu.Unlock() 1486 return &channelz.ChannelInternalMetric{ 1487 State: ac.getState(), 1488 Target: addr, 1489 CallsStarted: atomic.LoadInt64(&ac.czData.callsStarted), 1490 CallsSucceeded: atomic.LoadInt64(&ac.czData.callsSucceeded), 1491 CallsFailed: atomic.LoadInt64(&ac.czData.callsFailed), 1492 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)), 1493 } 1494} 1495 1496func (ac *addrConn) incrCallsStarted() { 1497 atomic.AddInt64(&ac.czData.callsStarted, 1) 1498 atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano()) 1499} 1500 1501func (ac *addrConn) incrCallsSucceeded() { 1502 atomic.AddInt64(&ac.czData.callsSucceeded, 1) 1503} 1504 1505func (ac *addrConn) incrCallsFailed() { 1506 atomic.AddInt64(&ac.czData.callsFailed, 1) 1507} 1508 1509type retryThrottler struct { 1510 max float64 1511 thresh float64 1512 ratio float64 1513 1514 mu sync.Mutex 1515 tokens float64 // TODO(dfawley): replace with atomic and remove lock. 1516} 1517 1518// throttle subtracts a retry token from the pool and returns whether a retry 1519// should be throttled (disallowed) based upon the retry throttling policy in 1520// the service config. 1521func (rt *retryThrottler) throttle() bool { 1522 if rt == nil { 1523 return false 1524 } 1525 rt.mu.Lock() 1526 defer rt.mu.Unlock() 1527 rt.tokens-- 1528 if rt.tokens < 0 { 1529 rt.tokens = 0 1530 } 1531 return rt.tokens <= rt.thresh 1532} 1533 1534func (rt *retryThrottler) successfulRPC() { 1535 if rt == nil { 1536 return 1537 } 1538 rt.mu.Lock() 1539 defer rt.mu.Unlock() 1540 rt.tokens += rt.ratio 1541 if rt.tokens > rt.max { 1542 rt.tokens = rt.max 1543 } 1544} 1545 1546type channelzChannel struct { 1547 cc *ClientConn 1548} 1549 1550func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric { 1551 return c.cc.channelzMetric() 1552} 1553 1554// ErrClientConnTimeout indicates that the ClientConn cannot establish the 1555// underlying connections within the specified timeout. 1556// 1557// Deprecated: This error is never returned by grpc and should not be 1558// referenced by users. 1559var ErrClientConnTimeout = errors.New("grpc: timed out when dialing") 1560 1561func (cc *ClientConn) getResolver(scheme string) resolver.Builder { 1562 for _, rb := range cc.dopts.resolvers { 1563 if cc.parsedTarget.Scheme == rb.Scheme() { 1564 return rb 1565 } 1566 } 1567 return resolver.Get(cc.parsedTarget.Scheme) 1568} 1569