1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22	"context"
23	"errors"
24	"fmt"
25	"math"
26	"net"
27	"reflect"
28	"strings"
29	"sync"
30	"sync/atomic"
31	"time"
32
33	"google.golang.org/grpc/balancer"
34	"google.golang.org/grpc/balancer/base"
35	"google.golang.org/grpc/codes"
36	"google.golang.org/grpc/connectivity"
37	"google.golang.org/grpc/credentials"
38	"google.golang.org/grpc/grpclog"
39	"google.golang.org/grpc/internal/backoff"
40	"google.golang.org/grpc/internal/channelz"
41	"google.golang.org/grpc/internal/grpcsync"
42	"google.golang.org/grpc/internal/transport"
43	"google.golang.org/grpc/keepalive"
44	"google.golang.org/grpc/resolver"
45	"google.golang.org/grpc/serviceconfig"
46	"google.golang.org/grpc/status"
47
48	_ "google.golang.org/grpc/balancer/roundrobin"           // To register roundrobin.
49	_ "google.golang.org/grpc/internal/resolver/dns"         // To register dns resolver.
50	_ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver.
51)
52
53const (
54	// minimum time to give a connection to complete
55	minConnectTimeout = 20 * time.Second
56	// must match grpclbName in grpclb/grpclb.go
57	grpclbName = "grpclb"
58)
59
60var (
61	// ErrClientConnClosing indicates that the operation is illegal because
62	// the ClientConn is closing.
63	//
64	// Deprecated: this error should not be relied upon by users; use the status
65	// code of Canceled instead.
66	ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing")
67	// errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
68	errConnDrain = errors.New("grpc: the connection is drained")
69	// errConnClosing indicates that the connection is closing.
70	errConnClosing = errors.New("grpc: the connection is closing")
71	// errBalancerClosed indicates that the balancer is closed.
72	errBalancerClosed = errors.New("grpc: balancer is closed")
73	// invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default
74	// service config.
75	invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid"
76)
77
78// The following errors are returned from Dial and DialContext
79var (
80	// errNoTransportSecurity indicates that there is no transport security
81	// being set for ClientConn. Users should either set one or explicitly
82	// call WithInsecure DialOption to disable security.
83	errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
84	// errTransportCredsAndBundle indicates that creds bundle is used together
85	// with other individual Transport Credentials.
86	errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials")
87	// errTransportCredentialsMissing indicates that users want to transmit security
88	// information (e.g., OAuth2 token) which requires secure connection on an insecure
89	// connection.
90	errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
91	// errCredentialsConflict indicates that grpc.WithTransportCredentials()
92	// and grpc.WithInsecure() are both called for a connection.
93	errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
94)
95
96const (
97	defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
98	defaultClientMaxSendMessageSize    = math.MaxInt32
99	// http2IOBufSize specifies the buffer size for sending frames.
100	defaultWriteBufSize = 32 * 1024
101	defaultReadBufSize  = 32 * 1024
102)
103
104// Dial creates a client connection to the given target.
105func Dial(target string, opts ...DialOption) (*ClientConn, error) {
106	return DialContext(context.Background(), target, opts...)
107}
108
109// DialContext creates a client connection to the given target. By default, it's
110// a non-blocking dial (the function won't wait for connections to be
111// established, and connecting happens in the background). To make it a blocking
112// dial, use WithBlock() dial option.
113//
114// In the non-blocking case, the ctx does not act against the connection. It
115// only controls the setup steps.
116//
117// In the blocking case, ctx can be used to cancel or expire the pending
118// connection. Once this function returns, the cancellation and expiration of
119// ctx will be noop. Users should call ClientConn.Close to terminate all the
120// pending operations after this function returns.
121//
122// The target name syntax is defined in
123// https://github.com/grpc/grpc/blob/master/doc/naming.md.
124// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
125func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
126	cc := &ClientConn{
127		target:            target,
128		csMgr:             &connectivityStateManager{},
129		conns:             make(map[*addrConn]struct{}),
130		dopts:             defaultDialOptions(),
131		blockingpicker:    newPickerWrapper(),
132		czData:            new(channelzData),
133		firstResolveEvent: grpcsync.NewEvent(),
134	}
135	cc.retryThrottler.Store((*retryThrottler)(nil))
136	cc.ctx, cc.cancel = context.WithCancel(context.Background())
137
138	for _, opt := range opts {
139		opt.apply(&cc.dopts)
140	}
141
142	chainUnaryClientInterceptors(cc)
143	chainStreamClientInterceptors(cc)
144
145	defer func() {
146		if err != nil {
147			cc.Close()
148		}
149	}()
150
151	if channelz.IsOn() {
152		if cc.dopts.channelzParentID != 0 {
153			cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
154			channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
155				Desc:     "Channel Created",
156				Severity: channelz.CtINFO,
157				Parent: &channelz.TraceEventDesc{
158					Desc:     fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID),
159					Severity: channelz.CtINFO,
160				},
161			})
162		} else {
163			cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)
164			channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
165				Desc:     "Channel Created",
166				Severity: channelz.CtINFO,
167			})
168		}
169		cc.csMgr.channelzID = cc.channelzID
170	}
171
172	if !cc.dopts.insecure {
173		if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
174			return nil, errNoTransportSecurity
175		}
176		if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
177			return nil, errTransportCredsAndBundle
178		}
179	} else {
180		if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil {
181			return nil, errCredentialsConflict
182		}
183		for _, cd := range cc.dopts.copts.PerRPCCredentials {
184			if cd.RequireTransportSecurity() {
185				return nil, errTransportCredentialsMissing
186			}
187		}
188	}
189
190	if cc.dopts.defaultServiceConfigRawJSON != nil {
191		scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON)
192		if scpr.Err != nil {
193			return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err)
194		}
195		cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig)
196	}
197	cc.mkp = cc.dopts.copts.KeepaliveParams
198
199	if cc.dopts.copts.Dialer == nil {
200		cc.dopts.copts.Dialer = newProxyDialer(
201			func(ctx context.Context, addr string) (net.Conn, error) {
202				network, addr := parseDialTarget(addr)
203				return (&net.Dialer{}).DialContext(ctx, network, addr)
204			},
205		)
206	}
207
208	if cc.dopts.copts.UserAgent != "" {
209		cc.dopts.copts.UserAgent += " " + grpcUA
210	} else {
211		cc.dopts.copts.UserAgent = grpcUA
212	}
213
214	if cc.dopts.timeout > 0 {
215		var cancel context.CancelFunc
216		ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
217		defer cancel()
218	}
219	defer func() {
220		select {
221		case <-ctx.Done():
222			conn, err = nil, ctx.Err()
223		default:
224		}
225	}()
226
227	scSet := false
228	if cc.dopts.scChan != nil {
229		// Try to get an initial service config.
230		select {
231		case sc, ok := <-cc.dopts.scChan:
232			if ok {
233				cc.sc = &sc
234				scSet = true
235			}
236		default:
237		}
238	}
239	if cc.dopts.bs == nil {
240		cc.dopts.bs = backoff.DefaultExponential
241	}
242
243	// Determine the resolver to use.
244	cc.parsedTarget = parseTarget(cc.target)
245	grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme)
246	resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)
247	if resolverBuilder == nil {
248		// If resolver builder is still nil, the parsed target's scheme is
249		// not registered. Fallback to default resolver and set Endpoint to
250		// the original target.
251		grpclog.Infof("scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
252		cc.parsedTarget = resolver.Target{
253			Scheme:   resolver.GetDefaultScheme(),
254			Endpoint: target,
255		}
256		resolverBuilder = cc.getResolver(cc.parsedTarget.Scheme)
257		if resolverBuilder == nil {
258			return nil, fmt.Errorf("could not get resolver for default scheme: %q", cc.parsedTarget.Scheme)
259		}
260	}
261
262	creds := cc.dopts.copts.TransportCredentials
263	if creds != nil && creds.Info().ServerName != "" {
264		cc.authority = creds.Info().ServerName
265	} else if cc.dopts.insecure && cc.dopts.authority != "" {
266		cc.authority = cc.dopts.authority
267	} else {
268		// Use endpoint from "scheme://authority/endpoint" as the default
269		// authority for ClientConn.
270		cc.authority = cc.parsedTarget.Endpoint
271	}
272
273	if cc.dopts.scChan != nil && !scSet {
274		// Blocking wait for the initial service config.
275		select {
276		case sc, ok := <-cc.dopts.scChan:
277			if ok {
278				cc.sc = &sc
279			}
280		case <-ctx.Done():
281			return nil, ctx.Err()
282		}
283	}
284	if cc.dopts.scChan != nil {
285		go cc.scWatcher()
286	}
287
288	var credsClone credentials.TransportCredentials
289	if creds := cc.dopts.copts.TransportCredentials; creds != nil {
290		credsClone = creds.Clone()
291	}
292	cc.balancerBuildOpts = balancer.BuildOptions{
293		DialCreds:        credsClone,
294		CredsBundle:      cc.dopts.copts.CredsBundle,
295		Dialer:           cc.dopts.copts.Dialer,
296		ChannelzParentID: cc.channelzID,
297		Target:           cc.parsedTarget,
298	}
299
300	// Build the resolver.
301	rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
302	if err != nil {
303		return nil, fmt.Errorf("failed to build resolver: %v", err)
304	}
305	cc.mu.Lock()
306	cc.resolverWrapper = rWrapper
307	cc.mu.Unlock()
308
309	// A blocking dial blocks until the clientConn is ready.
310	if cc.dopts.block {
311		for {
312			s := cc.GetState()
313			if s == connectivity.Ready {
314				break
315			} else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
316				if err = cc.blockingpicker.connectionError(); err != nil {
317					terr, ok := err.(interface {
318						Temporary() bool
319					})
320					if ok && !terr.Temporary() {
321						return nil, err
322					}
323				}
324			}
325			if !cc.WaitForStateChange(ctx, s) {
326				// ctx got timeout or canceled.
327				return nil, ctx.Err()
328			}
329		}
330	}
331
332	return cc, nil
333}
334
335// chainUnaryClientInterceptors chains all unary client interceptors into one.
336func chainUnaryClientInterceptors(cc *ClientConn) {
337	interceptors := cc.dopts.chainUnaryInts
338	// Prepend dopts.unaryInt to the chaining interceptors if it exists, since unaryInt will
339	// be executed before any other chained interceptors.
340	if cc.dopts.unaryInt != nil {
341		interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)
342	}
343	var chainedInt UnaryClientInterceptor
344	if len(interceptors) == 0 {
345		chainedInt = nil
346	} else if len(interceptors) == 1 {
347		chainedInt = interceptors[0]
348	} else {
349		chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {
350			return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)
351		}
352	}
353	cc.dopts.unaryInt = chainedInt
354}
355
356// getChainUnaryInvoker recursively generate the chained unary invoker.
357func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, finalInvoker UnaryInvoker) UnaryInvoker {
358	if curr == len(interceptors)-1 {
359		return finalInvoker
360	}
361	return func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
362		return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...)
363	}
364}
365
366// chainStreamClientInterceptors chains all stream client interceptors into one.
367func chainStreamClientInterceptors(cc *ClientConn) {
368	interceptors := cc.dopts.chainStreamInts
369	// Prepend dopts.streamInt to the chaining interceptors if it exists, since streamInt will
370	// be executed before any other chained interceptors.
371	if cc.dopts.streamInt != nil {
372		interceptors = append([]StreamClientInterceptor{cc.dopts.streamInt}, interceptors...)
373	}
374	var chainedInt StreamClientInterceptor
375	if len(interceptors) == 0 {
376		chainedInt = nil
377	} else if len(interceptors) == 1 {
378		chainedInt = interceptors[0]
379	} else {
380		chainedInt = func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error) {
381			return interceptors[0](ctx, desc, cc, method, getChainStreamer(interceptors, 0, streamer), opts...)
382		}
383	}
384	cc.dopts.streamInt = chainedInt
385}
386
387// getChainStreamer recursively generate the chained client stream constructor.
388func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStreamer Streamer) Streamer {
389	if curr == len(interceptors)-1 {
390		return finalStreamer
391	}
392	return func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
393		return interceptors[curr+1](ctx, desc, cc, method, getChainStreamer(interceptors, curr+1, finalStreamer), opts...)
394	}
395}
396
397// connectivityStateManager keeps the connectivity.State of ClientConn.
398// This struct will eventually be exported so the balancers can access it.
399type connectivityStateManager struct {
400	mu         sync.Mutex
401	state      connectivity.State
402	notifyChan chan struct{}
403	channelzID int64
404}
405
406// updateState updates the connectivity.State of ClientConn.
407// If there's a change it notifies goroutines waiting on state change to
408// happen.
409func (csm *connectivityStateManager) updateState(state connectivity.State) {
410	csm.mu.Lock()
411	defer csm.mu.Unlock()
412	if csm.state == connectivity.Shutdown {
413		return
414	}
415	if csm.state == state {
416		return
417	}
418	csm.state = state
419	if channelz.IsOn() {
420		channelz.AddTraceEvent(csm.channelzID, &channelz.TraceEventDesc{
421			Desc:     fmt.Sprintf("Channel Connectivity change to %v", state),
422			Severity: channelz.CtINFO,
423		})
424	}
425	if csm.notifyChan != nil {
426		// There are other goroutines waiting on this channel.
427		close(csm.notifyChan)
428		csm.notifyChan = nil
429	}
430}
431
432func (csm *connectivityStateManager) getState() connectivity.State {
433	csm.mu.Lock()
434	defer csm.mu.Unlock()
435	return csm.state
436}
437
438func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
439	csm.mu.Lock()
440	defer csm.mu.Unlock()
441	if csm.notifyChan == nil {
442		csm.notifyChan = make(chan struct{})
443	}
444	return csm.notifyChan
445}
446
447// ClientConnInterface defines the functions clients need to perform unary and
448// streaming RPCs.  It is implemented by *ClientConn, and is only intended to
449// be referenced by generated code.
450type ClientConnInterface interface {
451	// Invoke performs a unary RPC and returns after the response is received
452	// into reply.
453	Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error
454	// NewStream begins a streaming RPC.
455	NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error)
456}
457
458// Assert *ClientConn implements ClientConnInterface.
459var _ ClientConnInterface = (*ClientConn)(nil)
460
461// ClientConn represents a virtual connection to a conceptual endpoint, to
462// perform RPCs.
463//
464// A ClientConn is free to have zero or more actual connections to the endpoint
465// based on configuration, load, etc. It is also free to determine which actual
466// endpoints to use and may change it every RPC, permitting client-side load
467// balancing.
468//
469// A ClientConn encapsulates a range of functionality including name
470// resolution, TCP connection establishment (with retries and backoff) and TLS
471// handshakes. It also handles errors on established connections by
472// re-resolving the name and reconnecting.
473type ClientConn struct {
474	ctx    context.Context
475	cancel context.CancelFunc
476
477	target       string
478	parsedTarget resolver.Target
479	authority    string
480	dopts        dialOptions
481	csMgr        *connectivityStateManager
482
483	balancerBuildOpts balancer.BuildOptions
484	blockingpicker    *pickerWrapper
485
486	mu              sync.RWMutex
487	resolverWrapper *ccResolverWrapper
488	sc              *ServiceConfig
489	conns           map[*addrConn]struct{}
490	// Keepalive parameter can be updated if a GoAway is received.
491	mkp             keepalive.ClientParameters
492	curBalancerName string
493	balancerWrapper *ccBalancerWrapper
494	retryThrottler  atomic.Value
495
496	firstResolveEvent *grpcsync.Event
497
498	channelzID int64 // channelz unique identification number
499	czData     *channelzData
500}
501
502// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
503// ctx expires. A true value is returned in former case and false in latter.
504// This is an EXPERIMENTAL API.
505func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
506	ch := cc.csMgr.getNotifyChan()
507	if cc.csMgr.getState() != sourceState {
508		return true
509	}
510	select {
511	case <-ctx.Done():
512		return false
513	case <-ch:
514		return true
515	}
516}
517
518// GetState returns the connectivity.State of ClientConn.
519// This is an EXPERIMENTAL API.
520func (cc *ClientConn) GetState() connectivity.State {
521	return cc.csMgr.getState()
522}
523
524func (cc *ClientConn) scWatcher() {
525	for {
526		select {
527		case sc, ok := <-cc.dopts.scChan:
528			if !ok {
529				return
530			}
531			cc.mu.Lock()
532			// TODO: load balance policy runtime change is ignored.
533			// We may revisit this decision in the future.
534			cc.sc = &sc
535			cc.mu.Unlock()
536		case <-cc.ctx.Done():
537			return
538		}
539	}
540}
541
542// waitForResolvedAddrs blocks until the resolver has provided addresses or the
543// context expires.  Returns nil unless the context expires first; otherwise
544// returns a status error based on the context.
545func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
546	// This is on the RPC path, so we use a fast path to avoid the
547	// more-expensive "select" below after the resolver has returned once.
548	if cc.firstResolveEvent.HasFired() {
549		return nil
550	}
551	select {
552	case <-cc.firstResolveEvent.Done():
553		return nil
554	case <-ctx.Done():
555		return status.FromContextError(ctx.Err()).Err()
556	case <-cc.ctx.Done():
557		return ErrClientConnClosing
558	}
559}
560
561var emptyServiceConfig *ServiceConfig
562
563func init() {
564	cfg := parseServiceConfig("{}")
565	if cfg.Err != nil {
566		panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err))
567	}
568	emptyServiceConfig = cfg.Config.(*ServiceConfig)
569}
570
571func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) {
572	if cc.sc != nil {
573		cc.applyServiceConfigAndBalancer(cc.sc, addrs)
574		return
575	}
576	if cc.dopts.defaultServiceConfig != nil {
577		cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, addrs)
578	} else {
579		cc.applyServiceConfigAndBalancer(emptyServiceConfig, addrs)
580	}
581}
582
583func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
584	defer cc.firstResolveEvent.Fire()
585	cc.mu.Lock()
586	// Check if the ClientConn is already closed. Some fields (e.g.
587	// balancerWrapper) are set to nil when closing the ClientConn, and could
588	// cause nil pointer panic if we don't have this check.
589	if cc.conns == nil {
590		cc.mu.Unlock()
591		return nil
592	}
593
594	if err != nil {
595		// May need to apply the initial service config in case the resolver
596		// doesn't support service configs, or doesn't provide a service config
597		// with the new addresses.
598		cc.maybeApplyDefaultServiceConfig(nil)
599
600		if cc.balancerWrapper != nil {
601			cc.balancerWrapper.resolverError(err)
602		}
603
604		// No addresses are valid with err set; return early.
605		cc.mu.Unlock()
606		return balancer.ErrBadResolverState
607	}
608
609	var ret error
610	if cc.dopts.disableServiceConfig || s.ServiceConfig == nil {
611		cc.maybeApplyDefaultServiceConfig(s.Addresses)
612		// TODO: do we need to apply a failing LB policy if there is no
613		// default, per the error handling design?
614	} else {
615		if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok {
616			cc.applyServiceConfigAndBalancer(sc, s.Addresses)
617		} else {
618			ret = balancer.ErrBadResolverState
619			if cc.balancerWrapper == nil {
620				var err error
621				if s.ServiceConfig.Err != nil {
622					err = status.Errorf(codes.Unavailable, "error parsing service config: %v", s.ServiceConfig.Err)
623				} else {
624					err = status.Errorf(codes.Unavailable, "illegal service config type: %T", s.ServiceConfig.Config)
625				}
626				cc.blockingpicker.updatePicker(base.NewErrPicker(err))
627				cc.csMgr.updateState(connectivity.TransientFailure)
628				cc.mu.Unlock()
629				return ret
630			}
631		}
632	}
633
634	var balCfg serviceconfig.LoadBalancingConfig
635	if cc.dopts.balancerBuilder == nil && cc.sc != nil && cc.sc.lbConfig != nil {
636		balCfg = cc.sc.lbConfig.cfg
637	}
638
639	cbn := cc.curBalancerName
640	bw := cc.balancerWrapper
641	cc.mu.Unlock()
642	if cbn != grpclbName {
643		// Filter any grpclb addresses since we don't have the grpclb balancer.
644		for i := 0; i < len(s.Addresses); {
645			if s.Addresses[i].Type == resolver.GRPCLB {
646				copy(s.Addresses[i:], s.Addresses[i+1:])
647				s.Addresses = s.Addresses[:len(s.Addresses)-1]
648				continue
649			}
650			i++
651		}
652	}
653	uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
654	if ret == nil {
655		ret = uccsErr // prefer ErrBadResolver state since any other error is
656		// currently meaningless to the caller.
657	}
658	return ret
659}
660
661// switchBalancer starts the switching from current balancer to the balancer
662// with the given name.
663//
664// It will NOT send the current address list to the new balancer. If needed,
665// caller of this function should send address list to the new balancer after
666// this function returns.
667//
668// Caller must hold cc.mu.
669func (cc *ClientConn) switchBalancer(name string) {
670	if strings.EqualFold(cc.curBalancerName, name) {
671		return
672	}
673
674	grpclog.Infof("ClientConn switching balancer to %q", name)
675	if cc.dopts.balancerBuilder != nil {
676		grpclog.Infoln("ignoring balancer switching: Balancer DialOption used instead")
677		return
678	}
679	if cc.balancerWrapper != nil {
680		cc.balancerWrapper.close()
681	}
682
683	builder := balancer.Get(name)
684	if channelz.IsOn() {
685		if builder == nil {
686			channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
687				Desc:     fmt.Sprintf("Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName),
688				Severity: channelz.CtWarning,
689			})
690		} else {
691			channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
692				Desc:     fmt.Sprintf("Channel switches to new LB policy %q", name),
693				Severity: channelz.CtINFO,
694			})
695		}
696	}
697	if builder == nil {
698		grpclog.Infof("failed to get balancer builder for: %v, using pick_first instead", name)
699		builder = newPickfirstBuilder()
700	}
701
702	cc.curBalancerName = builder.Name()
703	cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
704}
705
706func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {
707	cc.mu.Lock()
708	if cc.conns == nil {
709		cc.mu.Unlock()
710		return
711	}
712	// TODO(bar switching) send updates to all balancer wrappers when balancer
713	// gracefully switching is supported.
714	cc.balancerWrapper.handleSubConnStateChange(sc, s, err)
715	cc.mu.Unlock()
716}
717
718// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
719//
720// Caller needs to make sure len(addrs) > 0.
721func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
722	ac := &addrConn{
723		cc:           cc,
724		addrs:        addrs,
725		scopts:       opts,
726		dopts:        cc.dopts,
727		czData:       new(channelzData),
728		resetBackoff: make(chan struct{}),
729	}
730	ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
731	// Track ac in cc. This needs to be done before any getTransport(...) is called.
732	cc.mu.Lock()
733	if cc.conns == nil {
734		cc.mu.Unlock()
735		return nil, ErrClientConnClosing
736	}
737	if channelz.IsOn() {
738		ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "")
739		channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
740			Desc:     "Subchannel Created",
741			Severity: channelz.CtINFO,
742			Parent: &channelz.TraceEventDesc{
743				Desc:     fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID),
744				Severity: channelz.CtINFO,
745			},
746		})
747	}
748	cc.conns[ac] = struct{}{}
749	cc.mu.Unlock()
750	return ac, nil
751}
752
753// removeAddrConn removes the addrConn in the subConn from clientConn.
754// It also tears down the ac with the given error.
755func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
756	cc.mu.Lock()
757	if cc.conns == nil {
758		cc.mu.Unlock()
759		return
760	}
761	delete(cc.conns, ac)
762	cc.mu.Unlock()
763	ac.tearDown(err)
764}
765
766func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric {
767	return &channelz.ChannelInternalMetric{
768		State:                    cc.GetState(),
769		Target:                   cc.target,
770		CallsStarted:             atomic.LoadInt64(&cc.czData.callsStarted),
771		CallsSucceeded:           atomic.LoadInt64(&cc.czData.callsSucceeded),
772		CallsFailed:              atomic.LoadInt64(&cc.czData.callsFailed),
773		LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)),
774	}
775}
776
777// Target returns the target string of the ClientConn.
778// This is an EXPERIMENTAL API.
779func (cc *ClientConn) Target() string {
780	return cc.target
781}
782
783func (cc *ClientConn) incrCallsStarted() {
784	atomic.AddInt64(&cc.czData.callsStarted, 1)
785	atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano())
786}
787
788func (cc *ClientConn) incrCallsSucceeded() {
789	atomic.AddInt64(&cc.czData.callsSucceeded, 1)
790}
791
792func (cc *ClientConn) incrCallsFailed() {
793	atomic.AddInt64(&cc.czData.callsFailed, 1)
794}
795
796// connect starts creating a transport.
797// It does nothing if the ac is not IDLE.
798// TODO(bar) Move this to the addrConn section.
799func (ac *addrConn) connect() error {
800	ac.mu.Lock()
801	if ac.state == connectivity.Shutdown {
802		ac.mu.Unlock()
803		return errConnClosing
804	}
805	if ac.state != connectivity.Idle {
806		ac.mu.Unlock()
807		return nil
808	}
809	// Update connectivity state within the lock to prevent subsequent or
810	// concurrent calls from resetting the transport more than once.
811	ac.updateConnectivityState(connectivity.Connecting, nil)
812	ac.mu.Unlock()
813
814	// Start a goroutine connecting to the server asynchronously.
815	go ac.resetTransport()
816	return nil
817}
818
819// tryUpdateAddrs tries to update ac.addrs with the new addresses list.
820//
821// If ac is Connecting, it returns false. The caller should tear down the ac and
822// create a new one. Note that the backoff will be reset when this happens.
823//
824// If ac is TransientFailure, it updates ac.addrs and returns true. The updated
825// addresses will be picked up by retry in the next iteration after backoff.
826//
827// If ac is Shutdown or Idle, it updates ac.addrs and returns true.
828//
829// If ac is Ready, it checks whether current connected address of ac is in the
830// new addrs list.
831//  - If true, it updates ac.addrs and returns true. The ac will keep using
832//    the existing connection.
833//  - If false, it does nothing and returns false.
834func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
835	ac.mu.Lock()
836	defer ac.mu.Unlock()
837	grpclog.Infof("addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
838	if ac.state == connectivity.Shutdown ||
839		ac.state == connectivity.TransientFailure ||
840		ac.state == connectivity.Idle {
841		ac.addrs = addrs
842		return true
843	}
844
845	if ac.state == connectivity.Connecting {
846		return false
847	}
848
849	// ac.state is Ready, try to find the connected address.
850	var curAddrFound bool
851	for _, a := range addrs {
852		if reflect.DeepEqual(ac.curAddr, a) {
853			curAddrFound = true
854			break
855		}
856	}
857	grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
858	if curAddrFound {
859		ac.addrs = addrs
860	}
861
862	return curAddrFound
863}
864
865// GetMethodConfig gets the method config of the input method.
866// If there's an exact match for input method (i.e. /service/method), we return
867// the corresponding MethodConfig.
868// If there isn't an exact match for the input method, we look for the default config
869// under the service (i.e /service/). If there is a default MethodConfig for
870// the service, we return it.
871// Otherwise, we return an empty MethodConfig.
872func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
873	// TODO: Avoid the locking here.
874	cc.mu.RLock()
875	defer cc.mu.RUnlock()
876	if cc.sc == nil {
877		return MethodConfig{}
878	}
879	m, ok := cc.sc.Methods[method]
880	if !ok {
881		i := strings.LastIndex(method, "/")
882		m = cc.sc.Methods[method[:i+1]]
883	}
884	return m
885}
886
887func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
888	cc.mu.RLock()
889	defer cc.mu.RUnlock()
890	if cc.sc == nil {
891		return nil
892	}
893	return cc.sc.healthCheckConfig
894}
895
896func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
897	t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
898		Ctx:            ctx,
899		FullMethodName: method,
900	})
901	if err != nil {
902		return nil, nil, toRPCErr(err)
903	}
904	return t, done, nil
905}
906
907func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, addrs []resolver.Address) {
908	if sc == nil {
909		// should never reach here.
910		return
911	}
912	cc.sc = sc
913
914	if cc.sc.retryThrottling != nil {
915		newThrottler := &retryThrottler{
916			tokens: cc.sc.retryThrottling.MaxTokens,
917			max:    cc.sc.retryThrottling.MaxTokens,
918			thresh: cc.sc.retryThrottling.MaxTokens / 2,
919			ratio:  cc.sc.retryThrottling.TokenRatio,
920		}
921		cc.retryThrottler.Store(newThrottler)
922	} else {
923		cc.retryThrottler.Store((*retryThrottler)(nil))
924	}
925
926	if cc.dopts.balancerBuilder == nil {
927		// Only look at balancer types and switch balancer if balancer dial
928		// option is not set.
929		var newBalancerName string
930		if cc.sc != nil && cc.sc.lbConfig != nil {
931			newBalancerName = cc.sc.lbConfig.name
932		} else {
933			var isGRPCLB bool
934			for _, a := range addrs {
935				if a.Type == resolver.GRPCLB {
936					isGRPCLB = true
937					break
938				}
939			}
940			if isGRPCLB {
941				newBalancerName = grpclbName
942			} else if cc.sc != nil && cc.sc.LB != nil {
943				newBalancerName = *cc.sc.LB
944			} else {
945				newBalancerName = PickFirstBalancerName
946			}
947		}
948		cc.switchBalancer(newBalancerName)
949	} else if cc.balancerWrapper == nil {
950		// Balancer dial option was set, and this is the first time handling
951		// resolved addresses. Build a balancer with dopts.balancerBuilder.
952		cc.curBalancerName = cc.dopts.balancerBuilder.Name()
953		cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
954	}
955}
956
957func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) {
958	cc.mu.RLock()
959	r := cc.resolverWrapper
960	cc.mu.RUnlock()
961	if r == nil {
962		return
963	}
964	go r.resolveNow(o)
965}
966
967// ResetConnectBackoff wakes up all subchannels in transient failure and causes
968// them to attempt another connection immediately.  It also resets the backoff
969// times used for subsequent attempts regardless of the current state.
970//
971// In general, this function should not be used.  Typical service or network
972// outages result in a reasonable client reconnection strategy by default.
973// However, if a previously unavailable network becomes available, this may be
974// used to trigger an immediate reconnect.
975//
976// This API is EXPERIMENTAL.
977func (cc *ClientConn) ResetConnectBackoff() {
978	cc.mu.Lock()
979	conns := cc.conns
980	cc.mu.Unlock()
981	for ac := range conns {
982		ac.resetConnectBackoff()
983	}
984}
985
986// Close tears down the ClientConn and all underlying connections.
987func (cc *ClientConn) Close() error {
988	defer cc.cancel()
989
990	cc.mu.Lock()
991	if cc.conns == nil {
992		cc.mu.Unlock()
993		return ErrClientConnClosing
994	}
995	conns := cc.conns
996	cc.conns = nil
997	cc.csMgr.updateState(connectivity.Shutdown)
998
999	rWrapper := cc.resolverWrapper
1000	cc.resolverWrapper = nil
1001	bWrapper := cc.balancerWrapper
1002	cc.balancerWrapper = nil
1003	cc.mu.Unlock()
1004
1005	cc.blockingpicker.close()
1006
1007	if rWrapper != nil {
1008		rWrapper.close()
1009	}
1010	if bWrapper != nil {
1011		bWrapper.close()
1012	}
1013
1014	for ac := range conns {
1015		ac.tearDown(ErrClientConnClosing)
1016	}
1017	if channelz.IsOn() {
1018		ted := &channelz.TraceEventDesc{
1019			Desc:     "Channel Deleted",
1020			Severity: channelz.CtINFO,
1021		}
1022		if cc.dopts.channelzParentID != 0 {
1023			ted.Parent = &channelz.TraceEventDesc{
1024				Desc:     fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID),
1025				Severity: channelz.CtINFO,
1026			}
1027		}
1028		channelz.AddTraceEvent(cc.channelzID, ted)
1029		// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
1030		// the entity being deleted, and thus prevent it from being deleted right away.
1031		channelz.RemoveEntry(cc.channelzID)
1032	}
1033	return nil
1034}
1035
1036// addrConn is a network connection to a given address.
1037type addrConn struct {
1038	ctx    context.Context
1039	cancel context.CancelFunc
1040
1041	cc     *ClientConn
1042	dopts  dialOptions
1043	acbw   balancer.SubConn
1044	scopts balancer.NewSubConnOptions
1045
1046	// transport is set when there's a viable transport (note: ac state may not be READY as LB channel
1047	// health checking may require server to report healthy to set ac to READY), and is reset
1048	// to nil when the current transport should no longer be used to create a stream (e.g. after GoAway
1049	// is received, transport is closed, ac has been torn down).
1050	transport transport.ClientTransport // The current transport.
1051
1052	mu      sync.Mutex
1053	curAddr resolver.Address   // The current address.
1054	addrs   []resolver.Address // All addresses that the resolver resolved to.
1055
1056	// Use updateConnectivityState for updating addrConn's connectivity state.
1057	state connectivity.State
1058
1059	backoffIdx   int // Needs to be stateful for resetConnectBackoff.
1060	resetBackoff chan struct{}
1061
1062	channelzID int64 // channelz unique identification number.
1063	czData     *channelzData
1064}
1065
1066// Note: this requires a lock on ac.mu.
1067func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) {
1068	if ac.state == s {
1069		return
1070	}
1071
1072	updateMsg := fmt.Sprintf("Subchannel Connectivity change to %v", s)
1073	ac.state = s
1074	if channelz.IsOn() {
1075		channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1076			Desc:     updateMsg,
1077			Severity: channelz.CtINFO,
1078		})
1079	}
1080	ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr)
1081}
1082
1083// adjustParams updates parameters used to create transports upon
1084// receiving a GoAway.
1085func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
1086	switch r {
1087	case transport.GoAwayTooManyPings:
1088		v := 2 * ac.dopts.copts.KeepaliveParams.Time
1089		ac.cc.mu.Lock()
1090		if v > ac.cc.mkp.Time {
1091			ac.cc.mkp.Time = v
1092		}
1093		ac.cc.mu.Unlock()
1094	}
1095}
1096
1097func (ac *addrConn) resetTransport() {
1098	for i := 0; ; i++ {
1099		if i > 0 {
1100			ac.cc.resolveNow(resolver.ResolveNowOptions{})
1101		}
1102
1103		ac.mu.Lock()
1104		if ac.state == connectivity.Shutdown {
1105			ac.mu.Unlock()
1106			return
1107		}
1108
1109		addrs := ac.addrs
1110		backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
1111		// This will be the duration that dial gets to finish.
1112		dialDuration := minConnectTimeout
1113		if ac.dopts.minConnectTimeout != nil {
1114			dialDuration = ac.dopts.minConnectTimeout()
1115		}
1116
1117		if dialDuration < backoffFor {
1118			// Give dial more time as we keep failing to connect.
1119			dialDuration = backoffFor
1120		}
1121		// We can potentially spend all the time trying the first address, and
1122		// if the server accepts the connection and then hangs, the following
1123		// addresses will never be tried.
1124		//
1125		// The spec doesn't mention what should be done for multiple addresses.
1126		// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm
1127		connectDeadline := time.Now().Add(dialDuration)
1128
1129		ac.updateConnectivityState(connectivity.Connecting, nil)
1130		ac.transport = nil
1131		ac.mu.Unlock()
1132
1133		newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline)
1134		if err != nil {
1135			// After exhausting all addresses, the addrConn enters
1136			// TRANSIENT_FAILURE.
1137			ac.mu.Lock()
1138			if ac.state == connectivity.Shutdown {
1139				ac.mu.Unlock()
1140				return
1141			}
1142			ac.updateConnectivityState(connectivity.TransientFailure, err)
1143
1144			// Backoff.
1145			b := ac.resetBackoff
1146			ac.mu.Unlock()
1147
1148			timer := time.NewTimer(backoffFor)
1149			select {
1150			case <-timer.C:
1151				ac.mu.Lock()
1152				ac.backoffIdx++
1153				ac.mu.Unlock()
1154			case <-b:
1155				timer.Stop()
1156			case <-ac.ctx.Done():
1157				timer.Stop()
1158				return
1159			}
1160			continue
1161		}
1162
1163		ac.mu.Lock()
1164		if ac.state == connectivity.Shutdown {
1165			ac.mu.Unlock()
1166			newTr.Close()
1167			return
1168		}
1169		ac.curAddr = addr
1170		ac.transport = newTr
1171		ac.backoffIdx = 0
1172
1173		hctx, hcancel := context.WithCancel(ac.ctx)
1174		ac.startHealthCheck(hctx)
1175		ac.mu.Unlock()
1176
1177		// Block until the created transport is down. And when this happens,
1178		// we restart from the top of the addr list.
1179		<-reconnect.Done()
1180		hcancel()
1181		// restart connecting - the top of the loop will set state to
1182		// CONNECTING.  This is against the current connectivity semantics doc,
1183		// however it allows for graceful behavior for RPCs not yet dispatched
1184		// - unfortunate timing would otherwise lead to the RPC failing even
1185		// though the TRANSIENT_FAILURE state (called for by the doc) would be
1186		// instantaneous.
1187		//
1188		// Ideally we should transition to Idle here and block until there is
1189		// RPC activity that leads to the balancer requesting a reconnect of
1190		// the associated SubConn.
1191	}
1192}
1193
1194// tryAllAddrs tries to creates a connection to the addresses, and stop when at the
1195// first successful one. It returns the transport, the address and a Event in
1196// the successful case. The Event fires when the returned transport disconnects.
1197func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) (transport.ClientTransport, resolver.Address, *grpcsync.Event, error) {
1198	var firstConnErr error
1199	for _, addr := range addrs {
1200		ac.mu.Lock()
1201		if ac.state == connectivity.Shutdown {
1202			ac.mu.Unlock()
1203			return nil, resolver.Address{}, nil, errConnClosing
1204		}
1205
1206		ac.cc.mu.RLock()
1207		ac.dopts.copts.KeepaliveParams = ac.cc.mkp
1208		ac.cc.mu.RUnlock()
1209
1210		copts := ac.dopts.copts
1211		if ac.scopts.CredsBundle != nil {
1212			copts.CredsBundle = ac.scopts.CredsBundle
1213		}
1214		ac.mu.Unlock()
1215
1216		if channelz.IsOn() {
1217			channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1218				Desc:     fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr),
1219				Severity: channelz.CtINFO,
1220			})
1221		}
1222
1223		newTr, reconnect, err := ac.createTransport(addr, copts, connectDeadline)
1224		if err == nil {
1225			return newTr, addr, reconnect, nil
1226		}
1227		if firstConnErr == nil {
1228			firstConnErr = err
1229		}
1230		ac.cc.blockingpicker.updateConnectionError(err)
1231	}
1232
1233	// Couldn't connect to any address.
1234	return nil, resolver.Address{}, nil, firstConnErr
1235}
1236
1237// createTransport creates a connection to addr. It returns the transport and a
1238// Event in the successful case. The Event fires when the returned transport
1239// disconnects.
1240func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) (transport.ClientTransport, *grpcsync.Event, error) {
1241	prefaceReceived := make(chan struct{})
1242	onCloseCalled := make(chan struct{})
1243	reconnect := grpcsync.NewEvent()
1244
1245	authority := ac.cc.authority
1246	// addr.ServerName takes precedent over ClientConn authority, if present.
1247	if addr.ServerName != "" {
1248		authority = addr.ServerName
1249	}
1250
1251	target := transport.TargetInfo{
1252		Addr:      addr.Addr,
1253		Metadata:  addr.Metadata,
1254		Authority: authority,
1255	}
1256
1257	once := sync.Once{}
1258	onGoAway := func(r transport.GoAwayReason) {
1259		ac.mu.Lock()
1260		ac.adjustParams(r)
1261		once.Do(func() {
1262			if ac.state == connectivity.Ready {
1263				// Prevent this SubConn from being used for new RPCs by setting its
1264				// state to Connecting.
1265				//
1266				// TODO: this should be Idle when grpc-go properly supports it.
1267				ac.updateConnectivityState(connectivity.Connecting, nil)
1268			}
1269		})
1270		ac.mu.Unlock()
1271		reconnect.Fire()
1272	}
1273
1274	onClose := func() {
1275		ac.mu.Lock()
1276		once.Do(func() {
1277			if ac.state == connectivity.Ready {
1278				// Prevent this SubConn from being used for new RPCs by setting its
1279				// state to Connecting.
1280				//
1281				// TODO: this should be Idle when grpc-go properly supports it.
1282				ac.updateConnectivityState(connectivity.Connecting, nil)
1283			}
1284		})
1285		ac.mu.Unlock()
1286		close(onCloseCalled)
1287		reconnect.Fire()
1288	}
1289
1290	onPrefaceReceipt := func() {
1291		close(prefaceReceived)
1292	}
1293
1294	connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
1295	defer cancel()
1296	if channelz.IsOn() {
1297		copts.ChannelzParentID = ac.channelzID
1298	}
1299
1300	newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)
1301	if err != nil {
1302		// newTr is either nil, or closed.
1303		grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err)
1304		return nil, nil, err
1305	}
1306
1307	select {
1308	case <-time.After(time.Until(connectDeadline)):
1309		// We didn't get the preface in time.
1310		newTr.Close()
1311		grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
1312		return nil, nil, errors.New("timed out waiting for server handshake")
1313	case <-prefaceReceived:
1314		// We got the preface - huzzah! things are good.
1315	case <-onCloseCalled:
1316		// The transport has already closed - noop.
1317		return nil, nil, errors.New("connection closed")
1318		// TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.
1319	}
1320	return newTr, reconnect, nil
1321}
1322
1323// startHealthCheck starts the health checking stream (RPC) to watch the health
1324// stats of this connection if health checking is requested and configured.
1325//
1326// LB channel health checking is enabled when all requirements below are met:
1327// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption
1328// 2. internal.HealthCheckFunc is set by importing the grpc/healthcheck package
1329// 3. a service config with non-empty healthCheckConfig field is provided
1330// 4. the load balancer requests it
1331//
1332// It sets addrConn to READY if the health checking stream is not started.
1333//
1334// Caller must hold ac.mu.
1335func (ac *addrConn) startHealthCheck(ctx context.Context) {
1336	var healthcheckManagingState bool
1337	defer func() {
1338		if !healthcheckManagingState {
1339			ac.updateConnectivityState(connectivity.Ready, nil)
1340		}
1341	}()
1342
1343	if ac.cc.dopts.disableHealthCheck {
1344		return
1345	}
1346	healthCheckConfig := ac.cc.healthCheckConfig()
1347	if healthCheckConfig == nil {
1348		return
1349	}
1350	if !ac.scopts.HealthCheckEnabled {
1351		return
1352	}
1353	healthCheckFunc := ac.cc.dopts.healthCheckFunc
1354	if healthCheckFunc == nil {
1355		// The health package is not imported to set health check function.
1356		//
1357		// TODO: add a link to the health check doc in the error message.
1358		grpclog.Error("Health check is requested but health check function is not set.")
1359		return
1360	}
1361
1362	healthcheckManagingState = true
1363
1364	// Set up the health check helper functions.
1365	currentTr := ac.transport
1366	newStream := func(method string) (interface{}, error) {
1367		ac.mu.Lock()
1368		if ac.transport != currentTr {
1369			ac.mu.Unlock()
1370			return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
1371		}
1372		ac.mu.Unlock()
1373		return newNonRetryClientStream(ctx, &StreamDesc{ServerStreams: true}, method, currentTr, ac)
1374	}
1375	setConnectivityState := func(s connectivity.State, lastErr error) {
1376		ac.mu.Lock()
1377		defer ac.mu.Unlock()
1378		if ac.transport != currentTr {
1379			return
1380		}
1381		ac.updateConnectivityState(s, lastErr)
1382	}
1383	// Start the health checking stream.
1384	go func() {
1385		err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName)
1386		if err != nil {
1387			if status.Code(err) == codes.Unimplemented {
1388				if channelz.IsOn() {
1389					channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1390						Desc:     "Subchannel health check is unimplemented at server side, thus health check is disabled",
1391						Severity: channelz.CtError,
1392					})
1393				}
1394				grpclog.Error("Subchannel health check is unimplemented at server side, thus health check is disabled")
1395			} else {
1396				grpclog.Errorf("HealthCheckFunc exits with unexpected error %v", err)
1397			}
1398		}
1399	}()
1400}
1401
1402func (ac *addrConn) resetConnectBackoff() {
1403	ac.mu.Lock()
1404	close(ac.resetBackoff)
1405	ac.backoffIdx = 0
1406	ac.resetBackoff = make(chan struct{})
1407	ac.mu.Unlock()
1408}
1409
1410// getReadyTransport returns the transport if ac's state is READY.
1411// Otherwise it returns nil, false.
1412// If ac's state is IDLE, it will trigger ac to connect.
1413func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
1414	ac.mu.Lock()
1415	if ac.state == connectivity.Ready && ac.transport != nil {
1416		t := ac.transport
1417		ac.mu.Unlock()
1418		return t, true
1419	}
1420	var idle bool
1421	if ac.state == connectivity.Idle {
1422		idle = true
1423	}
1424	ac.mu.Unlock()
1425	// Trigger idle ac to connect.
1426	if idle {
1427		ac.connect()
1428	}
1429	return nil, false
1430}
1431
1432// tearDown starts to tear down the addrConn.
1433// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
1434// some edge cases (e.g., the caller opens and closes many addrConn's in a
1435// tight loop.
1436// tearDown doesn't remove ac from ac.cc.conns.
1437func (ac *addrConn) tearDown(err error) {
1438	ac.mu.Lock()
1439	if ac.state == connectivity.Shutdown {
1440		ac.mu.Unlock()
1441		return
1442	}
1443	curTr := ac.transport
1444	ac.transport = nil
1445	// We have to set the state to Shutdown before anything else to prevent races
1446	// between setting the state and logic that waits on context cancellation / etc.
1447	ac.updateConnectivityState(connectivity.Shutdown, nil)
1448	ac.cancel()
1449	ac.curAddr = resolver.Address{}
1450	if err == errConnDrain && curTr != nil {
1451		// GracefulClose(...) may be executed multiple times when
1452		// i) receiving multiple GoAway frames from the server; or
1453		// ii) there are concurrent name resolver/Balancer triggered
1454		// address removal and GoAway.
1455		// We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu.
1456		ac.mu.Unlock()
1457		curTr.GracefulClose()
1458		ac.mu.Lock()
1459	}
1460	if channelz.IsOn() {
1461		channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1462			Desc:     "Subchannel Deleted",
1463			Severity: channelz.CtINFO,
1464			Parent: &channelz.TraceEventDesc{
1465				Desc:     fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID),
1466				Severity: channelz.CtINFO,
1467			},
1468		})
1469		// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
1470		// the entity being deleted, and thus prevent it from being deleted right away.
1471		channelz.RemoveEntry(ac.channelzID)
1472	}
1473	ac.mu.Unlock()
1474}
1475
1476func (ac *addrConn) getState() connectivity.State {
1477	ac.mu.Lock()
1478	defer ac.mu.Unlock()
1479	return ac.state
1480}
1481
1482func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric {
1483	ac.mu.Lock()
1484	addr := ac.curAddr.Addr
1485	ac.mu.Unlock()
1486	return &channelz.ChannelInternalMetric{
1487		State:                    ac.getState(),
1488		Target:                   addr,
1489		CallsStarted:             atomic.LoadInt64(&ac.czData.callsStarted),
1490		CallsSucceeded:           atomic.LoadInt64(&ac.czData.callsSucceeded),
1491		CallsFailed:              atomic.LoadInt64(&ac.czData.callsFailed),
1492		LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)),
1493	}
1494}
1495
1496func (ac *addrConn) incrCallsStarted() {
1497	atomic.AddInt64(&ac.czData.callsStarted, 1)
1498	atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano())
1499}
1500
1501func (ac *addrConn) incrCallsSucceeded() {
1502	atomic.AddInt64(&ac.czData.callsSucceeded, 1)
1503}
1504
1505func (ac *addrConn) incrCallsFailed() {
1506	atomic.AddInt64(&ac.czData.callsFailed, 1)
1507}
1508
1509type retryThrottler struct {
1510	max    float64
1511	thresh float64
1512	ratio  float64
1513
1514	mu     sync.Mutex
1515	tokens float64 // TODO(dfawley): replace with atomic and remove lock.
1516}
1517
1518// throttle subtracts a retry token from the pool and returns whether a retry
1519// should be throttled (disallowed) based upon the retry throttling policy in
1520// the service config.
1521func (rt *retryThrottler) throttle() bool {
1522	if rt == nil {
1523		return false
1524	}
1525	rt.mu.Lock()
1526	defer rt.mu.Unlock()
1527	rt.tokens--
1528	if rt.tokens < 0 {
1529		rt.tokens = 0
1530	}
1531	return rt.tokens <= rt.thresh
1532}
1533
1534func (rt *retryThrottler) successfulRPC() {
1535	if rt == nil {
1536		return
1537	}
1538	rt.mu.Lock()
1539	defer rt.mu.Unlock()
1540	rt.tokens += rt.ratio
1541	if rt.tokens > rt.max {
1542		rt.tokens = rt.max
1543	}
1544}
1545
1546type channelzChannel struct {
1547	cc *ClientConn
1548}
1549
1550func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric {
1551	return c.cc.channelzMetric()
1552}
1553
1554// ErrClientConnTimeout indicates that the ClientConn cannot establish the
1555// underlying connections within the specified timeout.
1556//
1557// Deprecated: This error is never returned by grpc and should not be
1558// referenced by users.
1559var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
1560
1561func (cc *ClientConn) getResolver(scheme string) resolver.Builder {
1562	for _, rb := range cc.dopts.resolvers {
1563		if cc.parsedTarget.Scheme == rb.Scheme() {
1564			return rb
1565		}
1566	}
1567	return resolver.Get(cc.parsedTarget.Scheme)
1568}
1569