1/* 2 * 3 * Copyright 2014, Google Inc. 4 * All rights reserved. 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions are 8 * met: 9 * 10 * * Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * * Redistributions in binary form must reproduce the above 13 * copyright notice, this list of conditions and the following disclaimer 14 * in the documentation and/or other materials provided with the 15 * distribution. 16 * * Neither the name of Google Inc. nor the names of its 17 * contributors may be used to endorse or promote products derived from 18 * this software without specific prior written permission. 19 * 20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 31 * 32 */ 33 34package grpc 35 36import ( 37 "errors" 38 "fmt" 39 "net" 40 "strings" 41 "sync" 42 "time" 43 44 "golang.org/x/net/context" 45 "golang.org/x/net/trace" 46 "google.golang.org/grpc/credentials" 47 "google.golang.org/grpc/grpclog" 48 "google.golang.org/grpc/transport" 49) 50 51var ( 52 // ErrClientConnClosing indicates that the operation is illegal because 53 // the ClientConn is closing. 54 ErrClientConnClosing = errors.New("grpc: the client connection is closing") 55 // ErrClientConnTimeout indicates that the ClientConn cannot establish the 56 // underlying connections within the specified timeout. 57 ErrClientConnTimeout = errors.New("grpc: timed out when dialing") 58 59 // errNoTransportSecurity indicates that there is no transport security 60 // being set for ClientConn. Users should either set one or explicitly 61 // call WithInsecure DialOption to disable security. 62 errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)") 63 // errTransportCredentialsMissing indicates that users want to transmit security 64 // information (e.g., oauth2 token) which requires secure connection on an insecure 65 // connection. 66 errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)") 67 // errCredentialsConflict indicates that grpc.WithTransportCredentials() 68 // and grpc.WithInsecure() are both called for a connection. 69 errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)") 70 // errNetworkIO indicates that the connection is down due to some network I/O error. 71 errNetworkIO = errors.New("grpc: failed with network I/O error") 72 // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs. 73 errConnDrain = errors.New("grpc: the connection is drained") 74 // errConnClosing indicates that the connection is closing. 75 errConnClosing = errors.New("grpc: the connection is closing") 76 // errConnUnavailable indicates that the connection is unavailable. 77 errConnUnavailable = errors.New("grpc: the connection is unavailable") 78 errNoAddr = errors.New("grpc: there is no address available to dial") 79 // minimum time to give a connection to complete 80 minConnectTimeout = 20 * time.Second 81) 82 83// dialOptions configure a Dial call. dialOptions are set by the DialOption 84// values passed to Dial. 85type dialOptions struct { 86 unaryInt UnaryClientInterceptor 87 streamInt StreamClientInterceptor 88 codec Codec 89 cp Compressor 90 dc Decompressor 91 bs backoffStrategy 92 balancer Balancer 93 block bool 94 insecure bool 95 timeout time.Duration 96 copts transport.ConnectOptions 97} 98 99// DialOption configures how we set up the connection. 100type DialOption func(*dialOptions) 101 102// WithCodec returns a DialOption which sets a codec for message marshaling and unmarshaling. 103func WithCodec(c Codec) DialOption { 104 return func(o *dialOptions) { 105 o.codec = c 106 } 107} 108 109// WithCompressor returns a DialOption which sets a CompressorGenerator for generating message 110// compressor. 111func WithCompressor(cp Compressor) DialOption { 112 return func(o *dialOptions) { 113 o.cp = cp 114 } 115} 116 117// WithDecompressor returns a DialOption which sets a DecompressorGenerator for generating 118// message decompressor. 119func WithDecompressor(dc Decompressor) DialOption { 120 return func(o *dialOptions) { 121 o.dc = dc 122 } 123} 124 125// WithBalancer returns a DialOption which sets a load balancer. 126func WithBalancer(b Balancer) DialOption { 127 return func(o *dialOptions) { 128 o.balancer = b 129 } 130} 131 132// WithBackoffMaxDelay configures the dialer to use the provided maximum delay 133// when backing off after failed connection attempts. 134func WithBackoffMaxDelay(md time.Duration) DialOption { 135 return WithBackoffConfig(BackoffConfig{MaxDelay: md}) 136} 137 138// WithBackoffConfig configures the dialer to use the provided backoff 139// parameters after connection failures. 140// 141// Use WithBackoffMaxDelay until more parameters on BackoffConfig are opened up 142// for use. 143func WithBackoffConfig(b BackoffConfig) DialOption { 144 // Set defaults to ensure that provided BackoffConfig is valid and 145 // unexported fields get default values. 146 setDefaults(&b) 147 return withBackoff(b) 148} 149 150// withBackoff sets the backoff strategy used for retries after a 151// failed connection attempt. 152// 153// This can be exported if arbitrary backoff strategies are allowed by gRPC. 154func withBackoff(bs backoffStrategy) DialOption { 155 return func(o *dialOptions) { 156 o.bs = bs 157 } 158} 159 160// WithBlock returns a DialOption which makes caller of Dial blocks until the underlying 161// connection is up. Without this, Dial returns immediately and connecting the server 162// happens in background. 163func WithBlock() DialOption { 164 return func(o *dialOptions) { 165 o.block = true 166 } 167} 168 169// WithInsecure returns a DialOption which disables transport security for this ClientConn. 170// Note that transport security is required unless WithInsecure is set. 171func WithInsecure() DialOption { 172 return func(o *dialOptions) { 173 o.insecure = true 174 } 175} 176 177// WithTransportCredentials returns a DialOption which configures a 178// connection level security credentials (e.g., TLS/SSL). 179func WithTransportCredentials(creds credentials.TransportCredentials) DialOption { 180 return func(o *dialOptions) { 181 o.copts.TransportCredentials = creds 182 } 183} 184 185// WithPerRPCCredentials returns a DialOption which sets 186// credentials which will place auth state on each outbound RPC. 187func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption { 188 return func(o *dialOptions) { 189 o.copts.PerRPCCredentials = append(o.copts.PerRPCCredentials, creds) 190 } 191} 192 193// WithTimeout returns a DialOption that configures a timeout for dialing a ClientConn 194// initially. This is valid if and only if WithBlock() is present. 195func WithTimeout(d time.Duration) DialOption { 196 return func(o *dialOptions) { 197 o.timeout = d 198 } 199} 200 201// WithDialer returns a DialOption that specifies a function to use for dialing network addresses. 202func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption { 203 return func(o *dialOptions) { 204 o.copts.Dialer = func(ctx context.Context, addr string) (net.Conn, error) { 205 if deadline, ok := ctx.Deadline(); ok { 206 return f(addr, deadline.Sub(time.Now())) 207 } 208 return f(addr, 0) 209 } 210 } 211} 212 213// WithUserAgent returns a DialOption that specifies a user agent string for all the RPCs. 214func WithUserAgent(s string) DialOption { 215 return func(o *dialOptions) { 216 o.copts.UserAgent = s 217 } 218} 219 220// WithUnaryInterceptor returns a DialOption that specifies the interceptor for unary RPCs. 221func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption { 222 return func(o *dialOptions) { 223 o.unaryInt = f 224 } 225} 226 227// WithStreamInterceptor returns a DialOption that specifies the interceptor for streaming RPCs. 228func WithStreamInterceptor(f StreamClientInterceptor) DialOption { 229 return func(o *dialOptions) { 230 o.streamInt = f 231 } 232} 233 234// Dial creates a client connection to the given target. 235func Dial(target string, opts ...DialOption) (*ClientConn, error) { 236 return DialContext(context.Background(), target, opts...) 237} 238 239// DialContext creates a client connection to the given target. ctx can be used to 240// cancel or expire the pending connecting. Once this function returns, the 241// cancellation and expiration of ctx will be noop. Users should call ClientConn.Close 242// to terminate all the pending operations after this function returns. 243// This is the EXPERIMENTAL API. 244func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) { 245 cc := &ClientConn{ 246 target: target, 247 conns: make(map[Address]*addrConn), 248 } 249 cc.ctx, cc.cancel = context.WithCancel(context.Background()) 250 defer func() { 251 select { 252 case <-ctx.Done(): 253 conn, err = nil, ctx.Err() 254 default: 255 } 256 257 if err != nil { 258 cc.Close() 259 } 260 }() 261 262 for _, opt := range opts { 263 opt(&cc.dopts) 264 } 265 266 // Set defaults. 267 if cc.dopts.codec == nil { 268 cc.dopts.codec = protoCodec{} 269 } 270 if cc.dopts.bs == nil { 271 cc.dopts.bs = DefaultBackoffConfig 272 } 273 creds := cc.dopts.copts.TransportCredentials 274 if creds != nil && creds.Info().ServerName != "" { 275 cc.authority = creds.Info().ServerName 276 } else { 277 colonPos := strings.LastIndex(target, ":") 278 if colonPos == -1 { 279 colonPos = len(target) 280 } 281 cc.authority = target[:colonPos] 282 } 283 var ok bool 284 waitC := make(chan error, 1) 285 go func() { 286 var addrs []Address 287 if cc.dopts.balancer == nil { 288 // Connect to target directly if balancer is nil. 289 addrs = append(addrs, Address{Addr: target}) 290 } else { 291 var credsClone credentials.TransportCredentials 292 if creds != nil { 293 credsClone = creds.Clone() 294 } 295 config := BalancerConfig{ 296 DialCreds: credsClone, 297 } 298 if err := cc.dopts.balancer.Start(target, config); err != nil { 299 waitC <- err 300 return 301 } 302 ch := cc.dopts.balancer.Notify() 303 if ch == nil { 304 // There is no name resolver installed. 305 addrs = append(addrs, Address{Addr: target}) 306 } else { 307 addrs, ok = <-ch 308 if !ok || len(addrs) == 0 { 309 waitC <- errNoAddr 310 return 311 } 312 } 313 } 314 for _, a := range addrs { 315 if err := cc.resetAddrConn(a, false, nil); err != nil { 316 waitC <- err 317 return 318 } 319 } 320 close(waitC) 321 }() 322 var timeoutCh <-chan time.Time 323 if cc.dopts.timeout > 0 { 324 timeoutCh = time.After(cc.dopts.timeout) 325 } 326 select { 327 case <-ctx.Done(): 328 return nil, ctx.Err() 329 case err := <-waitC: 330 if err != nil { 331 return nil, err 332 } 333 case <-timeoutCh: 334 return nil, ErrClientConnTimeout 335 } 336 // If balancer is nil or balancer.Notify() is nil, ok will be false here. 337 // The lbWatcher goroutine will not be created. 338 if ok { 339 go cc.lbWatcher() 340 } 341 return cc, nil 342} 343 344// ConnectivityState indicates the state of a client connection. 345type ConnectivityState int 346 347const ( 348 // Idle indicates the ClientConn is idle. 349 Idle ConnectivityState = iota 350 // Connecting indicates the ClienConn is connecting. 351 Connecting 352 // Ready indicates the ClientConn is ready for work. 353 Ready 354 // TransientFailure indicates the ClientConn has seen a failure but expects to recover. 355 TransientFailure 356 // Shutdown indicates the ClientConn has started shutting down. 357 Shutdown 358) 359 360func (s ConnectivityState) String() string { 361 switch s { 362 case Idle: 363 return "IDLE" 364 case Connecting: 365 return "CONNECTING" 366 case Ready: 367 return "READY" 368 case TransientFailure: 369 return "TRANSIENT_FAILURE" 370 case Shutdown: 371 return "SHUTDOWN" 372 default: 373 panic(fmt.Sprintf("unknown connectivity state: %d", s)) 374 } 375} 376 377// ClientConn represents a client connection to an RPC server. 378type ClientConn struct { 379 ctx context.Context 380 cancel context.CancelFunc 381 382 target string 383 authority string 384 dopts dialOptions 385 386 mu sync.RWMutex 387 conns map[Address]*addrConn 388} 389 390func (cc *ClientConn) lbWatcher() { 391 for addrs := range cc.dopts.balancer.Notify() { 392 var ( 393 add []Address // Addresses need to setup connections. 394 del []*addrConn // Connections need to tear down. 395 ) 396 cc.mu.Lock() 397 for _, a := range addrs { 398 if _, ok := cc.conns[a]; !ok { 399 add = append(add, a) 400 } 401 } 402 for k, c := range cc.conns { 403 var keep bool 404 for _, a := range addrs { 405 if k == a { 406 keep = true 407 break 408 } 409 } 410 if !keep { 411 del = append(del, c) 412 delete(cc.conns, c.addr) 413 } 414 } 415 cc.mu.Unlock() 416 for _, a := range add { 417 cc.resetAddrConn(a, true, nil) 418 } 419 for _, c := range del { 420 c.tearDown(errConnDrain) 421 } 422 } 423} 424 425// resetAddrConn creates an addrConn for addr and adds it to cc.conns. 426// If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason. 427// If tearDownErr is nil, errConnDrain will be used instead. 428func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr error) error { 429 ac := &addrConn{ 430 cc: cc, 431 addr: addr, 432 dopts: cc.dopts, 433 } 434 ac.ctx, ac.cancel = context.WithCancel(cc.ctx) 435 ac.stateCV = sync.NewCond(&ac.mu) 436 if EnableTracing { 437 ac.events = trace.NewEventLog("grpc.ClientConn", ac.addr.Addr) 438 } 439 if !ac.dopts.insecure { 440 if ac.dopts.copts.TransportCredentials == nil { 441 return errNoTransportSecurity 442 } 443 } else { 444 if ac.dopts.copts.TransportCredentials != nil { 445 return errCredentialsConflict 446 } 447 for _, cd := range ac.dopts.copts.PerRPCCredentials { 448 if cd.RequireTransportSecurity() { 449 return errTransportCredentialsMissing 450 } 451 } 452 } 453 // Track ac in cc. This needs to be done before any getTransport(...) is called. 454 cc.mu.Lock() 455 if cc.conns == nil { 456 cc.mu.Unlock() 457 return ErrClientConnClosing 458 } 459 stale := cc.conns[ac.addr] 460 cc.conns[ac.addr] = ac 461 cc.mu.Unlock() 462 if stale != nil { 463 // There is an addrConn alive on ac.addr already. This could be due to 464 // 1) a buggy Balancer notifies duplicated Addresses; 465 // 2) goaway was received, a new ac will replace the old ac. 466 // The old ac should be deleted from cc.conns, but the 467 // underlying transport should drain rather than close. 468 if tearDownErr == nil { 469 // tearDownErr is nil if resetAddrConn is called by 470 // 1) Dial 471 // 2) lbWatcher 472 // In both cases, the stale ac should drain, not close. 473 stale.tearDown(errConnDrain) 474 } else { 475 stale.tearDown(tearDownErr) 476 } 477 } 478 // skipWait may overwrite the decision in ac.dopts.block. 479 if ac.dopts.block && !skipWait { 480 if err := ac.resetTransport(false); err != nil { 481 if err != errConnClosing { 482 // Tear down ac and delete it from cc.conns. 483 cc.mu.Lock() 484 delete(cc.conns, ac.addr) 485 cc.mu.Unlock() 486 ac.tearDown(err) 487 } 488 if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() { 489 return e.Origin() 490 } 491 return err 492 } 493 // Start to monitor the error status of transport. 494 go ac.transportMonitor() 495 } else { 496 // Start a goroutine connecting to the server asynchronously. 497 go func() { 498 if err := ac.resetTransport(false); err != nil { 499 grpclog.Printf("Failed to dial %s: %v; please retry.", ac.addr.Addr, err) 500 if err != errConnClosing { 501 // Keep this ac in cc.conns, to get the reason it's torn down. 502 ac.tearDown(err) 503 } 504 return 505 } 506 ac.transportMonitor() 507 }() 508 } 509 return nil 510} 511 512func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) { 513 var ( 514 ac *addrConn 515 ok bool 516 put func() 517 ) 518 if cc.dopts.balancer == nil { 519 // If balancer is nil, there should be only one addrConn available. 520 cc.mu.RLock() 521 if cc.conns == nil { 522 cc.mu.RUnlock() 523 return nil, nil, toRPCErr(ErrClientConnClosing) 524 } 525 for _, ac = range cc.conns { 526 // Break after the first iteration to get the first addrConn. 527 ok = true 528 break 529 } 530 cc.mu.RUnlock() 531 } else { 532 var ( 533 addr Address 534 err error 535 ) 536 addr, put, err = cc.dopts.balancer.Get(ctx, opts) 537 if err != nil { 538 return nil, nil, toRPCErr(err) 539 } 540 cc.mu.RLock() 541 if cc.conns == nil { 542 cc.mu.RUnlock() 543 return nil, nil, toRPCErr(ErrClientConnClosing) 544 } 545 ac, ok = cc.conns[addr] 546 cc.mu.RUnlock() 547 } 548 if !ok { 549 if put != nil { 550 put() 551 } 552 return nil, nil, errConnClosing 553 } 554 t, err := ac.wait(ctx, cc.dopts.balancer != nil, !opts.BlockingWait) 555 if err != nil { 556 if put != nil { 557 put() 558 } 559 return nil, nil, err 560 } 561 return t, put, nil 562} 563 564// Close tears down the ClientConn and all underlying connections. 565func (cc *ClientConn) Close() error { 566 cc.cancel() 567 568 cc.mu.Lock() 569 if cc.conns == nil { 570 cc.mu.Unlock() 571 return ErrClientConnClosing 572 } 573 conns := cc.conns 574 cc.conns = nil 575 cc.mu.Unlock() 576 if cc.dopts.balancer != nil { 577 cc.dopts.balancer.Close() 578 } 579 for _, ac := range conns { 580 ac.tearDown(ErrClientConnClosing) 581 } 582 return nil 583} 584 585// addrConn is a network connection to a given address. 586type addrConn struct { 587 ctx context.Context 588 cancel context.CancelFunc 589 590 cc *ClientConn 591 addr Address 592 dopts dialOptions 593 events trace.EventLog 594 595 mu sync.Mutex 596 state ConnectivityState 597 stateCV *sync.Cond 598 down func(error) // the handler called when a connection is down. 599 // ready is closed and becomes nil when a new transport is up or failed 600 // due to timeout. 601 ready chan struct{} 602 transport transport.ClientTransport 603 604 // The reason this addrConn is torn down. 605 tearDownErr error 606} 607 608// printf records an event in ac's event log, unless ac has been closed. 609// REQUIRES ac.mu is held. 610func (ac *addrConn) printf(format string, a ...interface{}) { 611 if ac.events != nil { 612 ac.events.Printf(format, a...) 613 } 614} 615 616// errorf records an error in ac's event log, unless ac has been closed. 617// REQUIRES ac.mu is held. 618func (ac *addrConn) errorf(format string, a ...interface{}) { 619 if ac.events != nil { 620 ac.events.Errorf(format, a...) 621 } 622} 623 624// getState returns the connectivity state of the Conn 625func (ac *addrConn) getState() ConnectivityState { 626 ac.mu.Lock() 627 defer ac.mu.Unlock() 628 return ac.state 629} 630 631// waitForStateChange blocks until the state changes to something other than the sourceState. 632func (ac *addrConn) waitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error) { 633 ac.mu.Lock() 634 defer ac.mu.Unlock() 635 if sourceState != ac.state { 636 return ac.state, nil 637 } 638 done := make(chan struct{}) 639 var err error 640 go func() { 641 select { 642 case <-ctx.Done(): 643 ac.mu.Lock() 644 err = ctx.Err() 645 ac.stateCV.Broadcast() 646 ac.mu.Unlock() 647 case <-done: 648 } 649 }() 650 defer close(done) 651 for sourceState == ac.state { 652 ac.stateCV.Wait() 653 if err != nil { 654 return ac.state, err 655 } 656 } 657 return ac.state, nil 658} 659 660func (ac *addrConn) resetTransport(closeTransport bool) error { 661 for retries := 0; ; retries++ { 662 ac.mu.Lock() 663 ac.printf("connecting") 664 if ac.state == Shutdown { 665 // ac.tearDown(...) has been invoked. 666 ac.mu.Unlock() 667 return errConnClosing 668 } 669 if ac.down != nil { 670 ac.down(downErrorf(false, true, "%v", errNetworkIO)) 671 ac.down = nil 672 } 673 ac.state = Connecting 674 ac.stateCV.Broadcast() 675 t := ac.transport 676 ac.mu.Unlock() 677 if closeTransport && t != nil { 678 t.Close() 679 } 680 sleepTime := ac.dopts.bs.backoff(retries) 681 timeout := minConnectTimeout 682 if timeout < sleepTime { 683 timeout = sleepTime 684 } 685 ctx, cancel := context.WithTimeout(ac.ctx, timeout) 686 connectTime := time.Now() 687 sinfo := transport.TargetInfo{ 688 Addr: ac.addr.Addr, 689 Metadata: ac.addr.Metadata, 690 } 691 newTransport, err := transport.NewClientTransport(ctx, sinfo, ac.dopts.copts) 692 if err != nil { 693 cancel() 694 695 if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() { 696 return err 697 } 698 grpclog.Printf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %v", err, ac.addr) 699 ac.mu.Lock() 700 if ac.state == Shutdown { 701 // ac.tearDown(...) has been invoked. 702 ac.mu.Unlock() 703 return errConnClosing 704 } 705 ac.errorf("transient failure: %v", err) 706 ac.state = TransientFailure 707 ac.stateCV.Broadcast() 708 if ac.ready != nil { 709 close(ac.ready) 710 ac.ready = nil 711 } 712 ac.mu.Unlock() 713 closeTransport = false 714 select { 715 case <-time.After(sleepTime - time.Since(connectTime)): 716 case <-ac.ctx.Done(): 717 return ac.ctx.Err() 718 } 719 continue 720 } 721 ac.mu.Lock() 722 ac.printf("ready") 723 if ac.state == Shutdown { 724 // ac.tearDown(...) has been invoked. 725 ac.mu.Unlock() 726 newTransport.Close() 727 return errConnClosing 728 } 729 ac.state = Ready 730 ac.stateCV.Broadcast() 731 ac.transport = newTransport 732 if ac.ready != nil { 733 close(ac.ready) 734 ac.ready = nil 735 } 736 if ac.cc.dopts.balancer != nil { 737 ac.down = ac.cc.dopts.balancer.Up(ac.addr) 738 } 739 ac.mu.Unlock() 740 return nil 741 } 742} 743 744// Run in a goroutine to track the error in transport and create the 745// new transport if an error happens. It returns when the channel is closing. 746func (ac *addrConn) transportMonitor() { 747 for { 748 ac.mu.Lock() 749 t := ac.transport 750 ac.mu.Unlock() 751 select { 752 // This is needed to detect the teardown when 753 // the addrConn is idle (i.e., no RPC in flight). 754 case <-ac.ctx.Done(): 755 select { 756 case <-t.Error(): 757 t.Close() 758 default: 759 } 760 return 761 case <-t.GoAway(): 762 // If GoAway happens without any network I/O error, ac is closed without shutting down the 763 // underlying transport (the transport will be closed when all the pending RPCs finished or 764 // failed.). 765 // If GoAway and some network I/O error happen concurrently, ac and its underlying transport 766 // are closed. 767 // In both cases, a new ac is created. 768 select { 769 case <-t.Error(): 770 ac.cc.resetAddrConn(ac.addr, true, errNetworkIO) 771 default: 772 ac.cc.resetAddrConn(ac.addr, true, errConnDrain) 773 } 774 return 775 case <-t.Error(): 776 select { 777 case <-ac.ctx.Done(): 778 t.Close() 779 return 780 case <-t.GoAway(): 781 ac.cc.resetAddrConn(ac.addr, true, errNetworkIO) 782 return 783 default: 784 } 785 ac.mu.Lock() 786 if ac.state == Shutdown { 787 // ac has been shutdown. 788 ac.mu.Unlock() 789 return 790 } 791 ac.state = TransientFailure 792 ac.stateCV.Broadcast() 793 ac.mu.Unlock() 794 if err := ac.resetTransport(true); err != nil { 795 ac.mu.Lock() 796 ac.printf("transport exiting: %v", err) 797 ac.mu.Unlock() 798 grpclog.Printf("grpc: addrConn.transportMonitor exits due to: %v", err) 799 if err != errConnClosing { 800 // Keep this ac in cc.conns, to get the reason it's torn down. 801 ac.tearDown(err) 802 } 803 return 804 } 805 } 806 } 807} 808 809// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or 810// iv) transport is in TransientFailure and there is a balancer/failfast is true. 811func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) { 812 for { 813 ac.mu.Lock() 814 switch { 815 case ac.state == Shutdown: 816 if failfast || !hasBalancer { 817 // RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr. 818 err := ac.tearDownErr 819 ac.mu.Unlock() 820 return nil, err 821 } 822 ac.mu.Unlock() 823 return nil, errConnClosing 824 case ac.state == Ready: 825 ct := ac.transport 826 ac.mu.Unlock() 827 return ct, nil 828 case ac.state == TransientFailure: 829 if failfast || hasBalancer { 830 ac.mu.Unlock() 831 return nil, errConnUnavailable 832 } 833 } 834 ready := ac.ready 835 if ready == nil { 836 ready = make(chan struct{}) 837 ac.ready = ready 838 } 839 ac.mu.Unlock() 840 select { 841 case <-ctx.Done(): 842 return nil, toRPCErr(ctx.Err()) 843 // Wait until the new transport is ready or failed. 844 case <-ready: 845 } 846 } 847} 848 849// tearDown starts to tear down the addrConn. 850// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in 851// some edge cases (e.g., the caller opens and closes many addrConn's in a 852// tight loop. 853// tearDown doesn't remove ac from ac.cc.conns. 854func (ac *addrConn) tearDown(err error) { 855 ac.cancel() 856 857 ac.mu.Lock() 858 defer ac.mu.Unlock() 859 if ac.down != nil { 860 ac.down(downErrorf(false, false, "%v", err)) 861 ac.down = nil 862 } 863 if err == errConnDrain && ac.transport != nil { 864 // GracefulClose(...) may be executed multiple times when 865 // i) receiving multiple GoAway frames from the server; or 866 // ii) there are concurrent name resolver/Balancer triggered 867 // address removal and GoAway. 868 ac.transport.GracefulClose() 869 } 870 if ac.state == Shutdown { 871 return 872 } 873 ac.state = Shutdown 874 ac.tearDownErr = err 875 ac.stateCV.Broadcast() 876 if ac.events != nil { 877 ac.events.Finish() 878 ac.events = nil 879 } 880 if ac.ready != nil { 881 close(ac.ready) 882 ac.ready = nil 883 } 884 if ac.transport != nil && err != errConnDrain { 885 ac.transport.Close() 886 } 887 return 888} 889