1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22	"context"
23	"errors"
24	"fmt"
25	"math"
26	"net"
27	"reflect"
28	"strings"
29	"sync"
30	"sync/atomic"
31	"time"
32
33	"google.golang.org/grpc/balancer"
34	"google.golang.org/grpc/balancer/base"
35	"google.golang.org/grpc/codes"
36	"google.golang.org/grpc/connectivity"
37	"google.golang.org/grpc/credentials"
38	"google.golang.org/grpc/grpclog"
39	"google.golang.org/grpc/internal/backoff"
40	"google.golang.org/grpc/internal/channelz"
41	"google.golang.org/grpc/internal/grpcsync"
42	"google.golang.org/grpc/internal/transport"
43	"google.golang.org/grpc/keepalive"
44	"google.golang.org/grpc/resolver"
45	"google.golang.org/grpc/serviceconfig"
46	"google.golang.org/grpc/status"
47
48	_ "google.golang.org/grpc/balancer/roundrobin"           // To register roundrobin.
49	_ "google.golang.org/grpc/internal/resolver/dns"         // To register dns resolver.
50	_ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver.
51)
52
53const (
54	// minimum time to give a connection to complete
55	minConnectTimeout = 20 * time.Second
56	// must match grpclbName in grpclb/grpclb.go
57	grpclbName = "grpclb"
58)
59
60var (
61	// ErrClientConnClosing indicates that the operation is illegal because
62	// the ClientConn is closing.
63	//
64	// Deprecated: this error should not be relied upon by users; use the status
65	// code of Canceled instead.
66	ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing")
67	// errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
68	errConnDrain = errors.New("grpc: the connection is drained")
69	// errConnClosing indicates that the connection is closing.
70	errConnClosing = errors.New("grpc: the connection is closing")
71	// errBalancerClosed indicates that the balancer is closed.
72	errBalancerClosed = errors.New("grpc: balancer is closed")
73	// invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default
74	// service config.
75	invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid"
76)
77
78// The following errors are returned from Dial and DialContext
79var (
80	// errNoTransportSecurity indicates that there is no transport security
81	// being set for ClientConn. Users should either set one or explicitly
82	// call WithInsecure DialOption to disable security.
83	errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
84	// errTransportCredsAndBundle indicates that creds bundle is used together
85	// with other individual Transport Credentials.
86	errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials")
87	// errTransportCredentialsMissing indicates that users want to transmit security
88	// information (e.g., OAuth2 token) which requires secure connection on an insecure
89	// connection.
90	errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
91	// errCredentialsConflict indicates that grpc.WithTransportCredentials()
92	// and grpc.WithInsecure() are both called for a connection.
93	errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
94)
95
96const (
97	defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
98	defaultClientMaxSendMessageSize    = math.MaxInt32
99	// http2IOBufSize specifies the buffer size for sending frames.
100	defaultWriteBufSize = 32 * 1024
101	defaultReadBufSize  = 32 * 1024
102)
103
104// Dial creates a client connection to the given target.
105func Dial(target string, opts ...DialOption) (*ClientConn, error) {
106	return DialContext(context.Background(), target, opts...)
107}
108
109// DialContext creates a client connection to the given target. By default, it's
110// a non-blocking dial (the function won't wait for connections to be
111// established, and connecting happens in the background). To make it a blocking
112// dial, use WithBlock() dial option.
113//
114// In the non-blocking case, the ctx does not act against the connection. It
115// only controls the setup steps.
116//
117// In the blocking case, ctx can be used to cancel or expire the pending
118// connection. Once this function returns, the cancellation and expiration of
119// ctx will be noop. Users should call ClientConn.Close to terminate all the
120// pending operations after this function returns.
121//
122// The target name syntax is defined in
123// https://github.com/grpc/grpc/blob/master/doc/naming.md.
124// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
125func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
126	cc := &ClientConn{
127		target:            target,
128		csMgr:             &connectivityStateManager{},
129		conns:             make(map[*addrConn]struct{}),
130		dopts:             defaultDialOptions(),
131		blockingpicker:    newPickerWrapper(),
132		czData:            new(channelzData),
133		firstResolveEvent: grpcsync.NewEvent(),
134	}
135	cc.retryThrottler.Store((*retryThrottler)(nil))
136	cc.ctx, cc.cancel = context.WithCancel(context.Background())
137
138	for _, opt := range opts {
139		opt.apply(&cc.dopts)
140	}
141
142	chainUnaryClientInterceptors(cc)
143	chainStreamClientInterceptors(cc)
144
145	defer func() {
146		if err != nil {
147			cc.Close()
148		}
149	}()
150
151	if channelz.IsOn() {
152		if cc.dopts.channelzParentID != 0 {
153			cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
154			channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
155				Desc:     "Channel Created",
156				Severity: channelz.CtINFO,
157				Parent: &channelz.TraceEventDesc{
158					Desc:     fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID),
159					Severity: channelz.CtINFO,
160				},
161			})
162		} else {
163			cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)
164			channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
165				Desc:     "Channel Created",
166				Severity: channelz.CtINFO,
167			})
168		}
169		cc.csMgr.channelzID = cc.channelzID
170	}
171
172	if !cc.dopts.insecure {
173		if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
174			return nil, errNoTransportSecurity
175		}
176		if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
177			return nil, errTransportCredsAndBundle
178		}
179	} else {
180		if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil {
181			return nil, errCredentialsConflict
182		}
183		for _, cd := range cc.dopts.copts.PerRPCCredentials {
184			if cd.RequireTransportSecurity() {
185				return nil, errTransportCredentialsMissing
186			}
187		}
188	}
189
190	if cc.dopts.defaultServiceConfigRawJSON != nil {
191		scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON)
192		if scpr.Err != nil {
193			return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err)
194		}
195		cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig)
196	}
197	cc.mkp = cc.dopts.copts.KeepaliveParams
198
199	if cc.dopts.copts.Dialer == nil {
200		cc.dopts.copts.Dialer = newProxyDialer(
201			func(ctx context.Context, addr string) (net.Conn, error) {
202				network, addr := parseDialTarget(addr)
203				return (&net.Dialer{}).DialContext(ctx, network, addr)
204			},
205		)
206	}
207
208	if cc.dopts.copts.UserAgent != "" {
209		cc.dopts.copts.UserAgent += " " + grpcUA
210	} else {
211		cc.dopts.copts.UserAgent = grpcUA
212	}
213
214	if cc.dopts.timeout > 0 {
215		var cancel context.CancelFunc
216		ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
217		defer cancel()
218	}
219	defer func() {
220		select {
221		case <-ctx.Done():
222			conn, err = nil, ctx.Err()
223		default:
224		}
225	}()
226
227	scSet := false
228	if cc.dopts.scChan != nil {
229		// Try to get an initial service config.
230		select {
231		case sc, ok := <-cc.dopts.scChan:
232			if ok {
233				cc.sc = &sc
234				scSet = true
235			}
236		default:
237		}
238	}
239	if cc.dopts.bs == nil {
240		cc.dopts.bs = backoff.DefaultExponential
241	}
242	if cc.dopts.resolverBuilder == nil {
243		// Only try to parse target when resolver builder is not already set.
244		cc.parsedTarget = parseTarget(cc.target)
245		grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme)
246		cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
247		if cc.dopts.resolverBuilder == nil {
248			// If resolver builder is still nil, the parsed target's scheme is
249			// not registered. Fallback to default resolver and set Endpoint to
250			// the original target.
251			grpclog.Infof("scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
252			cc.parsedTarget = resolver.Target{
253				Scheme:   resolver.GetDefaultScheme(),
254				Endpoint: target,
255			}
256			cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
257		}
258	} else {
259		cc.parsedTarget = resolver.Target{Endpoint: target}
260	}
261	creds := cc.dopts.copts.TransportCredentials
262	if creds != nil && creds.Info().ServerName != "" {
263		cc.authority = creds.Info().ServerName
264	} else if cc.dopts.insecure && cc.dopts.authority != "" {
265		cc.authority = cc.dopts.authority
266	} else {
267		// Use endpoint from "scheme://authority/endpoint" as the default
268		// authority for ClientConn.
269		cc.authority = cc.parsedTarget.Endpoint
270	}
271
272	if cc.dopts.scChan != nil && !scSet {
273		// Blocking wait for the initial service config.
274		select {
275		case sc, ok := <-cc.dopts.scChan:
276			if ok {
277				cc.sc = &sc
278			}
279		case <-ctx.Done():
280			return nil, ctx.Err()
281		}
282	}
283	if cc.dopts.scChan != nil {
284		go cc.scWatcher()
285	}
286
287	var credsClone credentials.TransportCredentials
288	if creds := cc.dopts.copts.TransportCredentials; creds != nil {
289		credsClone = creds.Clone()
290	}
291	cc.balancerBuildOpts = balancer.BuildOptions{
292		DialCreds:        credsClone,
293		CredsBundle:      cc.dopts.copts.CredsBundle,
294		Dialer:           cc.dopts.copts.Dialer,
295		ChannelzParentID: cc.channelzID,
296		Target:           cc.parsedTarget,
297	}
298
299	// Build the resolver.
300	rWrapper, err := newCCResolverWrapper(cc)
301	if err != nil {
302		return nil, fmt.Errorf("failed to build resolver: %v", err)
303	}
304
305	cc.mu.Lock()
306	cc.resolverWrapper = rWrapper
307	cc.mu.Unlock()
308	// A blocking dial blocks until the clientConn is ready.
309	if cc.dopts.block {
310		for {
311			s := cc.GetState()
312			if s == connectivity.Ready {
313				break
314			} else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
315				if err = cc.blockingpicker.connectionError(); err != nil {
316					terr, ok := err.(interface {
317						Temporary() bool
318					})
319					if ok && !terr.Temporary() {
320						return nil, err
321					}
322				}
323			}
324			if !cc.WaitForStateChange(ctx, s) {
325				// ctx got timeout or canceled.
326				return nil, ctx.Err()
327			}
328		}
329	}
330
331	return cc, nil
332}
333
334// chainUnaryClientInterceptors chains all unary client interceptors into one.
335func chainUnaryClientInterceptors(cc *ClientConn) {
336	interceptors := cc.dopts.chainUnaryInts
337	// Prepend dopts.unaryInt to the chaining interceptors if it exists, since unaryInt will
338	// be executed before any other chained interceptors.
339	if cc.dopts.unaryInt != nil {
340		interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)
341	}
342	var chainedInt UnaryClientInterceptor
343	if len(interceptors) == 0 {
344		chainedInt = nil
345	} else if len(interceptors) == 1 {
346		chainedInt = interceptors[0]
347	} else {
348		chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {
349			return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)
350		}
351	}
352	cc.dopts.unaryInt = chainedInt
353}
354
355// getChainUnaryInvoker recursively generate the chained unary invoker.
356func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, finalInvoker UnaryInvoker) UnaryInvoker {
357	if curr == len(interceptors)-1 {
358		return finalInvoker
359	}
360	return func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
361		return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...)
362	}
363}
364
365// chainStreamClientInterceptors chains all stream client interceptors into one.
366func chainStreamClientInterceptors(cc *ClientConn) {
367	interceptors := cc.dopts.chainStreamInts
368	// Prepend dopts.streamInt to the chaining interceptors if it exists, since streamInt will
369	// be executed before any other chained interceptors.
370	if cc.dopts.streamInt != nil {
371		interceptors = append([]StreamClientInterceptor{cc.dopts.streamInt}, interceptors...)
372	}
373	var chainedInt StreamClientInterceptor
374	if len(interceptors) == 0 {
375		chainedInt = nil
376	} else if len(interceptors) == 1 {
377		chainedInt = interceptors[0]
378	} else {
379		chainedInt = func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error) {
380			return interceptors[0](ctx, desc, cc, method, getChainStreamer(interceptors, 0, streamer), opts...)
381		}
382	}
383	cc.dopts.streamInt = chainedInt
384}
385
386// getChainStreamer recursively generate the chained client stream constructor.
387func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStreamer Streamer) Streamer {
388	if curr == len(interceptors)-1 {
389		return finalStreamer
390	}
391	return func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
392		return interceptors[curr+1](ctx, desc, cc, method, getChainStreamer(interceptors, curr+1, finalStreamer), opts...)
393	}
394}
395
396// connectivityStateManager keeps the connectivity.State of ClientConn.
397// This struct will eventually be exported so the balancers can access it.
398type connectivityStateManager struct {
399	mu         sync.Mutex
400	state      connectivity.State
401	notifyChan chan struct{}
402	channelzID int64
403}
404
405// updateState updates the connectivity.State of ClientConn.
406// If there's a change it notifies goroutines waiting on state change to
407// happen.
408func (csm *connectivityStateManager) updateState(state connectivity.State) {
409	csm.mu.Lock()
410	defer csm.mu.Unlock()
411	if csm.state == connectivity.Shutdown {
412		return
413	}
414	if csm.state == state {
415		return
416	}
417	csm.state = state
418	if channelz.IsOn() {
419		channelz.AddTraceEvent(csm.channelzID, &channelz.TraceEventDesc{
420			Desc:     fmt.Sprintf("Channel Connectivity change to %v", state),
421			Severity: channelz.CtINFO,
422		})
423	}
424	if csm.notifyChan != nil {
425		// There are other goroutines waiting on this channel.
426		close(csm.notifyChan)
427		csm.notifyChan = nil
428	}
429}
430
431func (csm *connectivityStateManager) getState() connectivity.State {
432	csm.mu.Lock()
433	defer csm.mu.Unlock()
434	return csm.state
435}
436
437func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
438	csm.mu.Lock()
439	defer csm.mu.Unlock()
440	if csm.notifyChan == nil {
441		csm.notifyChan = make(chan struct{})
442	}
443	return csm.notifyChan
444}
445
446// ClientConn represents a virtual connection to a conceptual endpoint, to
447// perform RPCs.
448//
449// A ClientConn is free to have zero or more actual connections to the endpoint
450// based on configuration, load, etc. It is also free to determine which actual
451// endpoints to use and may change it every RPC, permitting client-side load
452// balancing.
453//
454// A ClientConn encapsulates a range of functionality including name
455// resolution, TCP connection establishment (with retries and backoff) and TLS
456// handshakes. It also handles errors on established connections by
457// re-resolving the name and reconnecting.
458type ClientConn struct {
459	ctx    context.Context
460	cancel context.CancelFunc
461
462	target       string
463	parsedTarget resolver.Target
464	authority    string
465	dopts        dialOptions
466	csMgr        *connectivityStateManager
467
468	balancerBuildOpts balancer.BuildOptions
469	blockingpicker    *pickerWrapper
470
471	mu              sync.RWMutex
472	resolverWrapper *ccResolverWrapper
473	sc              *ServiceConfig
474	conns           map[*addrConn]struct{}
475	// Keepalive parameter can be updated if a GoAway is received.
476	mkp             keepalive.ClientParameters
477	curBalancerName string
478	balancerWrapper *ccBalancerWrapper
479	retryThrottler  atomic.Value
480
481	firstResolveEvent *grpcsync.Event
482
483	channelzID int64 // channelz unique identification number
484	czData     *channelzData
485}
486
487// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
488// ctx expires. A true value is returned in former case and false in latter.
489// This is an EXPERIMENTAL API.
490func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
491	ch := cc.csMgr.getNotifyChan()
492	if cc.csMgr.getState() != sourceState {
493		return true
494	}
495	select {
496	case <-ctx.Done():
497		return false
498	case <-ch:
499		return true
500	}
501}
502
503// GetState returns the connectivity.State of ClientConn.
504// This is an EXPERIMENTAL API.
505func (cc *ClientConn) GetState() connectivity.State {
506	return cc.csMgr.getState()
507}
508
509func (cc *ClientConn) scWatcher() {
510	for {
511		select {
512		case sc, ok := <-cc.dopts.scChan:
513			if !ok {
514				return
515			}
516			cc.mu.Lock()
517			// TODO: load balance policy runtime change is ignored.
518			// We may revisit this decision in the future.
519			cc.sc = &sc
520			cc.mu.Unlock()
521		case <-cc.ctx.Done():
522			return
523		}
524	}
525}
526
527// waitForResolvedAddrs blocks until the resolver has provided addresses or the
528// context expires.  Returns nil unless the context expires first; otherwise
529// returns a status error based on the context.
530func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
531	// This is on the RPC path, so we use a fast path to avoid the
532	// more-expensive "select" below after the resolver has returned once.
533	if cc.firstResolveEvent.HasFired() {
534		return nil
535	}
536	select {
537	case <-cc.firstResolveEvent.Done():
538		return nil
539	case <-ctx.Done():
540		return status.FromContextError(ctx.Err()).Err()
541	case <-cc.ctx.Done():
542		return ErrClientConnClosing
543	}
544}
545
546var emptyServiceConfig *ServiceConfig
547
548func init() {
549	cfg := parseServiceConfig("{}")
550	if cfg.Err != nil {
551		panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err))
552	}
553	emptyServiceConfig = cfg.Config.(*ServiceConfig)
554}
555
556func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) {
557	if cc.sc != nil {
558		cc.applyServiceConfigAndBalancer(cc.sc, addrs)
559		return
560	}
561	if cc.dopts.defaultServiceConfig != nil {
562		cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, addrs)
563	} else {
564		cc.applyServiceConfigAndBalancer(emptyServiceConfig, addrs)
565	}
566}
567
568func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
569	defer cc.firstResolveEvent.Fire()
570	cc.mu.Lock()
571	// Check if the ClientConn is already closed. Some fields (e.g.
572	// balancerWrapper) are set to nil when closing the ClientConn, and could
573	// cause nil pointer panic if we don't have this check.
574	if cc.conns == nil {
575		cc.mu.Unlock()
576		return nil
577	}
578
579	if err != nil {
580		// May need to apply the initial service config in case the resolver
581		// doesn't support service configs, or doesn't provide a service config
582		// with the new addresses.
583		cc.maybeApplyDefaultServiceConfig(nil)
584
585		if cc.balancerWrapper != nil {
586			cc.balancerWrapper.resolverError(err)
587		}
588
589		// No addresses are valid with err set; return early.
590		cc.mu.Unlock()
591		return balancer.ErrBadResolverState
592	}
593
594	var ret error
595	if cc.dopts.disableServiceConfig || s.ServiceConfig == nil {
596		cc.maybeApplyDefaultServiceConfig(s.Addresses)
597		// TODO: do we need to apply a failing LB policy if there is no
598		// default, per the error handling design?
599	} else {
600		if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok {
601			cc.applyServiceConfigAndBalancer(sc, s.Addresses)
602		} else {
603			ret = balancer.ErrBadResolverState
604			if cc.balancerWrapper == nil {
605				var err error
606				if s.ServiceConfig.Err != nil {
607					err = status.Errorf(codes.Unavailable, "error parsing service config: %v", s.ServiceConfig.Err)
608				} else {
609					err = status.Errorf(codes.Unavailable, "illegal service config type: %T", s.ServiceConfig.Config)
610				}
611				cc.blockingpicker.updatePicker(base.NewErrPicker(err))
612				cc.csMgr.updateState(connectivity.TransientFailure)
613				cc.mu.Unlock()
614				return ret
615			}
616		}
617	}
618
619	var balCfg serviceconfig.LoadBalancingConfig
620	if cc.dopts.balancerBuilder == nil && cc.sc != nil && cc.sc.lbConfig != nil {
621		balCfg = cc.sc.lbConfig.cfg
622	}
623
624	cbn := cc.curBalancerName
625	bw := cc.balancerWrapper
626	cc.mu.Unlock()
627	if cbn != grpclbName {
628		// Filter any grpclb addresses since we don't have the grpclb balancer.
629		for i := 0; i < len(s.Addresses); {
630			if s.Addresses[i].Type == resolver.GRPCLB {
631				copy(s.Addresses[i:], s.Addresses[i+1:])
632				s.Addresses = s.Addresses[:len(s.Addresses)-1]
633				continue
634			}
635			i++
636		}
637	}
638	uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
639	if ret == nil {
640		ret = uccsErr // prefer ErrBadResolver state since any other error is
641		// currently meaningless to the caller.
642	}
643	return ret
644}
645
646// switchBalancer starts the switching from current balancer to the balancer
647// with the given name.
648//
649// It will NOT send the current address list to the new balancer. If needed,
650// caller of this function should send address list to the new balancer after
651// this function returns.
652//
653// Caller must hold cc.mu.
654func (cc *ClientConn) switchBalancer(name string) {
655	if strings.EqualFold(cc.curBalancerName, name) {
656		return
657	}
658
659	grpclog.Infof("ClientConn switching balancer to %q", name)
660	if cc.dopts.balancerBuilder != nil {
661		grpclog.Infoln("ignoring balancer switching: Balancer DialOption used instead")
662		return
663	}
664	if cc.balancerWrapper != nil {
665		cc.balancerWrapper.close()
666	}
667
668	builder := balancer.Get(name)
669	if channelz.IsOn() {
670		if builder == nil {
671			channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
672				Desc:     fmt.Sprintf("Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName),
673				Severity: channelz.CtWarning,
674			})
675		} else {
676			channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
677				Desc:     fmt.Sprintf("Channel switches to new LB policy %q", name),
678				Severity: channelz.CtINFO,
679			})
680		}
681	}
682	if builder == nil {
683		grpclog.Infof("failed to get balancer builder for: %v, using pick_first instead", name)
684		builder = newPickfirstBuilder()
685	}
686
687	cc.curBalancerName = builder.Name()
688	cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
689}
690
691func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
692	cc.mu.Lock()
693	if cc.conns == nil {
694		cc.mu.Unlock()
695		return
696	}
697	// TODO(bar switching) send updates to all balancer wrappers when balancer
698	// gracefully switching is supported.
699	cc.balancerWrapper.handleSubConnStateChange(sc, s)
700	cc.mu.Unlock()
701}
702
703// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
704//
705// Caller needs to make sure len(addrs) > 0.
706func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
707	ac := &addrConn{
708		cc:           cc,
709		addrs:        addrs,
710		scopts:       opts,
711		dopts:        cc.dopts,
712		czData:       new(channelzData),
713		resetBackoff: make(chan struct{}),
714	}
715	ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
716	// Track ac in cc. This needs to be done before any getTransport(...) is called.
717	cc.mu.Lock()
718	if cc.conns == nil {
719		cc.mu.Unlock()
720		return nil, ErrClientConnClosing
721	}
722	if channelz.IsOn() {
723		ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "")
724		channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
725			Desc:     "Subchannel Created",
726			Severity: channelz.CtINFO,
727			Parent: &channelz.TraceEventDesc{
728				Desc:     fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID),
729				Severity: channelz.CtINFO,
730			},
731		})
732	}
733	cc.conns[ac] = struct{}{}
734	cc.mu.Unlock()
735	return ac, nil
736}
737
738// removeAddrConn removes the addrConn in the subConn from clientConn.
739// It also tears down the ac with the given error.
740func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
741	cc.mu.Lock()
742	if cc.conns == nil {
743		cc.mu.Unlock()
744		return
745	}
746	delete(cc.conns, ac)
747	cc.mu.Unlock()
748	ac.tearDown(err)
749}
750
751func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric {
752	return &channelz.ChannelInternalMetric{
753		State:                    cc.GetState(),
754		Target:                   cc.target,
755		CallsStarted:             atomic.LoadInt64(&cc.czData.callsStarted),
756		CallsSucceeded:           atomic.LoadInt64(&cc.czData.callsSucceeded),
757		CallsFailed:              atomic.LoadInt64(&cc.czData.callsFailed),
758		LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)),
759	}
760}
761
762// Target returns the target string of the ClientConn.
763// This is an EXPERIMENTAL API.
764func (cc *ClientConn) Target() string {
765	return cc.target
766}
767
768func (cc *ClientConn) incrCallsStarted() {
769	atomic.AddInt64(&cc.czData.callsStarted, 1)
770	atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano())
771}
772
773func (cc *ClientConn) incrCallsSucceeded() {
774	atomic.AddInt64(&cc.czData.callsSucceeded, 1)
775}
776
777func (cc *ClientConn) incrCallsFailed() {
778	atomic.AddInt64(&cc.czData.callsFailed, 1)
779}
780
781// connect starts creating a transport.
782// It does nothing if the ac is not IDLE.
783// TODO(bar) Move this to the addrConn section.
784func (ac *addrConn) connect() error {
785	ac.mu.Lock()
786	if ac.state == connectivity.Shutdown {
787		ac.mu.Unlock()
788		return errConnClosing
789	}
790	if ac.state != connectivity.Idle {
791		ac.mu.Unlock()
792		return nil
793	}
794	// Update connectivity state within the lock to prevent subsequent or
795	// concurrent calls from resetting the transport more than once.
796	ac.updateConnectivityState(connectivity.Connecting)
797	ac.mu.Unlock()
798
799	// Start a goroutine connecting to the server asynchronously.
800	go ac.resetTransport()
801	return nil
802}
803
804// tryUpdateAddrs tries to update ac.addrs with the new addresses list.
805//
806// If ac is Connecting, it returns false. The caller should tear down the ac and
807// create a new one. Note that the backoff will be reset when this happens.
808//
809// If ac is TransientFailure, it updates ac.addrs and returns true. The updated
810// addresses will be picked up by retry in the next iteration after backoff.
811//
812// If ac is Shutdown or Idle, it updates ac.addrs and returns true.
813//
814// If ac is Ready, it checks whether current connected address of ac is in the
815// new addrs list.
816//  - If true, it updates ac.addrs and returns true. The ac will keep using
817//    the existing connection.
818//  - If false, it does nothing and returns false.
819func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
820	ac.mu.Lock()
821	defer ac.mu.Unlock()
822	grpclog.Infof("addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
823	if ac.state == connectivity.Shutdown ||
824		ac.state == connectivity.TransientFailure ||
825		ac.state == connectivity.Idle {
826		ac.addrs = addrs
827		return true
828	}
829
830	if ac.state == connectivity.Connecting {
831		return false
832	}
833
834	// ac.state is Ready, try to find the connected address.
835	var curAddrFound bool
836	for _, a := range addrs {
837		if reflect.DeepEqual(ac.curAddr, a) {
838			curAddrFound = true
839			break
840		}
841	}
842	grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
843	if curAddrFound {
844		ac.addrs = addrs
845	}
846
847	return curAddrFound
848}
849
850// GetMethodConfig gets the method config of the input method.
851// If there's an exact match for input method (i.e. /service/method), we return
852// the corresponding MethodConfig.
853// If there isn't an exact match for the input method, we look for the default config
854// under the service (i.e /service/). If there is a default MethodConfig for
855// the service, we return it.
856// Otherwise, we return an empty MethodConfig.
857func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
858	// TODO: Avoid the locking here.
859	cc.mu.RLock()
860	defer cc.mu.RUnlock()
861	if cc.sc == nil {
862		return MethodConfig{}
863	}
864	m, ok := cc.sc.Methods[method]
865	if !ok {
866		i := strings.LastIndex(method, "/")
867		m = cc.sc.Methods[method[:i+1]]
868	}
869	return m
870}
871
872func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
873	cc.mu.RLock()
874	defer cc.mu.RUnlock()
875	if cc.sc == nil {
876		return nil
877	}
878	return cc.sc.healthCheckConfig
879}
880
881func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
882	t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{
883		FullMethodName: method,
884	})
885	if err != nil {
886		return nil, nil, toRPCErr(err)
887	}
888	return t, done, nil
889}
890
891func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, addrs []resolver.Address) {
892	if sc == nil {
893		// should never reach here.
894		return
895	}
896	cc.sc = sc
897
898	if cc.sc.retryThrottling != nil {
899		newThrottler := &retryThrottler{
900			tokens: cc.sc.retryThrottling.MaxTokens,
901			max:    cc.sc.retryThrottling.MaxTokens,
902			thresh: cc.sc.retryThrottling.MaxTokens / 2,
903			ratio:  cc.sc.retryThrottling.TokenRatio,
904		}
905		cc.retryThrottler.Store(newThrottler)
906	} else {
907		cc.retryThrottler.Store((*retryThrottler)(nil))
908	}
909
910	if cc.dopts.balancerBuilder == nil {
911		// Only look at balancer types and switch balancer if balancer dial
912		// option is not set.
913		var newBalancerName string
914		if cc.sc != nil && cc.sc.lbConfig != nil {
915			newBalancerName = cc.sc.lbConfig.name
916		} else {
917			var isGRPCLB bool
918			for _, a := range addrs {
919				if a.Type == resolver.GRPCLB {
920					isGRPCLB = true
921					break
922				}
923			}
924			if isGRPCLB {
925				newBalancerName = grpclbName
926			} else if cc.sc != nil && cc.sc.LB != nil {
927				newBalancerName = *cc.sc.LB
928			} else {
929				newBalancerName = PickFirstBalancerName
930			}
931		}
932		cc.switchBalancer(newBalancerName)
933	} else if cc.balancerWrapper == nil {
934		// Balancer dial option was set, and this is the first time handling
935		// resolved addresses. Build a balancer with dopts.balancerBuilder.
936		cc.curBalancerName = cc.dopts.balancerBuilder.Name()
937		cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
938	}
939}
940
941func (cc *ClientConn) resolveNow(o resolver.ResolveNowOption) {
942	cc.mu.RLock()
943	r := cc.resolverWrapper
944	cc.mu.RUnlock()
945	if r == nil {
946		return
947	}
948	go r.resolveNow(o)
949}
950
951// ResetConnectBackoff wakes up all subchannels in transient failure and causes
952// them to attempt another connection immediately.  It also resets the backoff
953// times used for subsequent attempts regardless of the current state.
954//
955// In general, this function should not be used.  Typical service or network
956// outages result in a reasonable client reconnection strategy by default.
957// However, if a previously unavailable network becomes available, this may be
958// used to trigger an immediate reconnect.
959//
960// This API is EXPERIMENTAL.
961func (cc *ClientConn) ResetConnectBackoff() {
962	cc.mu.Lock()
963	conns := cc.conns
964	cc.mu.Unlock()
965	for ac := range conns {
966		ac.resetConnectBackoff()
967	}
968}
969
970// Close tears down the ClientConn and all underlying connections.
971func (cc *ClientConn) Close() error {
972	defer cc.cancel()
973
974	cc.mu.Lock()
975	if cc.conns == nil {
976		cc.mu.Unlock()
977		return ErrClientConnClosing
978	}
979	conns := cc.conns
980	cc.conns = nil
981	cc.csMgr.updateState(connectivity.Shutdown)
982
983	rWrapper := cc.resolverWrapper
984	cc.resolverWrapper = nil
985	bWrapper := cc.balancerWrapper
986	cc.balancerWrapper = nil
987	cc.mu.Unlock()
988
989	cc.blockingpicker.close()
990
991	if rWrapper != nil {
992		rWrapper.close()
993	}
994	if bWrapper != nil {
995		bWrapper.close()
996	}
997
998	for ac := range conns {
999		ac.tearDown(ErrClientConnClosing)
1000	}
1001	if channelz.IsOn() {
1002		ted := &channelz.TraceEventDesc{
1003			Desc:     "Channel Deleted",
1004			Severity: channelz.CtINFO,
1005		}
1006		if cc.dopts.channelzParentID != 0 {
1007			ted.Parent = &channelz.TraceEventDesc{
1008				Desc:     fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID),
1009				Severity: channelz.CtINFO,
1010			}
1011		}
1012		channelz.AddTraceEvent(cc.channelzID, ted)
1013		// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
1014		// the entity being deleted, and thus prevent it from being deleted right away.
1015		channelz.RemoveEntry(cc.channelzID)
1016	}
1017	return nil
1018}
1019
1020// addrConn is a network connection to a given address.
1021type addrConn struct {
1022	ctx    context.Context
1023	cancel context.CancelFunc
1024
1025	cc     *ClientConn
1026	dopts  dialOptions
1027	acbw   balancer.SubConn
1028	scopts balancer.NewSubConnOptions
1029
1030	// transport is set when there's a viable transport (note: ac state may not be READY as LB channel
1031	// health checking may require server to report healthy to set ac to READY), and is reset
1032	// to nil when the current transport should no longer be used to create a stream (e.g. after GoAway
1033	// is received, transport is closed, ac has been torn down).
1034	transport transport.ClientTransport // The current transport.
1035
1036	mu      sync.Mutex
1037	curAddr resolver.Address   // The current address.
1038	addrs   []resolver.Address // All addresses that the resolver resolved to.
1039
1040	// Use updateConnectivityState for updating addrConn's connectivity state.
1041	state connectivity.State
1042
1043	backoffIdx   int // Needs to be stateful for resetConnectBackoff.
1044	resetBackoff chan struct{}
1045
1046	channelzID int64 // channelz unique identification number.
1047	czData     *channelzData
1048}
1049
1050// Note: this requires a lock on ac.mu.
1051func (ac *addrConn) updateConnectivityState(s connectivity.State) {
1052	if ac.state == s {
1053		return
1054	}
1055
1056	updateMsg := fmt.Sprintf("Subchannel Connectivity change to %v", s)
1057	ac.state = s
1058	if channelz.IsOn() {
1059		channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1060			Desc:     updateMsg,
1061			Severity: channelz.CtINFO,
1062		})
1063	}
1064	ac.cc.handleSubConnStateChange(ac.acbw, s)
1065}
1066
1067// adjustParams updates parameters used to create transports upon
1068// receiving a GoAway.
1069func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
1070	switch r {
1071	case transport.GoAwayTooManyPings:
1072		v := 2 * ac.dopts.copts.KeepaliveParams.Time
1073		ac.cc.mu.Lock()
1074		if v > ac.cc.mkp.Time {
1075			ac.cc.mkp.Time = v
1076		}
1077		ac.cc.mu.Unlock()
1078	}
1079}
1080
1081func (ac *addrConn) resetTransport() {
1082	for i := 0; ; i++ {
1083		if i > 0 {
1084			ac.cc.resolveNow(resolver.ResolveNowOption{})
1085		}
1086
1087		ac.mu.Lock()
1088		if ac.state == connectivity.Shutdown {
1089			ac.mu.Unlock()
1090			return
1091		}
1092
1093		addrs := ac.addrs
1094		backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
1095		// This will be the duration that dial gets to finish.
1096		dialDuration := minConnectTimeout
1097		if ac.dopts.minConnectTimeout != nil {
1098			dialDuration = ac.dopts.minConnectTimeout()
1099		}
1100
1101		if dialDuration < backoffFor {
1102			// Give dial more time as we keep failing to connect.
1103			dialDuration = backoffFor
1104		}
1105		// We can potentially spend all the time trying the first address, and
1106		// if the server accepts the connection and then hangs, the following
1107		// addresses will never be tried.
1108		//
1109		// The spec doesn't mention what should be done for multiple addresses.
1110		// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm
1111		connectDeadline := time.Now().Add(dialDuration)
1112
1113		ac.updateConnectivityState(connectivity.Connecting)
1114		ac.transport = nil
1115		ac.mu.Unlock()
1116
1117		newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline)
1118		if err != nil {
1119			// After exhausting all addresses, the addrConn enters
1120			// TRANSIENT_FAILURE.
1121			ac.mu.Lock()
1122			if ac.state == connectivity.Shutdown {
1123				ac.mu.Unlock()
1124				return
1125			}
1126			ac.updateConnectivityState(connectivity.TransientFailure)
1127
1128			// Backoff.
1129			b := ac.resetBackoff
1130			ac.mu.Unlock()
1131
1132			timer := time.NewTimer(backoffFor)
1133			select {
1134			case <-timer.C:
1135				ac.mu.Lock()
1136				ac.backoffIdx++
1137				ac.mu.Unlock()
1138			case <-b:
1139				timer.Stop()
1140			case <-ac.ctx.Done():
1141				timer.Stop()
1142				return
1143			}
1144			continue
1145		}
1146
1147		ac.mu.Lock()
1148		if ac.state == connectivity.Shutdown {
1149			ac.mu.Unlock()
1150			newTr.Close()
1151			return
1152		}
1153		ac.curAddr = addr
1154		ac.transport = newTr
1155		ac.backoffIdx = 0
1156
1157		hctx, hcancel := context.WithCancel(ac.ctx)
1158		ac.startHealthCheck(hctx)
1159		ac.mu.Unlock()
1160
1161		// Block until the created transport is down. And when this happens,
1162		// we restart from the top of the addr list.
1163		<-reconnect.Done()
1164		hcancel()
1165		// restart connecting - the top of the loop will set state to
1166		// CONNECTING.  This is against the current connectivity semantics doc,
1167		// however it allows for graceful behavior for RPCs not yet dispatched
1168		// - unfortunate timing would otherwise lead to the RPC failing even
1169		// though the TRANSIENT_FAILURE state (called for by the doc) would be
1170		// instantaneous.
1171		//
1172		// Ideally we should transition to Idle here and block until there is
1173		// RPC activity that leads to the balancer requesting a reconnect of
1174		// the associated SubConn.
1175	}
1176}
1177
1178// tryAllAddrs tries to creates a connection to the addresses, and stop when at the
1179// first successful one. It returns the transport, the address and a Event in
1180// the successful case. The Event fires when the returned transport disconnects.
1181func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) (transport.ClientTransport, resolver.Address, *grpcsync.Event, error) {
1182	for _, addr := range addrs {
1183		ac.mu.Lock()
1184		if ac.state == connectivity.Shutdown {
1185			ac.mu.Unlock()
1186			return nil, resolver.Address{}, nil, errConnClosing
1187		}
1188
1189		ac.cc.mu.RLock()
1190		ac.dopts.copts.KeepaliveParams = ac.cc.mkp
1191		ac.cc.mu.RUnlock()
1192
1193		copts := ac.dopts.copts
1194		if ac.scopts.CredsBundle != nil {
1195			copts.CredsBundle = ac.scopts.CredsBundle
1196		}
1197		ac.mu.Unlock()
1198
1199		if channelz.IsOn() {
1200			channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1201				Desc:     fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr),
1202				Severity: channelz.CtINFO,
1203			})
1204		}
1205
1206		newTr, reconnect, err := ac.createTransport(addr, copts, connectDeadline)
1207		if err == nil {
1208			return newTr, addr, reconnect, nil
1209		}
1210		ac.cc.blockingpicker.updateConnectionError(err)
1211	}
1212
1213	// Couldn't connect to any address.
1214	return nil, resolver.Address{}, nil, fmt.Errorf("couldn't connect to any address")
1215}
1216
1217// createTransport creates a connection to addr. It returns the transport and a
1218// Event in the successful case. The Event fires when the returned transport
1219// disconnects.
1220func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) (transport.ClientTransport, *grpcsync.Event, error) {
1221	prefaceReceived := make(chan struct{})
1222	onCloseCalled := make(chan struct{})
1223	reconnect := grpcsync.NewEvent()
1224
1225	authority := ac.cc.authority
1226	// addr.ServerName takes precedent over ClientConn authority, if present.
1227	if addr.ServerName != "" {
1228		authority = addr.ServerName
1229	}
1230
1231	target := transport.TargetInfo{
1232		Addr:      addr.Addr,
1233		Metadata:  addr.Metadata,
1234		Authority: authority,
1235	}
1236
1237	once := sync.Once{}
1238	onGoAway := func(r transport.GoAwayReason) {
1239		ac.mu.Lock()
1240		ac.adjustParams(r)
1241		once.Do(func() {
1242			if ac.state == connectivity.Ready {
1243				// Prevent this SubConn from being used for new RPCs by setting its
1244				// state to Connecting.
1245				//
1246				// TODO: this should be Idle when grpc-go properly supports it.
1247				ac.updateConnectivityState(connectivity.Connecting)
1248			}
1249		})
1250		ac.mu.Unlock()
1251		reconnect.Fire()
1252	}
1253
1254	onClose := func() {
1255		ac.mu.Lock()
1256		once.Do(func() {
1257			if ac.state == connectivity.Ready {
1258				// Prevent this SubConn from being used for new RPCs by setting its
1259				// state to Connecting.
1260				//
1261				// TODO: this should be Idle when grpc-go properly supports it.
1262				ac.updateConnectivityState(connectivity.Connecting)
1263			}
1264		})
1265		ac.mu.Unlock()
1266		close(onCloseCalled)
1267		reconnect.Fire()
1268	}
1269
1270	onPrefaceReceipt := func() {
1271		close(prefaceReceived)
1272	}
1273
1274	connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
1275	defer cancel()
1276	if channelz.IsOn() {
1277		copts.ChannelzParentID = ac.channelzID
1278	}
1279
1280	newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)
1281	if err != nil {
1282		// newTr is either nil, or closed.
1283		grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err)
1284		return nil, nil, err
1285	}
1286
1287	select {
1288	case <-time.After(connectDeadline.Sub(time.Now())):
1289		// We didn't get the preface in time.
1290		newTr.Close()
1291		grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
1292		return nil, nil, errors.New("timed out waiting for server handshake")
1293	case <-prefaceReceived:
1294		// We got the preface - huzzah! things are good.
1295	case <-onCloseCalled:
1296		// The transport has already closed - noop.
1297		return nil, nil, errors.New("connection closed")
1298		// TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.
1299	}
1300	return newTr, reconnect, nil
1301}
1302
1303// startHealthCheck starts the health checking stream (RPC) to watch the health
1304// stats of this connection if health checking is requested and configured.
1305//
1306// LB channel health checking is enabled when all requirements below are met:
1307// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption
1308// 2. internal.HealthCheckFunc is set by importing the grpc/healthcheck package
1309// 3. a service config with non-empty healthCheckConfig field is provided
1310// 4. the load balancer requests it
1311//
1312// It sets addrConn to READY if the health checking stream is not started.
1313//
1314// Caller must hold ac.mu.
1315func (ac *addrConn) startHealthCheck(ctx context.Context) {
1316	var healthcheckManagingState bool
1317	defer func() {
1318		if !healthcheckManagingState {
1319			ac.updateConnectivityState(connectivity.Ready)
1320		}
1321	}()
1322
1323	if ac.cc.dopts.disableHealthCheck {
1324		return
1325	}
1326	healthCheckConfig := ac.cc.healthCheckConfig()
1327	if healthCheckConfig == nil {
1328		return
1329	}
1330	if !ac.scopts.HealthCheckEnabled {
1331		return
1332	}
1333	healthCheckFunc := ac.cc.dopts.healthCheckFunc
1334	if healthCheckFunc == nil {
1335		// The health package is not imported to set health check function.
1336		//
1337		// TODO: add a link to the health check doc in the error message.
1338		grpclog.Error("Health check is requested but health check function is not set.")
1339		return
1340	}
1341
1342	healthcheckManagingState = true
1343
1344	// Set up the health check helper functions.
1345	currentTr := ac.transport
1346	newStream := func(method string) (interface{}, error) {
1347		ac.mu.Lock()
1348		if ac.transport != currentTr {
1349			ac.mu.Unlock()
1350			return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
1351		}
1352		ac.mu.Unlock()
1353		return newNonRetryClientStream(ctx, &StreamDesc{ServerStreams: true}, method, currentTr, ac)
1354	}
1355	setConnectivityState := func(s connectivity.State) {
1356		ac.mu.Lock()
1357		defer ac.mu.Unlock()
1358		if ac.transport != currentTr {
1359			return
1360		}
1361		ac.updateConnectivityState(s)
1362	}
1363	// Start the health checking stream.
1364	go func() {
1365		err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName)
1366		if err != nil {
1367			if status.Code(err) == codes.Unimplemented {
1368				if channelz.IsOn() {
1369					channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1370						Desc:     "Subchannel health check is unimplemented at server side, thus health check is disabled",
1371						Severity: channelz.CtError,
1372					})
1373				}
1374				grpclog.Error("Subchannel health check is unimplemented at server side, thus health check is disabled")
1375			} else {
1376				grpclog.Errorf("HealthCheckFunc exits with unexpected error %v", err)
1377			}
1378		}
1379	}()
1380}
1381
1382func (ac *addrConn) resetConnectBackoff() {
1383	ac.mu.Lock()
1384	close(ac.resetBackoff)
1385	ac.backoffIdx = 0
1386	ac.resetBackoff = make(chan struct{})
1387	ac.mu.Unlock()
1388}
1389
1390// getReadyTransport returns the transport if ac's state is READY.
1391// Otherwise it returns nil, false.
1392// If ac's state is IDLE, it will trigger ac to connect.
1393func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
1394	ac.mu.Lock()
1395	if ac.state == connectivity.Ready && ac.transport != nil {
1396		t := ac.transport
1397		ac.mu.Unlock()
1398		return t, true
1399	}
1400	var idle bool
1401	if ac.state == connectivity.Idle {
1402		idle = true
1403	}
1404	ac.mu.Unlock()
1405	// Trigger idle ac to connect.
1406	if idle {
1407		ac.connect()
1408	}
1409	return nil, false
1410}
1411
1412// tearDown starts to tear down the addrConn.
1413// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
1414// some edge cases (e.g., the caller opens and closes many addrConn's in a
1415// tight loop.
1416// tearDown doesn't remove ac from ac.cc.conns.
1417func (ac *addrConn) tearDown(err error) {
1418	ac.mu.Lock()
1419	if ac.state == connectivity.Shutdown {
1420		ac.mu.Unlock()
1421		return
1422	}
1423	curTr := ac.transport
1424	ac.transport = nil
1425	// We have to set the state to Shutdown before anything else to prevent races
1426	// between setting the state and logic that waits on context cancellation / etc.
1427	ac.updateConnectivityState(connectivity.Shutdown)
1428	ac.cancel()
1429	ac.curAddr = resolver.Address{}
1430	if err == errConnDrain && curTr != nil {
1431		// GracefulClose(...) may be executed multiple times when
1432		// i) receiving multiple GoAway frames from the server; or
1433		// ii) there are concurrent name resolver/Balancer triggered
1434		// address removal and GoAway.
1435		// We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu.
1436		ac.mu.Unlock()
1437		curTr.GracefulClose()
1438		ac.mu.Lock()
1439	}
1440	if channelz.IsOn() {
1441		channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1442			Desc:     "Subchannel Deleted",
1443			Severity: channelz.CtINFO,
1444			Parent: &channelz.TraceEventDesc{
1445				Desc:     fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID),
1446				Severity: channelz.CtINFO,
1447			},
1448		})
1449		// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
1450		// the entity being deleted, and thus prevent it from being deleted right away.
1451		channelz.RemoveEntry(ac.channelzID)
1452	}
1453	ac.mu.Unlock()
1454}
1455
1456func (ac *addrConn) getState() connectivity.State {
1457	ac.mu.Lock()
1458	defer ac.mu.Unlock()
1459	return ac.state
1460}
1461
1462func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric {
1463	ac.mu.Lock()
1464	addr := ac.curAddr.Addr
1465	ac.mu.Unlock()
1466	return &channelz.ChannelInternalMetric{
1467		State:                    ac.getState(),
1468		Target:                   addr,
1469		CallsStarted:             atomic.LoadInt64(&ac.czData.callsStarted),
1470		CallsSucceeded:           atomic.LoadInt64(&ac.czData.callsSucceeded),
1471		CallsFailed:              atomic.LoadInt64(&ac.czData.callsFailed),
1472		LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)),
1473	}
1474}
1475
1476func (ac *addrConn) incrCallsStarted() {
1477	atomic.AddInt64(&ac.czData.callsStarted, 1)
1478	atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano())
1479}
1480
1481func (ac *addrConn) incrCallsSucceeded() {
1482	atomic.AddInt64(&ac.czData.callsSucceeded, 1)
1483}
1484
1485func (ac *addrConn) incrCallsFailed() {
1486	atomic.AddInt64(&ac.czData.callsFailed, 1)
1487}
1488
1489type retryThrottler struct {
1490	max    float64
1491	thresh float64
1492	ratio  float64
1493
1494	mu     sync.Mutex
1495	tokens float64 // TODO(dfawley): replace with atomic and remove lock.
1496}
1497
1498// throttle subtracts a retry token from the pool and returns whether a retry
1499// should be throttled (disallowed) based upon the retry throttling policy in
1500// the service config.
1501func (rt *retryThrottler) throttle() bool {
1502	if rt == nil {
1503		return false
1504	}
1505	rt.mu.Lock()
1506	defer rt.mu.Unlock()
1507	rt.tokens--
1508	if rt.tokens < 0 {
1509		rt.tokens = 0
1510	}
1511	return rt.tokens <= rt.thresh
1512}
1513
1514func (rt *retryThrottler) successfulRPC() {
1515	if rt == nil {
1516		return
1517	}
1518	rt.mu.Lock()
1519	defer rt.mu.Unlock()
1520	rt.tokens += rt.ratio
1521	if rt.tokens > rt.max {
1522		rt.tokens = rt.max
1523	}
1524}
1525
1526type channelzChannel struct {
1527	cc *ClientConn
1528}
1529
1530func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric {
1531	return c.cc.channelzMetric()
1532}
1533
1534// ErrClientConnTimeout indicates that the ClientConn cannot establish the
1535// underlying connections within the specified timeout.
1536//
1537// Deprecated: This error is never returned by grpc and should not be
1538// referenced by users.
1539var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
1540