1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22	"context"
23	"errors"
24	"fmt"
25	"math"
26	"net"
27	"reflect"
28	"strings"
29	"sync"
30	"sync/atomic"
31	"time"
32
33	"google.golang.org/grpc/balancer"
34	_ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
35	"google.golang.org/grpc/codes"
36	"google.golang.org/grpc/connectivity"
37	"google.golang.org/grpc/credentials"
38	"google.golang.org/grpc/grpclog"
39	"google.golang.org/grpc/internal/backoff"
40	"google.golang.org/grpc/internal/channelz"
41	"google.golang.org/grpc/internal/grpcsync"
42	"google.golang.org/grpc/internal/transport"
43	"google.golang.org/grpc/keepalive"
44	"google.golang.org/grpc/resolver"
45	_ "google.golang.org/grpc/resolver/dns"         // To register dns resolver.
46	_ "google.golang.org/grpc/resolver/passthrough" // To register passthrough resolver.
47	"google.golang.org/grpc/serviceconfig"
48	"google.golang.org/grpc/status"
49)
50
51const (
52	// minimum time to give a connection to complete
53	minConnectTimeout = 20 * time.Second
54	// must match grpclbName in grpclb/grpclb.go
55	grpclbName = "grpclb"
56)
57
58var (
59	// ErrClientConnClosing indicates that the operation is illegal because
60	// the ClientConn is closing.
61	//
62	// Deprecated: this error should not be relied upon by users; use the status
63	// code of Canceled instead.
64	ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing")
65	// errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
66	errConnDrain = errors.New("grpc: the connection is drained")
67	// errConnClosing indicates that the connection is closing.
68	errConnClosing = errors.New("grpc: the connection is closing")
69	// errBalancerClosed indicates that the balancer is closed.
70	errBalancerClosed = errors.New("grpc: balancer is closed")
71	// invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default
72	// service config.
73	invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid"
74)
75
76// The following errors are returned from Dial and DialContext
77var (
78	// errNoTransportSecurity indicates that there is no transport security
79	// being set for ClientConn. Users should either set one or explicitly
80	// call WithInsecure DialOption to disable security.
81	errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
82	// errTransportCredsAndBundle indicates that creds bundle is used together
83	// with other individual Transport Credentials.
84	errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials")
85	// errTransportCredentialsMissing indicates that users want to transmit security
86	// information (e.g., OAuth2 token) which requires secure connection on an insecure
87	// connection.
88	errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
89	// errCredentialsConflict indicates that grpc.WithTransportCredentials()
90	// and grpc.WithInsecure() are both called for a connection.
91	errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
92)
93
94const (
95	defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
96	defaultClientMaxSendMessageSize    = math.MaxInt32
97	// http2IOBufSize specifies the buffer size for sending frames.
98	defaultWriteBufSize = 32 * 1024
99	defaultReadBufSize  = 32 * 1024
100)
101
102// Dial creates a client connection to the given target.
103func Dial(target string, opts ...DialOption) (*ClientConn, error) {
104	return DialContext(context.Background(), target, opts...)
105}
106
107// DialContext creates a client connection to the given target. By default, it's
108// a non-blocking dial (the function won't wait for connections to be
109// established, and connecting happens in the background). To make it a blocking
110// dial, use WithBlock() dial option.
111//
112// In the non-blocking case, the ctx does not act against the connection. It
113// only controls the setup steps.
114//
115// In the blocking case, ctx can be used to cancel or expire the pending
116// connection. Once this function returns, the cancellation and expiration of
117// ctx will be noop. Users should call ClientConn.Close to terminate all the
118// pending operations after this function returns.
119//
120// The target name syntax is defined in
121// https://github.com/grpc/grpc/blob/master/doc/naming.md.
122// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
123func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
124	cc := &ClientConn{
125		target:            target,
126		csMgr:             &connectivityStateManager{},
127		conns:             make(map[*addrConn]struct{}),
128		dopts:             defaultDialOptions(),
129		blockingpicker:    newPickerWrapper(),
130		czData:            new(channelzData),
131		firstResolveEvent: grpcsync.NewEvent(),
132	}
133	cc.retryThrottler.Store((*retryThrottler)(nil))
134	cc.ctx, cc.cancel = context.WithCancel(context.Background())
135
136	for _, opt := range opts {
137		opt.apply(&cc.dopts)
138	}
139
140	chainUnaryClientInterceptors(cc)
141	chainStreamClientInterceptors(cc)
142
143	defer func() {
144		if err != nil {
145			cc.Close()
146		}
147	}()
148
149	if channelz.IsOn() {
150		if cc.dopts.channelzParentID != 0 {
151			cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
152			channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
153				Desc:     "Channel Created",
154				Severity: channelz.CtINFO,
155				Parent: &channelz.TraceEventDesc{
156					Desc:     fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID),
157					Severity: channelz.CtINFO,
158				},
159			})
160		} else {
161			cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)
162			channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
163				Desc:     "Channel Created",
164				Severity: channelz.CtINFO,
165			})
166		}
167		cc.csMgr.channelzID = cc.channelzID
168	}
169
170	if !cc.dopts.insecure {
171		if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
172			return nil, errNoTransportSecurity
173		}
174		if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
175			return nil, errTransportCredsAndBundle
176		}
177	} else {
178		if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil {
179			return nil, errCredentialsConflict
180		}
181		for _, cd := range cc.dopts.copts.PerRPCCredentials {
182			if cd.RequireTransportSecurity() {
183				return nil, errTransportCredentialsMissing
184			}
185		}
186	}
187
188	if cc.dopts.defaultServiceConfigRawJSON != nil {
189		sc, err := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON)
190		if err != nil {
191			return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, err)
192		}
193		cc.dopts.defaultServiceConfig = sc
194	}
195	cc.mkp = cc.dopts.copts.KeepaliveParams
196
197	if cc.dopts.copts.Dialer == nil {
198		cc.dopts.copts.Dialer = newProxyDialer(
199			func(ctx context.Context, addr string) (net.Conn, error) {
200				network, addr := parseDialTarget(addr)
201				return (&net.Dialer{}).DialContext(ctx, network, addr)
202			},
203		)
204	}
205
206	if cc.dopts.copts.UserAgent != "" {
207		cc.dopts.copts.UserAgent += " " + grpcUA
208	} else {
209		cc.dopts.copts.UserAgent = grpcUA
210	}
211
212	if cc.dopts.timeout > 0 {
213		var cancel context.CancelFunc
214		ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
215		defer cancel()
216	}
217	defer func() {
218		select {
219		case <-ctx.Done():
220			conn, err = nil, ctx.Err()
221		default:
222		}
223	}()
224
225	scSet := false
226	if cc.dopts.scChan != nil {
227		// Try to get an initial service config.
228		select {
229		case sc, ok := <-cc.dopts.scChan:
230			if ok {
231				cc.sc = &sc
232				scSet = true
233			}
234		default:
235		}
236	}
237	if cc.dopts.bs == nil {
238		cc.dopts.bs = backoff.Exponential{
239			MaxDelay: DefaultBackoffConfig.MaxDelay,
240		}
241	}
242	if cc.dopts.resolverBuilder == nil {
243		// Only try to parse target when resolver builder is not already set.
244		cc.parsedTarget = parseTarget(cc.target)
245		grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme)
246		cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
247		if cc.dopts.resolverBuilder == nil {
248			// If resolver builder is still nil, the parsed target's scheme is
249			// not registered. Fallback to default resolver and set Endpoint to
250			// the original target.
251			grpclog.Infof("scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
252			cc.parsedTarget = resolver.Target{
253				Scheme:   resolver.GetDefaultScheme(),
254				Endpoint: target,
255			}
256			cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
257		}
258	} else {
259		cc.parsedTarget = resolver.Target{Endpoint: target}
260	}
261	creds := cc.dopts.copts.TransportCredentials
262	if creds != nil && creds.Info().ServerName != "" {
263		cc.authority = creds.Info().ServerName
264	} else if cc.dopts.insecure && cc.dopts.authority != "" {
265		cc.authority = cc.dopts.authority
266	} else {
267		// Use endpoint from "scheme://authority/endpoint" as the default
268		// authority for ClientConn.
269		cc.authority = cc.parsedTarget.Endpoint
270	}
271
272	if cc.dopts.scChan != nil && !scSet {
273		// Blocking wait for the initial service config.
274		select {
275		case sc, ok := <-cc.dopts.scChan:
276			if ok {
277				cc.sc = &sc
278			}
279		case <-ctx.Done():
280			return nil, ctx.Err()
281		}
282	}
283	if cc.dopts.scChan != nil {
284		go cc.scWatcher()
285	}
286
287	var credsClone credentials.TransportCredentials
288	if creds := cc.dopts.copts.TransportCredentials; creds != nil {
289		credsClone = creds.Clone()
290	}
291	cc.balancerBuildOpts = balancer.BuildOptions{
292		DialCreds:        credsClone,
293		CredsBundle:      cc.dopts.copts.CredsBundle,
294		Dialer:           cc.dopts.copts.Dialer,
295		ChannelzParentID: cc.channelzID,
296		Target:           cc.parsedTarget,
297	}
298
299	// Build the resolver.
300	rWrapper, err := newCCResolverWrapper(cc)
301	if err != nil {
302		return nil, fmt.Errorf("failed to build resolver: %v", err)
303	}
304
305	cc.mu.Lock()
306	cc.resolverWrapper = rWrapper
307	cc.mu.Unlock()
308	// A blocking dial blocks until the clientConn is ready.
309	if cc.dopts.block {
310		for {
311			s := cc.GetState()
312			if s == connectivity.Ready {
313				break
314			} else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
315				if err = cc.blockingpicker.connectionError(); err != nil {
316					terr, ok := err.(interface {
317						Temporary() bool
318					})
319					if ok && !terr.Temporary() {
320						return nil, err
321					}
322				}
323			}
324			if !cc.WaitForStateChange(ctx, s) {
325				// ctx got timeout or canceled.
326				return nil, ctx.Err()
327			}
328		}
329	}
330
331	return cc, nil
332}
333
334// chainUnaryClientInterceptors chains all unary client interceptors into one.
335func chainUnaryClientInterceptors(cc *ClientConn) {
336	interceptors := cc.dopts.chainUnaryInts
337	// Prepend dopts.unaryInt to the chaining interceptors if it exists, since unaryInt will
338	// be executed before any other chained interceptors.
339	if cc.dopts.unaryInt != nil {
340		interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)
341	}
342	var chainedInt UnaryClientInterceptor
343	if len(interceptors) == 0 {
344		chainedInt = nil
345	} else if len(interceptors) == 1 {
346		chainedInt = interceptors[0]
347	} else {
348		chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {
349			return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)
350		}
351	}
352	cc.dopts.unaryInt = chainedInt
353}
354
355// getChainUnaryInvoker recursively generate the chained unary invoker.
356func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, finalInvoker UnaryInvoker) UnaryInvoker {
357	if curr == len(interceptors)-1 {
358		return finalInvoker
359	}
360	return func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
361		return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...)
362	}
363}
364
365// chainStreamClientInterceptors chains all stream client interceptors into one.
366func chainStreamClientInterceptors(cc *ClientConn) {
367	interceptors := cc.dopts.chainStreamInts
368	// Prepend dopts.streamInt to the chaining interceptors if it exists, since streamInt will
369	// be executed before any other chained interceptors.
370	if cc.dopts.streamInt != nil {
371		interceptors = append([]StreamClientInterceptor{cc.dopts.streamInt}, interceptors...)
372	}
373	var chainedInt StreamClientInterceptor
374	if len(interceptors) == 0 {
375		chainedInt = nil
376	} else if len(interceptors) == 1 {
377		chainedInt = interceptors[0]
378	} else {
379		chainedInt = func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error) {
380			return interceptors[0](ctx, desc, cc, method, getChainStreamer(interceptors, 0, streamer), opts...)
381		}
382	}
383	cc.dopts.streamInt = chainedInt
384}
385
386// getChainStreamer recursively generate the chained client stream constructor.
387func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStreamer Streamer) Streamer {
388	if curr == len(interceptors)-1 {
389		return finalStreamer
390	}
391	return func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
392		return interceptors[curr+1](ctx, desc, cc, method, getChainStreamer(interceptors, curr+1, finalStreamer), opts...)
393	}
394}
395
396// connectivityStateManager keeps the connectivity.State of ClientConn.
397// This struct will eventually be exported so the balancers can access it.
398type connectivityStateManager struct {
399	mu         sync.Mutex
400	state      connectivity.State
401	notifyChan chan struct{}
402	channelzID int64
403}
404
405// updateState updates the connectivity.State of ClientConn.
406// If there's a change it notifies goroutines waiting on state change to
407// happen.
408func (csm *connectivityStateManager) updateState(state connectivity.State) {
409	csm.mu.Lock()
410	defer csm.mu.Unlock()
411	if csm.state == connectivity.Shutdown {
412		return
413	}
414	if csm.state == state {
415		return
416	}
417	csm.state = state
418	if channelz.IsOn() {
419		channelz.AddTraceEvent(csm.channelzID, &channelz.TraceEventDesc{
420			Desc:     fmt.Sprintf("Channel Connectivity change to %v", state),
421			Severity: channelz.CtINFO,
422		})
423	}
424	if csm.notifyChan != nil {
425		// There are other goroutines waiting on this channel.
426		close(csm.notifyChan)
427		csm.notifyChan = nil
428	}
429}
430
431func (csm *connectivityStateManager) getState() connectivity.State {
432	csm.mu.Lock()
433	defer csm.mu.Unlock()
434	return csm.state
435}
436
437func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
438	csm.mu.Lock()
439	defer csm.mu.Unlock()
440	if csm.notifyChan == nil {
441		csm.notifyChan = make(chan struct{})
442	}
443	return csm.notifyChan
444}
445
446// ClientConn represents a client connection to an RPC server.
447type ClientConn struct {
448	ctx    context.Context
449	cancel context.CancelFunc
450
451	target       string
452	parsedTarget resolver.Target
453	authority    string
454	dopts        dialOptions
455	csMgr        *connectivityStateManager
456
457	balancerBuildOpts balancer.BuildOptions
458	blockingpicker    *pickerWrapper
459
460	mu              sync.RWMutex
461	resolverWrapper *ccResolverWrapper
462	sc              *ServiceConfig
463	conns           map[*addrConn]struct{}
464	// Keepalive parameter can be updated if a GoAway is received.
465	mkp             keepalive.ClientParameters
466	curBalancerName string
467	balancerWrapper *ccBalancerWrapper
468	retryThrottler  atomic.Value
469
470	firstResolveEvent *grpcsync.Event
471
472	channelzID int64 // channelz unique identification number
473	czData     *channelzData
474}
475
476// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
477// ctx expires. A true value is returned in former case and false in latter.
478// This is an EXPERIMENTAL API.
479func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
480	ch := cc.csMgr.getNotifyChan()
481	if cc.csMgr.getState() != sourceState {
482		return true
483	}
484	select {
485	case <-ctx.Done():
486		return false
487	case <-ch:
488		return true
489	}
490}
491
492// GetState returns the connectivity.State of ClientConn.
493// This is an EXPERIMENTAL API.
494func (cc *ClientConn) GetState() connectivity.State {
495	return cc.csMgr.getState()
496}
497
498func (cc *ClientConn) scWatcher() {
499	for {
500		select {
501		case sc, ok := <-cc.dopts.scChan:
502			if !ok {
503				return
504			}
505			cc.mu.Lock()
506			// TODO: load balance policy runtime change is ignored.
507			// We may revisit this decision in the future.
508			cc.sc = &sc
509			cc.mu.Unlock()
510		case <-cc.ctx.Done():
511			return
512		}
513	}
514}
515
516// waitForResolvedAddrs blocks until the resolver has provided addresses or the
517// context expires.  Returns nil unless the context expires first; otherwise
518// returns a status error based on the context.
519func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
520	// This is on the RPC path, so we use a fast path to avoid the
521	// more-expensive "select" below after the resolver has returned once.
522	if cc.firstResolveEvent.HasFired() {
523		return nil
524	}
525	select {
526	case <-cc.firstResolveEvent.Done():
527		return nil
528	case <-ctx.Done():
529		return status.FromContextError(ctx.Err()).Err()
530	case <-cc.ctx.Done():
531		return ErrClientConnClosing
532	}
533}
534
535func (cc *ClientConn) updateResolverState(s resolver.State) error {
536	cc.mu.Lock()
537	defer cc.mu.Unlock()
538	// Check if the ClientConn is already closed. Some fields (e.g.
539	// balancerWrapper) are set to nil when closing the ClientConn, and could
540	// cause nil pointer panic if we don't have this check.
541	if cc.conns == nil {
542		return nil
543	}
544
545	if cc.dopts.disableServiceConfig || s.ServiceConfig == nil {
546		if cc.dopts.defaultServiceConfig != nil && cc.sc == nil {
547			cc.applyServiceConfig(cc.dopts.defaultServiceConfig)
548		}
549	} else if sc, ok := s.ServiceConfig.(*ServiceConfig); ok {
550		cc.applyServiceConfig(sc)
551	}
552
553	var balCfg serviceconfig.LoadBalancingConfig
554	if cc.dopts.balancerBuilder == nil {
555		// Only look at balancer types and switch balancer if balancer dial
556		// option is not set.
557		var newBalancerName string
558		if cc.sc != nil && cc.sc.lbConfig != nil {
559			newBalancerName = cc.sc.lbConfig.name
560			balCfg = cc.sc.lbConfig.cfg
561		} else {
562			var isGRPCLB bool
563			for _, a := range s.Addresses {
564				if a.Type == resolver.GRPCLB {
565					isGRPCLB = true
566					break
567				}
568			}
569			if isGRPCLB {
570				newBalancerName = grpclbName
571			} else if cc.sc != nil && cc.sc.LB != nil {
572				newBalancerName = *cc.sc.LB
573			} else {
574				newBalancerName = PickFirstBalancerName
575			}
576		}
577		cc.switchBalancer(newBalancerName)
578	} else if cc.balancerWrapper == nil {
579		// Balancer dial option was set, and this is the first time handling
580		// resolved addresses. Build a balancer with dopts.balancerBuilder.
581		cc.curBalancerName = cc.dopts.balancerBuilder.Name()
582		cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
583	}
584
585	cc.balancerWrapper.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
586	return nil
587}
588
589// switchBalancer starts the switching from current balancer to the balancer
590// with the given name.
591//
592// It will NOT send the current address list to the new balancer. If needed,
593// caller of this function should send address list to the new balancer after
594// this function returns.
595//
596// Caller must hold cc.mu.
597func (cc *ClientConn) switchBalancer(name string) {
598	if strings.EqualFold(cc.curBalancerName, name) {
599		return
600	}
601
602	grpclog.Infof("ClientConn switching balancer to %q", name)
603	if cc.dopts.balancerBuilder != nil {
604		grpclog.Infoln("ignoring balancer switching: Balancer DialOption used instead")
605		return
606	}
607	if cc.balancerWrapper != nil {
608		cc.balancerWrapper.close()
609	}
610
611	builder := balancer.Get(name)
612	if channelz.IsOn() {
613		if builder == nil {
614			channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
615				Desc:     fmt.Sprintf("Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName),
616				Severity: channelz.CtWarning,
617			})
618		} else {
619			channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
620				Desc:     fmt.Sprintf("Channel switches to new LB policy %q", name),
621				Severity: channelz.CtINFO,
622			})
623		}
624	}
625	if builder == nil {
626		grpclog.Infof("failed to get balancer builder for: %v, using pick_first instead", name)
627		builder = newPickfirstBuilder()
628	}
629
630	cc.curBalancerName = builder.Name()
631	cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
632}
633
634func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
635	cc.mu.Lock()
636	if cc.conns == nil {
637		cc.mu.Unlock()
638		return
639	}
640	// TODO(bar switching) send updates to all balancer wrappers when balancer
641	// gracefully switching is supported.
642	cc.balancerWrapper.handleSubConnStateChange(sc, s)
643	cc.mu.Unlock()
644}
645
646// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
647//
648// Caller needs to make sure len(addrs) > 0.
649func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
650	ac := &addrConn{
651		cc:           cc,
652		addrs:        addrs,
653		scopts:       opts,
654		dopts:        cc.dopts,
655		czData:       new(channelzData),
656		resetBackoff: make(chan struct{}),
657	}
658	ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
659	// Track ac in cc. This needs to be done before any getTransport(...) is called.
660	cc.mu.Lock()
661	if cc.conns == nil {
662		cc.mu.Unlock()
663		return nil, ErrClientConnClosing
664	}
665	if channelz.IsOn() {
666		ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "")
667		channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
668			Desc:     "Subchannel Created",
669			Severity: channelz.CtINFO,
670			Parent: &channelz.TraceEventDesc{
671				Desc:     fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID),
672				Severity: channelz.CtINFO,
673			},
674		})
675	}
676	cc.conns[ac] = struct{}{}
677	cc.mu.Unlock()
678	return ac, nil
679}
680
681// removeAddrConn removes the addrConn in the subConn from clientConn.
682// It also tears down the ac with the given error.
683func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
684	cc.mu.Lock()
685	if cc.conns == nil {
686		cc.mu.Unlock()
687		return
688	}
689	delete(cc.conns, ac)
690	cc.mu.Unlock()
691	ac.tearDown(err)
692}
693
694func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric {
695	return &channelz.ChannelInternalMetric{
696		State:                    cc.GetState(),
697		Target:                   cc.target,
698		CallsStarted:             atomic.LoadInt64(&cc.czData.callsStarted),
699		CallsSucceeded:           atomic.LoadInt64(&cc.czData.callsSucceeded),
700		CallsFailed:              atomic.LoadInt64(&cc.czData.callsFailed),
701		LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)),
702	}
703}
704
705// Target returns the target string of the ClientConn.
706// This is an EXPERIMENTAL API.
707func (cc *ClientConn) Target() string {
708	return cc.target
709}
710
711func (cc *ClientConn) incrCallsStarted() {
712	atomic.AddInt64(&cc.czData.callsStarted, 1)
713	atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano())
714}
715
716func (cc *ClientConn) incrCallsSucceeded() {
717	atomic.AddInt64(&cc.czData.callsSucceeded, 1)
718}
719
720func (cc *ClientConn) incrCallsFailed() {
721	atomic.AddInt64(&cc.czData.callsFailed, 1)
722}
723
724// connect starts creating a transport.
725// It does nothing if the ac is not IDLE.
726// TODO(bar) Move this to the addrConn section.
727func (ac *addrConn) connect() error {
728	ac.mu.Lock()
729	if ac.state == connectivity.Shutdown {
730		ac.mu.Unlock()
731		return errConnClosing
732	}
733	if ac.state != connectivity.Idle {
734		ac.mu.Unlock()
735		return nil
736	}
737	// Update connectivity state within the lock to prevent subsequent or
738	// concurrent calls from resetting the transport more than once.
739	ac.updateConnectivityState(connectivity.Connecting)
740	ac.mu.Unlock()
741
742	// Start a goroutine connecting to the server asynchronously.
743	go ac.resetTransport()
744	return nil
745}
746
747// tryUpdateAddrs tries to update ac.addrs with the new addresses list.
748//
749// If ac is Connecting, it returns false. The caller should tear down the ac and
750// create a new one. Note that the backoff will be reset when this happens.
751//
752// If ac is TransientFailure, it updates ac.addrs and returns true. The updated
753// addresses will be picked up by retry in the next iteration after backoff.
754//
755// If ac is Shutdown or Idle, it updates ac.addrs and returns true.
756//
757// If ac is Ready, it checks whether current connected address of ac is in the
758// new addrs list.
759//  - If true, it updates ac.addrs and returns true. The ac will keep using
760//    the existing connection.
761//  - If false, it does nothing and returns false.
762func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
763	ac.mu.Lock()
764	defer ac.mu.Unlock()
765	grpclog.Infof("addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
766	if ac.state == connectivity.Shutdown ||
767		ac.state == connectivity.TransientFailure ||
768		ac.state == connectivity.Idle {
769		ac.addrs = addrs
770		return true
771	}
772
773	if ac.state == connectivity.Connecting {
774		return false
775	}
776
777	// ac.state is Ready, try to find the connected address.
778	var curAddrFound bool
779	for _, a := range addrs {
780		if reflect.DeepEqual(ac.curAddr, a) {
781			curAddrFound = true
782			break
783		}
784	}
785	grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
786	if curAddrFound {
787		ac.addrs = addrs
788	}
789
790	return curAddrFound
791}
792
793// GetMethodConfig gets the method config of the input method.
794// If there's an exact match for input method (i.e. /service/method), we return
795// the corresponding MethodConfig.
796// If there isn't an exact match for the input method, we look for the default config
797// under the service (i.e /service/). If there is a default MethodConfig for
798// the service, we return it.
799// Otherwise, we return an empty MethodConfig.
800func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
801	// TODO: Avoid the locking here.
802	cc.mu.RLock()
803	defer cc.mu.RUnlock()
804	if cc.sc == nil {
805		return MethodConfig{}
806	}
807	m, ok := cc.sc.Methods[method]
808	if !ok {
809		i := strings.LastIndex(method, "/")
810		m = cc.sc.Methods[method[:i+1]]
811	}
812	return m
813}
814
815func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
816	cc.mu.RLock()
817	defer cc.mu.RUnlock()
818	if cc.sc == nil {
819		return nil
820	}
821	return cc.sc.healthCheckConfig
822}
823
824func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
825	t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{
826		FullMethodName: method,
827	})
828	if err != nil {
829		return nil, nil, toRPCErr(err)
830	}
831	return t, done, nil
832}
833
834func (cc *ClientConn) applyServiceConfig(sc *ServiceConfig) error {
835	if sc == nil {
836		// should never reach here.
837		return fmt.Errorf("got nil pointer for service config")
838	}
839	cc.sc = sc
840
841	if cc.sc.retryThrottling != nil {
842		newThrottler := &retryThrottler{
843			tokens: cc.sc.retryThrottling.MaxTokens,
844			max:    cc.sc.retryThrottling.MaxTokens,
845			thresh: cc.sc.retryThrottling.MaxTokens / 2,
846			ratio:  cc.sc.retryThrottling.TokenRatio,
847		}
848		cc.retryThrottler.Store(newThrottler)
849	} else {
850		cc.retryThrottler.Store((*retryThrottler)(nil))
851	}
852
853	return nil
854}
855
856func (cc *ClientConn) resolveNow(o resolver.ResolveNowOption) {
857	cc.mu.RLock()
858	r := cc.resolverWrapper
859	cc.mu.RUnlock()
860	if r == nil {
861		return
862	}
863	go r.resolveNow(o)
864}
865
866// ResetConnectBackoff wakes up all subchannels in transient failure and causes
867// them to attempt another connection immediately.  It also resets the backoff
868// times used for subsequent attempts regardless of the current state.
869//
870// In general, this function should not be used.  Typical service or network
871// outages result in a reasonable client reconnection strategy by default.
872// However, if a previously unavailable network becomes available, this may be
873// used to trigger an immediate reconnect.
874//
875// This API is EXPERIMENTAL.
876func (cc *ClientConn) ResetConnectBackoff() {
877	cc.mu.Lock()
878	defer cc.mu.Unlock()
879	for ac := range cc.conns {
880		ac.resetConnectBackoff()
881	}
882}
883
884// Close tears down the ClientConn and all underlying connections.
885func (cc *ClientConn) Close() error {
886	defer cc.cancel()
887
888	cc.mu.Lock()
889	if cc.conns == nil {
890		cc.mu.Unlock()
891		return ErrClientConnClosing
892	}
893	conns := cc.conns
894	cc.conns = nil
895	cc.csMgr.updateState(connectivity.Shutdown)
896
897	rWrapper := cc.resolverWrapper
898	cc.resolverWrapper = nil
899	bWrapper := cc.balancerWrapper
900	cc.balancerWrapper = nil
901	cc.mu.Unlock()
902
903	cc.blockingpicker.close()
904
905	if rWrapper != nil {
906		rWrapper.close()
907	}
908	if bWrapper != nil {
909		bWrapper.close()
910	}
911
912	for ac := range conns {
913		ac.tearDown(ErrClientConnClosing)
914	}
915	if channelz.IsOn() {
916		ted := &channelz.TraceEventDesc{
917			Desc:     "Channel Deleted",
918			Severity: channelz.CtINFO,
919		}
920		if cc.dopts.channelzParentID != 0 {
921			ted.Parent = &channelz.TraceEventDesc{
922				Desc:     fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID),
923				Severity: channelz.CtINFO,
924			}
925		}
926		channelz.AddTraceEvent(cc.channelzID, ted)
927		// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
928		// the entity being deleted, and thus prevent it from being deleted right away.
929		channelz.RemoveEntry(cc.channelzID)
930	}
931	return nil
932}
933
934// addrConn is a network connection to a given address.
935type addrConn struct {
936	ctx    context.Context
937	cancel context.CancelFunc
938
939	cc     *ClientConn
940	dopts  dialOptions
941	acbw   balancer.SubConn
942	scopts balancer.NewSubConnOptions
943
944	// transport is set when there's a viable transport (note: ac state may not be READY as LB channel
945	// health checking may require server to report healthy to set ac to READY), and is reset
946	// to nil when the current transport should no longer be used to create a stream (e.g. after GoAway
947	// is received, transport is closed, ac has been torn down).
948	transport transport.ClientTransport // The current transport.
949
950	mu      sync.Mutex
951	curAddr resolver.Address   // The current address.
952	addrs   []resolver.Address // All addresses that the resolver resolved to.
953
954	// Use updateConnectivityState for updating addrConn's connectivity state.
955	state connectivity.State
956
957	backoffIdx   int // Needs to be stateful for resetConnectBackoff.
958	resetBackoff chan struct{}
959
960	channelzID int64 // channelz unique identification number.
961	czData     *channelzData
962}
963
964// Note: this requires a lock on ac.mu.
965func (ac *addrConn) updateConnectivityState(s connectivity.State) {
966	if ac.state == s {
967		return
968	}
969
970	updateMsg := fmt.Sprintf("Subchannel Connectivity change to %v", s)
971	ac.state = s
972	if channelz.IsOn() {
973		channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
974			Desc:     updateMsg,
975			Severity: channelz.CtINFO,
976		})
977	}
978	ac.cc.handleSubConnStateChange(ac.acbw, s)
979}
980
981// adjustParams updates parameters used to create transports upon
982// receiving a GoAway.
983func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
984	switch r {
985	case transport.GoAwayTooManyPings:
986		v := 2 * ac.dopts.copts.KeepaliveParams.Time
987		ac.cc.mu.Lock()
988		if v > ac.cc.mkp.Time {
989			ac.cc.mkp.Time = v
990		}
991		ac.cc.mu.Unlock()
992	}
993}
994
995func (ac *addrConn) resetTransport() {
996	for i := 0; ; i++ {
997		if i > 0 {
998			ac.cc.resolveNow(resolver.ResolveNowOption{})
999		}
1000
1001		ac.mu.Lock()
1002		if ac.state == connectivity.Shutdown {
1003			ac.mu.Unlock()
1004			return
1005		}
1006
1007		addrs := ac.addrs
1008		backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
1009		// This will be the duration that dial gets to finish.
1010		dialDuration := minConnectTimeout
1011		if ac.dopts.minConnectTimeout != nil {
1012			dialDuration = ac.dopts.minConnectTimeout()
1013		}
1014
1015		if dialDuration < backoffFor {
1016			// Give dial more time as we keep failing to connect.
1017			dialDuration = backoffFor
1018		}
1019		// We can potentially spend all the time trying the first address, and
1020		// if the server accepts the connection and then hangs, the following
1021		// addresses will never be tried.
1022		//
1023		// The spec doesn't mention what should be done for multiple addresses.
1024		// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm
1025		connectDeadline := time.Now().Add(dialDuration)
1026
1027		ac.updateConnectivityState(connectivity.Connecting)
1028		ac.transport = nil
1029		ac.mu.Unlock()
1030
1031		newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline)
1032		if err != nil {
1033			// After exhausting all addresses, the addrConn enters
1034			// TRANSIENT_FAILURE.
1035			ac.mu.Lock()
1036			if ac.state == connectivity.Shutdown {
1037				ac.mu.Unlock()
1038				return
1039			}
1040			ac.updateConnectivityState(connectivity.TransientFailure)
1041
1042			// Backoff.
1043			b := ac.resetBackoff
1044			ac.mu.Unlock()
1045
1046			timer := time.NewTimer(backoffFor)
1047			select {
1048			case <-timer.C:
1049				ac.mu.Lock()
1050				ac.backoffIdx++
1051				ac.mu.Unlock()
1052			case <-b:
1053				timer.Stop()
1054			case <-ac.ctx.Done():
1055				timer.Stop()
1056				return
1057			}
1058			continue
1059		}
1060
1061		ac.mu.Lock()
1062		if ac.state == connectivity.Shutdown {
1063			ac.mu.Unlock()
1064			newTr.Close()
1065			return
1066		}
1067		ac.curAddr = addr
1068		ac.transport = newTr
1069		ac.backoffIdx = 0
1070
1071		hctx, hcancel := context.WithCancel(ac.ctx)
1072		ac.startHealthCheck(hctx)
1073		ac.mu.Unlock()
1074
1075		// Block until the created transport is down. And when this happens,
1076		// we restart from the top of the addr list.
1077		<-reconnect.Done()
1078		hcancel()
1079		// restart connecting - the top of the loop will set state to
1080		// CONNECTING.  This is against the current connectivity semantics doc,
1081		// however it allows for graceful behavior for RPCs not yet dispatched
1082		// - unfortunate timing would otherwise lead to the RPC failing even
1083		// though the TRANSIENT_FAILURE state (called for by the doc) would be
1084		// instantaneous.
1085		//
1086		// Ideally we should transition to Idle here and block until there is
1087		// RPC activity that leads to the balancer requesting a reconnect of
1088		// the associated SubConn.
1089	}
1090}
1091
1092// tryAllAddrs tries to creates a connection to the addresses, and stop when at the
1093// first successful one. It returns the transport, the address and a Event in
1094// the successful case. The Event fires when the returned transport disconnects.
1095func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) (transport.ClientTransport, resolver.Address, *grpcsync.Event, error) {
1096	for _, addr := range addrs {
1097		ac.mu.Lock()
1098		if ac.state == connectivity.Shutdown {
1099			ac.mu.Unlock()
1100			return nil, resolver.Address{}, nil, errConnClosing
1101		}
1102
1103		ac.cc.mu.RLock()
1104		ac.dopts.copts.KeepaliveParams = ac.cc.mkp
1105		ac.cc.mu.RUnlock()
1106
1107		copts := ac.dopts.copts
1108		if ac.scopts.CredsBundle != nil {
1109			copts.CredsBundle = ac.scopts.CredsBundle
1110		}
1111		ac.mu.Unlock()
1112
1113		if channelz.IsOn() {
1114			channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1115				Desc:     fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr),
1116				Severity: channelz.CtINFO,
1117			})
1118		}
1119
1120		newTr, reconnect, err := ac.createTransport(addr, copts, connectDeadline)
1121		if err == nil {
1122			return newTr, addr, reconnect, nil
1123		}
1124		ac.cc.blockingpicker.updateConnectionError(err)
1125	}
1126
1127	// Couldn't connect to any address.
1128	return nil, resolver.Address{}, nil, fmt.Errorf("couldn't connect to any address")
1129}
1130
1131// createTransport creates a connection to addr. It returns the transport and a
1132// Event in the successful case. The Event fires when the returned transport
1133// disconnects.
1134func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) (transport.ClientTransport, *grpcsync.Event, error) {
1135	prefaceReceived := make(chan struct{})
1136	onCloseCalled := make(chan struct{})
1137	reconnect := grpcsync.NewEvent()
1138
1139	target := transport.TargetInfo{
1140		Addr:      addr.Addr,
1141		Metadata:  addr.Metadata,
1142		Authority: ac.cc.authority,
1143	}
1144
1145	once := sync.Once{}
1146	onGoAway := func(r transport.GoAwayReason) {
1147		ac.mu.Lock()
1148		ac.adjustParams(r)
1149		once.Do(func() {
1150			if ac.state == connectivity.Ready {
1151				// Prevent this SubConn from being used for new RPCs by setting its
1152				// state to Connecting.
1153				//
1154				// TODO: this should be Idle when grpc-go properly supports it.
1155				ac.updateConnectivityState(connectivity.Connecting)
1156			}
1157		})
1158		ac.mu.Unlock()
1159		reconnect.Fire()
1160	}
1161
1162	onClose := func() {
1163		ac.mu.Lock()
1164		once.Do(func() {
1165			if ac.state == connectivity.Ready {
1166				// Prevent this SubConn from being used for new RPCs by setting its
1167				// state to Connecting.
1168				//
1169				// TODO: this should be Idle when grpc-go properly supports it.
1170				ac.updateConnectivityState(connectivity.Connecting)
1171			}
1172		})
1173		ac.mu.Unlock()
1174		close(onCloseCalled)
1175		reconnect.Fire()
1176	}
1177
1178	onPrefaceReceipt := func() {
1179		close(prefaceReceived)
1180	}
1181
1182	connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
1183	defer cancel()
1184	if channelz.IsOn() {
1185		copts.ChannelzParentID = ac.channelzID
1186	}
1187
1188	newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)
1189	if err != nil {
1190		// newTr is either nil, or closed.
1191		grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err)
1192		return nil, nil, err
1193	}
1194
1195	select {
1196	case <-time.After(connectDeadline.Sub(time.Now())):
1197		// We didn't get the preface in time.
1198		newTr.Close()
1199		grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
1200		return nil, nil, errors.New("timed out waiting for server handshake")
1201	case <-prefaceReceived:
1202		// We got the preface - huzzah! things are good.
1203	case <-onCloseCalled:
1204		// The transport has already closed - noop.
1205		return nil, nil, errors.New("connection closed")
1206		// TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.
1207	}
1208	return newTr, reconnect, nil
1209}
1210
1211// startHealthCheck starts the health checking stream (RPC) to watch the health
1212// stats of this connection if health checking is requested and configured.
1213//
1214// LB channel health checking is enabled when all requirements below are met:
1215// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption
1216// 2. internal.HealthCheckFunc is set by importing the grpc/healthcheck package
1217// 3. a service config with non-empty healthCheckConfig field is provided
1218// 4. the load balancer requests it
1219//
1220// It sets addrConn to READY if the health checking stream is not started.
1221//
1222// Caller must hold ac.mu.
1223func (ac *addrConn) startHealthCheck(ctx context.Context) {
1224	var healthcheckManagingState bool
1225	defer func() {
1226		if !healthcheckManagingState {
1227			ac.updateConnectivityState(connectivity.Ready)
1228		}
1229	}()
1230
1231	if ac.cc.dopts.disableHealthCheck {
1232		return
1233	}
1234	healthCheckConfig := ac.cc.healthCheckConfig()
1235	if healthCheckConfig == nil {
1236		return
1237	}
1238	if !ac.scopts.HealthCheckEnabled {
1239		return
1240	}
1241	healthCheckFunc := ac.cc.dopts.healthCheckFunc
1242	if healthCheckFunc == nil {
1243		// The health package is not imported to set health check function.
1244		//
1245		// TODO: add a link to the health check doc in the error message.
1246		grpclog.Error("Health check is requested but health check function is not set.")
1247		return
1248	}
1249
1250	healthcheckManagingState = true
1251
1252	// Set up the health check helper functions.
1253	currentTr := ac.transport
1254	newStream := func(method string) (interface{}, error) {
1255		ac.mu.Lock()
1256		if ac.transport != currentTr {
1257			ac.mu.Unlock()
1258			return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
1259		}
1260		ac.mu.Unlock()
1261		return newNonRetryClientStream(ctx, &StreamDesc{ServerStreams: true}, method, currentTr, ac)
1262	}
1263	setConnectivityState := func(s connectivity.State) {
1264		ac.mu.Lock()
1265		defer ac.mu.Unlock()
1266		if ac.transport != currentTr {
1267			return
1268		}
1269		ac.updateConnectivityState(s)
1270	}
1271	// Start the health checking stream.
1272	go func() {
1273		err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName)
1274		if err != nil {
1275			if status.Code(err) == codes.Unimplemented {
1276				if channelz.IsOn() {
1277					channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1278						Desc:     "Subchannel health check is unimplemented at server side, thus health check is disabled",
1279						Severity: channelz.CtError,
1280					})
1281				}
1282				grpclog.Error("Subchannel health check is unimplemented at server side, thus health check is disabled")
1283			} else {
1284				grpclog.Errorf("HealthCheckFunc exits with unexpected error %v", err)
1285			}
1286		}
1287	}()
1288}
1289
1290func (ac *addrConn) resetConnectBackoff() {
1291	ac.mu.Lock()
1292	close(ac.resetBackoff)
1293	ac.backoffIdx = 0
1294	ac.resetBackoff = make(chan struct{})
1295	ac.mu.Unlock()
1296}
1297
1298// getReadyTransport returns the transport if ac's state is READY.
1299// Otherwise it returns nil, false.
1300// If ac's state is IDLE, it will trigger ac to connect.
1301func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
1302	ac.mu.Lock()
1303	if ac.state == connectivity.Ready && ac.transport != nil {
1304		t := ac.transport
1305		ac.mu.Unlock()
1306		return t, true
1307	}
1308	var idle bool
1309	if ac.state == connectivity.Idle {
1310		idle = true
1311	}
1312	ac.mu.Unlock()
1313	// Trigger idle ac to connect.
1314	if idle {
1315		ac.connect()
1316	}
1317	return nil, false
1318}
1319
1320// tearDown starts to tear down the addrConn.
1321// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
1322// some edge cases (e.g., the caller opens and closes many addrConn's in a
1323// tight loop.
1324// tearDown doesn't remove ac from ac.cc.conns.
1325func (ac *addrConn) tearDown(err error) {
1326	ac.mu.Lock()
1327	if ac.state == connectivity.Shutdown {
1328		ac.mu.Unlock()
1329		return
1330	}
1331	curTr := ac.transport
1332	ac.transport = nil
1333	// We have to set the state to Shutdown before anything else to prevent races
1334	// between setting the state and logic that waits on context cancelation / etc.
1335	ac.updateConnectivityState(connectivity.Shutdown)
1336	ac.cancel()
1337	ac.curAddr = resolver.Address{}
1338	if err == errConnDrain && curTr != nil {
1339		// GracefulClose(...) may be executed multiple times when
1340		// i) receiving multiple GoAway frames from the server; or
1341		// ii) there are concurrent name resolver/Balancer triggered
1342		// address removal and GoAway.
1343		// We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu.
1344		ac.mu.Unlock()
1345		curTr.GracefulClose()
1346		ac.mu.Lock()
1347	}
1348	if channelz.IsOn() {
1349		channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1350			Desc:     "Subchannel Deleted",
1351			Severity: channelz.CtINFO,
1352			Parent: &channelz.TraceEventDesc{
1353				Desc:     fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID),
1354				Severity: channelz.CtINFO,
1355			},
1356		})
1357		// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
1358		// the entity beng deleted, and thus prevent it from being deleted right away.
1359		channelz.RemoveEntry(ac.channelzID)
1360	}
1361	ac.mu.Unlock()
1362}
1363
1364func (ac *addrConn) getState() connectivity.State {
1365	ac.mu.Lock()
1366	defer ac.mu.Unlock()
1367	return ac.state
1368}
1369
1370func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric {
1371	ac.mu.Lock()
1372	addr := ac.curAddr.Addr
1373	ac.mu.Unlock()
1374	return &channelz.ChannelInternalMetric{
1375		State:                    ac.getState(),
1376		Target:                   addr,
1377		CallsStarted:             atomic.LoadInt64(&ac.czData.callsStarted),
1378		CallsSucceeded:           atomic.LoadInt64(&ac.czData.callsSucceeded),
1379		CallsFailed:              atomic.LoadInt64(&ac.czData.callsFailed),
1380		LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)),
1381	}
1382}
1383
1384func (ac *addrConn) incrCallsStarted() {
1385	atomic.AddInt64(&ac.czData.callsStarted, 1)
1386	atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano())
1387}
1388
1389func (ac *addrConn) incrCallsSucceeded() {
1390	atomic.AddInt64(&ac.czData.callsSucceeded, 1)
1391}
1392
1393func (ac *addrConn) incrCallsFailed() {
1394	atomic.AddInt64(&ac.czData.callsFailed, 1)
1395}
1396
1397type retryThrottler struct {
1398	max    float64
1399	thresh float64
1400	ratio  float64
1401
1402	mu     sync.Mutex
1403	tokens float64 // TODO(dfawley): replace with atomic and remove lock.
1404}
1405
1406// throttle subtracts a retry token from the pool and returns whether a retry
1407// should be throttled (disallowed) based upon the retry throttling policy in
1408// the service config.
1409func (rt *retryThrottler) throttle() bool {
1410	if rt == nil {
1411		return false
1412	}
1413	rt.mu.Lock()
1414	defer rt.mu.Unlock()
1415	rt.tokens--
1416	if rt.tokens < 0 {
1417		rt.tokens = 0
1418	}
1419	return rt.tokens <= rt.thresh
1420}
1421
1422func (rt *retryThrottler) successfulRPC() {
1423	if rt == nil {
1424		return
1425	}
1426	rt.mu.Lock()
1427	defer rt.mu.Unlock()
1428	rt.tokens += rt.ratio
1429	if rt.tokens > rt.max {
1430		rt.tokens = rt.max
1431	}
1432}
1433
1434type channelzChannel struct {
1435	cc *ClientConn
1436}
1437
1438func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric {
1439	return c.cc.channelzMetric()
1440}
1441
1442// ErrClientConnTimeout indicates that the ClientConn cannot establish the
1443// underlying connections within the specified timeout.
1444//
1445// Deprecated: This error is never returned by grpc and should not be
1446// referenced by users.
1447var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
1448