1/* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package grpc 20 21import ( 22 "context" 23 "errors" 24 "fmt" 25 "math" 26 "net" 27 "reflect" 28 "strings" 29 "sync" 30 "sync/atomic" 31 "time" 32 33 "google.golang.org/grpc/balancer" 34 _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin. 35 "google.golang.org/grpc/codes" 36 "google.golang.org/grpc/connectivity" 37 "google.golang.org/grpc/credentials" 38 "google.golang.org/grpc/grpclog" 39 "google.golang.org/grpc/internal/backoff" 40 "google.golang.org/grpc/internal/channelz" 41 "google.golang.org/grpc/internal/envconfig" 42 "google.golang.org/grpc/internal/grpcsync" 43 "google.golang.org/grpc/internal/transport" 44 "google.golang.org/grpc/keepalive" 45 "google.golang.org/grpc/resolver" 46 _ "google.golang.org/grpc/resolver/dns" // To register dns resolver. 47 _ "google.golang.org/grpc/resolver/passthrough" // To register passthrough resolver. 48 "google.golang.org/grpc/status" 49) 50 51const ( 52 // minimum time to give a connection to complete 53 minConnectTimeout = 20 * time.Second 54 // must match grpclbName in grpclb/grpclb.go 55 grpclbName = "grpclb" 56) 57 58var ( 59 // ErrClientConnClosing indicates that the operation is illegal because 60 // the ClientConn is closing. 61 // 62 // Deprecated: this error should not be relied upon by users; use the status 63 // code of Canceled instead. 64 ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing") 65 // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs. 66 errConnDrain = errors.New("grpc: the connection is drained") 67 // errConnClosing indicates that the connection is closing. 68 errConnClosing = errors.New("grpc: the connection is closing") 69 // errBalancerClosed indicates that the balancer is closed. 70 errBalancerClosed = errors.New("grpc: balancer is closed") 71 // invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default 72 // service config. 73 invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid" 74) 75 76// The following errors are returned from Dial and DialContext 77var ( 78 // errNoTransportSecurity indicates that there is no transport security 79 // being set for ClientConn. Users should either set one or explicitly 80 // call WithInsecure DialOption to disable security. 81 errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)") 82 // errTransportCredsAndBundle indicates that creds bundle is used together 83 // with other individual Transport Credentials. 84 errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials") 85 // errTransportCredentialsMissing indicates that users want to transmit security 86 // information (e.g., OAuth2 token) which requires secure connection on an insecure 87 // connection. 88 errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)") 89 // errCredentialsConflict indicates that grpc.WithTransportCredentials() 90 // and grpc.WithInsecure() are both called for a connection. 91 errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)") 92) 93 94const ( 95 defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4 96 defaultClientMaxSendMessageSize = math.MaxInt32 97 // http2IOBufSize specifies the buffer size for sending frames. 98 defaultWriteBufSize = 32 * 1024 99 defaultReadBufSize = 32 * 1024 100) 101 102// Dial creates a client connection to the given target. 103func Dial(target string, opts ...DialOption) (*ClientConn, error) { 104 return DialContext(context.Background(), target, opts...) 105} 106 107// DialContext creates a client connection to the given target. By default, it's 108// a non-blocking dial (the function won't wait for connections to be 109// established, and connecting happens in the background). To make it a blocking 110// dial, use WithBlock() dial option. 111// 112// In the non-blocking case, the ctx does not act against the connection. It 113// only controls the setup steps. 114// 115// In the blocking case, ctx can be used to cancel or expire the pending 116// connection. Once this function returns, the cancellation and expiration of 117// ctx will be noop. Users should call ClientConn.Close to terminate all the 118// pending operations after this function returns. 119// 120// The target name syntax is defined in 121// https://github.com/grpc/grpc/blob/master/doc/naming.md. 122// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target. 123func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) { 124 cc := &ClientConn{ 125 target: target, 126 csMgr: &connectivityStateManager{}, 127 conns: make(map[*addrConn]struct{}), 128 dopts: defaultDialOptions(), 129 blockingpicker: newPickerWrapper(), 130 czData: new(channelzData), 131 firstResolveEvent: grpcsync.NewEvent(), 132 } 133 cc.retryThrottler.Store((*retryThrottler)(nil)) 134 cc.ctx, cc.cancel = context.WithCancel(context.Background()) 135 136 for _, opt := range opts { 137 opt.apply(&cc.dopts) 138 } 139 140 chainUnaryClientInterceptors(cc) 141 chainStreamClientInterceptors(cc) 142 143 defer func() { 144 if err != nil { 145 cc.Close() 146 } 147 }() 148 149 if channelz.IsOn() { 150 if cc.dopts.channelzParentID != 0 { 151 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target) 152 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{ 153 Desc: "Channel Created", 154 Severity: channelz.CtINFO, 155 Parent: &channelz.TraceEventDesc{ 156 Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID), 157 Severity: channelz.CtINFO, 158 }, 159 }) 160 } else { 161 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target) 162 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{ 163 Desc: "Channel Created", 164 Severity: channelz.CtINFO, 165 }) 166 } 167 cc.csMgr.channelzID = cc.channelzID 168 } 169 170 if !cc.dopts.insecure { 171 if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil { 172 return nil, errNoTransportSecurity 173 } 174 if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil { 175 return nil, errTransportCredsAndBundle 176 } 177 } else { 178 if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil { 179 return nil, errCredentialsConflict 180 } 181 for _, cd := range cc.dopts.copts.PerRPCCredentials { 182 if cd.RequireTransportSecurity() { 183 return nil, errTransportCredentialsMissing 184 } 185 } 186 } 187 188 if cc.dopts.defaultServiceConfigRawJSON != nil { 189 sc, err := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON) 190 if err != nil { 191 return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, err) 192 } 193 cc.dopts.defaultServiceConfig = sc 194 } 195 cc.mkp = cc.dopts.copts.KeepaliveParams 196 197 if cc.dopts.copts.Dialer == nil { 198 cc.dopts.copts.Dialer = newProxyDialer( 199 func(ctx context.Context, addr string) (net.Conn, error) { 200 network, addr := parseDialTarget(addr) 201 return (&net.Dialer{}).DialContext(ctx, network, addr) 202 }, 203 ) 204 } 205 206 if cc.dopts.copts.UserAgent != "" { 207 cc.dopts.copts.UserAgent += " " + grpcUA 208 } else { 209 cc.dopts.copts.UserAgent = grpcUA 210 } 211 212 if cc.dopts.timeout > 0 { 213 var cancel context.CancelFunc 214 ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout) 215 defer cancel() 216 } 217 defer func() { 218 select { 219 case <-ctx.Done(): 220 conn, err = nil, ctx.Err() 221 default: 222 } 223 }() 224 225 scSet := false 226 if cc.dopts.scChan != nil { 227 // Try to get an initial service config. 228 select { 229 case sc, ok := <-cc.dopts.scChan: 230 if ok { 231 cc.sc = &sc 232 scSet = true 233 } 234 default: 235 } 236 } 237 if cc.dopts.bs == nil { 238 cc.dopts.bs = backoff.Exponential{ 239 MaxDelay: DefaultBackoffConfig.MaxDelay, 240 } 241 } 242 if cc.dopts.resolverBuilder == nil { 243 // Only try to parse target when resolver builder is not already set. 244 cc.parsedTarget = parseTarget(cc.target) 245 grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme) 246 cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme) 247 if cc.dopts.resolverBuilder == nil { 248 // If resolver builder is still nil, the parsed target's scheme is 249 // not registered. Fallback to default resolver and set Endpoint to 250 // the original target. 251 grpclog.Infof("scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme) 252 cc.parsedTarget = resolver.Target{ 253 Scheme: resolver.GetDefaultScheme(), 254 Endpoint: target, 255 } 256 cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme) 257 } 258 } else { 259 cc.parsedTarget = resolver.Target{Endpoint: target} 260 } 261 creds := cc.dopts.copts.TransportCredentials 262 if creds != nil && creds.Info().ServerName != "" { 263 cc.authority = creds.Info().ServerName 264 } else if cc.dopts.insecure && cc.dopts.authority != "" { 265 cc.authority = cc.dopts.authority 266 } else { 267 // Use endpoint from "scheme://authority/endpoint" as the default 268 // authority for ClientConn. 269 cc.authority = cc.parsedTarget.Endpoint 270 } 271 272 if cc.dopts.scChan != nil && !scSet { 273 // Blocking wait for the initial service config. 274 select { 275 case sc, ok := <-cc.dopts.scChan: 276 if ok { 277 cc.sc = &sc 278 } 279 case <-ctx.Done(): 280 return nil, ctx.Err() 281 } 282 } 283 if cc.dopts.scChan != nil { 284 go cc.scWatcher() 285 } 286 287 var credsClone credentials.TransportCredentials 288 if creds := cc.dopts.copts.TransportCredentials; creds != nil { 289 credsClone = creds.Clone() 290 } 291 cc.balancerBuildOpts = balancer.BuildOptions{ 292 DialCreds: credsClone, 293 CredsBundle: cc.dopts.copts.CredsBundle, 294 Dialer: cc.dopts.copts.Dialer, 295 ChannelzParentID: cc.channelzID, 296 Target: cc.parsedTarget, 297 } 298 299 // Build the resolver. 300 rWrapper, err := newCCResolverWrapper(cc) 301 if err != nil { 302 return nil, fmt.Errorf("failed to build resolver: %v", err) 303 } 304 305 cc.mu.Lock() 306 cc.resolverWrapper = rWrapper 307 cc.mu.Unlock() 308 // A blocking dial blocks until the clientConn is ready. 309 if cc.dopts.block { 310 for { 311 s := cc.GetState() 312 if s == connectivity.Ready { 313 break 314 } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure { 315 if err = cc.blockingpicker.connectionError(); err != nil { 316 terr, ok := err.(interface { 317 Temporary() bool 318 }) 319 if ok && !terr.Temporary() { 320 return nil, err 321 } 322 } 323 } 324 if !cc.WaitForStateChange(ctx, s) { 325 // ctx got timeout or canceled. 326 return nil, ctx.Err() 327 } 328 } 329 } 330 331 return cc, nil 332} 333 334// chainUnaryClientInterceptors chains all unary client interceptors into one. 335func chainUnaryClientInterceptors(cc *ClientConn) { 336 interceptors := cc.dopts.chainUnaryInts 337 // Prepend dopts.unaryInt to the chaining interceptors if it exists, since unaryInt will 338 // be executed before any other chained interceptors. 339 if cc.dopts.unaryInt != nil { 340 interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...) 341 } 342 var chainedInt UnaryClientInterceptor 343 if len(interceptors) == 0 { 344 chainedInt = nil 345 } else if len(interceptors) == 1 { 346 chainedInt = interceptors[0] 347 } else { 348 chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error { 349 return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...) 350 } 351 } 352 cc.dopts.unaryInt = chainedInt 353} 354 355// getChainUnaryInvoker recursively generate the chained unary invoker. 356func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, finalInvoker UnaryInvoker) UnaryInvoker { 357 if curr == len(interceptors)-1 { 358 return finalInvoker 359 } 360 return func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error { 361 return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...) 362 } 363} 364 365// chainStreamClientInterceptors chains all stream client interceptors into one. 366func chainStreamClientInterceptors(cc *ClientConn) { 367 interceptors := cc.dopts.chainStreamInts 368 // Prepend dopts.streamInt to the chaining interceptors if it exists, since streamInt will 369 // be executed before any other chained interceptors. 370 if cc.dopts.streamInt != nil { 371 interceptors = append([]StreamClientInterceptor{cc.dopts.streamInt}, interceptors...) 372 } 373 var chainedInt StreamClientInterceptor 374 if len(interceptors) == 0 { 375 chainedInt = nil 376 } else if len(interceptors) == 1 { 377 chainedInt = interceptors[0] 378 } else { 379 chainedInt = func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error) { 380 return interceptors[0](ctx, desc, cc, method, getChainStreamer(interceptors, 0, streamer), opts...) 381 } 382 } 383 cc.dopts.streamInt = chainedInt 384} 385 386// getChainStreamer recursively generate the chained client stream constructor. 387func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStreamer Streamer) Streamer { 388 if curr == len(interceptors)-1 { 389 return finalStreamer 390 } 391 return func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) { 392 return interceptors[curr+1](ctx, desc, cc, method, getChainStreamer(interceptors, curr+1, finalStreamer), opts...) 393 } 394} 395 396// connectivityStateManager keeps the connectivity.State of ClientConn. 397// This struct will eventually be exported so the balancers can access it. 398type connectivityStateManager struct { 399 mu sync.Mutex 400 state connectivity.State 401 notifyChan chan struct{} 402 channelzID int64 403} 404 405// updateState updates the connectivity.State of ClientConn. 406// If there's a change it notifies goroutines waiting on state change to 407// happen. 408func (csm *connectivityStateManager) updateState(state connectivity.State) { 409 csm.mu.Lock() 410 defer csm.mu.Unlock() 411 if csm.state == connectivity.Shutdown { 412 return 413 } 414 if csm.state == state { 415 return 416 } 417 csm.state = state 418 if channelz.IsOn() { 419 channelz.AddTraceEvent(csm.channelzID, &channelz.TraceEventDesc{ 420 Desc: fmt.Sprintf("Channel Connectivity change to %v", state), 421 Severity: channelz.CtINFO, 422 }) 423 } 424 if csm.notifyChan != nil { 425 // There are other goroutines waiting on this channel. 426 close(csm.notifyChan) 427 csm.notifyChan = nil 428 } 429} 430 431func (csm *connectivityStateManager) getState() connectivity.State { 432 csm.mu.Lock() 433 defer csm.mu.Unlock() 434 return csm.state 435} 436 437func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} { 438 csm.mu.Lock() 439 defer csm.mu.Unlock() 440 if csm.notifyChan == nil { 441 csm.notifyChan = make(chan struct{}) 442 } 443 return csm.notifyChan 444} 445 446// ClientConn represents a client connection to an RPC server. 447type ClientConn struct { 448 ctx context.Context 449 cancel context.CancelFunc 450 451 target string 452 parsedTarget resolver.Target 453 authority string 454 dopts dialOptions 455 csMgr *connectivityStateManager 456 457 balancerBuildOpts balancer.BuildOptions 458 blockingpicker *pickerWrapper 459 460 mu sync.RWMutex 461 resolverWrapper *ccResolverWrapper 462 sc *ServiceConfig 463 conns map[*addrConn]struct{} 464 // Keepalive parameter can be updated if a GoAway is received. 465 mkp keepalive.ClientParameters 466 curBalancerName string 467 balancerWrapper *ccBalancerWrapper 468 retryThrottler atomic.Value 469 470 firstResolveEvent *grpcsync.Event 471 472 channelzID int64 // channelz unique identification number 473 czData *channelzData 474} 475 476// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or 477// ctx expires. A true value is returned in former case and false in latter. 478// This is an EXPERIMENTAL API. 479func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool { 480 ch := cc.csMgr.getNotifyChan() 481 if cc.csMgr.getState() != sourceState { 482 return true 483 } 484 select { 485 case <-ctx.Done(): 486 return false 487 case <-ch: 488 return true 489 } 490} 491 492// GetState returns the connectivity.State of ClientConn. 493// This is an EXPERIMENTAL API. 494func (cc *ClientConn) GetState() connectivity.State { 495 return cc.csMgr.getState() 496} 497 498func (cc *ClientConn) scWatcher() { 499 for { 500 select { 501 case sc, ok := <-cc.dopts.scChan: 502 if !ok { 503 return 504 } 505 cc.mu.Lock() 506 // TODO: load balance policy runtime change is ignored. 507 // We may revisit this decision in the future. 508 cc.sc = &sc 509 cc.mu.Unlock() 510 case <-cc.ctx.Done(): 511 return 512 } 513 } 514} 515 516// waitForResolvedAddrs blocks until the resolver has provided addresses or the 517// context expires. Returns nil unless the context expires first; otherwise 518// returns a status error based on the context. 519func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error { 520 // This is on the RPC path, so we use a fast path to avoid the 521 // more-expensive "select" below after the resolver has returned once. 522 if cc.firstResolveEvent.HasFired() { 523 return nil 524 } 525 select { 526 case <-cc.firstResolveEvent.Done(): 527 return nil 528 case <-ctx.Done(): 529 return status.FromContextError(ctx.Err()).Err() 530 case <-cc.ctx.Done(): 531 return ErrClientConnClosing 532 } 533} 534 535// gRPC should resort to default service config when: 536// * resolver service config is disabled 537// * or, resolver does not return a service config or returns an invalid one. 538func (cc *ClientConn) fallbackToDefaultServiceConfig(sc string) bool { 539 if cc.dopts.disableServiceConfig { 540 return true 541 } 542 // The logic below is temporary, will be removed once we change the resolver.State ServiceConfig field type. 543 // Right now, we assume that empty service config string means resolver does not return a config. 544 if sc == "" { 545 return true 546 } 547 // TODO: the logic below is temporary. Once we finish the logic to validate service config 548 // in resolver, we will replace the logic below. 549 _, err := parseServiceConfig(sc) 550 return err != nil 551} 552 553func (cc *ClientConn) updateResolverState(s resolver.State) error { 554 cc.mu.Lock() 555 defer cc.mu.Unlock() 556 // Check if the ClientConn is already closed. Some fields (e.g. 557 // balancerWrapper) are set to nil when closing the ClientConn, and could 558 // cause nil pointer panic if we don't have this check. 559 if cc.conns == nil { 560 return nil 561 } 562 563 if cc.fallbackToDefaultServiceConfig(s.ServiceConfig) { 564 if cc.dopts.defaultServiceConfig != nil && cc.sc == nil { 565 cc.applyServiceConfig(cc.dopts.defaultServiceConfig) 566 } 567 } else { 568 // TODO: the parsing logic below will be moved inside resolver. 569 sc, err := parseServiceConfig(s.ServiceConfig) 570 if err != nil { 571 return err 572 } 573 if cc.sc == nil || cc.sc.rawJSONString != s.ServiceConfig { 574 cc.applyServiceConfig(sc) 575 } 576 } 577 578 // update the service config that will be sent to balancer. 579 if cc.sc != nil { 580 s.ServiceConfig = cc.sc.rawJSONString 581 } 582 583 if cc.dopts.balancerBuilder == nil { 584 // Only look at balancer types and switch balancer if balancer dial 585 // option is not set. 586 var isGRPCLB bool 587 for _, a := range s.Addresses { 588 if a.Type == resolver.GRPCLB { 589 isGRPCLB = true 590 break 591 } 592 } 593 var newBalancerName string 594 // TODO: use new loadBalancerConfig field with appropriate priority. 595 if isGRPCLB { 596 newBalancerName = grpclbName 597 } else if cc.sc != nil && cc.sc.LB != nil { 598 newBalancerName = *cc.sc.LB 599 } else { 600 newBalancerName = PickFirstBalancerName 601 } 602 cc.switchBalancer(newBalancerName) 603 } else if cc.balancerWrapper == nil { 604 // Balancer dial option was set, and this is the first time handling 605 // resolved addresses. Build a balancer with dopts.balancerBuilder. 606 cc.curBalancerName = cc.dopts.balancerBuilder.Name() 607 cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts) 608 } 609 610 cc.balancerWrapper.updateResolverState(s) 611 cc.firstResolveEvent.Fire() 612 return nil 613} 614 615// switchBalancer starts the switching from current balancer to the balancer 616// with the given name. 617// 618// It will NOT send the current address list to the new balancer. If needed, 619// caller of this function should send address list to the new balancer after 620// this function returns. 621// 622// Caller must hold cc.mu. 623func (cc *ClientConn) switchBalancer(name string) { 624 if strings.ToLower(cc.curBalancerName) == strings.ToLower(name) { 625 return 626 } 627 628 grpclog.Infof("ClientConn switching balancer to %q", name) 629 if cc.dopts.balancerBuilder != nil { 630 grpclog.Infoln("ignoring balancer switching: Balancer DialOption used instead") 631 return 632 } 633 if cc.balancerWrapper != nil { 634 cc.balancerWrapper.close() 635 } 636 637 builder := balancer.Get(name) 638 if channelz.IsOn() { 639 if builder == nil { 640 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{ 641 Desc: fmt.Sprintf("Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName), 642 Severity: channelz.CtWarning, 643 }) 644 } else { 645 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{ 646 Desc: fmt.Sprintf("Channel switches to new LB policy %q", name), 647 Severity: channelz.CtINFO, 648 }) 649 } 650 } 651 if builder == nil { 652 grpclog.Infof("failed to get balancer builder for: %v, using pick_first instead", name) 653 builder = newPickfirstBuilder() 654 } 655 656 cc.curBalancerName = builder.Name() 657 cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts) 658} 659 660func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) { 661 cc.mu.Lock() 662 if cc.conns == nil { 663 cc.mu.Unlock() 664 return 665 } 666 // TODO(bar switching) send updates to all balancer wrappers when balancer 667 // gracefully switching is supported. 668 cc.balancerWrapper.handleSubConnStateChange(sc, s) 669 cc.mu.Unlock() 670} 671 672// newAddrConn creates an addrConn for addrs and adds it to cc.conns. 673// 674// Caller needs to make sure len(addrs) > 0. 675func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) { 676 ac := &addrConn{ 677 cc: cc, 678 addrs: addrs, 679 scopts: opts, 680 dopts: cc.dopts, 681 czData: new(channelzData), 682 resetBackoff: make(chan struct{}), 683 } 684 ac.ctx, ac.cancel = context.WithCancel(cc.ctx) 685 // Track ac in cc. This needs to be done before any getTransport(...) is called. 686 cc.mu.Lock() 687 if cc.conns == nil { 688 cc.mu.Unlock() 689 return nil, ErrClientConnClosing 690 } 691 if channelz.IsOn() { 692 ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "") 693 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{ 694 Desc: "Subchannel Created", 695 Severity: channelz.CtINFO, 696 Parent: &channelz.TraceEventDesc{ 697 Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID), 698 Severity: channelz.CtINFO, 699 }, 700 }) 701 } 702 cc.conns[ac] = struct{}{} 703 cc.mu.Unlock() 704 return ac, nil 705} 706 707// removeAddrConn removes the addrConn in the subConn from clientConn. 708// It also tears down the ac with the given error. 709func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) { 710 cc.mu.Lock() 711 if cc.conns == nil { 712 cc.mu.Unlock() 713 return 714 } 715 delete(cc.conns, ac) 716 cc.mu.Unlock() 717 ac.tearDown(err) 718} 719 720func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric { 721 return &channelz.ChannelInternalMetric{ 722 State: cc.GetState(), 723 Target: cc.target, 724 CallsStarted: atomic.LoadInt64(&cc.czData.callsStarted), 725 CallsSucceeded: atomic.LoadInt64(&cc.czData.callsSucceeded), 726 CallsFailed: atomic.LoadInt64(&cc.czData.callsFailed), 727 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)), 728 } 729} 730 731// Target returns the target string of the ClientConn. 732// This is an EXPERIMENTAL API. 733func (cc *ClientConn) Target() string { 734 return cc.target 735} 736 737func (cc *ClientConn) incrCallsStarted() { 738 atomic.AddInt64(&cc.czData.callsStarted, 1) 739 atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano()) 740} 741 742func (cc *ClientConn) incrCallsSucceeded() { 743 atomic.AddInt64(&cc.czData.callsSucceeded, 1) 744} 745 746func (cc *ClientConn) incrCallsFailed() { 747 atomic.AddInt64(&cc.czData.callsFailed, 1) 748} 749 750// connect starts creating a transport. 751// It does nothing if the ac is not IDLE. 752// TODO(bar) Move this to the addrConn section. 753func (ac *addrConn) connect() error { 754 ac.mu.Lock() 755 if ac.state == connectivity.Shutdown { 756 ac.mu.Unlock() 757 return errConnClosing 758 } 759 if ac.state != connectivity.Idle { 760 ac.mu.Unlock() 761 return nil 762 } 763 ac.updateConnectivityState(connectivity.Connecting) 764 ac.mu.Unlock() 765 766 // Start a goroutine connecting to the server asynchronously. 767 go ac.resetTransport() 768 return nil 769} 770 771// tryUpdateAddrs tries to update ac.addrs with the new addresses list. 772// 773// It checks whether current connected address of ac is in the new addrs list. 774// - If true, it updates ac.addrs and returns true. The ac will keep using 775// the existing connection. 776// - If false, it does nothing and returns false. 777func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool { 778 ac.mu.Lock() 779 defer ac.mu.Unlock() 780 grpclog.Infof("addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs) 781 if ac.state == connectivity.Shutdown { 782 ac.addrs = addrs 783 return true 784 } 785 786 // Unless we're busy reconnecting already, let's reconnect from the top of 787 // the list. 788 if ac.state != connectivity.Ready { 789 return false 790 } 791 792 var curAddrFound bool 793 for _, a := range addrs { 794 if reflect.DeepEqual(ac.curAddr, a) { 795 curAddrFound = true 796 break 797 } 798 } 799 grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound) 800 if curAddrFound { 801 ac.addrs = addrs 802 } 803 804 return curAddrFound 805} 806 807// GetMethodConfig gets the method config of the input method. 808// If there's an exact match for input method (i.e. /service/method), we return 809// the corresponding MethodConfig. 810// If there isn't an exact match for the input method, we look for the default config 811// under the service (i.e /service/). If there is a default MethodConfig for 812// the service, we return it. 813// Otherwise, we return an empty MethodConfig. 814func (cc *ClientConn) GetMethodConfig(method string) MethodConfig { 815 // TODO: Avoid the locking here. 816 cc.mu.RLock() 817 defer cc.mu.RUnlock() 818 if cc.sc == nil { 819 return MethodConfig{} 820 } 821 m, ok := cc.sc.Methods[method] 822 if !ok { 823 i := strings.LastIndex(method, "/") 824 m = cc.sc.Methods[method[:i+1]] 825 } 826 return m 827} 828 829func (cc *ClientConn) healthCheckConfig() *healthCheckConfig { 830 cc.mu.RLock() 831 defer cc.mu.RUnlock() 832 if cc.sc == nil { 833 return nil 834 } 835 return cc.sc.healthCheckConfig 836} 837 838func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) { 839 t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{ 840 FullMethodName: method, 841 }) 842 if err != nil { 843 return nil, nil, toRPCErr(err) 844 } 845 return t, done, nil 846} 847 848func (cc *ClientConn) applyServiceConfig(sc *ServiceConfig) error { 849 if sc == nil { 850 // should never reach here. 851 return fmt.Errorf("got nil pointer for service config") 852 } 853 cc.sc = sc 854 855 if cc.sc.retryThrottling != nil { 856 newThrottler := &retryThrottler{ 857 tokens: cc.sc.retryThrottling.MaxTokens, 858 max: cc.sc.retryThrottling.MaxTokens, 859 thresh: cc.sc.retryThrottling.MaxTokens / 2, 860 ratio: cc.sc.retryThrottling.TokenRatio, 861 } 862 cc.retryThrottler.Store(newThrottler) 863 } else { 864 cc.retryThrottler.Store((*retryThrottler)(nil)) 865 } 866 867 return nil 868} 869 870func (cc *ClientConn) resolveNow(o resolver.ResolveNowOption) { 871 cc.mu.RLock() 872 r := cc.resolverWrapper 873 cc.mu.RUnlock() 874 if r == nil { 875 return 876 } 877 go r.resolveNow(o) 878} 879 880// ResetConnectBackoff wakes up all subchannels in transient failure and causes 881// them to attempt another connection immediately. It also resets the backoff 882// times used for subsequent attempts regardless of the current state. 883// 884// In general, this function should not be used. Typical service or network 885// outages result in a reasonable client reconnection strategy by default. 886// However, if a previously unavailable network becomes available, this may be 887// used to trigger an immediate reconnect. 888// 889// This API is EXPERIMENTAL. 890func (cc *ClientConn) ResetConnectBackoff() { 891 cc.mu.Lock() 892 defer cc.mu.Unlock() 893 for ac := range cc.conns { 894 ac.resetConnectBackoff() 895 } 896} 897 898// Close tears down the ClientConn and all underlying connections. 899func (cc *ClientConn) Close() error { 900 defer cc.cancel() 901 902 cc.mu.Lock() 903 if cc.conns == nil { 904 cc.mu.Unlock() 905 return ErrClientConnClosing 906 } 907 conns := cc.conns 908 cc.conns = nil 909 cc.csMgr.updateState(connectivity.Shutdown) 910 911 rWrapper := cc.resolverWrapper 912 cc.resolverWrapper = nil 913 bWrapper := cc.balancerWrapper 914 cc.balancerWrapper = nil 915 cc.mu.Unlock() 916 917 cc.blockingpicker.close() 918 919 if rWrapper != nil { 920 rWrapper.close() 921 } 922 if bWrapper != nil { 923 bWrapper.close() 924 } 925 926 for ac := range conns { 927 ac.tearDown(ErrClientConnClosing) 928 } 929 if channelz.IsOn() { 930 ted := &channelz.TraceEventDesc{ 931 Desc: "Channel Deleted", 932 Severity: channelz.CtINFO, 933 } 934 if cc.dopts.channelzParentID != 0 { 935 ted.Parent = &channelz.TraceEventDesc{ 936 Desc: fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID), 937 Severity: channelz.CtINFO, 938 } 939 } 940 channelz.AddTraceEvent(cc.channelzID, ted) 941 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to 942 // the entity being deleted, and thus prevent it from being deleted right away. 943 channelz.RemoveEntry(cc.channelzID) 944 } 945 return nil 946} 947 948// addrConn is a network connection to a given address. 949type addrConn struct { 950 ctx context.Context 951 cancel context.CancelFunc 952 953 cc *ClientConn 954 dopts dialOptions 955 acbw balancer.SubConn 956 scopts balancer.NewSubConnOptions 957 958 // transport is set when there's a viable transport (note: ac state may not be READY as LB channel 959 // health checking may require server to report healthy to set ac to READY), and is reset 960 // to nil when the current transport should no longer be used to create a stream (e.g. after GoAway 961 // is received, transport is closed, ac has been torn down). 962 transport transport.ClientTransport // The current transport. 963 964 mu sync.Mutex 965 curAddr resolver.Address // The current address. 966 addrs []resolver.Address // All addresses that the resolver resolved to. 967 968 // Use updateConnectivityState for updating addrConn's connectivity state. 969 state connectivity.State 970 971 backoffIdx int // Needs to be stateful for resetConnectBackoff. 972 resetBackoff chan struct{} 973 974 channelzID int64 // channelz unique identification number. 975 czData *channelzData 976} 977 978// Note: this requires a lock on ac.mu. 979func (ac *addrConn) updateConnectivityState(s connectivity.State) { 980 if ac.state == s { 981 return 982 } 983 984 updateMsg := fmt.Sprintf("Subchannel Connectivity change to %v", s) 985 ac.state = s 986 if channelz.IsOn() { 987 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{ 988 Desc: updateMsg, 989 Severity: channelz.CtINFO, 990 }) 991 } 992 ac.cc.handleSubConnStateChange(ac.acbw, s) 993} 994 995// adjustParams updates parameters used to create transports upon 996// receiving a GoAway. 997func (ac *addrConn) adjustParams(r transport.GoAwayReason) { 998 switch r { 999 case transport.GoAwayTooManyPings: 1000 v := 2 * ac.dopts.copts.KeepaliveParams.Time 1001 ac.cc.mu.Lock() 1002 if v > ac.cc.mkp.Time { 1003 ac.cc.mkp.Time = v 1004 } 1005 ac.cc.mu.Unlock() 1006 } 1007} 1008 1009func (ac *addrConn) resetTransport() { 1010 for i := 0; ; i++ { 1011 if i > 0 { 1012 ac.cc.resolveNow(resolver.ResolveNowOption{}) 1013 } 1014 1015 ac.mu.Lock() 1016 if ac.state == connectivity.Shutdown { 1017 ac.mu.Unlock() 1018 return 1019 } 1020 1021 addrs := ac.addrs 1022 backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx) 1023 // This will be the duration that dial gets to finish. 1024 dialDuration := minConnectTimeout 1025 if ac.dopts.minConnectTimeout != nil { 1026 dialDuration = ac.dopts.minConnectTimeout() 1027 } 1028 1029 if dialDuration < backoffFor { 1030 // Give dial more time as we keep failing to connect. 1031 dialDuration = backoffFor 1032 } 1033 // We can potentially spend all the time trying the first address, and 1034 // if the server accepts the connection and then hangs, the following 1035 // addresses will never be tried. 1036 // 1037 // The spec doesn't mention what should be done for multiple addresses. 1038 // https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm 1039 connectDeadline := time.Now().Add(dialDuration) 1040 ac.mu.Unlock() 1041 1042 newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline) 1043 if err != nil { 1044 // After exhausting all addresses, the addrConn enters 1045 // TRANSIENT_FAILURE. 1046 ac.mu.Lock() 1047 if ac.state == connectivity.Shutdown { 1048 ac.mu.Unlock() 1049 return 1050 } 1051 ac.updateConnectivityState(connectivity.TransientFailure) 1052 1053 // Backoff. 1054 b := ac.resetBackoff 1055 ac.mu.Unlock() 1056 1057 timer := time.NewTimer(backoffFor) 1058 select { 1059 case <-timer.C: 1060 ac.mu.Lock() 1061 ac.backoffIdx++ 1062 ac.mu.Unlock() 1063 case <-b: 1064 timer.Stop() 1065 case <-ac.ctx.Done(): 1066 timer.Stop() 1067 return 1068 } 1069 continue 1070 } 1071 1072 ac.mu.Lock() 1073 if ac.state == connectivity.Shutdown { 1074 newTr.Close() 1075 ac.mu.Unlock() 1076 return 1077 } 1078 ac.curAddr = addr 1079 ac.transport = newTr 1080 ac.backoffIdx = 0 1081 1082 healthCheckConfig := ac.cc.healthCheckConfig() 1083 // LB channel health checking is only enabled when all the four requirements below are met: 1084 // 1. it is not disabled by the user with the WithDisableHealthCheck DialOption, 1085 // 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package, 1086 // 3. a service config with non-empty healthCheckConfig field is provided, 1087 // 4. the current load balancer allows it. 1088 hctx, hcancel := context.WithCancel(ac.ctx) 1089 healthcheckManagingState := false 1090 if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled { 1091 if ac.cc.dopts.healthCheckFunc == nil { 1092 // TODO: add a link to the health check doc in the error message. 1093 grpclog.Error("the client side LB channel health check function has not been set.") 1094 } else { 1095 // TODO(deklerk) refactor to just return transport 1096 go ac.startHealthCheck(hctx, newTr, addr, healthCheckConfig.ServiceName) 1097 healthcheckManagingState = true 1098 } 1099 } 1100 if !healthcheckManagingState { 1101 ac.updateConnectivityState(connectivity.Ready) 1102 } 1103 ac.mu.Unlock() 1104 1105 // Block until the created transport is down. And when this happens, 1106 // we restart from the top of the addr list. 1107 <-reconnect.Done() 1108 hcancel() 1109 1110 // Need to reconnect after a READY, the addrConn enters 1111 // TRANSIENT_FAILURE. 1112 // 1113 // This will set addrConn to TRANSIENT_FAILURE for a very short period 1114 // of time, and turns CONNECTING. It seems reasonable to skip this, but 1115 // READY-CONNECTING is not a valid transition. 1116 ac.mu.Lock() 1117 if ac.state == connectivity.Shutdown { 1118 ac.mu.Unlock() 1119 return 1120 } 1121 ac.updateConnectivityState(connectivity.TransientFailure) 1122 ac.mu.Unlock() 1123 } 1124} 1125 1126// tryAllAddrs tries to creates a connection to the addresses, and stop when at the 1127// first successful one. It returns the transport, the address and a Event in 1128// the successful case. The Event fires when the returned transport disconnects. 1129func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) (transport.ClientTransport, resolver.Address, *grpcsync.Event, error) { 1130 for _, addr := range addrs { 1131 ac.mu.Lock() 1132 if ac.state == connectivity.Shutdown { 1133 ac.mu.Unlock() 1134 return nil, resolver.Address{}, nil, errConnClosing 1135 } 1136 ac.updateConnectivityState(connectivity.Connecting) 1137 ac.transport = nil 1138 1139 ac.cc.mu.RLock() 1140 ac.dopts.copts.KeepaliveParams = ac.cc.mkp 1141 ac.cc.mu.RUnlock() 1142 1143 copts := ac.dopts.copts 1144 if ac.scopts.CredsBundle != nil { 1145 copts.CredsBundle = ac.scopts.CredsBundle 1146 } 1147 ac.mu.Unlock() 1148 1149 if channelz.IsOn() { 1150 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{ 1151 Desc: fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr), 1152 Severity: channelz.CtINFO, 1153 }) 1154 } 1155 1156 newTr, reconnect, err := ac.createTransport(addr, copts, connectDeadline) 1157 if err == nil { 1158 return newTr, addr, reconnect, nil 1159 } 1160 ac.cc.blockingpicker.updateConnectionError(err) 1161 } 1162 1163 // Couldn't connect to any address. 1164 return nil, resolver.Address{}, nil, fmt.Errorf("couldn't connect to any address") 1165} 1166 1167// createTransport creates a connection to addr. It returns the transport and a 1168// Event in the successful case. The Event fires when the returned transport 1169// disconnects. 1170func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) (transport.ClientTransport, *grpcsync.Event, error) { 1171 prefaceReceived := make(chan struct{}) 1172 onCloseCalled := make(chan struct{}) 1173 reconnect := grpcsync.NewEvent() 1174 1175 target := transport.TargetInfo{ 1176 Addr: addr.Addr, 1177 Metadata: addr.Metadata, 1178 Authority: ac.cc.authority, 1179 } 1180 1181 onGoAway := func(r transport.GoAwayReason) { 1182 ac.mu.Lock() 1183 ac.adjustParams(r) 1184 ac.mu.Unlock() 1185 reconnect.Fire() 1186 } 1187 1188 onClose := func() { 1189 close(onCloseCalled) 1190 reconnect.Fire() 1191 } 1192 1193 onPrefaceReceipt := func() { 1194 close(prefaceReceived) 1195 } 1196 1197 connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline) 1198 defer cancel() 1199 if channelz.IsOn() { 1200 copts.ChannelzParentID = ac.channelzID 1201 } 1202 1203 newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose) 1204 if err != nil { 1205 // newTr is either nil, or closed. 1206 grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err) 1207 return nil, nil, err 1208 } 1209 1210 if ac.dopts.reqHandshake == envconfig.RequireHandshakeOn { 1211 select { 1212 case <-time.After(connectDeadline.Sub(time.Now())): 1213 // We didn't get the preface in time. 1214 newTr.Close() 1215 grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr) 1216 return nil, nil, errors.New("timed out waiting for server handshake") 1217 case <-prefaceReceived: 1218 // We got the preface - huzzah! things are good. 1219 case <-onCloseCalled: 1220 // The transport has already closed - noop. 1221 return nil, nil, errors.New("connection closed") 1222 // TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix. 1223 } 1224 } 1225 return newTr, reconnect, nil 1226} 1227 1228func (ac *addrConn) startHealthCheck(ctx context.Context, newTr transport.ClientTransport, addr resolver.Address, serviceName string) { 1229 // Set up the health check helper functions 1230 newStream := func() (interface{}, error) { 1231 return ac.newClientStream(ctx, &StreamDesc{ServerStreams: true}, "/grpc.health.v1.Health/Watch", newTr) 1232 } 1233 firstReady := true 1234 reportHealth := func(ok bool) { 1235 ac.mu.Lock() 1236 defer ac.mu.Unlock() 1237 if ac.transport != newTr { 1238 return 1239 } 1240 if ok { 1241 if firstReady { 1242 firstReady = false 1243 ac.curAddr = addr 1244 } 1245 ac.updateConnectivityState(connectivity.Ready) 1246 } else { 1247 ac.updateConnectivityState(connectivity.TransientFailure) 1248 } 1249 } 1250 err := ac.cc.dopts.healthCheckFunc(ctx, newStream, reportHealth, serviceName) 1251 if err != nil { 1252 if status.Code(err) == codes.Unimplemented { 1253 if channelz.IsOn() { 1254 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{ 1255 Desc: "Subchannel health check is unimplemented at server side, thus health check is disabled", 1256 Severity: channelz.CtError, 1257 }) 1258 } 1259 grpclog.Error("Subchannel health check is unimplemented at server side, thus health check is disabled") 1260 } else { 1261 grpclog.Errorf("HealthCheckFunc exits with unexpected error %v", err) 1262 } 1263 } 1264} 1265 1266func (ac *addrConn) resetConnectBackoff() { 1267 ac.mu.Lock() 1268 close(ac.resetBackoff) 1269 ac.backoffIdx = 0 1270 ac.resetBackoff = make(chan struct{}) 1271 ac.mu.Unlock() 1272} 1273 1274// getReadyTransport returns the transport if ac's state is READY. 1275// Otherwise it returns nil, false. 1276// If ac's state is IDLE, it will trigger ac to connect. 1277func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) { 1278 ac.mu.Lock() 1279 if ac.state == connectivity.Ready && ac.transport != nil { 1280 t := ac.transport 1281 ac.mu.Unlock() 1282 return t, true 1283 } 1284 var idle bool 1285 if ac.state == connectivity.Idle { 1286 idle = true 1287 } 1288 ac.mu.Unlock() 1289 // Trigger idle ac to connect. 1290 if idle { 1291 ac.connect() 1292 } 1293 return nil, false 1294} 1295 1296// tearDown starts to tear down the addrConn. 1297// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in 1298// some edge cases (e.g., the caller opens and closes many addrConn's in a 1299// tight loop. 1300// tearDown doesn't remove ac from ac.cc.conns. 1301func (ac *addrConn) tearDown(err error) { 1302 ac.mu.Lock() 1303 if ac.state == connectivity.Shutdown { 1304 ac.mu.Unlock() 1305 return 1306 } 1307 curTr := ac.transport 1308 ac.transport = nil 1309 // We have to set the state to Shutdown before anything else to prevent races 1310 // between setting the state and logic that waits on context cancelation / etc. 1311 ac.updateConnectivityState(connectivity.Shutdown) 1312 ac.cancel() 1313 ac.curAddr = resolver.Address{} 1314 if err == errConnDrain && curTr != nil { 1315 // GracefulClose(...) may be executed multiple times when 1316 // i) receiving multiple GoAway frames from the server; or 1317 // ii) there are concurrent name resolver/Balancer triggered 1318 // address removal and GoAway. 1319 // We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu. 1320 ac.mu.Unlock() 1321 curTr.GracefulClose() 1322 ac.mu.Lock() 1323 } 1324 if channelz.IsOn() { 1325 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{ 1326 Desc: "Subchannel Deleted", 1327 Severity: channelz.CtINFO, 1328 Parent: &channelz.TraceEventDesc{ 1329 Desc: fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID), 1330 Severity: channelz.CtINFO, 1331 }, 1332 }) 1333 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to 1334 // the entity beng deleted, and thus prevent it from being deleted right away. 1335 channelz.RemoveEntry(ac.channelzID) 1336 } 1337 ac.mu.Unlock() 1338} 1339 1340func (ac *addrConn) getState() connectivity.State { 1341 ac.mu.Lock() 1342 defer ac.mu.Unlock() 1343 return ac.state 1344} 1345 1346func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric { 1347 ac.mu.Lock() 1348 addr := ac.curAddr.Addr 1349 ac.mu.Unlock() 1350 return &channelz.ChannelInternalMetric{ 1351 State: ac.getState(), 1352 Target: addr, 1353 CallsStarted: atomic.LoadInt64(&ac.czData.callsStarted), 1354 CallsSucceeded: atomic.LoadInt64(&ac.czData.callsSucceeded), 1355 CallsFailed: atomic.LoadInt64(&ac.czData.callsFailed), 1356 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)), 1357 } 1358} 1359 1360func (ac *addrConn) incrCallsStarted() { 1361 atomic.AddInt64(&ac.czData.callsStarted, 1) 1362 atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano()) 1363} 1364 1365func (ac *addrConn) incrCallsSucceeded() { 1366 atomic.AddInt64(&ac.czData.callsSucceeded, 1) 1367} 1368 1369func (ac *addrConn) incrCallsFailed() { 1370 atomic.AddInt64(&ac.czData.callsFailed, 1) 1371} 1372 1373type retryThrottler struct { 1374 max float64 1375 thresh float64 1376 ratio float64 1377 1378 mu sync.Mutex 1379 tokens float64 // TODO(dfawley): replace with atomic and remove lock. 1380} 1381 1382// throttle subtracts a retry token from the pool and returns whether a retry 1383// should be throttled (disallowed) based upon the retry throttling policy in 1384// the service config. 1385func (rt *retryThrottler) throttle() bool { 1386 if rt == nil { 1387 return false 1388 } 1389 rt.mu.Lock() 1390 defer rt.mu.Unlock() 1391 rt.tokens-- 1392 if rt.tokens < 0 { 1393 rt.tokens = 0 1394 } 1395 return rt.tokens <= rt.thresh 1396} 1397 1398func (rt *retryThrottler) successfulRPC() { 1399 if rt == nil { 1400 return 1401 } 1402 rt.mu.Lock() 1403 defer rt.mu.Unlock() 1404 rt.tokens += rt.ratio 1405 if rt.tokens > rt.max { 1406 rt.tokens = rt.max 1407 } 1408} 1409 1410type channelzChannel struct { 1411 cc *ClientConn 1412} 1413 1414func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric { 1415 return c.cc.channelzMetric() 1416} 1417 1418// ErrClientConnTimeout indicates that the ClientConn cannot establish the 1419// underlying connections within the specified timeout. 1420// 1421// Deprecated: This error is never returned by grpc and should not be 1422// referenced by users. 1423var ErrClientConnTimeout = errors.New("grpc: timed out when dialing") 1424