1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22	"context"
23	"errors"
24	"fmt"
25	"math"
26	"net"
27	"reflect"
28	"strings"
29	"sync"
30	"sync/atomic"
31	"time"
32
33	"google.golang.org/grpc/balancer"
34	"google.golang.org/grpc/balancer/base"
35	"google.golang.org/grpc/codes"
36	"google.golang.org/grpc/connectivity"
37	"google.golang.org/grpc/credentials"
38	"google.golang.org/grpc/grpclog"
39	"google.golang.org/grpc/internal/backoff"
40	"google.golang.org/grpc/internal/channelz"
41	"google.golang.org/grpc/internal/grpcsync"
42	"google.golang.org/grpc/internal/transport"
43	"google.golang.org/grpc/keepalive"
44	"google.golang.org/grpc/resolver"
45	"google.golang.org/grpc/serviceconfig"
46	"google.golang.org/grpc/status"
47
48	_ "google.golang.org/grpc/balancer/roundrobin"           // To register roundrobin.
49	_ "google.golang.org/grpc/internal/resolver/dns"         // To register dns resolver.
50	_ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver.
51)
52
53const (
54	// minimum time to give a connection to complete
55	minConnectTimeout = 20 * time.Second
56	// must match grpclbName in grpclb/grpclb.go
57	grpclbName = "grpclb"
58)
59
60var (
61	// ErrClientConnClosing indicates that the operation is illegal because
62	// the ClientConn is closing.
63	//
64	// Deprecated: this error should not be relied upon by users; use the status
65	// code of Canceled instead.
66	ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing")
67	// errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
68	errConnDrain = errors.New("grpc: the connection is drained")
69	// errConnClosing indicates that the connection is closing.
70	errConnClosing = errors.New("grpc: the connection is closing")
71	// errBalancerClosed indicates that the balancer is closed.
72	errBalancerClosed = errors.New("grpc: balancer is closed")
73	// invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default
74	// service config.
75	invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid"
76)
77
78// The following errors are returned from Dial and DialContext
79var (
80	// errNoTransportSecurity indicates that there is no transport security
81	// being set for ClientConn. Users should either set one or explicitly
82	// call WithInsecure DialOption to disable security.
83	errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
84	// errTransportCredsAndBundle indicates that creds bundle is used together
85	// with other individual Transport Credentials.
86	errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials")
87	// errTransportCredentialsMissing indicates that users want to transmit security
88	// information (e.g., OAuth2 token) which requires secure connection on an insecure
89	// connection.
90	errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
91	// errCredentialsConflict indicates that grpc.WithTransportCredentials()
92	// and grpc.WithInsecure() are both called for a connection.
93	errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
94)
95
96const (
97	defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
98	defaultClientMaxSendMessageSize    = math.MaxInt32
99	// http2IOBufSize specifies the buffer size for sending frames.
100	defaultWriteBufSize = 32 * 1024
101	defaultReadBufSize  = 32 * 1024
102)
103
104// Dial creates a client connection to the given target.
105func Dial(target string, opts ...DialOption) (*ClientConn, error) {
106	return DialContext(context.Background(), target, opts...)
107}
108
109// DialContext creates a client connection to the given target. By default, it's
110// a non-blocking dial (the function won't wait for connections to be
111// established, and connecting happens in the background). To make it a blocking
112// dial, use WithBlock() dial option.
113//
114// In the non-blocking case, the ctx does not act against the connection. It
115// only controls the setup steps.
116//
117// In the blocking case, ctx can be used to cancel or expire the pending
118// connection. Once this function returns, the cancellation and expiration of
119// ctx will be noop. Users should call ClientConn.Close to terminate all the
120// pending operations after this function returns.
121//
122// The target name syntax is defined in
123// https://github.com/grpc/grpc/blob/master/doc/naming.md.
124// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
125func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
126	cc := &ClientConn{
127		target:            target,
128		csMgr:             &connectivityStateManager{},
129		conns:             make(map[*addrConn]struct{}),
130		dopts:             defaultDialOptions(),
131		blockingpicker:    newPickerWrapper(),
132		czData:            new(channelzData),
133		firstResolveEvent: grpcsync.NewEvent(),
134	}
135	cc.retryThrottler.Store((*retryThrottler)(nil))
136	cc.ctx, cc.cancel = context.WithCancel(context.Background())
137
138	for _, opt := range opts {
139		opt.apply(&cc.dopts)
140	}
141
142	chainUnaryClientInterceptors(cc)
143	chainStreamClientInterceptors(cc)
144
145	defer func() {
146		if err != nil {
147			cc.Close()
148		}
149	}()
150
151	if channelz.IsOn() {
152		if cc.dopts.channelzParentID != 0 {
153			cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
154			channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
155				Desc:     "Channel Created",
156				Severity: channelz.CtINFO,
157				Parent: &channelz.TraceEventDesc{
158					Desc:     fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID),
159					Severity: channelz.CtINFO,
160				},
161			})
162		} else {
163			cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)
164			channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
165				Desc:     "Channel Created",
166				Severity: channelz.CtINFO,
167			})
168		}
169		cc.csMgr.channelzID = cc.channelzID
170	}
171
172	if !cc.dopts.insecure {
173		if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
174			return nil, errNoTransportSecurity
175		}
176		if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
177			return nil, errTransportCredsAndBundle
178		}
179	} else {
180		if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil {
181			return nil, errCredentialsConflict
182		}
183		for _, cd := range cc.dopts.copts.PerRPCCredentials {
184			if cd.RequireTransportSecurity() {
185				return nil, errTransportCredentialsMissing
186			}
187		}
188	}
189
190	if cc.dopts.defaultServiceConfigRawJSON != nil {
191		scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON)
192		if scpr.Err != nil {
193			return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err)
194		}
195		cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig)
196	}
197	cc.mkp = cc.dopts.copts.KeepaliveParams
198
199	if cc.dopts.copts.Dialer == nil {
200		cc.dopts.copts.Dialer = newProxyDialer(
201			func(ctx context.Context, addr string) (net.Conn, error) {
202				network, addr := parseDialTarget(addr)
203				return (&net.Dialer{}).DialContext(ctx, network, addr)
204			},
205		)
206	}
207
208	if cc.dopts.copts.UserAgent != "" {
209		cc.dopts.copts.UserAgent += " " + grpcUA
210	} else {
211		cc.dopts.copts.UserAgent = grpcUA
212	}
213
214	if cc.dopts.timeout > 0 {
215		var cancel context.CancelFunc
216		ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
217		defer cancel()
218	}
219	defer func() {
220		select {
221		case <-ctx.Done():
222			conn, err = nil, ctx.Err()
223		default:
224		}
225	}()
226
227	scSet := false
228	if cc.dopts.scChan != nil {
229		// Try to get an initial service config.
230		select {
231		case sc, ok := <-cc.dopts.scChan:
232			if ok {
233				cc.sc = &sc
234				scSet = true
235			}
236		default:
237		}
238	}
239	if cc.dopts.bs == nil {
240		cc.dopts.bs = backoff.DefaultExponential
241	}
242	if cc.dopts.resolverBuilder == nil {
243		// Only try to parse target when resolver builder is not already set.
244		cc.parsedTarget = parseTarget(cc.target)
245		grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme)
246		cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
247		if cc.dopts.resolverBuilder == nil {
248			// If resolver builder is still nil, the parsed target's scheme is
249			// not registered. Fallback to default resolver and set Endpoint to
250			// the original target.
251			grpclog.Infof("scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
252			cc.parsedTarget = resolver.Target{
253				Scheme:   resolver.GetDefaultScheme(),
254				Endpoint: target,
255			}
256			cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
257		}
258	} else {
259		cc.parsedTarget = resolver.Target{Endpoint: target}
260	}
261	creds := cc.dopts.copts.TransportCredentials
262	if creds != nil && creds.Info().ServerName != "" {
263		cc.authority = creds.Info().ServerName
264	} else if cc.dopts.insecure && cc.dopts.authority != "" {
265		cc.authority = cc.dopts.authority
266	} else {
267		// Use endpoint from "scheme://authority/endpoint" as the default
268		// authority for ClientConn.
269		cc.authority = cc.parsedTarget.Endpoint
270	}
271
272	if cc.dopts.scChan != nil && !scSet {
273		// Blocking wait for the initial service config.
274		select {
275		case sc, ok := <-cc.dopts.scChan:
276			if ok {
277				cc.sc = &sc
278			}
279		case <-ctx.Done():
280			return nil, ctx.Err()
281		}
282	}
283	if cc.dopts.scChan != nil {
284		go cc.scWatcher()
285	}
286
287	var credsClone credentials.TransportCredentials
288	if creds := cc.dopts.copts.TransportCredentials; creds != nil {
289		credsClone = creds.Clone()
290	}
291	cc.balancerBuildOpts = balancer.BuildOptions{
292		DialCreds:        credsClone,
293		CredsBundle:      cc.dopts.copts.CredsBundle,
294		Dialer:           cc.dopts.copts.Dialer,
295		ChannelzParentID: cc.channelzID,
296		Target:           cc.parsedTarget,
297	}
298
299	// Build the resolver.
300	rWrapper, err := newCCResolverWrapper(cc)
301	if err != nil {
302		return nil, fmt.Errorf("failed to build resolver: %v", err)
303	}
304
305	cc.mu.Lock()
306	cc.resolverWrapper = rWrapper
307	cc.mu.Unlock()
308	// A blocking dial blocks until the clientConn is ready.
309	if cc.dopts.block {
310		for {
311			s := cc.GetState()
312			if s == connectivity.Ready {
313				break
314			} else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
315				if err = cc.blockingpicker.connectionError(); err != nil {
316					terr, ok := err.(interface {
317						Temporary() bool
318					})
319					if ok && !terr.Temporary() {
320						return nil, err
321					}
322				}
323			}
324			if !cc.WaitForStateChange(ctx, s) {
325				// ctx got timeout or canceled.
326				return nil, ctx.Err()
327			}
328		}
329	}
330
331	return cc, nil
332}
333
334// chainUnaryClientInterceptors chains all unary client interceptors into one.
335func chainUnaryClientInterceptors(cc *ClientConn) {
336	interceptors := cc.dopts.chainUnaryInts
337	// Prepend dopts.unaryInt to the chaining interceptors if it exists, since unaryInt will
338	// be executed before any other chained interceptors.
339	if cc.dopts.unaryInt != nil {
340		interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)
341	}
342	var chainedInt UnaryClientInterceptor
343	if len(interceptors) == 0 {
344		chainedInt = nil
345	} else if len(interceptors) == 1 {
346		chainedInt = interceptors[0]
347	} else {
348		chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {
349			return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)
350		}
351	}
352	cc.dopts.unaryInt = chainedInt
353}
354
355// getChainUnaryInvoker recursively generate the chained unary invoker.
356func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, finalInvoker UnaryInvoker) UnaryInvoker {
357	if curr == len(interceptors)-1 {
358		return finalInvoker
359	}
360	return func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
361		return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...)
362	}
363}
364
365// chainStreamClientInterceptors chains all stream client interceptors into one.
366func chainStreamClientInterceptors(cc *ClientConn) {
367	interceptors := cc.dopts.chainStreamInts
368	// Prepend dopts.streamInt to the chaining interceptors if it exists, since streamInt will
369	// be executed before any other chained interceptors.
370	if cc.dopts.streamInt != nil {
371		interceptors = append([]StreamClientInterceptor{cc.dopts.streamInt}, interceptors...)
372	}
373	var chainedInt StreamClientInterceptor
374	if len(interceptors) == 0 {
375		chainedInt = nil
376	} else if len(interceptors) == 1 {
377		chainedInt = interceptors[0]
378	} else {
379		chainedInt = func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error) {
380			return interceptors[0](ctx, desc, cc, method, getChainStreamer(interceptors, 0, streamer), opts...)
381		}
382	}
383	cc.dopts.streamInt = chainedInt
384}
385
386// getChainStreamer recursively generate the chained client stream constructor.
387func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStreamer Streamer) Streamer {
388	if curr == len(interceptors)-1 {
389		return finalStreamer
390	}
391	return func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
392		return interceptors[curr+1](ctx, desc, cc, method, getChainStreamer(interceptors, curr+1, finalStreamer), opts...)
393	}
394}
395
396// connectivityStateManager keeps the connectivity.State of ClientConn.
397// This struct will eventually be exported so the balancers can access it.
398type connectivityStateManager struct {
399	mu         sync.Mutex
400	state      connectivity.State
401	notifyChan chan struct{}
402	channelzID int64
403}
404
405// updateState updates the connectivity.State of ClientConn.
406// If there's a change it notifies goroutines waiting on state change to
407// happen.
408func (csm *connectivityStateManager) updateState(state connectivity.State) {
409	csm.mu.Lock()
410	defer csm.mu.Unlock()
411	if csm.state == connectivity.Shutdown {
412		return
413	}
414	if csm.state == state {
415		return
416	}
417	csm.state = state
418	if channelz.IsOn() {
419		channelz.AddTraceEvent(csm.channelzID, &channelz.TraceEventDesc{
420			Desc:     fmt.Sprintf("Channel Connectivity change to %v", state),
421			Severity: channelz.CtINFO,
422		})
423	}
424	if csm.notifyChan != nil {
425		// There are other goroutines waiting on this channel.
426		close(csm.notifyChan)
427		csm.notifyChan = nil
428	}
429}
430
431func (csm *connectivityStateManager) getState() connectivity.State {
432	csm.mu.Lock()
433	defer csm.mu.Unlock()
434	return csm.state
435}
436
437func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
438	csm.mu.Lock()
439	defer csm.mu.Unlock()
440	if csm.notifyChan == nil {
441		csm.notifyChan = make(chan struct{})
442	}
443	return csm.notifyChan
444}
445
446// ClientConn represents a virtual connection to a conceptual endpoint, to
447// perform RPCs.
448//
449// A ClientConn is free to have zero or more actual connections to the endpoint
450// based on configuration, load, etc. It is also free to determine which actual
451// endpoints to use and may change it every RPC, permitting client-side load
452// balancing.
453//
454// A ClientConn encapsulates a range of functionality including name
455// resolution, TCP connection establishment (with retries and backoff) and TLS
456// handshakes. It also handles errors on established connections by
457// re-resolving the name and reconnecting.
458type ClientConn struct {
459	ctx    context.Context
460	cancel context.CancelFunc
461
462	target       string
463	parsedTarget resolver.Target
464	authority    string
465	dopts        dialOptions
466	csMgr        *connectivityStateManager
467
468	balancerBuildOpts balancer.BuildOptions
469	blockingpicker    *pickerWrapper
470
471	mu              sync.RWMutex
472	resolverWrapper *ccResolverWrapper
473	sc              *ServiceConfig
474	conns           map[*addrConn]struct{}
475	// Keepalive parameter can be updated if a GoAway is received.
476	mkp             keepalive.ClientParameters
477	curBalancerName string
478	balancerWrapper *ccBalancerWrapper
479	retryThrottler  atomic.Value
480
481	firstResolveEvent *grpcsync.Event
482
483	channelzID int64 // channelz unique identification number
484	czData     *channelzData
485}
486
487// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
488// ctx expires. A true value is returned in former case and false in latter.
489// This is an EXPERIMENTAL API.
490func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
491	ch := cc.csMgr.getNotifyChan()
492	if cc.csMgr.getState() != sourceState {
493		return true
494	}
495	select {
496	case <-ctx.Done():
497		return false
498	case <-ch:
499		return true
500	}
501}
502
503// GetState returns the connectivity.State of ClientConn.
504// This is an EXPERIMENTAL API.
505func (cc *ClientConn) GetState() connectivity.State {
506	return cc.csMgr.getState()
507}
508
509func (cc *ClientConn) scWatcher() {
510	for {
511		select {
512		case sc, ok := <-cc.dopts.scChan:
513			if !ok {
514				return
515			}
516			cc.mu.Lock()
517			// TODO: load balance policy runtime change is ignored.
518			// We may revisit this decision in the future.
519			cc.sc = &sc
520			cc.mu.Unlock()
521		case <-cc.ctx.Done():
522			return
523		}
524	}
525}
526
527// waitForResolvedAddrs blocks until the resolver has provided addresses or the
528// context expires.  Returns nil unless the context expires first; otherwise
529// returns a status error based on the context.
530func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
531	// This is on the RPC path, so we use a fast path to avoid the
532	// more-expensive "select" below after the resolver has returned once.
533	if cc.firstResolveEvent.HasFired() {
534		return nil
535	}
536	select {
537	case <-cc.firstResolveEvent.Done():
538		return nil
539	case <-ctx.Done():
540		return status.FromContextError(ctx.Err()).Err()
541	case <-cc.ctx.Done():
542		return ErrClientConnClosing
543	}
544}
545
546var emptyServiceConfig *ServiceConfig
547
548func init() {
549	cfg := parseServiceConfig("{}")
550	if cfg.Err != nil {
551		panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err))
552	}
553	emptyServiceConfig = cfg.Config.(*ServiceConfig)
554}
555
556func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) {
557	if cc.sc != nil {
558		cc.applyServiceConfigAndBalancer(cc.sc, addrs)
559		return
560	}
561	if cc.dopts.defaultServiceConfig != nil {
562		cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, addrs)
563	} else {
564		cc.applyServiceConfigAndBalancer(emptyServiceConfig, addrs)
565	}
566}
567
568func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
569	defer cc.firstResolveEvent.Fire()
570	cc.mu.Lock()
571	// Check if the ClientConn is already closed. Some fields (e.g.
572	// balancerWrapper) are set to nil when closing the ClientConn, and could
573	// cause nil pointer panic if we don't have this check.
574	if cc.conns == nil {
575		cc.mu.Unlock()
576		return nil
577	}
578
579	if err != nil {
580		// May need to apply the initial service config in case the resolver
581		// doesn't support service configs, or doesn't provide a service config
582		// with the new addresses.
583		cc.maybeApplyDefaultServiceConfig(nil)
584
585		if cc.balancerWrapper != nil {
586			cc.balancerWrapper.resolverError(err)
587		}
588
589		// No addresses are valid with err set; return early.
590		cc.mu.Unlock()
591		return balancer.ErrBadResolverState
592	}
593
594	var ret error
595	if cc.dopts.disableServiceConfig || s.ServiceConfig == nil {
596		cc.maybeApplyDefaultServiceConfig(s.Addresses)
597		// TODO: do we need to apply a failing LB policy if there is no
598		// default, per the error handling design?
599	} else {
600		if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok {
601			cc.applyServiceConfigAndBalancer(sc, s.Addresses)
602		} else {
603			ret = balancer.ErrBadResolverState
604			if cc.balancerWrapper == nil {
605				var err error
606				if s.ServiceConfig.Err != nil {
607					err = status.Errorf(codes.Unavailable, "error parsing service config: %v", s.ServiceConfig.Err)
608				} else {
609					err = status.Errorf(codes.Unavailable, "illegal service config type: %T", s.ServiceConfig.Config)
610				}
611				cc.blockingpicker.updatePicker(base.NewErrPicker(err))
612				cc.csMgr.updateState(connectivity.TransientFailure)
613				cc.mu.Unlock()
614				return ret
615			}
616		}
617	}
618
619	var balCfg serviceconfig.LoadBalancingConfig
620	if cc.dopts.balancerBuilder == nil && cc.sc != nil && cc.sc.lbConfig != nil {
621		balCfg = cc.sc.lbConfig.cfg
622	}
623
624	cbn := cc.curBalancerName
625	bw := cc.balancerWrapper
626	cc.mu.Unlock()
627	if cbn != grpclbName {
628		// Filter any grpclb addresses since we don't have the grpclb balancer.
629		for i := 0; i < len(s.Addresses); {
630			if s.Addresses[i].Type == resolver.GRPCLB {
631				copy(s.Addresses[i:], s.Addresses[i+1:])
632				s.Addresses = s.Addresses[:len(s.Addresses)-1]
633				continue
634			}
635			i++
636		}
637	}
638	uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
639	if ret == nil {
640		ret = uccsErr // prefer ErrBadResolver state since any other error is
641		// currently meaningless to the caller.
642	}
643	return ret
644}
645
646// switchBalancer starts the switching from current balancer to the balancer
647// with the given name.
648//
649// It will NOT send the current address list to the new balancer. If needed,
650// caller of this function should send address list to the new balancer after
651// this function returns.
652//
653// Caller must hold cc.mu.
654func (cc *ClientConn) switchBalancer(name string) {
655	if strings.EqualFold(cc.curBalancerName, name) {
656		return
657	}
658
659	grpclog.Infof("ClientConn switching balancer to %q", name)
660	if cc.dopts.balancerBuilder != nil {
661		grpclog.Infoln("ignoring balancer switching: Balancer DialOption used instead")
662		return
663	}
664	if cc.balancerWrapper != nil {
665		cc.balancerWrapper.close()
666	}
667
668	builder := balancer.Get(name)
669	if channelz.IsOn() {
670		if builder == nil {
671			channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
672				Desc:     fmt.Sprintf("Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName),
673				Severity: channelz.CtWarning,
674			})
675		} else {
676			channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
677				Desc:     fmt.Sprintf("Channel switches to new LB policy %q", name),
678				Severity: channelz.CtINFO,
679			})
680		}
681	}
682	if builder == nil {
683		grpclog.Infof("failed to get balancer builder for: %v, using pick_first instead", name)
684		builder = newPickfirstBuilder()
685	}
686
687	cc.curBalancerName = builder.Name()
688	cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
689}
690
691func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {
692	cc.mu.Lock()
693	if cc.conns == nil {
694		cc.mu.Unlock()
695		return
696	}
697	// TODO(bar switching) send updates to all balancer wrappers when balancer
698	// gracefully switching is supported.
699	cc.balancerWrapper.handleSubConnStateChange(sc, s, err)
700	cc.mu.Unlock()
701}
702
703// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
704//
705// Caller needs to make sure len(addrs) > 0.
706func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
707	ac := &addrConn{
708		cc:           cc,
709		addrs:        addrs,
710		scopts:       opts,
711		dopts:        cc.dopts,
712		czData:       new(channelzData),
713		resetBackoff: make(chan struct{}),
714	}
715	ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
716	// Track ac in cc. This needs to be done before any getTransport(...) is called.
717	cc.mu.Lock()
718	if cc.conns == nil {
719		cc.mu.Unlock()
720		return nil, ErrClientConnClosing
721	}
722	if channelz.IsOn() {
723		ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "")
724		channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
725			Desc:     "Subchannel Created",
726			Severity: channelz.CtINFO,
727			Parent: &channelz.TraceEventDesc{
728				Desc:     fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID),
729				Severity: channelz.CtINFO,
730			},
731		})
732	}
733	cc.conns[ac] = struct{}{}
734	cc.mu.Unlock()
735	return ac, nil
736}
737
738// removeAddrConn removes the addrConn in the subConn from clientConn.
739// It also tears down the ac with the given error.
740func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
741	cc.mu.Lock()
742	if cc.conns == nil {
743		cc.mu.Unlock()
744		return
745	}
746	delete(cc.conns, ac)
747	cc.mu.Unlock()
748	ac.tearDown(err)
749}
750
751func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric {
752	return &channelz.ChannelInternalMetric{
753		State:                    cc.GetState(),
754		Target:                   cc.target,
755		CallsStarted:             atomic.LoadInt64(&cc.czData.callsStarted),
756		CallsSucceeded:           atomic.LoadInt64(&cc.czData.callsSucceeded),
757		CallsFailed:              atomic.LoadInt64(&cc.czData.callsFailed),
758		LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)),
759	}
760}
761
762// Target returns the target string of the ClientConn.
763// This is an EXPERIMENTAL API.
764func (cc *ClientConn) Target() string {
765	return cc.target
766}
767
768func (cc *ClientConn) incrCallsStarted() {
769	atomic.AddInt64(&cc.czData.callsStarted, 1)
770	atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano())
771}
772
773func (cc *ClientConn) incrCallsSucceeded() {
774	atomic.AddInt64(&cc.czData.callsSucceeded, 1)
775}
776
777func (cc *ClientConn) incrCallsFailed() {
778	atomic.AddInt64(&cc.czData.callsFailed, 1)
779}
780
781// connect starts creating a transport.
782// It does nothing if the ac is not IDLE.
783// TODO(bar) Move this to the addrConn section.
784func (ac *addrConn) connect() error {
785	ac.mu.Lock()
786	if ac.state == connectivity.Shutdown {
787		ac.mu.Unlock()
788		return errConnClosing
789	}
790	if ac.state != connectivity.Idle {
791		ac.mu.Unlock()
792		return nil
793	}
794	// Update connectivity state within the lock to prevent subsequent or
795	// concurrent calls from resetting the transport more than once.
796	ac.updateConnectivityState(connectivity.Connecting, nil)
797	ac.mu.Unlock()
798
799	// Start a goroutine connecting to the server asynchronously.
800	go ac.resetTransport()
801	return nil
802}
803
804// tryUpdateAddrs tries to update ac.addrs with the new addresses list.
805//
806// If ac is Connecting, it returns false. The caller should tear down the ac and
807// create a new one. Note that the backoff will be reset when this happens.
808//
809// If ac is TransientFailure, it updates ac.addrs and returns true. The updated
810// addresses will be picked up by retry in the next iteration after backoff.
811//
812// If ac is Shutdown or Idle, it updates ac.addrs and returns true.
813//
814// If ac is Ready, it checks whether current connected address of ac is in the
815// new addrs list.
816//  - If true, it updates ac.addrs and returns true. The ac will keep using
817//    the existing connection.
818//  - If false, it does nothing and returns false.
819func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
820	ac.mu.Lock()
821	defer ac.mu.Unlock()
822	grpclog.Infof("addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
823	if ac.state == connectivity.Shutdown ||
824		ac.state == connectivity.TransientFailure ||
825		ac.state == connectivity.Idle {
826		ac.addrs = addrs
827		return true
828	}
829
830	if ac.state == connectivity.Connecting {
831		return false
832	}
833
834	// ac.state is Ready, try to find the connected address.
835	var curAddrFound bool
836	for _, a := range addrs {
837		if reflect.DeepEqual(ac.curAddr, a) {
838			curAddrFound = true
839			break
840		}
841	}
842	grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
843	if curAddrFound {
844		ac.addrs = addrs
845	}
846
847	return curAddrFound
848}
849
850// GetMethodConfig gets the method config of the input method.
851// If there's an exact match for input method (i.e. /service/method), we return
852// the corresponding MethodConfig.
853// If there isn't an exact match for the input method, we look for the default config
854// under the service (i.e /service/). If there is a default MethodConfig for
855// the service, we return it.
856// Otherwise, we return an empty MethodConfig.
857func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
858	// TODO: Avoid the locking here.
859	cc.mu.RLock()
860	defer cc.mu.RUnlock()
861	if cc.sc == nil {
862		return MethodConfig{}
863	}
864	m, ok := cc.sc.Methods[method]
865	if !ok {
866		i := strings.LastIndex(method, "/")
867		m = cc.sc.Methods[method[:i+1]]
868	}
869	return m
870}
871
872func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
873	cc.mu.RLock()
874	defer cc.mu.RUnlock()
875	if cc.sc == nil {
876		return nil
877	}
878	return cc.sc.healthCheckConfig
879}
880
881func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
882	t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
883		Ctx:            ctx,
884		FullMethodName: method,
885	})
886	if err != nil {
887		return nil, nil, toRPCErr(err)
888	}
889	return t, done, nil
890}
891
892func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, addrs []resolver.Address) {
893	if sc == nil {
894		// should never reach here.
895		return
896	}
897	cc.sc = sc
898
899	if cc.sc.retryThrottling != nil {
900		newThrottler := &retryThrottler{
901			tokens: cc.sc.retryThrottling.MaxTokens,
902			max:    cc.sc.retryThrottling.MaxTokens,
903			thresh: cc.sc.retryThrottling.MaxTokens / 2,
904			ratio:  cc.sc.retryThrottling.TokenRatio,
905		}
906		cc.retryThrottler.Store(newThrottler)
907	} else {
908		cc.retryThrottler.Store((*retryThrottler)(nil))
909	}
910
911	if cc.dopts.balancerBuilder == nil {
912		// Only look at balancer types and switch balancer if balancer dial
913		// option is not set.
914		var newBalancerName string
915		if cc.sc != nil && cc.sc.lbConfig != nil {
916			newBalancerName = cc.sc.lbConfig.name
917		} else {
918			var isGRPCLB bool
919			for _, a := range addrs {
920				if a.Type == resolver.GRPCLB {
921					isGRPCLB = true
922					break
923				}
924			}
925			if isGRPCLB {
926				newBalancerName = grpclbName
927			} else if cc.sc != nil && cc.sc.LB != nil {
928				newBalancerName = *cc.sc.LB
929			} else {
930				newBalancerName = PickFirstBalancerName
931			}
932		}
933		cc.switchBalancer(newBalancerName)
934	} else if cc.balancerWrapper == nil {
935		// Balancer dial option was set, and this is the first time handling
936		// resolved addresses. Build a balancer with dopts.balancerBuilder.
937		cc.curBalancerName = cc.dopts.balancerBuilder.Name()
938		cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
939	}
940}
941
942func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) {
943	cc.mu.RLock()
944	r := cc.resolverWrapper
945	cc.mu.RUnlock()
946	if r == nil {
947		return
948	}
949	go r.resolveNow(o)
950}
951
952// ResetConnectBackoff wakes up all subchannels in transient failure and causes
953// them to attempt another connection immediately.  It also resets the backoff
954// times used for subsequent attempts regardless of the current state.
955//
956// In general, this function should not be used.  Typical service or network
957// outages result in a reasonable client reconnection strategy by default.
958// However, if a previously unavailable network becomes available, this may be
959// used to trigger an immediate reconnect.
960//
961// This API is EXPERIMENTAL.
962func (cc *ClientConn) ResetConnectBackoff() {
963	cc.mu.Lock()
964	conns := cc.conns
965	cc.mu.Unlock()
966	for ac := range conns {
967		ac.resetConnectBackoff()
968	}
969}
970
971// Close tears down the ClientConn and all underlying connections.
972func (cc *ClientConn) Close() error {
973	defer cc.cancel()
974
975	cc.mu.Lock()
976	if cc.conns == nil {
977		cc.mu.Unlock()
978		return ErrClientConnClosing
979	}
980	conns := cc.conns
981	cc.conns = nil
982	cc.csMgr.updateState(connectivity.Shutdown)
983
984	rWrapper := cc.resolverWrapper
985	cc.resolverWrapper = nil
986	bWrapper := cc.balancerWrapper
987	cc.balancerWrapper = nil
988	cc.mu.Unlock()
989
990	cc.blockingpicker.close()
991
992	if rWrapper != nil {
993		rWrapper.close()
994	}
995	if bWrapper != nil {
996		bWrapper.close()
997	}
998
999	for ac := range conns {
1000		ac.tearDown(ErrClientConnClosing)
1001	}
1002	if channelz.IsOn() {
1003		ted := &channelz.TraceEventDesc{
1004			Desc:     "Channel Deleted",
1005			Severity: channelz.CtINFO,
1006		}
1007		if cc.dopts.channelzParentID != 0 {
1008			ted.Parent = &channelz.TraceEventDesc{
1009				Desc:     fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID),
1010				Severity: channelz.CtINFO,
1011			}
1012		}
1013		channelz.AddTraceEvent(cc.channelzID, ted)
1014		// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
1015		// the entity being deleted, and thus prevent it from being deleted right away.
1016		channelz.RemoveEntry(cc.channelzID)
1017	}
1018	return nil
1019}
1020
1021// addrConn is a network connection to a given address.
1022type addrConn struct {
1023	ctx    context.Context
1024	cancel context.CancelFunc
1025
1026	cc     *ClientConn
1027	dopts  dialOptions
1028	acbw   balancer.SubConn
1029	scopts balancer.NewSubConnOptions
1030
1031	// transport is set when there's a viable transport (note: ac state may not be READY as LB channel
1032	// health checking may require server to report healthy to set ac to READY), and is reset
1033	// to nil when the current transport should no longer be used to create a stream (e.g. after GoAway
1034	// is received, transport is closed, ac has been torn down).
1035	transport transport.ClientTransport // The current transport.
1036
1037	mu      sync.Mutex
1038	curAddr resolver.Address   // The current address.
1039	addrs   []resolver.Address // All addresses that the resolver resolved to.
1040
1041	// Use updateConnectivityState for updating addrConn's connectivity state.
1042	state connectivity.State
1043
1044	backoffIdx   int // Needs to be stateful for resetConnectBackoff.
1045	resetBackoff chan struct{}
1046
1047	channelzID int64 // channelz unique identification number.
1048	czData     *channelzData
1049}
1050
1051// Note: this requires a lock on ac.mu.
1052func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) {
1053	if ac.state == s {
1054		return
1055	}
1056
1057	updateMsg := fmt.Sprintf("Subchannel Connectivity change to %v", s)
1058	ac.state = s
1059	if channelz.IsOn() {
1060		channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1061			Desc:     updateMsg,
1062			Severity: channelz.CtINFO,
1063		})
1064	}
1065	ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr)
1066}
1067
1068// adjustParams updates parameters used to create transports upon
1069// receiving a GoAway.
1070func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
1071	switch r {
1072	case transport.GoAwayTooManyPings:
1073		v := 2 * ac.dopts.copts.KeepaliveParams.Time
1074		ac.cc.mu.Lock()
1075		if v > ac.cc.mkp.Time {
1076			ac.cc.mkp.Time = v
1077		}
1078		ac.cc.mu.Unlock()
1079	}
1080}
1081
1082func (ac *addrConn) resetTransport() {
1083	for i := 0; ; i++ {
1084		if i > 0 {
1085			ac.cc.resolveNow(resolver.ResolveNowOptions{})
1086		}
1087
1088		ac.mu.Lock()
1089		if ac.state == connectivity.Shutdown {
1090			ac.mu.Unlock()
1091			return
1092		}
1093
1094		addrs := ac.addrs
1095		backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
1096		// This will be the duration that dial gets to finish.
1097		dialDuration := minConnectTimeout
1098		if ac.dopts.minConnectTimeout != nil {
1099			dialDuration = ac.dopts.minConnectTimeout()
1100		}
1101
1102		if dialDuration < backoffFor {
1103			// Give dial more time as we keep failing to connect.
1104			dialDuration = backoffFor
1105		}
1106		// We can potentially spend all the time trying the first address, and
1107		// if the server accepts the connection and then hangs, the following
1108		// addresses will never be tried.
1109		//
1110		// The spec doesn't mention what should be done for multiple addresses.
1111		// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm
1112		connectDeadline := time.Now().Add(dialDuration)
1113
1114		ac.updateConnectivityState(connectivity.Connecting, nil)
1115		ac.transport = nil
1116		ac.mu.Unlock()
1117
1118		newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline)
1119		if err != nil {
1120			// After exhausting all addresses, the addrConn enters
1121			// TRANSIENT_FAILURE.
1122			ac.mu.Lock()
1123			if ac.state == connectivity.Shutdown {
1124				ac.mu.Unlock()
1125				return
1126			}
1127			ac.updateConnectivityState(connectivity.TransientFailure, err)
1128
1129			// Backoff.
1130			b := ac.resetBackoff
1131			ac.mu.Unlock()
1132
1133			timer := time.NewTimer(backoffFor)
1134			select {
1135			case <-timer.C:
1136				ac.mu.Lock()
1137				ac.backoffIdx++
1138				ac.mu.Unlock()
1139			case <-b:
1140				timer.Stop()
1141			case <-ac.ctx.Done():
1142				timer.Stop()
1143				return
1144			}
1145			continue
1146		}
1147
1148		ac.mu.Lock()
1149		if ac.state == connectivity.Shutdown {
1150			ac.mu.Unlock()
1151			newTr.Close()
1152			return
1153		}
1154		ac.curAddr = addr
1155		ac.transport = newTr
1156		ac.backoffIdx = 0
1157
1158		hctx, hcancel := context.WithCancel(ac.ctx)
1159		ac.startHealthCheck(hctx)
1160		ac.mu.Unlock()
1161
1162		// Block until the created transport is down. And when this happens,
1163		// we restart from the top of the addr list.
1164		<-reconnect.Done()
1165		hcancel()
1166		// restart connecting - the top of the loop will set state to
1167		// CONNECTING.  This is against the current connectivity semantics doc,
1168		// however it allows for graceful behavior for RPCs not yet dispatched
1169		// - unfortunate timing would otherwise lead to the RPC failing even
1170		// though the TRANSIENT_FAILURE state (called for by the doc) would be
1171		// instantaneous.
1172		//
1173		// Ideally we should transition to Idle here and block until there is
1174		// RPC activity that leads to the balancer requesting a reconnect of
1175		// the associated SubConn.
1176	}
1177}
1178
1179// tryAllAddrs tries to creates a connection to the addresses, and stop when at the
1180// first successful one. It returns the transport, the address and a Event in
1181// the successful case. The Event fires when the returned transport disconnects.
1182func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) (transport.ClientTransport, resolver.Address, *grpcsync.Event, error) {
1183	var firstConnErr error
1184	for _, addr := range addrs {
1185		ac.mu.Lock()
1186		if ac.state == connectivity.Shutdown {
1187			ac.mu.Unlock()
1188			return nil, resolver.Address{}, nil, errConnClosing
1189		}
1190
1191		ac.cc.mu.RLock()
1192		ac.dopts.copts.KeepaliveParams = ac.cc.mkp
1193		ac.cc.mu.RUnlock()
1194
1195		copts := ac.dopts.copts
1196		if ac.scopts.CredsBundle != nil {
1197			copts.CredsBundle = ac.scopts.CredsBundle
1198		}
1199		ac.mu.Unlock()
1200
1201		if channelz.IsOn() {
1202			channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1203				Desc:     fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr),
1204				Severity: channelz.CtINFO,
1205			})
1206		}
1207
1208		newTr, reconnect, err := ac.createTransport(addr, copts, connectDeadline)
1209		if err == nil {
1210			return newTr, addr, reconnect, nil
1211		}
1212		if firstConnErr == nil {
1213			firstConnErr = err
1214		}
1215		ac.cc.blockingpicker.updateConnectionError(err)
1216	}
1217
1218	// Couldn't connect to any address.
1219	return nil, resolver.Address{}, nil, firstConnErr
1220}
1221
1222// createTransport creates a connection to addr. It returns the transport and a
1223// Event in the successful case. The Event fires when the returned transport
1224// disconnects.
1225func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) (transport.ClientTransport, *grpcsync.Event, error) {
1226	prefaceReceived := make(chan struct{})
1227	onCloseCalled := make(chan struct{})
1228	reconnect := grpcsync.NewEvent()
1229
1230	authority := ac.cc.authority
1231	// addr.ServerName takes precedent over ClientConn authority, if present.
1232	if addr.ServerName != "" {
1233		authority = addr.ServerName
1234	}
1235
1236	target := transport.TargetInfo{
1237		Addr:      addr.Addr,
1238		Metadata:  addr.Metadata,
1239		Authority: authority,
1240	}
1241
1242	once := sync.Once{}
1243	onGoAway := func(r transport.GoAwayReason) {
1244		ac.mu.Lock()
1245		ac.adjustParams(r)
1246		once.Do(func() {
1247			if ac.state == connectivity.Ready {
1248				// Prevent this SubConn from being used for new RPCs by setting its
1249				// state to Connecting.
1250				//
1251				// TODO: this should be Idle when grpc-go properly supports it.
1252				ac.updateConnectivityState(connectivity.Connecting, nil)
1253			}
1254		})
1255		ac.mu.Unlock()
1256		reconnect.Fire()
1257	}
1258
1259	onClose := func() {
1260		ac.mu.Lock()
1261		once.Do(func() {
1262			if ac.state == connectivity.Ready {
1263				// Prevent this SubConn from being used for new RPCs by setting its
1264				// state to Connecting.
1265				//
1266				// TODO: this should be Idle when grpc-go properly supports it.
1267				ac.updateConnectivityState(connectivity.Connecting, nil)
1268			}
1269		})
1270		ac.mu.Unlock()
1271		close(onCloseCalled)
1272		reconnect.Fire()
1273	}
1274
1275	onPrefaceReceipt := func() {
1276		close(prefaceReceived)
1277	}
1278
1279	connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
1280	defer cancel()
1281	if channelz.IsOn() {
1282		copts.ChannelzParentID = ac.channelzID
1283	}
1284
1285	newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)
1286	if err != nil {
1287		// newTr is either nil, or closed.
1288		grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err)
1289		return nil, nil, err
1290	}
1291
1292	select {
1293	case <-time.After(time.Until(connectDeadline)):
1294		// We didn't get the preface in time.
1295		newTr.Close()
1296		grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
1297		return nil, nil, errors.New("timed out waiting for server handshake")
1298	case <-prefaceReceived:
1299		// We got the preface - huzzah! things are good.
1300	case <-onCloseCalled:
1301		// The transport has already closed - noop.
1302		return nil, nil, errors.New("connection closed")
1303		// TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.
1304	}
1305	return newTr, reconnect, nil
1306}
1307
1308// startHealthCheck starts the health checking stream (RPC) to watch the health
1309// stats of this connection if health checking is requested and configured.
1310//
1311// LB channel health checking is enabled when all requirements below are met:
1312// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption
1313// 2. internal.HealthCheckFunc is set by importing the grpc/healthcheck package
1314// 3. a service config with non-empty healthCheckConfig field is provided
1315// 4. the load balancer requests it
1316//
1317// It sets addrConn to READY if the health checking stream is not started.
1318//
1319// Caller must hold ac.mu.
1320func (ac *addrConn) startHealthCheck(ctx context.Context) {
1321	var healthcheckManagingState bool
1322	defer func() {
1323		if !healthcheckManagingState {
1324			ac.updateConnectivityState(connectivity.Ready, nil)
1325		}
1326	}()
1327
1328	if ac.cc.dopts.disableHealthCheck {
1329		return
1330	}
1331	healthCheckConfig := ac.cc.healthCheckConfig()
1332	if healthCheckConfig == nil {
1333		return
1334	}
1335	if !ac.scopts.HealthCheckEnabled {
1336		return
1337	}
1338	healthCheckFunc := ac.cc.dopts.healthCheckFunc
1339	if healthCheckFunc == nil {
1340		// The health package is not imported to set health check function.
1341		//
1342		// TODO: add a link to the health check doc in the error message.
1343		grpclog.Error("Health check is requested but health check function is not set.")
1344		return
1345	}
1346
1347	healthcheckManagingState = true
1348
1349	// Set up the health check helper functions.
1350	currentTr := ac.transport
1351	newStream := func(method string) (interface{}, error) {
1352		ac.mu.Lock()
1353		if ac.transport != currentTr {
1354			ac.mu.Unlock()
1355			return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
1356		}
1357		ac.mu.Unlock()
1358		return newNonRetryClientStream(ctx, &StreamDesc{ServerStreams: true}, method, currentTr, ac)
1359	}
1360	setConnectivityState := func(s connectivity.State, lastErr error) {
1361		ac.mu.Lock()
1362		defer ac.mu.Unlock()
1363		if ac.transport != currentTr {
1364			return
1365		}
1366		ac.updateConnectivityState(s, lastErr)
1367	}
1368	// Start the health checking stream.
1369	go func() {
1370		err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName)
1371		if err != nil {
1372			if status.Code(err) == codes.Unimplemented {
1373				if channelz.IsOn() {
1374					channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1375						Desc:     "Subchannel health check is unimplemented at server side, thus health check is disabled",
1376						Severity: channelz.CtError,
1377					})
1378				}
1379				grpclog.Error("Subchannel health check is unimplemented at server side, thus health check is disabled")
1380			} else {
1381				grpclog.Errorf("HealthCheckFunc exits with unexpected error %v", err)
1382			}
1383		}
1384	}()
1385}
1386
1387func (ac *addrConn) resetConnectBackoff() {
1388	ac.mu.Lock()
1389	close(ac.resetBackoff)
1390	ac.backoffIdx = 0
1391	ac.resetBackoff = make(chan struct{})
1392	ac.mu.Unlock()
1393}
1394
1395// getReadyTransport returns the transport if ac's state is READY.
1396// Otherwise it returns nil, false.
1397// If ac's state is IDLE, it will trigger ac to connect.
1398func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
1399	ac.mu.Lock()
1400	if ac.state == connectivity.Ready && ac.transport != nil {
1401		t := ac.transport
1402		ac.mu.Unlock()
1403		return t, true
1404	}
1405	var idle bool
1406	if ac.state == connectivity.Idle {
1407		idle = true
1408	}
1409	ac.mu.Unlock()
1410	// Trigger idle ac to connect.
1411	if idle {
1412		ac.connect()
1413	}
1414	return nil, false
1415}
1416
1417// tearDown starts to tear down the addrConn.
1418// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
1419// some edge cases (e.g., the caller opens and closes many addrConn's in a
1420// tight loop.
1421// tearDown doesn't remove ac from ac.cc.conns.
1422func (ac *addrConn) tearDown(err error) {
1423	ac.mu.Lock()
1424	if ac.state == connectivity.Shutdown {
1425		ac.mu.Unlock()
1426		return
1427	}
1428	curTr := ac.transport
1429	ac.transport = nil
1430	// We have to set the state to Shutdown before anything else to prevent races
1431	// between setting the state and logic that waits on context cancellation / etc.
1432	ac.updateConnectivityState(connectivity.Shutdown, nil)
1433	ac.cancel()
1434	ac.curAddr = resolver.Address{}
1435	if err == errConnDrain && curTr != nil {
1436		// GracefulClose(...) may be executed multiple times when
1437		// i) receiving multiple GoAway frames from the server; or
1438		// ii) there are concurrent name resolver/Balancer triggered
1439		// address removal and GoAway.
1440		// We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu.
1441		ac.mu.Unlock()
1442		curTr.GracefulClose()
1443		ac.mu.Lock()
1444	}
1445	if channelz.IsOn() {
1446		channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1447			Desc:     "Subchannel Deleted",
1448			Severity: channelz.CtINFO,
1449			Parent: &channelz.TraceEventDesc{
1450				Desc:     fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID),
1451				Severity: channelz.CtINFO,
1452			},
1453		})
1454		// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
1455		// the entity being deleted, and thus prevent it from being deleted right away.
1456		channelz.RemoveEntry(ac.channelzID)
1457	}
1458	ac.mu.Unlock()
1459}
1460
1461func (ac *addrConn) getState() connectivity.State {
1462	ac.mu.Lock()
1463	defer ac.mu.Unlock()
1464	return ac.state
1465}
1466
1467func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric {
1468	ac.mu.Lock()
1469	addr := ac.curAddr.Addr
1470	ac.mu.Unlock()
1471	return &channelz.ChannelInternalMetric{
1472		State:                    ac.getState(),
1473		Target:                   addr,
1474		CallsStarted:             atomic.LoadInt64(&ac.czData.callsStarted),
1475		CallsSucceeded:           atomic.LoadInt64(&ac.czData.callsSucceeded),
1476		CallsFailed:              atomic.LoadInt64(&ac.czData.callsFailed),
1477		LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)),
1478	}
1479}
1480
1481func (ac *addrConn) incrCallsStarted() {
1482	atomic.AddInt64(&ac.czData.callsStarted, 1)
1483	atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano())
1484}
1485
1486func (ac *addrConn) incrCallsSucceeded() {
1487	atomic.AddInt64(&ac.czData.callsSucceeded, 1)
1488}
1489
1490func (ac *addrConn) incrCallsFailed() {
1491	atomic.AddInt64(&ac.czData.callsFailed, 1)
1492}
1493
1494type retryThrottler struct {
1495	max    float64
1496	thresh float64
1497	ratio  float64
1498
1499	mu     sync.Mutex
1500	tokens float64 // TODO(dfawley): replace with atomic and remove lock.
1501}
1502
1503// throttle subtracts a retry token from the pool and returns whether a retry
1504// should be throttled (disallowed) based upon the retry throttling policy in
1505// the service config.
1506func (rt *retryThrottler) throttle() bool {
1507	if rt == nil {
1508		return false
1509	}
1510	rt.mu.Lock()
1511	defer rt.mu.Unlock()
1512	rt.tokens--
1513	if rt.tokens < 0 {
1514		rt.tokens = 0
1515	}
1516	return rt.tokens <= rt.thresh
1517}
1518
1519func (rt *retryThrottler) successfulRPC() {
1520	if rt == nil {
1521		return
1522	}
1523	rt.mu.Lock()
1524	defer rt.mu.Unlock()
1525	rt.tokens += rt.ratio
1526	if rt.tokens > rt.max {
1527		rt.tokens = rt.max
1528	}
1529}
1530
1531type channelzChannel struct {
1532	cc *ClientConn
1533}
1534
1535func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric {
1536	return c.cc.channelzMetric()
1537}
1538
1539// ErrClientConnTimeout indicates that the ClientConn cannot establish the
1540// underlying connections within the specified timeout.
1541//
1542// Deprecated: This error is never returned by grpc and should not be
1543// referenced by users.
1544var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
1545