1/*
2 *
3 * Copyright 2014, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 *     * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 *     * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 *     * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34package grpc
35
36import (
37	"errors"
38	"fmt"
39	"net"
40	"strings"
41	"sync"
42	"time"
43
44	"golang.org/x/net/context"
45	"golang.org/x/net/trace"
46	"google.golang.org/grpc/credentials"
47	"google.golang.org/grpc/grpclog"
48	"google.golang.org/grpc/transport"
49)
50
51var (
52	// ErrClientConnClosing indicates that the operation is illegal because
53	// the ClientConn is closing.
54	ErrClientConnClosing = errors.New("grpc: the client connection is closing")
55	// ErrClientConnTimeout indicates that the ClientConn cannot establish the
56	// underlying connections within the specified timeout.
57	ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
58
59	// errNoTransportSecurity indicates that there is no transport security
60	// being set for ClientConn. Users should either set one or explicitly
61	// call WithInsecure DialOption to disable security.
62	errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
63	// errTransportCredentialsMissing indicates that users want to transmit security
64	// information (e.g., oauth2 token) which requires secure connection on an insecure
65	// connection.
66	errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
67	// errCredentialsConflict indicates that grpc.WithTransportCredentials()
68	// and grpc.WithInsecure() are both called for a connection.
69	errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
70	// errNetworkIO indicates that the connection is down due to some network I/O error.
71	errNetworkIO = errors.New("grpc: failed with network I/O error")
72	// errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
73	errConnDrain = errors.New("grpc: the connection is drained")
74	// errConnClosing indicates that the connection is closing.
75	errConnClosing = errors.New("grpc: the connection is closing")
76	// errConnUnavailable indicates that the connection is unavailable.
77	errConnUnavailable = errors.New("grpc: the connection is unavailable")
78	errNoAddr          = errors.New("grpc: there is no address available to dial")
79	// minimum time to give a connection to complete
80	minConnectTimeout = 20 * time.Second
81)
82
83// dialOptions configure a Dial call. dialOptions are set by the DialOption
84// values passed to Dial.
85type dialOptions struct {
86	unaryInt  UnaryClientInterceptor
87	streamInt StreamClientInterceptor
88	codec     Codec
89	cp        Compressor
90	dc        Decompressor
91	bs        backoffStrategy
92	balancer  Balancer
93	block     bool
94	insecure  bool
95	timeout   time.Duration
96	copts     transport.ConnectOptions
97}
98
99// DialOption configures how we set up the connection.
100type DialOption func(*dialOptions)
101
102// WithCodec returns a DialOption which sets a codec for message marshaling and unmarshaling.
103func WithCodec(c Codec) DialOption {
104	return func(o *dialOptions) {
105		o.codec = c
106	}
107}
108
109// WithCompressor returns a DialOption which sets a CompressorGenerator for generating message
110// compressor.
111func WithCompressor(cp Compressor) DialOption {
112	return func(o *dialOptions) {
113		o.cp = cp
114	}
115}
116
117// WithDecompressor returns a DialOption which sets a DecompressorGenerator for generating
118// message decompressor.
119func WithDecompressor(dc Decompressor) DialOption {
120	return func(o *dialOptions) {
121		o.dc = dc
122	}
123}
124
125// WithBalancer returns a DialOption which sets a load balancer.
126func WithBalancer(b Balancer) DialOption {
127	return func(o *dialOptions) {
128		o.balancer = b
129	}
130}
131
132// WithBackoffMaxDelay configures the dialer to use the provided maximum delay
133// when backing off after failed connection attempts.
134func WithBackoffMaxDelay(md time.Duration) DialOption {
135	return WithBackoffConfig(BackoffConfig{MaxDelay: md})
136}
137
138// WithBackoffConfig configures the dialer to use the provided backoff
139// parameters after connection failures.
140//
141// Use WithBackoffMaxDelay until more parameters on BackoffConfig are opened up
142// for use.
143func WithBackoffConfig(b BackoffConfig) DialOption {
144	// Set defaults to ensure that provided BackoffConfig is valid and
145	// unexported fields get default values.
146	setDefaults(&b)
147	return withBackoff(b)
148}
149
150// withBackoff sets the backoff strategy used for retries after a
151// failed connection attempt.
152//
153// This can be exported if arbitrary backoff strategies are allowed by gRPC.
154func withBackoff(bs backoffStrategy) DialOption {
155	return func(o *dialOptions) {
156		o.bs = bs
157	}
158}
159
160// WithBlock returns a DialOption which makes caller of Dial blocks until the underlying
161// connection is up. Without this, Dial returns immediately and connecting the server
162// happens in background.
163func WithBlock() DialOption {
164	return func(o *dialOptions) {
165		o.block = true
166	}
167}
168
169// WithInsecure returns a DialOption which disables transport security for this ClientConn.
170// Note that transport security is required unless WithInsecure is set.
171func WithInsecure() DialOption {
172	return func(o *dialOptions) {
173		o.insecure = true
174	}
175}
176
177// WithTransportCredentials returns a DialOption which configures a
178// connection level security credentials (e.g., TLS/SSL).
179func WithTransportCredentials(creds credentials.TransportCredentials) DialOption {
180	return func(o *dialOptions) {
181		o.copts.TransportCredentials = creds
182	}
183}
184
185// WithPerRPCCredentials returns a DialOption which sets
186// credentials which will place auth state on each outbound RPC.
187func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption {
188	return func(o *dialOptions) {
189		o.copts.PerRPCCredentials = append(o.copts.PerRPCCredentials, creds)
190	}
191}
192
193// WithTimeout returns a DialOption that configures a timeout for dialing a ClientConn
194// initially. This is valid if and only if WithBlock() is present.
195func WithTimeout(d time.Duration) DialOption {
196	return func(o *dialOptions) {
197		o.timeout = d
198	}
199}
200
201// WithDialer returns a DialOption that specifies a function to use for dialing network addresses.
202func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption {
203	return func(o *dialOptions) {
204		o.copts.Dialer = func(ctx context.Context, addr string) (net.Conn, error) {
205			if deadline, ok := ctx.Deadline(); ok {
206				return f(addr, deadline.Sub(time.Now()))
207			}
208			return f(addr, 0)
209		}
210	}
211}
212
213// WithUserAgent returns a DialOption that specifies a user agent string for all the RPCs.
214func WithUserAgent(s string) DialOption {
215	return func(o *dialOptions) {
216		o.copts.UserAgent = s
217	}
218}
219
220// WithUnaryInterceptor returns a DialOption that specifies the interceptor for unary RPCs.
221func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {
222	return func(o *dialOptions) {
223		o.unaryInt = f
224	}
225}
226
227// WithStreamInterceptor returns a DialOption that specifies the interceptor for streaming RPCs.
228func WithStreamInterceptor(f StreamClientInterceptor) DialOption {
229	return func(o *dialOptions) {
230		o.streamInt = f
231	}
232}
233
234// Dial creates a client connection to the given target.
235func Dial(target string, opts ...DialOption) (*ClientConn, error) {
236	return DialContext(context.Background(), target, opts...)
237}
238
239// DialContext creates a client connection to the given target. ctx can be used to
240// cancel or expire the pending connecting. Once this function returns, the
241// cancellation and expiration of ctx will be noop. Users should call ClientConn.Close
242// to terminate all the pending operations after this function returns.
243// This is the EXPERIMENTAL API.
244func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
245	cc := &ClientConn{
246		target: target,
247		conns:  make(map[Address]*addrConn),
248	}
249	cc.ctx, cc.cancel = context.WithCancel(context.Background())
250	defer func() {
251		select {
252		case <-ctx.Done():
253			conn, err = nil, ctx.Err()
254		default:
255		}
256
257		if err != nil {
258			cc.Close()
259		}
260	}()
261
262	for _, opt := range opts {
263		opt(&cc.dopts)
264	}
265
266	// Set defaults.
267	if cc.dopts.codec == nil {
268		cc.dopts.codec = protoCodec{}
269	}
270	if cc.dopts.bs == nil {
271		cc.dopts.bs = DefaultBackoffConfig
272	}
273	creds := cc.dopts.copts.TransportCredentials
274	if creds != nil && creds.Info().ServerName != "" {
275		cc.authority = creds.Info().ServerName
276	} else {
277		colonPos := strings.LastIndex(target, ":")
278		if colonPos == -1 {
279			colonPos = len(target)
280		}
281		cc.authority = target[:colonPos]
282	}
283	var ok bool
284	waitC := make(chan error, 1)
285	go func() {
286		var addrs []Address
287		if cc.dopts.balancer == nil {
288			// Connect to target directly if balancer is nil.
289			addrs = append(addrs, Address{Addr: target})
290		} else {
291			var credsClone credentials.TransportCredentials
292			if creds != nil {
293				credsClone = creds.Clone()
294			}
295			config := BalancerConfig{
296				DialCreds: credsClone,
297			}
298			if err := cc.dopts.balancer.Start(target, config); err != nil {
299				waitC <- err
300				return
301			}
302			ch := cc.dopts.balancer.Notify()
303			if ch == nil {
304				// There is no name resolver installed.
305				addrs = append(addrs, Address{Addr: target})
306			} else {
307				addrs, ok = <-ch
308				if !ok || len(addrs) == 0 {
309					waitC <- errNoAddr
310					return
311				}
312			}
313		}
314		for _, a := range addrs {
315			if err := cc.resetAddrConn(a, false, nil); err != nil {
316				waitC <- err
317				return
318			}
319		}
320		close(waitC)
321	}()
322	var timeoutCh <-chan time.Time
323	if cc.dopts.timeout > 0 {
324		timeoutCh = time.After(cc.dopts.timeout)
325	}
326	select {
327	case <-ctx.Done():
328		return nil, ctx.Err()
329	case err := <-waitC:
330		if err != nil {
331			return nil, err
332		}
333	case <-timeoutCh:
334		return nil, ErrClientConnTimeout
335	}
336	// If balancer is nil or balancer.Notify() is nil, ok will be false here.
337	// The lbWatcher goroutine will not be created.
338	if ok {
339		go cc.lbWatcher()
340	}
341	return cc, nil
342}
343
344// ConnectivityState indicates the state of a client connection.
345type ConnectivityState int
346
347const (
348	// Idle indicates the ClientConn is idle.
349	Idle ConnectivityState = iota
350	// Connecting indicates the ClienConn is connecting.
351	Connecting
352	// Ready indicates the ClientConn is ready for work.
353	Ready
354	// TransientFailure indicates the ClientConn has seen a failure but expects to recover.
355	TransientFailure
356	// Shutdown indicates the ClientConn has started shutting down.
357	Shutdown
358)
359
360func (s ConnectivityState) String() string {
361	switch s {
362	case Idle:
363		return "IDLE"
364	case Connecting:
365		return "CONNECTING"
366	case Ready:
367		return "READY"
368	case TransientFailure:
369		return "TRANSIENT_FAILURE"
370	case Shutdown:
371		return "SHUTDOWN"
372	default:
373		panic(fmt.Sprintf("unknown connectivity state: %d", s))
374	}
375}
376
377// ClientConn represents a client connection to an RPC server.
378type ClientConn struct {
379	ctx    context.Context
380	cancel context.CancelFunc
381
382	target    string
383	authority string
384	dopts     dialOptions
385
386	mu    sync.RWMutex
387	conns map[Address]*addrConn
388}
389
390func (cc *ClientConn) lbWatcher() {
391	for addrs := range cc.dopts.balancer.Notify() {
392		var (
393			add []Address   // Addresses need to setup connections.
394			del []*addrConn // Connections need to tear down.
395		)
396		cc.mu.Lock()
397		for _, a := range addrs {
398			if _, ok := cc.conns[a]; !ok {
399				add = append(add, a)
400			}
401		}
402		for k, c := range cc.conns {
403			var keep bool
404			for _, a := range addrs {
405				if k == a {
406					keep = true
407					break
408				}
409			}
410			if !keep {
411				del = append(del, c)
412				delete(cc.conns, c.addr)
413			}
414		}
415		cc.mu.Unlock()
416		for _, a := range add {
417			cc.resetAddrConn(a, true, nil)
418		}
419		for _, c := range del {
420			c.tearDown(errConnDrain)
421		}
422	}
423}
424
425// resetAddrConn creates an addrConn for addr and adds it to cc.conns.
426// If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason.
427// If tearDownErr is nil, errConnDrain will be used instead.
428func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr error) error {
429	ac := &addrConn{
430		cc:    cc,
431		addr:  addr,
432		dopts: cc.dopts,
433	}
434	ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
435	ac.stateCV = sync.NewCond(&ac.mu)
436	if EnableTracing {
437		ac.events = trace.NewEventLog("grpc.ClientConn", ac.addr.Addr)
438	}
439	if !ac.dopts.insecure {
440		if ac.dopts.copts.TransportCredentials == nil {
441			return errNoTransportSecurity
442		}
443	} else {
444		if ac.dopts.copts.TransportCredentials != nil {
445			return errCredentialsConflict
446		}
447		for _, cd := range ac.dopts.copts.PerRPCCredentials {
448			if cd.RequireTransportSecurity() {
449				return errTransportCredentialsMissing
450			}
451		}
452	}
453	// Track ac in cc. This needs to be done before any getTransport(...) is called.
454	cc.mu.Lock()
455	if cc.conns == nil {
456		cc.mu.Unlock()
457		return ErrClientConnClosing
458	}
459	stale := cc.conns[ac.addr]
460	cc.conns[ac.addr] = ac
461	cc.mu.Unlock()
462	if stale != nil {
463		// There is an addrConn alive on ac.addr already. This could be due to
464		// 1) a buggy Balancer notifies duplicated Addresses;
465		// 2) goaway was received, a new ac will replace the old ac.
466		//    The old ac should be deleted from cc.conns, but the
467		//    underlying transport should drain rather than close.
468		if tearDownErr == nil {
469			// tearDownErr is nil if resetAddrConn is called by
470			// 1) Dial
471			// 2) lbWatcher
472			// In both cases, the stale ac should drain, not close.
473			stale.tearDown(errConnDrain)
474		} else {
475			stale.tearDown(tearDownErr)
476		}
477	}
478	// skipWait may overwrite the decision in ac.dopts.block.
479	if ac.dopts.block && !skipWait {
480		if err := ac.resetTransport(false); err != nil {
481			if err != errConnClosing {
482				// Tear down ac and delete it from cc.conns.
483				cc.mu.Lock()
484				delete(cc.conns, ac.addr)
485				cc.mu.Unlock()
486				ac.tearDown(err)
487			}
488			if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
489				return e.Origin()
490			}
491			return err
492		}
493		// Start to monitor the error status of transport.
494		go ac.transportMonitor()
495	} else {
496		// Start a goroutine connecting to the server asynchronously.
497		go func() {
498			if err := ac.resetTransport(false); err != nil {
499				grpclog.Printf("Failed to dial %s: %v; please retry.", ac.addr.Addr, err)
500				if err != errConnClosing {
501					// Keep this ac in cc.conns, to get the reason it's torn down.
502					ac.tearDown(err)
503				}
504				return
505			}
506			ac.transportMonitor()
507		}()
508	}
509	return nil
510}
511
512func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) {
513	var (
514		ac  *addrConn
515		ok  bool
516		put func()
517	)
518	if cc.dopts.balancer == nil {
519		// If balancer is nil, there should be only one addrConn available.
520		cc.mu.RLock()
521		if cc.conns == nil {
522			cc.mu.RUnlock()
523			return nil, nil, toRPCErr(ErrClientConnClosing)
524		}
525		for _, ac = range cc.conns {
526			// Break after the first iteration to get the first addrConn.
527			ok = true
528			break
529		}
530		cc.mu.RUnlock()
531	} else {
532		var (
533			addr Address
534			err  error
535		)
536		addr, put, err = cc.dopts.balancer.Get(ctx, opts)
537		if err != nil {
538			return nil, nil, toRPCErr(err)
539		}
540		cc.mu.RLock()
541		if cc.conns == nil {
542			cc.mu.RUnlock()
543			return nil, nil, toRPCErr(ErrClientConnClosing)
544		}
545		ac, ok = cc.conns[addr]
546		cc.mu.RUnlock()
547	}
548	if !ok {
549		if put != nil {
550			put()
551		}
552		return nil, nil, errConnClosing
553	}
554	t, err := ac.wait(ctx, cc.dopts.balancer != nil, !opts.BlockingWait)
555	if err != nil {
556		if put != nil {
557			put()
558		}
559		return nil, nil, err
560	}
561	return t, put, nil
562}
563
564// Close tears down the ClientConn and all underlying connections.
565func (cc *ClientConn) Close() error {
566	cc.cancel()
567
568	cc.mu.Lock()
569	if cc.conns == nil {
570		cc.mu.Unlock()
571		return ErrClientConnClosing
572	}
573	conns := cc.conns
574	cc.conns = nil
575	cc.mu.Unlock()
576	if cc.dopts.balancer != nil {
577		cc.dopts.balancer.Close()
578	}
579	for _, ac := range conns {
580		ac.tearDown(ErrClientConnClosing)
581	}
582	return nil
583}
584
585// addrConn is a network connection to a given address.
586type addrConn struct {
587	ctx    context.Context
588	cancel context.CancelFunc
589
590	cc     *ClientConn
591	addr   Address
592	dopts  dialOptions
593	events trace.EventLog
594
595	mu      sync.Mutex
596	state   ConnectivityState
597	stateCV *sync.Cond
598	down    func(error) // the handler called when a connection is down.
599	// ready is closed and becomes nil when a new transport is up or failed
600	// due to timeout.
601	ready     chan struct{}
602	transport transport.ClientTransport
603
604	// The reason this addrConn is torn down.
605	tearDownErr error
606}
607
608// printf records an event in ac's event log, unless ac has been closed.
609// REQUIRES ac.mu is held.
610func (ac *addrConn) printf(format string, a ...interface{}) {
611	if ac.events != nil {
612		ac.events.Printf(format, a...)
613	}
614}
615
616// errorf records an error in ac's event log, unless ac has been closed.
617// REQUIRES ac.mu is held.
618func (ac *addrConn) errorf(format string, a ...interface{}) {
619	if ac.events != nil {
620		ac.events.Errorf(format, a...)
621	}
622}
623
624// getState returns the connectivity state of the Conn
625func (ac *addrConn) getState() ConnectivityState {
626	ac.mu.Lock()
627	defer ac.mu.Unlock()
628	return ac.state
629}
630
631// waitForStateChange blocks until the state changes to something other than the sourceState.
632func (ac *addrConn) waitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error) {
633	ac.mu.Lock()
634	defer ac.mu.Unlock()
635	if sourceState != ac.state {
636		return ac.state, nil
637	}
638	done := make(chan struct{})
639	var err error
640	go func() {
641		select {
642		case <-ctx.Done():
643			ac.mu.Lock()
644			err = ctx.Err()
645			ac.stateCV.Broadcast()
646			ac.mu.Unlock()
647		case <-done:
648		}
649	}()
650	defer close(done)
651	for sourceState == ac.state {
652		ac.stateCV.Wait()
653		if err != nil {
654			return ac.state, err
655		}
656	}
657	return ac.state, nil
658}
659
660func (ac *addrConn) resetTransport(closeTransport bool) error {
661	for retries := 0; ; retries++ {
662		ac.mu.Lock()
663		ac.printf("connecting")
664		if ac.state == Shutdown {
665			// ac.tearDown(...) has been invoked.
666			ac.mu.Unlock()
667			return errConnClosing
668		}
669		if ac.down != nil {
670			ac.down(downErrorf(false, true, "%v", errNetworkIO))
671			ac.down = nil
672		}
673		ac.state = Connecting
674		ac.stateCV.Broadcast()
675		t := ac.transport
676		ac.mu.Unlock()
677		if closeTransport && t != nil {
678			t.Close()
679		}
680		sleepTime := ac.dopts.bs.backoff(retries)
681		timeout := minConnectTimeout
682		if timeout < sleepTime {
683			timeout = sleepTime
684		}
685		ctx, cancel := context.WithTimeout(ac.ctx, timeout)
686		connectTime := time.Now()
687		sinfo := transport.TargetInfo{
688			Addr:     ac.addr.Addr,
689			Metadata: ac.addr.Metadata,
690		}
691		newTransport, err := transport.NewClientTransport(ctx, sinfo, ac.dopts.copts)
692		if err != nil {
693			cancel()
694
695			if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
696				return err
697			}
698			grpclog.Printf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %v", err, ac.addr)
699			ac.mu.Lock()
700			if ac.state == Shutdown {
701				// ac.tearDown(...) has been invoked.
702				ac.mu.Unlock()
703				return errConnClosing
704			}
705			ac.errorf("transient failure: %v", err)
706			ac.state = TransientFailure
707			ac.stateCV.Broadcast()
708			if ac.ready != nil {
709				close(ac.ready)
710				ac.ready = nil
711			}
712			ac.mu.Unlock()
713			closeTransport = false
714			select {
715			case <-time.After(sleepTime - time.Since(connectTime)):
716			case <-ac.ctx.Done():
717				return ac.ctx.Err()
718			}
719			continue
720		}
721		ac.mu.Lock()
722		ac.printf("ready")
723		if ac.state == Shutdown {
724			// ac.tearDown(...) has been invoked.
725			ac.mu.Unlock()
726			newTransport.Close()
727			return errConnClosing
728		}
729		ac.state = Ready
730		ac.stateCV.Broadcast()
731		ac.transport = newTransport
732		if ac.ready != nil {
733			close(ac.ready)
734			ac.ready = nil
735		}
736		if ac.cc.dopts.balancer != nil {
737			ac.down = ac.cc.dopts.balancer.Up(ac.addr)
738		}
739		ac.mu.Unlock()
740		return nil
741	}
742}
743
744// Run in a goroutine to track the error in transport and create the
745// new transport if an error happens. It returns when the channel is closing.
746func (ac *addrConn) transportMonitor() {
747	for {
748		ac.mu.Lock()
749		t := ac.transport
750		ac.mu.Unlock()
751		select {
752		// This is needed to detect the teardown when
753		// the addrConn is idle (i.e., no RPC in flight).
754		case <-ac.ctx.Done():
755			select {
756			case <-t.Error():
757				t.Close()
758			default:
759			}
760			return
761		case <-t.GoAway():
762			// If GoAway happens without any network I/O error, ac is closed without shutting down the
763			// underlying transport (the transport will be closed when all the pending RPCs finished or
764			// failed.).
765			// If GoAway and some network I/O error happen concurrently, ac and its underlying transport
766			// are closed.
767			// In both cases, a new ac is created.
768			select {
769			case <-t.Error():
770				ac.cc.resetAddrConn(ac.addr, true, errNetworkIO)
771			default:
772				ac.cc.resetAddrConn(ac.addr, true, errConnDrain)
773			}
774			return
775		case <-t.Error():
776			select {
777			case <-ac.ctx.Done():
778				t.Close()
779				return
780			case <-t.GoAway():
781				ac.cc.resetAddrConn(ac.addr, true, errNetworkIO)
782				return
783			default:
784			}
785			ac.mu.Lock()
786			if ac.state == Shutdown {
787				// ac has been shutdown.
788				ac.mu.Unlock()
789				return
790			}
791			ac.state = TransientFailure
792			ac.stateCV.Broadcast()
793			ac.mu.Unlock()
794			if err := ac.resetTransport(true); err != nil {
795				ac.mu.Lock()
796				ac.printf("transport exiting: %v", err)
797				ac.mu.Unlock()
798				grpclog.Printf("grpc: addrConn.transportMonitor exits due to: %v", err)
799				if err != errConnClosing {
800					// Keep this ac in cc.conns, to get the reason it's torn down.
801					ac.tearDown(err)
802				}
803				return
804			}
805		}
806	}
807}
808
809// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
810// iv) transport is in TransientFailure and there is a balancer/failfast is true.
811func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) {
812	for {
813		ac.mu.Lock()
814		switch {
815		case ac.state == Shutdown:
816			if failfast || !hasBalancer {
817				// RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr.
818				err := ac.tearDownErr
819				ac.mu.Unlock()
820				return nil, err
821			}
822			ac.mu.Unlock()
823			return nil, errConnClosing
824		case ac.state == Ready:
825			ct := ac.transport
826			ac.mu.Unlock()
827			return ct, nil
828		case ac.state == TransientFailure:
829			if failfast || hasBalancer {
830				ac.mu.Unlock()
831				return nil, errConnUnavailable
832			}
833		}
834		ready := ac.ready
835		if ready == nil {
836			ready = make(chan struct{})
837			ac.ready = ready
838		}
839		ac.mu.Unlock()
840		select {
841		case <-ctx.Done():
842			return nil, toRPCErr(ctx.Err())
843		// Wait until the new transport is ready or failed.
844		case <-ready:
845		}
846	}
847}
848
849// tearDown starts to tear down the addrConn.
850// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
851// some edge cases (e.g., the caller opens and closes many addrConn's in a
852// tight loop.
853// tearDown doesn't remove ac from ac.cc.conns.
854func (ac *addrConn) tearDown(err error) {
855	ac.cancel()
856
857	ac.mu.Lock()
858	defer ac.mu.Unlock()
859	if ac.down != nil {
860		ac.down(downErrorf(false, false, "%v", err))
861		ac.down = nil
862	}
863	if err == errConnDrain && ac.transport != nil {
864		// GracefulClose(...) may be executed multiple times when
865		// i) receiving multiple GoAway frames from the server; or
866		// ii) there are concurrent name resolver/Balancer triggered
867		// address removal and GoAway.
868		ac.transport.GracefulClose()
869	}
870	if ac.state == Shutdown {
871		return
872	}
873	ac.state = Shutdown
874	ac.tearDownErr = err
875	ac.stateCV.Broadcast()
876	if ac.events != nil {
877		ac.events.Finish()
878		ac.events = nil
879	}
880	if ac.ready != nil {
881		close(ac.ready)
882		ac.ready = nil
883	}
884	if ac.transport != nil && err != errConnDrain {
885		ac.transport.Close()
886	}
887	return
888}
889