1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22	"context"
23	"errors"
24	"fmt"
25	"math"
26	"net"
27	"reflect"
28	"strings"
29	"sync"
30	"sync/atomic"
31	"time"
32
33	"google.golang.org/grpc/balancer"
34	_ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
35	"google.golang.org/grpc/codes"
36	"google.golang.org/grpc/connectivity"
37	"google.golang.org/grpc/credentials"
38	"google.golang.org/grpc/grpclog"
39	"google.golang.org/grpc/internal/backoff"
40	"google.golang.org/grpc/internal/channelz"
41	"google.golang.org/grpc/internal/envconfig"
42	"google.golang.org/grpc/internal/grpcsync"
43	"google.golang.org/grpc/internal/transport"
44	"google.golang.org/grpc/keepalive"
45	"google.golang.org/grpc/resolver"
46	_ "google.golang.org/grpc/resolver/dns"         // To register dns resolver.
47	_ "google.golang.org/grpc/resolver/passthrough" // To register passthrough resolver.
48	"google.golang.org/grpc/status"
49)
50
51const (
52	// minimum time to give a connection to complete
53	minConnectTimeout = 20 * time.Second
54	// must match grpclbName in grpclb/grpclb.go
55	grpclbName = "grpclb"
56)
57
58var (
59	// ErrClientConnClosing indicates that the operation is illegal because
60	// the ClientConn is closing.
61	//
62	// Deprecated: this error should not be relied upon by users; use the status
63	// code of Canceled instead.
64	ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing")
65	// errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
66	errConnDrain = errors.New("grpc: the connection is drained")
67	// errConnClosing indicates that the connection is closing.
68	errConnClosing = errors.New("grpc: the connection is closing")
69	// errBalancerClosed indicates that the balancer is closed.
70	errBalancerClosed = errors.New("grpc: balancer is closed")
71	// invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default
72	// service config.
73	invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid"
74)
75
76// The following errors are returned from Dial and DialContext
77var (
78	// errNoTransportSecurity indicates that there is no transport security
79	// being set for ClientConn. Users should either set one or explicitly
80	// call WithInsecure DialOption to disable security.
81	errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
82	// errTransportCredsAndBundle indicates that creds bundle is used together
83	// with other individual Transport Credentials.
84	errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials")
85	// errTransportCredentialsMissing indicates that users want to transmit security
86	// information (e.g., OAuth2 token) which requires secure connection on an insecure
87	// connection.
88	errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
89	// errCredentialsConflict indicates that grpc.WithTransportCredentials()
90	// and grpc.WithInsecure() are both called for a connection.
91	errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
92)
93
94const (
95	defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
96	defaultClientMaxSendMessageSize    = math.MaxInt32
97	// http2IOBufSize specifies the buffer size for sending frames.
98	defaultWriteBufSize = 32 * 1024
99	defaultReadBufSize  = 32 * 1024
100)
101
102// Dial creates a client connection to the given target.
103func Dial(target string, opts ...DialOption) (*ClientConn, error) {
104	return DialContext(context.Background(), target, opts...)
105}
106
107// DialContext creates a client connection to the given target. By default, it's
108// a non-blocking dial (the function won't wait for connections to be
109// established, and connecting happens in the background). To make it a blocking
110// dial, use WithBlock() dial option.
111//
112// In the non-blocking case, the ctx does not act against the connection. It
113// only controls the setup steps.
114//
115// In the blocking case, ctx can be used to cancel or expire the pending
116// connection. Once this function returns, the cancellation and expiration of
117// ctx will be noop. Users should call ClientConn.Close to terminate all the
118// pending operations after this function returns.
119//
120// The target name syntax is defined in
121// https://github.com/grpc/grpc/blob/master/doc/naming.md.
122// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
123func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
124	cc := &ClientConn{
125		target:            target,
126		csMgr:             &connectivityStateManager{},
127		conns:             make(map[*addrConn]struct{}),
128		dopts:             defaultDialOptions(),
129		blockingpicker:    newPickerWrapper(),
130		czData:            new(channelzData),
131		firstResolveEvent: grpcsync.NewEvent(),
132	}
133	cc.retryThrottler.Store((*retryThrottler)(nil))
134	cc.ctx, cc.cancel = context.WithCancel(context.Background())
135
136	for _, opt := range opts {
137		opt.apply(&cc.dopts)
138	}
139
140	chainUnaryClientInterceptors(cc)
141	chainStreamClientInterceptors(cc)
142
143	defer func() {
144		if err != nil {
145			cc.Close()
146		}
147	}()
148
149	if channelz.IsOn() {
150		if cc.dopts.channelzParentID != 0 {
151			cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
152			channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
153				Desc:     "Channel Created",
154				Severity: channelz.CtINFO,
155				Parent: &channelz.TraceEventDesc{
156					Desc:     fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID),
157					Severity: channelz.CtINFO,
158				},
159			})
160		} else {
161			cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)
162			channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
163				Desc:     "Channel Created",
164				Severity: channelz.CtINFO,
165			})
166		}
167		cc.csMgr.channelzID = cc.channelzID
168	}
169
170	if !cc.dopts.insecure {
171		if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
172			return nil, errNoTransportSecurity
173		}
174		if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
175			return nil, errTransportCredsAndBundle
176		}
177	} else {
178		if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil {
179			return nil, errCredentialsConflict
180		}
181		for _, cd := range cc.dopts.copts.PerRPCCredentials {
182			if cd.RequireTransportSecurity() {
183				return nil, errTransportCredentialsMissing
184			}
185		}
186	}
187
188	if cc.dopts.defaultServiceConfigRawJSON != nil {
189		sc, err := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON)
190		if err != nil {
191			return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, err)
192		}
193		cc.dopts.defaultServiceConfig = sc
194	}
195	cc.mkp = cc.dopts.copts.KeepaliveParams
196
197	if cc.dopts.copts.Dialer == nil {
198		cc.dopts.copts.Dialer = newProxyDialer(
199			func(ctx context.Context, addr string) (net.Conn, error) {
200				network, addr := parseDialTarget(addr)
201				return (&net.Dialer{}).DialContext(ctx, network, addr)
202			},
203		)
204	}
205
206	if cc.dopts.copts.UserAgent != "" {
207		cc.dopts.copts.UserAgent += " " + grpcUA
208	} else {
209		cc.dopts.copts.UserAgent = grpcUA
210	}
211
212	if cc.dopts.timeout > 0 {
213		var cancel context.CancelFunc
214		ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
215		defer cancel()
216	}
217	defer func() {
218		select {
219		case <-ctx.Done():
220			conn, err = nil, ctx.Err()
221		default:
222		}
223	}()
224
225	scSet := false
226	if cc.dopts.scChan != nil {
227		// Try to get an initial service config.
228		select {
229		case sc, ok := <-cc.dopts.scChan:
230			if ok {
231				cc.sc = &sc
232				scSet = true
233			}
234		default:
235		}
236	}
237	if cc.dopts.bs == nil {
238		cc.dopts.bs = backoff.Exponential{
239			MaxDelay: DefaultBackoffConfig.MaxDelay,
240		}
241	}
242	if cc.dopts.resolverBuilder == nil {
243		// Only try to parse target when resolver builder is not already set.
244		cc.parsedTarget = parseTarget(cc.target)
245		grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme)
246		cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
247		if cc.dopts.resolverBuilder == nil {
248			// If resolver builder is still nil, the parsed target's scheme is
249			// not registered. Fallback to default resolver and set Endpoint to
250			// the original target.
251			grpclog.Infof("scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
252			cc.parsedTarget = resolver.Target{
253				Scheme:   resolver.GetDefaultScheme(),
254				Endpoint: target,
255			}
256			cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
257		}
258	} else {
259		cc.parsedTarget = resolver.Target{Endpoint: target}
260	}
261	creds := cc.dopts.copts.TransportCredentials
262	if creds != nil && creds.Info().ServerName != "" {
263		cc.authority = creds.Info().ServerName
264	} else if cc.dopts.insecure && cc.dopts.authority != "" {
265		cc.authority = cc.dopts.authority
266	} else {
267		// Use endpoint from "scheme://authority/endpoint" as the default
268		// authority for ClientConn.
269		cc.authority = cc.parsedTarget.Endpoint
270	}
271
272	if cc.dopts.scChan != nil && !scSet {
273		// Blocking wait for the initial service config.
274		select {
275		case sc, ok := <-cc.dopts.scChan:
276			if ok {
277				cc.sc = &sc
278			}
279		case <-ctx.Done():
280			return nil, ctx.Err()
281		}
282	}
283	if cc.dopts.scChan != nil {
284		go cc.scWatcher()
285	}
286
287	var credsClone credentials.TransportCredentials
288	if creds := cc.dopts.copts.TransportCredentials; creds != nil {
289		credsClone = creds.Clone()
290	}
291	cc.balancerBuildOpts = balancer.BuildOptions{
292		DialCreds:        credsClone,
293		CredsBundle:      cc.dopts.copts.CredsBundle,
294		Dialer:           cc.dopts.copts.Dialer,
295		ChannelzParentID: cc.channelzID,
296		Target:           cc.parsedTarget,
297	}
298
299	// Build the resolver.
300	rWrapper, err := newCCResolverWrapper(cc)
301	if err != nil {
302		return nil, fmt.Errorf("failed to build resolver: %v", err)
303	}
304
305	cc.mu.Lock()
306	cc.resolverWrapper = rWrapper
307	cc.mu.Unlock()
308	// A blocking dial blocks until the clientConn is ready.
309	if cc.dopts.block {
310		for {
311			s := cc.GetState()
312			if s == connectivity.Ready {
313				break
314			} else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
315				if err = cc.blockingpicker.connectionError(); err != nil {
316					terr, ok := err.(interface {
317						Temporary() bool
318					})
319					if ok && !terr.Temporary() {
320						return nil, err
321					}
322				}
323			}
324			if !cc.WaitForStateChange(ctx, s) {
325				// ctx got timeout or canceled.
326				return nil, ctx.Err()
327			}
328		}
329	}
330
331	return cc, nil
332}
333
334// chainUnaryClientInterceptors chains all unary client interceptors into one.
335func chainUnaryClientInterceptors(cc *ClientConn) {
336	interceptors := cc.dopts.chainUnaryInts
337	// Prepend dopts.unaryInt to the chaining interceptors if it exists, since unaryInt will
338	// be executed before any other chained interceptors.
339	if cc.dopts.unaryInt != nil {
340		interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)
341	}
342	var chainedInt UnaryClientInterceptor
343	if len(interceptors) == 0 {
344		chainedInt = nil
345	} else if len(interceptors) == 1 {
346		chainedInt = interceptors[0]
347	} else {
348		chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {
349			return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)
350		}
351	}
352	cc.dopts.unaryInt = chainedInt
353}
354
355// getChainUnaryInvoker recursively generate the chained unary invoker.
356func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, finalInvoker UnaryInvoker) UnaryInvoker {
357	if curr == len(interceptors)-1 {
358		return finalInvoker
359	}
360	return func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
361		return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...)
362	}
363}
364
365// chainStreamClientInterceptors chains all stream client interceptors into one.
366func chainStreamClientInterceptors(cc *ClientConn) {
367	interceptors := cc.dopts.chainStreamInts
368	// Prepend dopts.streamInt to the chaining interceptors if it exists, since streamInt will
369	// be executed before any other chained interceptors.
370	if cc.dopts.streamInt != nil {
371		interceptors = append([]StreamClientInterceptor{cc.dopts.streamInt}, interceptors...)
372	}
373	var chainedInt StreamClientInterceptor
374	if len(interceptors) == 0 {
375		chainedInt = nil
376	} else if len(interceptors) == 1 {
377		chainedInt = interceptors[0]
378	} else {
379		chainedInt = func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error) {
380			return interceptors[0](ctx, desc, cc, method, getChainStreamer(interceptors, 0, streamer), opts...)
381		}
382	}
383	cc.dopts.streamInt = chainedInt
384}
385
386// getChainStreamer recursively generate the chained client stream constructor.
387func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStreamer Streamer) Streamer {
388	if curr == len(interceptors)-1 {
389		return finalStreamer
390	}
391	return func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
392		return interceptors[curr+1](ctx, desc, cc, method, getChainStreamer(interceptors, curr+1, finalStreamer), opts...)
393	}
394}
395
396// connectivityStateManager keeps the connectivity.State of ClientConn.
397// This struct will eventually be exported so the balancers can access it.
398type connectivityStateManager struct {
399	mu         sync.Mutex
400	state      connectivity.State
401	notifyChan chan struct{}
402	channelzID int64
403}
404
405// updateState updates the connectivity.State of ClientConn.
406// If there's a change it notifies goroutines waiting on state change to
407// happen.
408func (csm *connectivityStateManager) updateState(state connectivity.State) {
409	csm.mu.Lock()
410	defer csm.mu.Unlock()
411	if csm.state == connectivity.Shutdown {
412		return
413	}
414	if csm.state == state {
415		return
416	}
417	csm.state = state
418	if channelz.IsOn() {
419		channelz.AddTraceEvent(csm.channelzID, &channelz.TraceEventDesc{
420			Desc:     fmt.Sprintf("Channel Connectivity change to %v", state),
421			Severity: channelz.CtINFO,
422		})
423	}
424	if csm.notifyChan != nil {
425		// There are other goroutines waiting on this channel.
426		close(csm.notifyChan)
427		csm.notifyChan = nil
428	}
429}
430
431func (csm *connectivityStateManager) getState() connectivity.State {
432	csm.mu.Lock()
433	defer csm.mu.Unlock()
434	return csm.state
435}
436
437func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
438	csm.mu.Lock()
439	defer csm.mu.Unlock()
440	if csm.notifyChan == nil {
441		csm.notifyChan = make(chan struct{})
442	}
443	return csm.notifyChan
444}
445
446// ClientConn represents a client connection to an RPC server.
447type ClientConn struct {
448	ctx    context.Context
449	cancel context.CancelFunc
450
451	target       string
452	parsedTarget resolver.Target
453	authority    string
454	dopts        dialOptions
455	csMgr        *connectivityStateManager
456
457	balancerBuildOpts balancer.BuildOptions
458	blockingpicker    *pickerWrapper
459
460	mu              sync.RWMutex
461	resolverWrapper *ccResolverWrapper
462	sc              *ServiceConfig
463	conns           map[*addrConn]struct{}
464	// Keepalive parameter can be updated if a GoAway is received.
465	mkp             keepalive.ClientParameters
466	curBalancerName string
467	balancerWrapper *ccBalancerWrapper
468	retryThrottler  atomic.Value
469
470	firstResolveEvent *grpcsync.Event
471
472	channelzID int64 // channelz unique identification number
473	czData     *channelzData
474}
475
476// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
477// ctx expires. A true value is returned in former case and false in latter.
478// This is an EXPERIMENTAL API.
479func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
480	ch := cc.csMgr.getNotifyChan()
481	if cc.csMgr.getState() != sourceState {
482		return true
483	}
484	select {
485	case <-ctx.Done():
486		return false
487	case <-ch:
488		return true
489	}
490}
491
492// GetState returns the connectivity.State of ClientConn.
493// This is an EXPERIMENTAL API.
494func (cc *ClientConn) GetState() connectivity.State {
495	return cc.csMgr.getState()
496}
497
498func (cc *ClientConn) scWatcher() {
499	for {
500		select {
501		case sc, ok := <-cc.dopts.scChan:
502			if !ok {
503				return
504			}
505			cc.mu.Lock()
506			// TODO: load balance policy runtime change is ignored.
507			// We may revisit this decision in the future.
508			cc.sc = &sc
509			cc.mu.Unlock()
510		case <-cc.ctx.Done():
511			return
512		}
513	}
514}
515
516// waitForResolvedAddrs blocks until the resolver has provided addresses or the
517// context expires.  Returns nil unless the context expires first; otherwise
518// returns a status error based on the context.
519func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
520	// This is on the RPC path, so we use a fast path to avoid the
521	// more-expensive "select" below after the resolver has returned once.
522	if cc.firstResolveEvent.HasFired() {
523		return nil
524	}
525	select {
526	case <-cc.firstResolveEvent.Done():
527		return nil
528	case <-ctx.Done():
529		return status.FromContextError(ctx.Err()).Err()
530	case <-cc.ctx.Done():
531		return ErrClientConnClosing
532	}
533}
534
535// gRPC should resort to default service config when:
536// * resolver service config is disabled
537// * or, resolver does not return a service config or returns an invalid one.
538func (cc *ClientConn) fallbackToDefaultServiceConfig(sc string) bool {
539	if cc.dopts.disableServiceConfig {
540		return true
541	}
542	// The logic below is temporary, will be removed once we change the resolver.State ServiceConfig field type.
543	// Right now, we assume that empty service config string means resolver does not return a config.
544	if sc == "" {
545		return true
546	}
547	// TODO: the logic below is temporary. Once we finish the logic to validate service config
548	// in resolver, we will replace the logic below.
549	_, err := parseServiceConfig(sc)
550	return err != nil
551}
552
553func (cc *ClientConn) updateResolverState(s resolver.State) error {
554	cc.mu.Lock()
555	defer cc.mu.Unlock()
556	// Check if the ClientConn is already closed. Some fields (e.g.
557	// balancerWrapper) are set to nil when closing the ClientConn, and could
558	// cause nil pointer panic if we don't have this check.
559	if cc.conns == nil {
560		return nil
561	}
562
563	if cc.fallbackToDefaultServiceConfig(s.ServiceConfig) {
564		if cc.dopts.defaultServiceConfig != nil && cc.sc == nil {
565			cc.applyServiceConfig(cc.dopts.defaultServiceConfig)
566		}
567	} else {
568		// TODO: the parsing logic below will be moved inside resolver.
569		sc, err := parseServiceConfig(s.ServiceConfig)
570		if err != nil {
571			return err
572		}
573		if cc.sc == nil || cc.sc.rawJSONString != s.ServiceConfig {
574			cc.applyServiceConfig(sc)
575		}
576	}
577
578	// update the service config that will be sent to balancer.
579	if cc.sc != nil {
580		s.ServiceConfig = cc.sc.rawJSONString
581	}
582
583	if cc.dopts.balancerBuilder == nil {
584		// Only look at balancer types and switch balancer if balancer dial
585		// option is not set.
586		var isGRPCLB bool
587		for _, a := range s.Addresses {
588			if a.Type == resolver.GRPCLB {
589				isGRPCLB = true
590				break
591			}
592		}
593		var newBalancerName string
594		// TODO: use new loadBalancerConfig field with appropriate priority.
595		if isGRPCLB {
596			newBalancerName = grpclbName
597		} else if cc.sc != nil && cc.sc.LB != nil {
598			newBalancerName = *cc.sc.LB
599		} else {
600			newBalancerName = PickFirstBalancerName
601		}
602		cc.switchBalancer(newBalancerName)
603	} else if cc.balancerWrapper == nil {
604		// Balancer dial option was set, and this is the first time handling
605		// resolved addresses. Build a balancer with dopts.balancerBuilder.
606		cc.curBalancerName = cc.dopts.balancerBuilder.Name()
607		cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
608	}
609
610	cc.balancerWrapper.updateResolverState(s)
611	cc.firstResolveEvent.Fire()
612	return nil
613}
614
615// switchBalancer starts the switching from current balancer to the balancer
616// with the given name.
617//
618// It will NOT send the current address list to the new balancer. If needed,
619// caller of this function should send address list to the new balancer after
620// this function returns.
621//
622// Caller must hold cc.mu.
623func (cc *ClientConn) switchBalancer(name string) {
624	if strings.ToLower(cc.curBalancerName) == strings.ToLower(name) {
625		return
626	}
627
628	grpclog.Infof("ClientConn switching balancer to %q", name)
629	if cc.dopts.balancerBuilder != nil {
630		grpclog.Infoln("ignoring balancer switching: Balancer DialOption used instead")
631		return
632	}
633	if cc.balancerWrapper != nil {
634		cc.balancerWrapper.close()
635	}
636
637	builder := balancer.Get(name)
638	if channelz.IsOn() {
639		if builder == nil {
640			channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
641				Desc:     fmt.Sprintf("Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName),
642				Severity: channelz.CtWarning,
643			})
644		} else {
645			channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
646				Desc:     fmt.Sprintf("Channel switches to new LB policy %q", name),
647				Severity: channelz.CtINFO,
648			})
649		}
650	}
651	if builder == nil {
652		grpclog.Infof("failed to get balancer builder for: %v, using pick_first instead", name)
653		builder = newPickfirstBuilder()
654	}
655
656	cc.curBalancerName = builder.Name()
657	cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
658}
659
660func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
661	cc.mu.Lock()
662	if cc.conns == nil {
663		cc.mu.Unlock()
664		return
665	}
666	// TODO(bar switching) send updates to all balancer wrappers when balancer
667	// gracefully switching is supported.
668	cc.balancerWrapper.handleSubConnStateChange(sc, s)
669	cc.mu.Unlock()
670}
671
672// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
673//
674// Caller needs to make sure len(addrs) > 0.
675func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
676	ac := &addrConn{
677		cc:           cc,
678		addrs:        addrs,
679		scopts:       opts,
680		dopts:        cc.dopts,
681		czData:       new(channelzData),
682		resetBackoff: make(chan struct{}),
683	}
684	ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
685	// Track ac in cc. This needs to be done before any getTransport(...) is called.
686	cc.mu.Lock()
687	if cc.conns == nil {
688		cc.mu.Unlock()
689		return nil, ErrClientConnClosing
690	}
691	if channelz.IsOn() {
692		ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "")
693		channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
694			Desc:     "Subchannel Created",
695			Severity: channelz.CtINFO,
696			Parent: &channelz.TraceEventDesc{
697				Desc:     fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID),
698				Severity: channelz.CtINFO,
699			},
700		})
701	}
702	cc.conns[ac] = struct{}{}
703	cc.mu.Unlock()
704	return ac, nil
705}
706
707// removeAddrConn removes the addrConn in the subConn from clientConn.
708// It also tears down the ac with the given error.
709func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
710	cc.mu.Lock()
711	if cc.conns == nil {
712		cc.mu.Unlock()
713		return
714	}
715	delete(cc.conns, ac)
716	cc.mu.Unlock()
717	ac.tearDown(err)
718}
719
720func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric {
721	return &channelz.ChannelInternalMetric{
722		State:                    cc.GetState(),
723		Target:                   cc.target,
724		CallsStarted:             atomic.LoadInt64(&cc.czData.callsStarted),
725		CallsSucceeded:           atomic.LoadInt64(&cc.czData.callsSucceeded),
726		CallsFailed:              atomic.LoadInt64(&cc.czData.callsFailed),
727		LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)),
728	}
729}
730
731// Target returns the target string of the ClientConn.
732// This is an EXPERIMENTAL API.
733func (cc *ClientConn) Target() string {
734	return cc.target
735}
736
737func (cc *ClientConn) incrCallsStarted() {
738	atomic.AddInt64(&cc.czData.callsStarted, 1)
739	atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano())
740}
741
742func (cc *ClientConn) incrCallsSucceeded() {
743	atomic.AddInt64(&cc.czData.callsSucceeded, 1)
744}
745
746func (cc *ClientConn) incrCallsFailed() {
747	atomic.AddInt64(&cc.czData.callsFailed, 1)
748}
749
750// connect starts creating a transport.
751// It does nothing if the ac is not IDLE.
752// TODO(bar) Move this to the addrConn section.
753func (ac *addrConn) connect() error {
754	ac.mu.Lock()
755	if ac.state == connectivity.Shutdown {
756		ac.mu.Unlock()
757		return errConnClosing
758	}
759	if ac.state != connectivity.Idle {
760		ac.mu.Unlock()
761		return nil
762	}
763	ac.updateConnectivityState(connectivity.Connecting)
764	ac.mu.Unlock()
765
766	// Start a goroutine connecting to the server asynchronously.
767	go ac.resetTransport()
768	return nil
769}
770
771// tryUpdateAddrs tries to update ac.addrs with the new addresses list.
772//
773// It checks whether current connected address of ac is in the new addrs list.
774//  - If true, it updates ac.addrs and returns true. The ac will keep using
775//    the existing connection.
776//  - If false, it does nothing and returns false.
777func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
778	ac.mu.Lock()
779	defer ac.mu.Unlock()
780	grpclog.Infof("addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
781	if ac.state == connectivity.Shutdown {
782		ac.addrs = addrs
783		return true
784	}
785
786	// Unless we're busy reconnecting already, let's reconnect from the top of
787	// the list.
788	if ac.state != connectivity.Ready {
789		return false
790	}
791
792	var curAddrFound bool
793	for _, a := range addrs {
794		if reflect.DeepEqual(ac.curAddr, a) {
795			curAddrFound = true
796			break
797		}
798	}
799	grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
800	if curAddrFound {
801		ac.addrs = addrs
802	}
803
804	return curAddrFound
805}
806
807// GetMethodConfig gets the method config of the input method.
808// If there's an exact match for input method (i.e. /service/method), we return
809// the corresponding MethodConfig.
810// If there isn't an exact match for the input method, we look for the default config
811// under the service (i.e /service/). If there is a default MethodConfig for
812// the service, we return it.
813// Otherwise, we return an empty MethodConfig.
814func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
815	// TODO: Avoid the locking here.
816	cc.mu.RLock()
817	defer cc.mu.RUnlock()
818	if cc.sc == nil {
819		return MethodConfig{}
820	}
821	m, ok := cc.sc.Methods[method]
822	if !ok {
823		i := strings.LastIndex(method, "/")
824		m = cc.sc.Methods[method[:i+1]]
825	}
826	return m
827}
828
829func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
830	cc.mu.RLock()
831	defer cc.mu.RUnlock()
832	if cc.sc == nil {
833		return nil
834	}
835	return cc.sc.healthCheckConfig
836}
837
838func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
839	t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{
840		FullMethodName: method,
841	})
842	if err != nil {
843		return nil, nil, toRPCErr(err)
844	}
845	return t, done, nil
846}
847
848func (cc *ClientConn) applyServiceConfig(sc *ServiceConfig) error {
849	if sc == nil {
850		// should never reach here.
851		return fmt.Errorf("got nil pointer for service config")
852	}
853	cc.sc = sc
854
855	if cc.sc.retryThrottling != nil {
856		newThrottler := &retryThrottler{
857			tokens: cc.sc.retryThrottling.MaxTokens,
858			max:    cc.sc.retryThrottling.MaxTokens,
859			thresh: cc.sc.retryThrottling.MaxTokens / 2,
860			ratio:  cc.sc.retryThrottling.TokenRatio,
861		}
862		cc.retryThrottler.Store(newThrottler)
863	} else {
864		cc.retryThrottler.Store((*retryThrottler)(nil))
865	}
866
867	return nil
868}
869
870func (cc *ClientConn) resolveNow(o resolver.ResolveNowOption) {
871	cc.mu.RLock()
872	r := cc.resolverWrapper
873	cc.mu.RUnlock()
874	if r == nil {
875		return
876	}
877	go r.resolveNow(o)
878}
879
880// ResetConnectBackoff wakes up all subchannels in transient failure and causes
881// them to attempt another connection immediately.  It also resets the backoff
882// times used for subsequent attempts regardless of the current state.
883//
884// In general, this function should not be used.  Typical service or network
885// outages result in a reasonable client reconnection strategy by default.
886// However, if a previously unavailable network becomes available, this may be
887// used to trigger an immediate reconnect.
888//
889// This API is EXPERIMENTAL.
890func (cc *ClientConn) ResetConnectBackoff() {
891	cc.mu.Lock()
892	defer cc.mu.Unlock()
893	for ac := range cc.conns {
894		ac.resetConnectBackoff()
895	}
896}
897
898// Close tears down the ClientConn and all underlying connections.
899func (cc *ClientConn) Close() error {
900	defer cc.cancel()
901
902	cc.mu.Lock()
903	if cc.conns == nil {
904		cc.mu.Unlock()
905		return ErrClientConnClosing
906	}
907	conns := cc.conns
908	cc.conns = nil
909	cc.csMgr.updateState(connectivity.Shutdown)
910
911	rWrapper := cc.resolverWrapper
912	cc.resolverWrapper = nil
913	bWrapper := cc.balancerWrapper
914	cc.balancerWrapper = nil
915	cc.mu.Unlock()
916
917	cc.blockingpicker.close()
918
919	if rWrapper != nil {
920		rWrapper.close()
921	}
922	if bWrapper != nil {
923		bWrapper.close()
924	}
925
926	for ac := range conns {
927		ac.tearDown(ErrClientConnClosing)
928	}
929	if channelz.IsOn() {
930		ted := &channelz.TraceEventDesc{
931			Desc:     "Channel Deleted",
932			Severity: channelz.CtINFO,
933		}
934		if cc.dopts.channelzParentID != 0 {
935			ted.Parent = &channelz.TraceEventDesc{
936				Desc:     fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID),
937				Severity: channelz.CtINFO,
938			}
939		}
940		channelz.AddTraceEvent(cc.channelzID, ted)
941		// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
942		// the entity being deleted, and thus prevent it from being deleted right away.
943		channelz.RemoveEntry(cc.channelzID)
944	}
945	return nil
946}
947
948// addrConn is a network connection to a given address.
949type addrConn struct {
950	ctx    context.Context
951	cancel context.CancelFunc
952
953	cc     *ClientConn
954	dopts  dialOptions
955	acbw   balancer.SubConn
956	scopts balancer.NewSubConnOptions
957
958	// transport is set when there's a viable transport (note: ac state may not be READY as LB channel
959	// health checking may require server to report healthy to set ac to READY), and is reset
960	// to nil when the current transport should no longer be used to create a stream (e.g. after GoAway
961	// is received, transport is closed, ac has been torn down).
962	transport transport.ClientTransport // The current transport.
963
964	mu      sync.Mutex
965	curAddr resolver.Address   // The current address.
966	addrs   []resolver.Address // All addresses that the resolver resolved to.
967
968	// Use updateConnectivityState for updating addrConn's connectivity state.
969	state connectivity.State
970
971	backoffIdx   int // Needs to be stateful for resetConnectBackoff.
972	resetBackoff chan struct{}
973
974	channelzID int64 // channelz unique identification number.
975	czData     *channelzData
976}
977
978// Note: this requires a lock on ac.mu.
979func (ac *addrConn) updateConnectivityState(s connectivity.State) {
980	if ac.state == s {
981		return
982	}
983
984	updateMsg := fmt.Sprintf("Subchannel Connectivity change to %v", s)
985	ac.state = s
986	if channelz.IsOn() {
987		channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
988			Desc:     updateMsg,
989			Severity: channelz.CtINFO,
990		})
991	}
992	ac.cc.handleSubConnStateChange(ac.acbw, s)
993}
994
995// adjustParams updates parameters used to create transports upon
996// receiving a GoAway.
997func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
998	switch r {
999	case transport.GoAwayTooManyPings:
1000		v := 2 * ac.dopts.copts.KeepaliveParams.Time
1001		ac.cc.mu.Lock()
1002		if v > ac.cc.mkp.Time {
1003			ac.cc.mkp.Time = v
1004		}
1005		ac.cc.mu.Unlock()
1006	}
1007}
1008
1009func (ac *addrConn) resetTransport() {
1010	for i := 0; ; i++ {
1011		if i > 0 {
1012			ac.cc.resolveNow(resolver.ResolveNowOption{})
1013		}
1014
1015		ac.mu.Lock()
1016		if ac.state == connectivity.Shutdown {
1017			ac.mu.Unlock()
1018			return
1019		}
1020
1021		addrs := ac.addrs
1022		backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
1023		// This will be the duration that dial gets to finish.
1024		dialDuration := minConnectTimeout
1025		if ac.dopts.minConnectTimeout != nil {
1026			dialDuration = ac.dopts.minConnectTimeout()
1027		}
1028
1029		if dialDuration < backoffFor {
1030			// Give dial more time as we keep failing to connect.
1031			dialDuration = backoffFor
1032		}
1033		// We can potentially spend all the time trying the first address, and
1034		// if the server accepts the connection and then hangs, the following
1035		// addresses will never be tried.
1036		//
1037		// The spec doesn't mention what should be done for multiple addresses.
1038		// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm
1039		connectDeadline := time.Now().Add(dialDuration)
1040		ac.mu.Unlock()
1041
1042		newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline)
1043		if err != nil {
1044			// After exhausting all addresses, the addrConn enters
1045			// TRANSIENT_FAILURE.
1046			ac.mu.Lock()
1047			if ac.state == connectivity.Shutdown {
1048				ac.mu.Unlock()
1049				return
1050			}
1051			ac.updateConnectivityState(connectivity.TransientFailure)
1052
1053			// Backoff.
1054			b := ac.resetBackoff
1055			ac.mu.Unlock()
1056
1057			timer := time.NewTimer(backoffFor)
1058			select {
1059			case <-timer.C:
1060				ac.mu.Lock()
1061				ac.backoffIdx++
1062				ac.mu.Unlock()
1063			case <-b:
1064				timer.Stop()
1065			case <-ac.ctx.Done():
1066				timer.Stop()
1067				return
1068			}
1069			continue
1070		}
1071
1072		ac.mu.Lock()
1073		if ac.state == connectivity.Shutdown {
1074			newTr.Close()
1075			ac.mu.Unlock()
1076			return
1077		}
1078		ac.curAddr = addr
1079		ac.transport = newTr
1080		ac.backoffIdx = 0
1081
1082		healthCheckConfig := ac.cc.healthCheckConfig()
1083		// LB channel health checking is only enabled when all the four requirements below are met:
1084		// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption,
1085		// 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package,
1086		// 3. a service config with non-empty healthCheckConfig field is provided,
1087		// 4. the current load balancer allows it.
1088		hctx, hcancel := context.WithCancel(ac.ctx)
1089		healthcheckManagingState := false
1090		if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled {
1091			if ac.cc.dopts.healthCheckFunc == nil {
1092				// TODO: add a link to the health check doc in the error message.
1093				grpclog.Error("the client side LB channel health check function has not been set.")
1094			} else {
1095				// TODO(deklerk) refactor to just return transport
1096				go ac.startHealthCheck(hctx, newTr, addr, healthCheckConfig.ServiceName)
1097				healthcheckManagingState = true
1098			}
1099		}
1100		if !healthcheckManagingState {
1101			ac.updateConnectivityState(connectivity.Ready)
1102		}
1103		ac.mu.Unlock()
1104
1105		// Block until the created transport is down. And when this happens,
1106		// we restart from the top of the addr list.
1107		<-reconnect.Done()
1108		hcancel()
1109
1110		// Need to reconnect after a READY, the addrConn enters
1111		// TRANSIENT_FAILURE.
1112		//
1113		// This will set addrConn to TRANSIENT_FAILURE for a very short period
1114		// of time, and turns CONNECTING. It seems reasonable to skip this, but
1115		// READY-CONNECTING is not a valid transition.
1116		ac.mu.Lock()
1117		if ac.state == connectivity.Shutdown {
1118			ac.mu.Unlock()
1119			return
1120		}
1121		ac.updateConnectivityState(connectivity.TransientFailure)
1122		ac.mu.Unlock()
1123	}
1124}
1125
1126// tryAllAddrs tries to creates a connection to the addresses, and stop when at the
1127// first successful one. It returns the transport, the address and a Event in
1128// the successful case. The Event fires when the returned transport disconnects.
1129func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) (transport.ClientTransport, resolver.Address, *grpcsync.Event, error) {
1130	for _, addr := range addrs {
1131		ac.mu.Lock()
1132		if ac.state == connectivity.Shutdown {
1133			ac.mu.Unlock()
1134			return nil, resolver.Address{}, nil, errConnClosing
1135		}
1136		ac.updateConnectivityState(connectivity.Connecting)
1137		ac.transport = nil
1138
1139		ac.cc.mu.RLock()
1140		ac.dopts.copts.KeepaliveParams = ac.cc.mkp
1141		ac.cc.mu.RUnlock()
1142
1143		copts := ac.dopts.copts
1144		if ac.scopts.CredsBundle != nil {
1145			copts.CredsBundle = ac.scopts.CredsBundle
1146		}
1147		ac.mu.Unlock()
1148
1149		if channelz.IsOn() {
1150			channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1151				Desc:     fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr),
1152				Severity: channelz.CtINFO,
1153			})
1154		}
1155
1156		newTr, reconnect, err := ac.createTransport(addr, copts, connectDeadline)
1157		if err == nil {
1158			return newTr, addr, reconnect, nil
1159		}
1160		ac.cc.blockingpicker.updateConnectionError(err)
1161	}
1162
1163	// Couldn't connect to any address.
1164	return nil, resolver.Address{}, nil, fmt.Errorf("couldn't connect to any address")
1165}
1166
1167// createTransport creates a connection to addr. It returns the transport and a
1168// Event in the successful case. The Event fires when the returned transport
1169// disconnects.
1170func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) (transport.ClientTransport, *grpcsync.Event, error) {
1171	prefaceReceived := make(chan struct{})
1172	onCloseCalled := make(chan struct{})
1173	reconnect := grpcsync.NewEvent()
1174
1175	target := transport.TargetInfo{
1176		Addr:      addr.Addr,
1177		Metadata:  addr.Metadata,
1178		Authority: ac.cc.authority,
1179	}
1180
1181	onGoAway := func(r transport.GoAwayReason) {
1182		ac.mu.Lock()
1183		ac.adjustParams(r)
1184		ac.mu.Unlock()
1185		reconnect.Fire()
1186	}
1187
1188	onClose := func() {
1189		close(onCloseCalled)
1190		reconnect.Fire()
1191	}
1192
1193	onPrefaceReceipt := func() {
1194		close(prefaceReceived)
1195	}
1196
1197	connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
1198	defer cancel()
1199	if channelz.IsOn() {
1200		copts.ChannelzParentID = ac.channelzID
1201	}
1202
1203	newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)
1204	if err != nil {
1205		// newTr is either nil, or closed.
1206		grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err)
1207		return nil, nil, err
1208	}
1209
1210	if ac.dopts.reqHandshake == envconfig.RequireHandshakeOn {
1211		select {
1212		case <-time.After(connectDeadline.Sub(time.Now())):
1213			// We didn't get the preface in time.
1214			newTr.Close()
1215			grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
1216			return nil, nil, errors.New("timed out waiting for server handshake")
1217		case <-prefaceReceived:
1218			// We got the preface - huzzah! things are good.
1219		case <-onCloseCalled:
1220			// The transport has already closed - noop.
1221			return nil, nil, errors.New("connection closed")
1222			// TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.
1223		}
1224	}
1225	return newTr, reconnect, nil
1226}
1227
1228func (ac *addrConn) startHealthCheck(ctx context.Context, newTr transport.ClientTransport, addr resolver.Address, serviceName string) {
1229	// Set up the health check helper functions
1230	newStream := func() (interface{}, error) {
1231		return ac.newClientStream(ctx, &StreamDesc{ServerStreams: true}, "/grpc.health.v1.Health/Watch", newTr)
1232	}
1233	firstReady := true
1234	reportHealth := func(ok bool) {
1235		ac.mu.Lock()
1236		defer ac.mu.Unlock()
1237		if ac.transport != newTr {
1238			return
1239		}
1240		if ok {
1241			if firstReady {
1242				firstReady = false
1243				ac.curAddr = addr
1244			}
1245			ac.updateConnectivityState(connectivity.Ready)
1246		} else {
1247			ac.updateConnectivityState(connectivity.TransientFailure)
1248		}
1249	}
1250	err := ac.cc.dopts.healthCheckFunc(ctx, newStream, reportHealth, serviceName)
1251	if err != nil {
1252		if status.Code(err) == codes.Unimplemented {
1253			if channelz.IsOn() {
1254				channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1255					Desc:     "Subchannel health check is unimplemented at server side, thus health check is disabled",
1256					Severity: channelz.CtError,
1257				})
1258			}
1259			grpclog.Error("Subchannel health check is unimplemented at server side, thus health check is disabled")
1260		} else {
1261			grpclog.Errorf("HealthCheckFunc exits with unexpected error %v", err)
1262		}
1263	}
1264}
1265
1266func (ac *addrConn) resetConnectBackoff() {
1267	ac.mu.Lock()
1268	close(ac.resetBackoff)
1269	ac.backoffIdx = 0
1270	ac.resetBackoff = make(chan struct{})
1271	ac.mu.Unlock()
1272}
1273
1274// getReadyTransport returns the transport if ac's state is READY.
1275// Otherwise it returns nil, false.
1276// If ac's state is IDLE, it will trigger ac to connect.
1277func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
1278	ac.mu.Lock()
1279	if ac.state == connectivity.Ready && ac.transport != nil {
1280		t := ac.transport
1281		ac.mu.Unlock()
1282		return t, true
1283	}
1284	var idle bool
1285	if ac.state == connectivity.Idle {
1286		idle = true
1287	}
1288	ac.mu.Unlock()
1289	// Trigger idle ac to connect.
1290	if idle {
1291		ac.connect()
1292	}
1293	return nil, false
1294}
1295
1296// tearDown starts to tear down the addrConn.
1297// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
1298// some edge cases (e.g., the caller opens and closes many addrConn's in a
1299// tight loop.
1300// tearDown doesn't remove ac from ac.cc.conns.
1301func (ac *addrConn) tearDown(err error) {
1302	ac.mu.Lock()
1303	if ac.state == connectivity.Shutdown {
1304		ac.mu.Unlock()
1305		return
1306	}
1307	curTr := ac.transport
1308	ac.transport = nil
1309	// We have to set the state to Shutdown before anything else to prevent races
1310	// between setting the state and logic that waits on context cancelation / etc.
1311	ac.updateConnectivityState(connectivity.Shutdown)
1312	ac.cancel()
1313	ac.curAddr = resolver.Address{}
1314	if err == errConnDrain && curTr != nil {
1315		// GracefulClose(...) may be executed multiple times when
1316		// i) receiving multiple GoAway frames from the server; or
1317		// ii) there are concurrent name resolver/Balancer triggered
1318		// address removal and GoAway.
1319		// We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu.
1320		ac.mu.Unlock()
1321		curTr.GracefulClose()
1322		ac.mu.Lock()
1323	}
1324	if channelz.IsOn() {
1325		channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1326			Desc:     "Subchannel Deleted",
1327			Severity: channelz.CtINFO,
1328			Parent: &channelz.TraceEventDesc{
1329				Desc:     fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID),
1330				Severity: channelz.CtINFO,
1331			},
1332		})
1333		// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
1334		// the entity beng deleted, and thus prevent it from being deleted right away.
1335		channelz.RemoveEntry(ac.channelzID)
1336	}
1337	ac.mu.Unlock()
1338}
1339
1340func (ac *addrConn) getState() connectivity.State {
1341	ac.mu.Lock()
1342	defer ac.mu.Unlock()
1343	return ac.state
1344}
1345
1346func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric {
1347	ac.mu.Lock()
1348	addr := ac.curAddr.Addr
1349	ac.mu.Unlock()
1350	return &channelz.ChannelInternalMetric{
1351		State:                    ac.getState(),
1352		Target:                   addr,
1353		CallsStarted:             atomic.LoadInt64(&ac.czData.callsStarted),
1354		CallsSucceeded:           atomic.LoadInt64(&ac.czData.callsSucceeded),
1355		CallsFailed:              atomic.LoadInt64(&ac.czData.callsFailed),
1356		LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)),
1357	}
1358}
1359
1360func (ac *addrConn) incrCallsStarted() {
1361	atomic.AddInt64(&ac.czData.callsStarted, 1)
1362	atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano())
1363}
1364
1365func (ac *addrConn) incrCallsSucceeded() {
1366	atomic.AddInt64(&ac.czData.callsSucceeded, 1)
1367}
1368
1369func (ac *addrConn) incrCallsFailed() {
1370	atomic.AddInt64(&ac.czData.callsFailed, 1)
1371}
1372
1373type retryThrottler struct {
1374	max    float64
1375	thresh float64
1376	ratio  float64
1377
1378	mu     sync.Mutex
1379	tokens float64 // TODO(dfawley): replace with atomic and remove lock.
1380}
1381
1382// throttle subtracts a retry token from the pool and returns whether a retry
1383// should be throttled (disallowed) based upon the retry throttling policy in
1384// the service config.
1385func (rt *retryThrottler) throttle() bool {
1386	if rt == nil {
1387		return false
1388	}
1389	rt.mu.Lock()
1390	defer rt.mu.Unlock()
1391	rt.tokens--
1392	if rt.tokens < 0 {
1393		rt.tokens = 0
1394	}
1395	return rt.tokens <= rt.thresh
1396}
1397
1398func (rt *retryThrottler) successfulRPC() {
1399	if rt == nil {
1400		return
1401	}
1402	rt.mu.Lock()
1403	defer rt.mu.Unlock()
1404	rt.tokens += rt.ratio
1405	if rt.tokens > rt.max {
1406		rt.tokens = rt.max
1407	}
1408}
1409
1410type channelzChannel struct {
1411	cc *ClientConn
1412}
1413
1414func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric {
1415	return c.cc.channelzMetric()
1416}
1417
1418// ErrClientConnTimeout indicates that the ClientConn cannot establish the
1419// underlying connections within the specified timeout.
1420//
1421// Deprecated: This error is never returned by grpc and should not be
1422// referenced by users.
1423var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
1424