1/* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package grpc 20 21import ( 22 "context" 23 "errors" 24 "fmt" 25 "math" 26 "net" 27 "reflect" 28 "strings" 29 "sync" 30 "sync/atomic" 31 "time" 32 33 "google.golang.org/grpc/balancer" 34 _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin. 35 "google.golang.org/grpc/codes" 36 "google.golang.org/grpc/connectivity" 37 "google.golang.org/grpc/credentials" 38 "google.golang.org/grpc/grpclog" 39 "google.golang.org/grpc/internal/backoff" 40 "google.golang.org/grpc/internal/channelz" 41 "google.golang.org/grpc/internal/grpcsync" 42 "google.golang.org/grpc/internal/transport" 43 "google.golang.org/grpc/keepalive" 44 "google.golang.org/grpc/resolver" 45 _ "google.golang.org/grpc/resolver/dns" // To register dns resolver. 46 _ "google.golang.org/grpc/resolver/passthrough" // To register passthrough resolver. 47 "google.golang.org/grpc/serviceconfig" 48 "google.golang.org/grpc/status" 49) 50 51const ( 52 // minimum time to give a connection to complete 53 minConnectTimeout = 20 * time.Second 54 // must match grpclbName in grpclb/grpclb.go 55 grpclbName = "grpclb" 56) 57 58var ( 59 // ErrClientConnClosing indicates that the operation is illegal because 60 // the ClientConn is closing. 61 // 62 // Deprecated: this error should not be relied upon by users; use the status 63 // code of Canceled instead. 64 ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing") 65 // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs. 66 errConnDrain = errors.New("grpc: the connection is drained") 67 // errConnClosing indicates that the connection is closing. 68 errConnClosing = errors.New("grpc: the connection is closing") 69 // errBalancerClosed indicates that the balancer is closed. 70 errBalancerClosed = errors.New("grpc: balancer is closed") 71 // invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default 72 // service config. 73 invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid" 74) 75 76// The following errors are returned from Dial and DialContext 77var ( 78 // errNoTransportSecurity indicates that there is no transport security 79 // being set for ClientConn. Users should either set one or explicitly 80 // call WithInsecure DialOption to disable security. 81 errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)") 82 // errTransportCredsAndBundle indicates that creds bundle is used together 83 // with other individual Transport Credentials. 84 errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials") 85 // errTransportCredentialsMissing indicates that users want to transmit security 86 // information (e.g., OAuth2 token) which requires secure connection on an insecure 87 // connection. 88 errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)") 89 // errCredentialsConflict indicates that grpc.WithTransportCredentials() 90 // and grpc.WithInsecure() are both called for a connection. 91 errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)") 92) 93 94const ( 95 defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4 96 defaultClientMaxSendMessageSize = math.MaxInt32 97 // http2IOBufSize specifies the buffer size for sending frames. 98 defaultWriteBufSize = 32 * 1024 99 defaultReadBufSize = 32 * 1024 100) 101 102// Dial creates a client connection to the given target. 103func Dial(target string, opts ...DialOption) (*ClientConn, error) { 104 return DialContext(context.Background(), target, opts...) 105} 106 107// DialContext creates a client connection to the given target. By default, it's 108// a non-blocking dial (the function won't wait for connections to be 109// established, and connecting happens in the background). To make it a blocking 110// dial, use WithBlock() dial option. 111// 112// In the non-blocking case, the ctx does not act against the connection. It 113// only controls the setup steps. 114// 115// In the blocking case, ctx can be used to cancel or expire the pending 116// connection. Once this function returns, the cancellation and expiration of 117// ctx will be noop. Users should call ClientConn.Close to terminate all the 118// pending operations after this function returns. 119// 120// The target name syntax is defined in 121// https://github.com/grpc/grpc/blob/master/doc/naming.md. 122// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target. 123func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) { 124 cc := &ClientConn{ 125 target: target, 126 csMgr: &connectivityStateManager{}, 127 conns: make(map[*addrConn]struct{}), 128 dopts: defaultDialOptions(), 129 blockingpicker: newPickerWrapper(), 130 czData: new(channelzData), 131 firstResolveEvent: grpcsync.NewEvent(), 132 } 133 cc.retryThrottler.Store((*retryThrottler)(nil)) 134 cc.ctx, cc.cancel = context.WithCancel(context.Background()) 135 136 for _, opt := range opts { 137 opt.apply(&cc.dopts) 138 } 139 140 chainUnaryClientInterceptors(cc) 141 chainStreamClientInterceptors(cc) 142 143 defer func() { 144 if err != nil { 145 cc.Close() 146 } 147 }() 148 149 if channelz.IsOn() { 150 if cc.dopts.channelzParentID != 0 { 151 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target) 152 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{ 153 Desc: "Channel Created", 154 Severity: channelz.CtINFO, 155 Parent: &channelz.TraceEventDesc{ 156 Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID), 157 Severity: channelz.CtINFO, 158 }, 159 }) 160 } else { 161 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target) 162 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{ 163 Desc: "Channel Created", 164 Severity: channelz.CtINFO, 165 }) 166 } 167 cc.csMgr.channelzID = cc.channelzID 168 } 169 170 if !cc.dopts.insecure { 171 if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil { 172 return nil, errNoTransportSecurity 173 } 174 if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil { 175 return nil, errTransportCredsAndBundle 176 } 177 } else { 178 if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil { 179 return nil, errCredentialsConflict 180 } 181 for _, cd := range cc.dopts.copts.PerRPCCredentials { 182 if cd.RequireTransportSecurity() { 183 return nil, errTransportCredentialsMissing 184 } 185 } 186 } 187 188 if cc.dopts.defaultServiceConfigRawJSON != nil { 189 sc, err := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON) 190 if err != nil { 191 return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, err) 192 } 193 cc.dopts.defaultServiceConfig = sc 194 } 195 cc.mkp = cc.dopts.copts.KeepaliveParams 196 197 if cc.dopts.copts.Dialer == nil { 198 cc.dopts.copts.Dialer = newProxyDialer( 199 func(ctx context.Context, addr string) (net.Conn, error) { 200 network, addr := parseDialTarget(addr) 201 return (&net.Dialer{}).DialContext(ctx, network, addr) 202 }, 203 ) 204 } 205 206 if cc.dopts.copts.UserAgent != "" { 207 cc.dopts.copts.UserAgent += " " + grpcUA 208 } else { 209 cc.dopts.copts.UserAgent = grpcUA 210 } 211 212 if cc.dopts.timeout > 0 { 213 var cancel context.CancelFunc 214 ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout) 215 defer cancel() 216 } 217 defer func() { 218 select { 219 case <-ctx.Done(): 220 conn, err = nil, ctx.Err() 221 default: 222 } 223 }() 224 225 scSet := false 226 if cc.dopts.scChan != nil { 227 // Try to get an initial service config. 228 select { 229 case sc, ok := <-cc.dopts.scChan: 230 if ok { 231 cc.sc = &sc 232 scSet = true 233 } 234 default: 235 } 236 } 237 if cc.dopts.bs == nil { 238 cc.dopts.bs = backoff.Exponential{ 239 MaxDelay: DefaultBackoffConfig.MaxDelay, 240 } 241 } 242 if cc.dopts.resolverBuilder == nil { 243 // Only try to parse target when resolver builder is not already set. 244 cc.parsedTarget = parseTarget(cc.target) 245 grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme) 246 cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme) 247 if cc.dopts.resolverBuilder == nil { 248 // If resolver builder is still nil, the parsed target's scheme is 249 // not registered. Fallback to default resolver and set Endpoint to 250 // the original target. 251 grpclog.Infof("scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme) 252 cc.parsedTarget = resolver.Target{ 253 Scheme: resolver.GetDefaultScheme(), 254 Endpoint: target, 255 } 256 cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme) 257 } 258 } else { 259 cc.parsedTarget = resolver.Target{Endpoint: target} 260 } 261 creds := cc.dopts.copts.TransportCredentials 262 if creds != nil && creds.Info().ServerName != "" { 263 cc.authority = creds.Info().ServerName 264 } else if cc.dopts.insecure && cc.dopts.authority != "" { 265 cc.authority = cc.dopts.authority 266 } else { 267 // Use endpoint from "scheme://authority/endpoint" as the default 268 // authority for ClientConn. 269 cc.authority = cc.parsedTarget.Endpoint 270 } 271 272 if cc.dopts.scChan != nil && !scSet { 273 // Blocking wait for the initial service config. 274 select { 275 case sc, ok := <-cc.dopts.scChan: 276 if ok { 277 cc.sc = &sc 278 } 279 case <-ctx.Done(): 280 return nil, ctx.Err() 281 } 282 } 283 if cc.dopts.scChan != nil { 284 go cc.scWatcher() 285 } 286 287 var credsClone credentials.TransportCredentials 288 if creds := cc.dopts.copts.TransportCredentials; creds != nil { 289 credsClone = creds.Clone() 290 } 291 cc.balancerBuildOpts = balancer.BuildOptions{ 292 DialCreds: credsClone, 293 CredsBundle: cc.dopts.copts.CredsBundle, 294 Dialer: cc.dopts.copts.Dialer, 295 ChannelzParentID: cc.channelzID, 296 Target: cc.parsedTarget, 297 } 298 299 // Build the resolver. 300 rWrapper, err := newCCResolverWrapper(cc) 301 if err != nil { 302 return nil, fmt.Errorf("failed to build resolver: %v", err) 303 } 304 305 cc.mu.Lock() 306 cc.resolverWrapper = rWrapper 307 cc.mu.Unlock() 308 // A blocking dial blocks until the clientConn is ready. 309 if cc.dopts.block { 310 for { 311 s := cc.GetState() 312 if s == connectivity.Ready { 313 break 314 } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure { 315 if err = cc.blockingpicker.connectionError(); err != nil { 316 terr, ok := err.(interface { 317 Temporary() bool 318 }) 319 if ok && !terr.Temporary() { 320 return nil, err 321 } 322 } 323 } 324 if !cc.WaitForStateChange(ctx, s) { 325 // ctx got timeout or canceled. 326 return nil, ctx.Err() 327 } 328 } 329 } 330 331 return cc, nil 332} 333 334// chainUnaryClientInterceptors chains all unary client interceptors into one. 335func chainUnaryClientInterceptors(cc *ClientConn) { 336 interceptors := cc.dopts.chainUnaryInts 337 // Prepend dopts.unaryInt to the chaining interceptors if it exists, since unaryInt will 338 // be executed before any other chained interceptors. 339 if cc.dopts.unaryInt != nil { 340 interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...) 341 } 342 var chainedInt UnaryClientInterceptor 343 if len(interceptors) == 0 { 344 chainedInt = nil 345 } else if len(interceptors) == 1 { 346 chainedInt = interceptors[0] 347 } else { 348 chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error { 349 return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...) 350 } 351 } 352 cc.dopts.unaryInt = chainedInt 353} 354 355// getChainUnaryInvoker recursively generate the chained unary invoker. 356func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, finalInvoker UnaryInvoker) UnaryInvoker { 357 if curr == len(interceptors)-1 { 358 return finalInvoker 359 } 360 return func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error { 361 return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...) 362 } 363} 364 365// chainStreamClientInterceptors chains all stream client interceptors into one. 366func chainStreamClientInterceptors(cc *ClientConn) { 367 interceptors := cc.dopts.chainStreamInts 368 // Prepend dopts.streamInt to the chaining interceptors if it exists, since streamInt will 369 // be executed before any other chained interceptors. 370 if cc.dopts.streamInt != nil { 371 interceptors = append([]StreamClientInterceptor{cc.dopts.streamInt}, interceptors...) 372 } 373 var chainedInt StreamClientInterceptor 374 if len(interceptors) == 0 { 375 chainedInt = nil 376 } else if len(interceptors) == 1 { 377 chainedInt = interceptors[0] 378 } else { 379 chainedInt = func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error) { 380 return interceptors[0](ctx, desc, cc, method, getChainStreamer(interceptors, 0, streamer), opts...) 381 } 382 } 383 cc.dopts.streamInt = chainedInt 384} 385 386// getChainStreamer recursively generate the chained client stream constructor. 387func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStreamer Streamer) Streamer { 388 if curr == len(interceptors)-1 { 389 return finalStreamer 390 } 391 return func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) { 392 return interceptors[curr+1](ctx, desc, cc, method, getChainStreamer(interceptors, curr+1, finalStreamer), opts...) 393 } 394} 395 396// connectivityStateManager keeps the connectivity.State of ClientConn. 397// This struct will eventually be exported so the balancers can access it. 398type connectivityStateManager struct { 399 mu sync.Mutex 400 state connectivity.State 401 notifyChan chan struct{} 402 channelzID int64 403} 404 405// updateState updates the connectivity.State of ClientConn. 406// If there's a change it notifies goroutines waiting on state change to 407// happen. 408func (csm *connectivityStateManager) updateState(state connectivity.State) { 409 csm.mu.Lock() 410 defer csm.mu.Unlock() 411 if csm.state == connectivity.Shutdown { 412 return 413 } 414 if csm.state == state { 415 return 416 } 417 csm.state = state 418 if channelz.IsOn() { 419 channelz.AddTraceEvent(csm.channelzID, &channelz.TraceEventDesc{ 420 Desc: fmt.Sprintf("Channel Connectivity change to %v", state), 421 Severity: channelz.CtINFO, 422 }) 423 } 424 if csm.notifyChan != nil { 425 // There are other goroutines waiting on this channel. 426 close(csm.notifyChan) 427 csm.notifyChan = nil 428 } 429} 430 431func (csm *connectivityStateManager) getState() connectivity.State { 432 csm.mu.Lock() 433 defer csm.mu.Unlock() 434 return csm.state 435} 436 437func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} { 438 csm.mu.Lock() 439 defer csm.mu.Unlock() 440 if csm.notifyChan == nil { 441 csm.notifyChan = make(chan struct{}) 442 } 443 return csm.notifyChan 444} 445 446// ClientConn represents a client connection to an RPC server. 447type ClientConn struct { 448 ctx context.Context 449 cancel context.CancelFunc 450 451 target string 452 parsedTarget resolver.Target 453 authority string 454 dopts dialOptions 455 csMgr *connectivityStateManager 456 457 balancerBuildOpts balancer.BuildOptions 458 blockingpicker *pickerWrapper 459 460 mu sync.RWMutex 461 resolverWrapper *ccResolverWrapper 462 sc *ServiceConfig 463 conns map[*addrConn]struct{} 464 // Keepalive parameter can be updated if a GoAway is received. 465 mkp keepalive.ClientParameters 466 curBalancerName string 467 balancerWrapper *ccBalancerWrapper 468 retryThrottler atomic.Value 469 470 firstResolveEvent *grpcsync.Event 471 472 channelzID int64 // channelz unique identification number 473 czData *channelzData 474} 475 476// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or 477// ctx expires. A true value is returned in former case and false in latter. 478// This is an EXPERIMENTAL API. 479func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool { 480 ch := cc.csMgr.getNotifyChan() 481 if cc.csMgr.getState() != sourceState { 482 return true 483 } 484 select { 485 case <-ctx.Done(): 486 return false 487 case <-ch: 488 return true 489 } 490} 491 492// GetState returns the connectivity.State of ClientConn. 493// This is an EXPERIMENTAL API. 494func (cc *ClientConn) GetState() connectivity.State { 495 return cc.csMgr.getState() 496} 497 498func (cc *ClientConn) scWatcher() { 499 for { 500 select { 501 case sc, ok := <-cc.dopts.scChan: 502 if !ok { 503 return 504 } 505 cc.mu.Lock() 506 // TODO: load balance policy runtime change is ignored. 507 // We may revisit this decision in the future. 508 cc.sc = &sc 509 cc.mu.Unlock() 510 case <-cc.ctx.Done(): 511 return 512 } 513 } 514} 515 516// waitForResolvedAddrs blocks until the resolver has provided addresses or the 517// context expires. Returns nil unless the context expires first; otherwise 518// returns a status error based on the context. 519func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error { 520 // This is on the RPC path, so we use a fast path to avoid the 521 // more-expensive "select" below after the resolver has returned once. 522 if cc.firstResolveEvent.HasFired() { 523 return nil 524 } 525 select { 526 case <-cc.firstResolveEvent.Done(): 527 return nil 528 case <-ctx.Done(): 529 return status.FromContextError(ctx.Err()).Err() 530 case <-cc.ctx.Done(): 531 return ErrClientConnClosing 532 } 533} 534 535func (cc *ClientConn) updateResolverState(s resolver.State) error { 536 cc.mu.Lock() 537 defer cc.mu.Unlock() 538 // Check if the ClientConn is already closed. Some fields (e.g. 539 // balancerWrapper) are set to nil when closing the ClientConn, and could 540 // cause nil pointer panic if we don't have this check. 541 if cc.conns == nil { 542 return nil 543 } 544 545 if cc.dopts.disableServiceConfig || s.ServiceConfig == nil { 546 if cc.dopts.defaultServiceConfig != nil && cc.sc == nil { 547 cc.applyServiceConfig(cc.dopts.defaultServiceConfig) 548 } 549 } else if sc, ok := s.ServiceConfig.(*ServiceConfig); ok { 550 cc.applyServiceConfig(sc) 551 } 552 553 var balCfg serviceconfig.LoadBalancingConfig 554 if cc.dopts.balancerBuilder == nil { 555 // Only look at balancer types and switch balancer if balancer dial 556 // option is not set. 557 var newBalancerName string 558 if cc.sc != nil && cc.sc.lbConfig != nil { 559 newBalancerName = cc.sc.lbConfig.name 560 balCfg = cc.sc.lbConfig.cfg 561 } else { 562 var isGRPCLB bool 563 for _, a := range s.Addresses { 564 if a.Type == resolver.GRPCLB { 565 isGRPCLB = true 566 break 567 } 568 } 569 if isGRPCLB { 570 newBalancerName = grpclbName 571 } else if cc.sc != nil && cc.sc.LB != nil { 572 newBalancerName = *cc.sc.LB 573 } else { 574 newBalancerName = PickFirstBalancerName 575 } 576 } 577 cc.switchBalancer(newBalancerName) 578 } else if cc.balancerWrapper == nil { 579 // Balancer dial option was set, and this is the first time handling 580 // resolved addresses. Build a balancer with dopts.balancerBuilder. 581 cc.curBalancerName = cc.dopts.balancerBuilder.Name() 582 cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts) 583 } 584 585 cc.balancerWrapper.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg}) 586 return nil 587} 588 589// switchBalancer starts the switching from current balancer to the balancer 590// with the given name. 591// 592// It will NOT send the current address list to the new balancer. If needed, 593// caller of this function should send address list to the new balancer after 594// this function returns. 595// 596// Caller must hold cc.mu. 597func (cc *ClientConn) switchBalancer(name string) { 598 if strings.EqualFold(cc.curBalancerName, name) { 599 return 600 } 601 602 grpclog.Infof("ClientConn switching balancer to %q", name) 603 if cc.dopts.balancerBuilder != nil { 604 grpclog.Infoln("ignoring balancer switching: Balancer DialOption used instead") 605 return 606 } 607 if cc.balancerWrapper != nil { 608 cc.balancerWrapper.close() 609 } 610 611 builder := balancer.Get(name) 612 if channelz.IsOn() { 613 if builder == nil { 614 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{ 615 Desc: fmt.Sprintf("Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName), 616 Severity: channelz.CtWarning, 617 }) 618 } else { 619 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{ 620 Desc: fmt.Sprintf("Channel switches to new LB policy %q", name), 621 Severity: channelz.CtINFO, 622 }) 623 } 624 } 625 if builder == nil { 626 grpclog.Infof("failed to get balancer builder for: %v, using pick_first instead", name) 627 builder = newPickfirstBuilder() 628 } 629 630 cc.curBalancerName = builder.Name() 631 cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts) 632} 633 634func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) { 635 cc.mu.Lock() 636 if cc.conns == nil { 637 cc.mu.Unlock() 638 return 639 } 640 // TODO(bar switching) send updates to all balancer wrappers when balancer 641 // gracefully switching is supported. 642 cc.balancerWrapper.handleSubConnStateChange(sc, s) 643 cc.mu.Unlock() 644} 645 646// newAddrConn creates an addrConn for addrs and adds it to cc.conns. 647// 648// Caller needs to make sure len(addrs) > 0. 649func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) { 650 ac := &addrConn{ 651 cc: cc, 652 addrs: addrs, 653 scopts: opts, 654 dopts: cc.dopts, 655 czData: new(channelzData), 656 resetBackoff: make(chan struct{}), 657 } 658 ac.ctx, ac.cancel = context.WithCancel(cc.ctx) 659 // Track ac in cc. This needs to be done before any getTransport(...) is called. 660 cc.mu.Lock() 661 if cc.conns == nil { 662 cc.mu.Unlock() 663 return nil, ErrClientConnClosing 664 } 665 if channelz.IsOn() { 666 ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "") 667 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{ 668 Desc: "Subchannel Created", 669 Severity: channelz.CtINFO, 670 Parent: &channelz.TraceEventDesc{ 671 Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID), 672 Severity: channelz.CtINFO, 673 }, 674 }) 675 } 676 cc.conns[ac] = struct{}{} 677 cc.mu.Unlock() 678 return ac, nil 679} 680 681// removeAddrConn removes the addrConn in the subConn from clientConn. 682// It also tears down the ac with the given error. 683func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) { 684 cc.mu.Lock() 685 if cc.conns == nil { 686 cc.mu.Unlock() 687 return 688 } 689 delete(cc.conns, ac) 690 cc.mu.Unlock() 691 ac.tearDown(err) 692} 693 694func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric { 695 return &channelz.ChannelInternalMetric{ 696 State: cc.GetState(), 697 Target: cc.target, 698 CallsStarted: atomic.LoadInt64(&cc.czData.callsStarted), 699 CallsSucceeded: atomic.LoadInt64(&cc.czData.callsSucceeded), 700 CallsFailed: atomic.LoadInt64(&cc.czData.callsFailed), 701 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)), 702 } 703} 704 705// Target returns the target string of the ClientConn. 706// This is an EXPERIMENTAL API. 707func (cc *ClientConn) Target() string { 708 return cc.target 709} 710 711func (cc *ClientConn) incrCallsStarted() { 712 atomic.AddInt64(&cc.czData.callsStarted, 1) 713 atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano()) 714} 715 716func (cc *ClientConn) incrCallsSucceeded() { 717 atomic.AddInt64(&cc.czData.callsSucceeded, 1) 718} 719 720func (cc *ClientConn) incrCallsFailed() { 721 atomic.AddInt64(&cc.czData.callsFailed, 1) 722} 723 724// connect starts creating a transport. 725// It does nothing if the ac is not IDLE. 726// TODO(bar) Move this to the addrConn section. 727func (ac *addrConn) connect() error { 728 ac.mu.Lock() 729 if ac.state == connectivity.Shutdown { 730 ac.mu.Unlock() 731 return errConnClosing 732 } 733 if ac.state != connectivity.Idle { 734 ac.mu.Unlock() 735 return nil 736 } 737 // Update connectivity state within the lock to prevent subsequent or 738 // concurrent calls from resetting the transport more than once. 739 ac.updateConnectivityState(connectivity.Connecting) 740 ac.mu.Unlock() 741 742 // Start a goroutine connecting to the server asynchronously. 743 go ac.resetTransport() 744 return nil 745} 746 747// tryUpdateAddrs tries to update ac.addrs with the new addresses list. 748// 749// If ac is Connecting, it returns false. The caller should tear down the ac and 750// create a new one. Note that the backoff will be reset when this happens. 751// 752// If ac is TransientFailure, it updates ac.addrs and returns true. The updated 753// addresses will be picked up by retry in the next iteration after backoff. 754// 755// If ac is Shutdown or Idle, it updates ac.addrs and returns true. 756// 757// If ac is Ready, it checks whether current connected address of ac is in the 758// new addrs list. 759// - If true, it updates ac.addrs and returns true. The ac will keep using 760// the existing connection. 761// - If false, it does nothing and returns false. 762func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool { 763 ac.mu.Lock() 764 defer ac.mu.Unlock() 765 grpclog.Infof("addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs) 766 if ac.state == connectivity.Shutdown || 767 ac.state == connectivity.TransientFailure || 768 ac.state == connectivity.Idle { 769 ac.addrs = addrs 770 return true 771 } 772 773 if ac.state == connectivity.Connecting { 774 return false 775 } 776 777 // ac.state is Ready, try to find the connected address. 778 var curAddrFound bool 779 for _, a := range addrs { 780 if reflect.DeepEqual(ac.curAddr, a) { 781 curAddrFound = true 782 break 783 } 784 } 785 grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound) 786 if curAddrFound { 787 ac.addrs = addrs 788 } 789 790 return curAddrFound 791} 792 793// GetMethodConfig gets the method config of the input method. 794// If there's an exact match for input method (i.e. /service/method), we return 795// the corresponding MethodConfig. 796// If there isn't an exact match for the input method, we look for the default config 797// under the service (i.e /service/). If there is a default MethodConfig for 798// the service, we return it. 799// Otherwise, we return an empty MethodConfig. 800func (cc *ClientConn) GetMethodConfig(method string) MethodConfig { 801 // TODO: Avoid the locking here. 802 cc.mu.RLock() 803 defer cc.mu.RUnlock() 804 if cc.sc == nil { 805 return MethodConfig{} 806 } 807 m, ok := cc.sc.Methods[method] 808 if !ok { 809 i := strings.LastIndex(method, "/") 810 m = cc.sc.Methods[method[:i+1]] 811 } 812 return m 813} 814 815func (cc *ClientConn) healthCheckConfig() *healthCheckConfig { 816 cc.mu.RLock() 817 defer cc.mu.RUnlock() 818 if cc.sc == nil { 819 return nil 820 } 821 return cc.sc.healthCheckConfig 822} 823 824func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) { 825 t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{ 826 FullMethodName: method, 827 }) 828 if err != nil { 829 return nil, nil, toRPCErr(err) 830 } 831 return t, done, nil 832} 833 834func (cc *ClientConn) applyServiceConfig(sc *ServiceConfig) error { 835 if sc == nil { 836 // should never reach here. 837 return fmt.Errorf("got nil pointer for service config") 838 } 839 cc.sc = sc 840 841 if cc.sc.retryThrottling != nil { 842 newThrottler := &retryThrottler{ 843 tokens: cc.sc.retryThrottling.MaxTokens, 844 max: cc.sc.retryThrottling.MaxTokens, 845 thresh: cc.sc.retryThrottling.MaxTokens / 2, 846 ratio: cc.sc.retryThrottling.TokenRatio, 847 } 848 cc.retryThrottler.Store(newThrottler) 849 } else { 850 cc.retryThrottler.Store((*retryThrottler)(nil)) 851 } 852 853 return nil 854} 855 856func (cc *ClientConn) resolveNow(o resolver.ResolveNowOption) { 857 cc.mu.RLock() 858 r := cc.resolverWrapper 859 cc.mu.RUnlock() 860 if r == nil { 861 return 862 } 863 go r.resolveNow(o) 864} 865 866// ResetConnectBackoff wakes up all subchannels in transient failure and causes 867// them to attempt another connection immediately. It also resets the backoff 868// times used for subsequent attempts regardless of the current state. 869// 870// In general, this function should not be used. Typical service or network 871// outages result in a reasonable client reconnection strategy by default. 872// However, if a previously unavailable network becomes available, this may be 873// used to trigger an immediate reconnect. 874// 875// This API is EXPERIMENTAL. 876func (cc *ClientConn) ResetConnectBackoff() { 877 cc.mu.Lock() 878 defer cc.mu.Unlock() 879 for ac := range cc.conns { 880 ac.resetConnectBackoff() 881 } 882} 883 884// Close tears down the ClientConn and all underlying connections. 885func (cc *ClientConn) Close() error { 886 defer cc.cancel() 887 888 cc.mu.Lock() 889 if cc.conns == nil { 890 cc.mu.Unlock() 891 return ErrClientConnClosing 892 } 893 conns := cc.conns 894 cc.conns = nil 895 cc.csMgr.updateState(connectivity.Shutdown) 896 897 rWrapper := cc.resolverWrapper 898 cc.resolverWrapper = nil 899 bWrapper := cc.balancerWrapper 900 cc.balancerWrapper = nil 901 cc.mu.Unlock() 902 903 cc.blockingpicker.close() 904 905 if rWrapper != nil { 906 rWrapper.close() 907 } 908 if bWrapper != nil { 909 bWrapper.close() 910 } 911 912 for ac := range conns { 913 ac.tearDown(ErrClientConnClosing) 914 } 915 if channelz.IsOn() { 916 ted := &channelz.TraceEventDesc{ 917 Desc: "Channel Deleted", 918 Severity: channelz.CtINFO, 919 } 920 if cc.dopts.channelzParentID != 0 { 921 ted.Parent = &channelz.TraceEventDesc{ 922 Desc: fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID), 923 Severity: channelz.CtINFO, 924 } 925 } 926 channelz.AddTraceEvent(cc.channelzID, ted) 927 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to 928 // the entity being deleted, and thus prevent it from being deleted right away. 929 channelz.RemoveEntry(cc.channelzID) 930 } 931 return nil 932} 933 934// addrConn is a network connection to a given address. 935type addrConn struct { 936 ctx context.Context 937 cancel context.CancelFunc 938 939 cc *ClientConn 940 dopts dialOptions 941 acbw balancer.SubConn 942 scopts balancer.NewSubConnOptions 943 944 // transport is set when there's a viable transport (note: ac state may not be READY as LB channel 945 // health checking may require server to report healthy to set ac to READY), and is reset 946 // to nil when the current transport should no longer be used to create a stream (e.g. after GoAway 947 // is received, transport is closed, ac has been torn down). 948 transport transport.ClientTransport // The current transport. 949 950 mu sync.Mutex 951 curAddr resolver.Address // The current address. 952 addrs []resolver.Address // All addresses that the resolver resolved to. 953 954 // Use updateConnectivityState for updating addrConn's connectivity state. 955 state connectivity.State 956 957 backoffIdx int // Needs to be stateful for resetConnectBackoff. 958 resetBackoff chan struct{} 959 960 channelzID int64 // channelz unique identification number. 961 czData *channelzData 962} 963 964// Note: this requires a lock on ac.mu. 965func (ac *addrConn) updateConnectivityState(s connectivity.State) { 966 if ac.state == s { 967 return 968 } 969 970 updateMsg := fmt.Sprintf("Subchannel Connectivity change to %v", s) 971 ac.state = s 972 if channelz.IsOn() { 973 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{ 974 Desc: updateMsg, 975 Severity: channelz.CtINFO, 976 }) 977 } 978 ac.cc.handleSubConnStateChange(ac.acbw, s) 979} 980 981// adjustParams updates parameters used to create transports upon 982// receiving a GoAway. 983func (ac *addrConn) adjustParams(r transport.GoAwayReason) { 984 switch r { 985 case transport.GoAwayTooManyPings: 986 v := 2 * ac.dopts.copts.KeepaliveParams.Time 987 ac.cc.mu.Lock() 988 if v > ac.cc.mkp.Time { 989 ac.cc.mkp.Time = v 990 } 991 ac.cc.mu.Unlock() 992 } 993} 994 995func (ac *addrConn) resetTransport() { 996 for i := 0; ; i++ { 997 if i > 0 { 998 ac.cc.resolveNow(resolver.ResolveNowOption{}) 999 } 1000 1001 ac.mu.Lock() 1002 if ac.state == connectivity.Shutdown { 1003 ac.mu.Unlock() 1004 return 1005 } 1006 1007 addrs := ac.addrs 1008 backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx) 1009 // This will be the duration that dial gets to finish. 1010 dialDuration := minConnectTimeout 1011 if ac.dopts.minConnectTimeout != nil { 1012 dialDuration = ac.dopts.minConnectTimeout() 1013 } 1014 1015 if dialDuration < backoffFor { 1016 // Give dial more time as we keep failing to connect. 1017 dialDuration = backoffFor 1018 } 1019 // We can potentially spend all the time trying the first address, and 1020 // if the server accepts the connection and then hangs, the following 1021 // addresses will never be tried. 1022 // 1023 // The spec doesn't mention what should be done for multiple addresses. 1024 // https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm 1025 connectDeadline := time.Now().Add(dialDuration) 1026 1027 ac.updateConnectivityState(connectivity.Connecting) 1028 ac.transport = nil 1029 ac.mu.Unlock() 1030 1031 newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline) 1032 if err != nil { 1033 // After exhausting all addresses, the addrConn enters 1034 // TRANSIENT_FAILURE. 1035 ac.mu.Lock() 1036 if ac.state == connectivity.Shutdown { 1037 ac.mu.Unlock() 1038 return 1039 } 1040 ac.updateConnectivityState(connectivity.TransientFailure) 1041 1042 // Backoff. 1043 b := ac.resetBackoff 1044 ac.mu.Unlock() 1045 1046 timer := time.NewTimer(backoffFor) 1047 select { 1048 case <-timer.C: 1049 ac.mu.Lock() 1050 ac.backoffIdx++ 1051 ac.mu.Unlock() 1052 case <-b: 1053 timer.Stop() 1054 case <-ac.ctx.Done(): 1055 timer.Stop() 1056 return 1057 } 1058 continue 1059 } 1060 1061 ac.mu.Lock() 1062 if ac.state == connectivity.Shutdown { 1063 ac.mu.Unlock() 1064 newTr.Close() 1065 return 1066 } 1067 ac.curAddr = addr 1068 ac.transport = newTr 1069 ac.backoffIdx = 0 1070 1071 hctx, hcancel := context.WithCancel(ac.ctx) 1072 ac.startHealthCheck(hctx) 1073 ac.mu.Unlock() 1074 1075 // Block until the created transport is down. And when this happens, 1076 // we restart from the top of the addr list. 1077 <-reconnect.Done() 1078 hcancel() 1079 // restart connecting - the top of the loop will set state to 1080 // CONNECTING. This is against the current connectivity semantics doc, 1081 // however it allows for graceful behavior for RPCs not yet dispatched 1082 // - unfortunate timing would otherwise lead to the RPC failing even 1083 // though the TRANSIENT_FAILURE state (called for by the doc) would be 1084 // instantaneous. 1085 // 1086 // Ideally we should transition to Idle here and block until there is 1087 // RPC activity that leads to the balancer requesting a reconnect of 1088 // the associated SubConn. 1089 } 1090} 1091 1092// tryAllAddrs tries to creates a connection to the addresses, and stop when at the 1093// first successful one. It returns the transport, the address and a Event in 1094// the successful case. The Event fires when the returned transport disconnects. 1095func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) (transport.ClientTransport, resolver.Address, *grpcsync.Event, error) { 1096 for _, addr := range addrs { 1097 ac.mu.Lock() 1098 if ac.state == connectivity.Shutdown { 1099 ac.mu.Unlock() 1100 return nil, resolver.Address{}, nil, errConnClosing 1101 } 1102 1103 ac.cc.mu.RLock() 1104 ac.dopts.copts.KeepaliveParams = ac.cc.mkp 1105 ac.cc.mu.RUnlock() 1106 1107 copts := ac.dopts.copts 1108 if ac.scopts.CredsBundle != nil { 1109 copts.CredsBundle = ac.scopts.CredsBundle 1110 } 1111 ac.mu.Unlock() 1112 1113 if channelz.IsOn() { 1114 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{ 1115 Desc: fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr), 1116 Severity: channelz.CtINFO, 1117 }) 1118 } 1119 1120 newTr, reconnect, err := ac.createTransport(addr, copts, connectDeadline) 1121 if err == nil { 1122 return newTr, addr, reconnect, nil 1123 } 1124 ac.cc.blockingpicker.updateConnectionError(err) 1125 } 1126 1127 // Couldn't connect to any address. 1128 return nil, resolver.Address{}, nil, fmt.Errorf("couldn't connect to any address") 1129} 1130 1131// createTransport creates a connection to addr. It returns the transport and a 1132// Event in the successful case. The Event fires when the returned transport 1133// disconnects. 1134func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) (transport.ClientTransport, *grpcsync.Event, error) { 1135 prefaceReceived := make(chan struct{}) 1136 onCloseCalled := make(chan struct{}) 1137 reconnect := grpcsync.NewEvent() 1138 1139 target := transport.TargetInfo{ 1140 Addr: addr.Addr, 1141 Metadata: addr.Metadata, 1142 Authority: ac.cc.authority, 1143 } 1144 1145 once := sync.Once{} 1146 onGoAway := func(r transport.GoAwayReason) { 1147 ac.mu.Lock() 1148 ac.adjustParams(r) 1149 once.Do(func() { 1150 if ac.state == connectivity.Ready { 1151 // Prevent this SubConn from being used for new RPCs by setting its 1152 // state to Connecting. 1153 // 1154 // TODO: this should be Idle when grpc-go properly supports it. 1155 ac.updateConnectivityState(connectivity.Connecting) 1156 } 1157 }) 1158 ac.mu.Unlock() 1159 reconnect.Fire() 1160 } 1161 1162 onClose := func() { 1163 ac.mu.Lock() 1164 once.Do(func() { 1165 if ac.state == connectivity.Ready { 1166 // Prevent this SubConn from being used for new RPCs by setting its 1167 // state to Connecting. 1168 // 1169 // TODO: this should be Idle when grpc-go properly supports it. 1170 ac.updateConnectivityState(connectivity.Connecting) 1171 } 1172 }) 1173 ac.mu.Unlock() 1174 close(onCloseCalled) 1175 reconnect.Fire() 1176 } 1177 1178 onPrefaceReceipt := func() { 1179 close(prefaceReceived) 1180 } 1181 1182 connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline) 1183 defer cancel() 1184 if channelz.IsOn() { 1185 copts.ChannelzParentID = ac.channelzID 1186 } 1187 1188 newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose) 1189 if err != nil { 1190 // newTr is either nil, or closed. 1191 grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err) 1192 return nil, nil, err 1193 } 1194 1195 select { 1196 case <-time.After(connectDeadline.Sub(time.Now())): 1197 // We didn't get the preface in time. 1198 newTr.Close() 1199 grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr) 1200 return nil, nil, errors.New("timed out waiting for server handshake") 1201 case <-prefaceReceived: 1202 // We got the preface - huzzah! things are good. 1203 case <-onCloseCalled: 1204 // The transport has already closed - noop. 1205 return nil, nil, errors.New("connection closed") 1206 // TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix. 1207 } 1208 return newTr, reconnect, nil 1209} 1210 1211// startHealthCheck starts the health checking stream (RPC) to watch the health 1212// stats of this connection if health checking is requested and configured. 1213// 1214// LB channel health checking is enabled when all requirements below are met: 1215// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption 1216// 2. internal.HealthCheckFunc is set by importing the grpc/healthcheck package 1217// 3. a service config with non-empty healthCheckConfig field is provided 1218// 4. the load balancer requests it 1219// 1220// It sets addrConn to READY if the health checking stream is not started. 1221// 1222// Caller must hold ac.mu. 1223func (ac *addrConn) startHealthCheck(ctx context.Context) { 1224 var healthcheckManagingState bool 1225 defer func() { 1226 if !healthcheckManagingState { 1227 ac.updateConnectivityState(connectivity.Ready) 1228 } 1229 }() 1230 1231 if ac.cc.dopts.disableHealthCheck { 1232 return 1233 } 1234 healthCheckConfig := ac.cc.healthCheckConfig() 1235 if healthCheckConfig == nil { 1236 return 1237 } 1238 if !ac.scopts.HealthCheckEnabled { 1239 return 1240 } 1241 healthCheckFunc := ac.cc.dopts.healthCheckFunc 1242 if healthCheckFunc == nil { 1243 // The health package is not imported to set health check function. 1244 // 1245 // TODO: add a link to the health check doc in the error message. 1246 grpclog.Error("Health check is requested but health check function is not set.") 1247 return 1248 } 1249 1250 healthcheckManagingState = true 1251 1252 // Set up the health check helper functions. 1253 currentTr := ac.transport 1254 newStream := func(method string) (interface{}, error) { 1255 ac.mu.Lock() 1256 if ac.transport != currentTr { 1257 ac.mu.Unlock() 1258 return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use") 1259 } 1260 ac.mu.Unlock() 1261 return newNonRetryClientStream(ctx, &StreamDesc{ServerStreams: true}, method, currentTr, ac) 1262 } 1263 setConnectivityState := func(s connectivity.State) { 1264 ac.mu.Lock() 1265 defer ac.mu.Unlock() 1266 if ac.transport != currentTr { 1267 return 1268 } 1269 ac.updateConnectivityState(s) 1270 } 1271 // Start the health checking stream. 1272 go func() { 1273 err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName) 1274 if err != nil { 1275 if status.Code(err) == codes.Unimplemented { 1276 if channelz.IsOn() { 1277 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{ 1278 Desc: "Subchannel health check is unimplemented at server side, thus health check is disabled", 1279 Severity: channelz.CtError, 1280 }) 1281 } 1282 grpclog.Error("Subchannel health check is unimplemented at server side, thus health check is disabled") 1283 } else { 1284 grpclog.Errorf("HealthCheckFunc exits with unexpected error %v", err) 1285 } 1286 } 1287 }() 1288} 1289 1290func (ac *addrConn) resetConnectBackoff() { 1291 ac.mu.Lock() 1292 close(ac.resetBackoff) 1293 ac.backoffIdx = 0 1294 ac.resetBackoff = make(chan struct{}) 1295 ac.mu.Unlock() 1296} 1297 1298// getReadyTransport returns the transport if ac's state is READY. 1299// Otherwise it returns nil, false. 1300// If ac's state is IDLE, it will trigger ac to connect. 1301func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) { 1302 ac.mu.Lock() 1303 if ac.state == connectivity.Ready && ac.transport != nil { 1304 t := ac.transport 1305 ac.mu.Unlock() 1306 return t, true 1307 } 1308 var idle bool 1309 if ac.state == connectivity.Idle { 1310 idle = true 1311 } 1312 ac.mu.Unlock() 1313 // Trigger idle ac to connect. 1314 if idle { 1315 ac.connect() 1316 } 1317 return nil, false 1318} 1319 1320// tearDown starts to tear down the addrConn. 1321// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in 1322// some edge cases (e.g., the caller opens and closes many addrConn's in a 1323// tight loop. 1324// tearDown doesn't remove ac from ac.cc.conns. 1325func (ac *addrConn) tearDown(err error) { 1326 ac.mu.Lock() 1327 if ac.state == connectivity.Shutdown { 1328 ac.mu.Unlock() 1329 return 1330 } 1331 curTr := ac.transport 1332 ac.transport = nil 1333 // We have to set the state to Shutdown before anything else to prevent races 1334 // between setting the state and logic that waits on context cancelation / etc. 1335 ac.updateConnectivityState(connectivity.Shutdown) 1336 ac.cancel() 1337 ac.curAddr = resolver.Address{} 1338 if err == errConnDrain && curTr != nil { 1339 // GracefulClose(...) may be executed multiple times when 1340 // i) receiving multiple GoAway frames from the server; or 1341 // ii) there are concurrent name resolver/Balancer triggered 1342 // address removal and GoAway. 1343 // We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu. 1344 ac.mu.Unlock() 1345 curTr.GracefulClose() 1346 ac.mu.Lock() 1347 } 1348 if channelz.IsOn() { 1349 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{ 1350 Desc: "Subchannel Deleted", 1351 Severity: channelz.CtINFO, 1352 Parent: &channelz.TraceEventDesc{ 1353 Desc: fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID), 1354 Severity: channelz.CtINFO, 1355 }, 1356 }) 1357 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to 1358 // the entity beng deleted, and thus prevent it from being deleted right away. 1359 channelz.RemoveEntry(ac.channelzID) 1360 } 1361 ac.mu.Unlock() 1362} 1363 1364func (ac *addrConn) getState() connectivity.State { 1365 ac.mu.Lock() 1366 defer ac.mu.Unlock() 1367 return ac.state 1368} 1369 1370func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric { 1371 ac.mu.Lock() 1372 addr := ac.curAddr.Addr 1373 ac.mu.Unlock() 1374 return &channelz.ChannelInternalMetric{ 1375 State: ac.getState(), 1376 Target: addr, 1377 CallsStarted: atomic.LoadInt64(&ac.czData.callsStarted), 1378 CallsSucceeded: atomic.LoadInt64(&ac.czData.callsSucceeded), 1379 CallsFailed: atomic.LoadInt64(&ac.czData.callsFailed), 1380 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)), 1381 } 1382} 1383 1384func (ac *addrConn) incrCallsStarted() { 1385 atomic.AddInt64(&ac.czData.callsStarted, 1) 1386 atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano()) 1387} 1388 1389func (ac *addrConn) incrCallsSucceeded() { 1390 atomic.AddInt64(&ac.czData.callsSucceeded, 1) 1391} 1392 1393func (ac *addrConn) incrCallsFailed() { 1394 atomic.AddInt64(&ac.czData.callsFailed, 1) 1395} 1396 1397type retryThrottler struct { 1398 max float64 1399 thresh float64 1400 ratio float64 1401 1402 mu sync.Mutex 1403 tokens float64 // TODO(dfawley): replace with atomic and remove lock. 1404} 1405 1406// throttle subtracts a retry token from the pool and returns whether a retry 1407// should be throttled (disallowed) based upon the retry throttling policy in 1408// the service config. 1409func (rt *retryThrottler) throttle() bool { 1410 if rt == nil { 1411 return false 1412 } 1413 rt.mu.Lock() 1414 defer rt.mu.Unlock() 1415 rt.tokens-- 1416 if rt.tokens < 0 { 1417 rt.tokens = 0 1418 } 1419 return rt.tokens <= rt.thresh 1420} 1421 1422func (rt *retryThrottler) successfulRPC() { 1423 if rt == nil { 1424 return 1425 } 1426 rt.mu.Lock() 1427 defer rt.mu.Unlock() 1428 rt.tokens += rt.ratio 1429 if rt.tokens > rt.max { 1430 rt.tokens = rt.max 1431 } 1432} 1433 1434type channelzChannel struct { 1435 cc *ClientConn 1436} 1437 1438func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric { 1439 return c.cc.channelzMetric() 1440} 1441 1442// ErrClientConnTimeout indicates that the ClientConn cannot establish the 1443// underlying connections within the specified timeout. 1444// 1445// Deprecated: This error is never returned by grpc and should not be 1446// referenced by users. 1447var ErrClientConnTimeout = errors.New("grpc: timed out when dialing") 1448