1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22	"context"
23	"errors"
24	"fmt"
25	"math"
26	"reflect"
27	"strings"
28	"sync"
29	"sync/atomic"
30	"time"
31
32	"google.golang.org/grpc/balancer"
33	"google.golang.org/grpc/balancer/base"
34	"google.golang.org/grpc/codes"
35	"google.golang.org/grpc/connectivity"
36	"google.golang.org/grpc/credentials"
37	"google.golang.org/grpc/internal/backoff"
38	"google.golang.org/grpc/internal/channelz"
39	"google.golang.org/grpc/internal/grpcsync"
40	"google.golang.org/grpc/internal/grpcutil"
41	iresolver "google.golang.org/grpc/internal/resolver"
42	"google.golang.org/grpc/internal/transport"
43	"google.golang.org/grpc/keepalive"
44	"google.golang.org/grpc/resolver"
45	"google.golang.org/grpc/serviceconfig"
46	"google.golang.org/grpc/status"
47
48	_ "google.golang.org/grpc/balancer/roundrobin"           // To register roundrobin.
49	_ "google.golang.org/grpc/internal/resolver/dns"         // To register dns resolver.
50	_ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver.
51	_ "google.golang.org/grpc/internal/resolver/unix"        // To register unix resolver.
52)
53
54const (
55	// minimum time to give a connection to complete
56	minConnectTimeout = 20 * time.Second
57	// must match grpclbName in grpclb/grpclb.go
58	grpclbName = "grpclb"
59)
60
61var (
62	// ErrClientConnClosing indicates that the operation is illegal because
63	// the ClientConn is closing.
64	//
65	// Deprecated: this error should not be relied upon by users; use the status
66	// code of Canceled instead.
67	ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing")
68	// errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
69	errConnDrain = errors.New("grpc: the connection is drained")
70	// errConnClosing indicates that the connection is closing.
71	errConnClosing = errors.New("grpc: the connection is closing")
72	// invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default
73	// service config.
74	invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid"
75)
76
77// The following errors are returned from Dial and DialContext
78var (
79	// errNoTransportSecurity indicates that there is no transport security
80	// being set for ClientConn. Users should either set one or explicitly
81	// call WithInsecure DialOption to disable security.
82	errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
83	// errTransportCredsAndBundle indicates that creds bundle is used together
84	// with other individual Transport Credentials.
85	errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials")
86	// errTransportCredentialsMissing indicates that users want to transmit security
87	// information (e.g., OAuth2 token) which requires secure connection on an insecure
88	// connection.
89	errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
90	// errCredentialsConflict indicates that grpc.WithTransportCredentials()
91	// and grpc.WithInsecure() are both called for a connection.
92	errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
93)
94
95const (
96	defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
97	defaultClientMaxSendMessageSize    = math.MaxInt32
98	// http2IOBufSize specifies the buffer size for sending frames.
99	defaultWriteBufSize = 32 * 1024
100	defaultReadBufSize  = 32 * 1024
101)
102
103// Dial creates a client connection to the given target.
104func Dial(target string, opts ...DialOption) (*ClientConn, error) {
105	return DialContext(context.Background(), target, opts...)
106}
107
108type defaultConfigSelector struct {
109	sc *ServiceConfig
110}
111
112func (dcs *defaultConfigSelector) SelectConfig(rpcInfo iresolver.RPCInfo) *iresolver.RPCConfig {
113	return &iresolver.RPCConfig{
114		Context:      rpcInfo.Context,
115		MethodConfig: getMethodConfig(dcs.sc, rpcInfo.Method),
116	}
117}
118
119// DialContext creates a client connection to the given target. By default, it's
120// a non-blocking dial (the function won't wait for connections to be
121// established, and connecting happens in the background). To make it a blocking
122// dial, use WithBlock() dial option.
123//
124// In the non-blocking case, the ctx does not act against the connection. It
125// only controls the setup steps.
126//
127// In the blocking case, ctx can be used to cancel or expire the pending
128// connection. Once this function returns, the cancellation and expiration of
129// ctx will be noop. Users should call ClientConn.Close to terminate all the
130// pending operations after this function returns.
131//
132// The target name syntax is defined in
133// https://github.com/grpc/grpc/blob/master/doc/naming.md.
134// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
135func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
136	cc := &ClientConn{
137		target:            target,
138		csMgr:             &connectivityStateManager{},
139		conns:             make(map[*addrConn]struct{}),
140		dopts:             defaultDialOptions(),
141		blockingpicker:    newPickerWrapper(),
142		czData:            new(channelzData),
143		firstResolveEvent: grpcsync.NewEvent(),
144	}
145	cc.retryThrottler.Store((*retryThrottler)(nil))
146	cc.ctx, cc.cancel = context.WithCancel(context.Background())
147
148	for _, opt := range opts {
149		opt.apply(&cc.dopts)
150	}
151
152	chainUnaryClientInterceptors(cc)
153	chainStreamClientInterceptors(cc)
154
155	defer func() {
156		if err != nil {
157			cc.Close()
158		}
159	}()
160
161	if channelz.IsOn() {
162		if cc.dopts.channelzParentID != 0 {
163			cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
164			channelz.AddTraceEvent(logger, cc.channelzID, 0, &channelz.TraceEventDesc{
165				Desc:     "Channel Created",
166				Severity: channelz.CtInfo,
167				Parent: &channelz.TraceEventDesc{
168					Desc:     fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID),
169					Severity: channelz.CtInfo,
170				},
171			})
172		} else {
173			cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)
174			channelz.Info(logger, cc.channelzID, "Channel Created")
175		}
176		cc.csMgr.channelzID = cc.channelzID
177	}
178
179	if !cc.dopts.insecure {
180		if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
181			return nil, errNoTransportSecurity
182		}
183		if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
184			return nil, errTransportCredsAndBundle
185		}
186	} else {
187		if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil {
188			return nil, errCredentialsConflict
189		}
190		for _, cd := range cc.dopts.copts.PerRPCCredentials {
191			if cd.RequireTransportSecurity() {
192				return nil, errTransportCredentialsMissing
193			}
194		}
195	}
196
197	if cc.dopts.defaultServiceConfigRawJSON != nil {
198		scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON)
199		if scpr.Err != nil {
200			return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err)
201		}
202		cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig)
203	}
204	cc.mkp = cc.dopts.copts.KeepaliveParams
205
206	if cc.dopts.copts.UserAgent != "" {
207		cc.dopts.copts.UserAgent += " " + grpcUA
208	} else {
209		cc.dopts.copts.UserAgent = grpcUA
210	}
211
212	if cc.dopts.timeout > 0 {
213		var cancel context.CancelFunc
214		ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
215		defer cancel()
216	}
217	defer func() {
218		select {
219		case <-ctx.Done():
220			switch {
221			case ctx.Err() == err:
222				conn = nil
223			case err == nil || !cc.dopts.returnLastError:
224				conn, err = nil, ctx.Err()
225			default:
226				conn, err = nil, fmt.Errorf("%v: %v", ctx.Err(), err)
227			}
228		default:
229		}
230	}()
231
232	scSet := false
233	if cc.dopts.scChan != nil {
234		// Try to get an initial service config.
235		select {
236		case sc, ok := <-cc.dopts.scChan:
237			if ok {
238				cc.sc = &sc
239				cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc})
240				scSet = true
241			}
242		default:
243		}
244	}
245	if cc.dopts.bs == nil {
246		cc.dopts.bs = backoff.DefaultExponential
247	}
248
249	// Determine the resolver to use.
250	cc.parsedTarget = grpcutil.ParseTarget(cc.target, cc.dopts.copts.Dialer != nil)
251	channelz.Infof(logger, cc.channelzID, "parsed scheme: %q", cc.parsedTarget.Scheme)
252	resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)
253	if resolverBuilder == nil {
254		// If resolver builder is still nil, the parsed target's scheme is
255		// not registered. Fallback to default resolver and set Endpoint to
256		// the original target.
257		channelz.Infof(logger, cc.channelzID, "scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
258		cc.parsedTarget = resolver.Target{
259			Scheme:   resolver.GetDefaultScheme(),
260			Endpoint: target,
261		}
262		resolverBuilder = cc.getResolver(cc.parsedTarget.Scheme)
263		if resolverBuilder == nil {
264			return nil, fmt.Errorf("could not get resolver for default scheme: %q", cc.parsedTarget.Scheme)
265		}
266	}
267
268	creds := cc.dopts.copts.TransportCredentials
269	if creds != nil && creds.Info().ServerName != "" {
270		cc.authority = creds.Info().ServerName
271	} else if cc.dopts.insecure && cc.dopts.authority != "" {
272		cc.authority = cc.dopts.authority
273	} else if strings.HasPrefix(cc.target, "unix:") {
274		cc.authority = "localhost"
275	} else if strings.HasPrefix(cc.parsedTarget.Endpoint, ":") {
276		cc.authority = "localhost" + cc.parsedTarget.Endpoint
277	} else {
278		// Use endpoint from "scheme://authority/endpoint" as the default
279		// authority for ClientConn.
280		cc.authority = cc.parsedTarget.Endpoint
281	}
282
283	if cc.dopts.scChan != nil && !scSet {
284		// Blocking wait for the initial service config.
285		select {
286		case sc, ok := <-cc.dopts.scChan:
287			if ok {
288				cc.sc = &sc
289				cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc})
290			}
291		case <-ctx.Done():
292			return nil, ctx.Err()
293		}
294	}
295	if cc.dopts.scChan != nil {
296		go cc.scWatcher()
297	}
298
299	var credsClone credentials.TransportCredentials
300	if creds := cc.dopts.copts.TransportCredentials; creds != nil {
301		credsClone = creds.Clone()
302	}
303	cc.balancerBuildOpts = balancer.BuildOptions{
304		DialCreds:        credsClone,
305		CredsBundle:      cc.dopts.copts.CredsBundle,
306		Dialer:           cc.dopts.copts.Dialer,
307		CustomUserAgent:  cc.dopts.copts.UserAgent,
308		ChannelzParentID: cc.channelzID,
309		Target:           cc.parsedTarget,
310	}
311
312	// Build the resolver.
313	rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
314	if err != nil {
315		return nil, fmt.Errorf("failed to build resolver: %v", err)
316	}
317	cc.mu.Lock()
318	cc.resolverWrapper = rWrapper
319	cc.mu.Unlock()
320
321	// A blocking dial blocks until the clientConn is ready.
322	if cc.dopts.block {
323		for {
324			s := cc.GetState()
325			if s == connectivity.Ready {
326				break
327			} else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
328				if err = cc.connectionError(); err != nil {
329					terr, ok := err.(interface {
330						Temporary() bool
331					})
332					if ok && !terr.Temporary() {
333						return nil, err
334					}
335				}
336			}
337			if !cc.WaitForStateChange(ctx, s) {
338				// ctx got timeout or canceled.
339				if err = cc.connectionError(); err != nil && cc.dopts.returnLastError {
340					return nil, err
341				}
342				return nil, ctx.Err()
343			}
344		}
345	}
346
347	return cc, nil
348}
349
350// chainUnaryClientInterceptors chains all unary client interceptors into one.
351func chainUnaryClientInterceptors(cc *ClientConn) {
352	interceptors := cc.dopts.chainUnaryInts
353	// Prepend dopts.unaryInt to the chaining interceptors if it exists, since unaryInt will
354	// be executed before any other chained interceptors.
355	if cc.dopts.unaryInt != nil {
356		interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)
357	}
358	var chainedInt UnaryClientInterceptor
359	if len(interceptors) == 0 {
360		chainedInt = nil
361	} else if len(interceptors) == 1 {
362		chainedInt = interceptors[0]
363	} else {
364		chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {
365			return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)
366		}
367	}
368	cc.dopts.unaryInt = chainedInt
369}
370
371// getChainUnaryInvoker recursively generate the chained unary invoker.
372func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, finalInvoker UnaryInvoker) UnaryInvoker {
373	if curr == len(interceptors)-1 {
374		return finalInvoker
375	}
376	return func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
377		return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...)
378	}
379}
380
381// chainStreamClientInterceptors chains all stream client interceptors into one.
382func chainStreamClientInterceptors(cc *ClientConn) {
383	interceptors := cc.dopts.chainStreamInts
384	// Prepend dopts.streamInt to the chaining interceptors if it exists, since streamInt will
385	// be executed before any other chained interceptors.
386	if cc.dopts.streamInt != nil {
387		interceptors = append([]StreamClientInterceptor{cc.dopts.streamInt}, interceptors...)
388	}
389	var chainedInt StreamClientInterceptor
390	if len(interceptors) == 0 {
391		chainedInt = nil
392	} else if len(interceptors) == 1 {
393		chainedInt = interceptors[0]
394	} else {
395		chainedInt = func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error) {
396			return interceptors[0](ctx, desc, cc, method, getChainStreamer(interceptors, 0, streamer), opts...)
397		}
398	}
399	cc.dopts.streamInt = chainedInt
400}
401
402// getChainStreamer recursively generate the chained client stream constructor.
403func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStreamer Streamer) Streamer {
404	if curr == len(interceptors)-1 {
405		return finalStreamer
406	}
407	return func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
408		return interceptors[curr+1](ctx, desc, cc, method, getChainStreamer(interceptors, curr+1, finalStreamer), opts...)
409	}
410}
411
412// connectivityStateManager keeps the connectivity.State of ClientConn.
413// This struct will eventually be exported so the balancers can access it.
414type connectivityStateManager struct {
415	mu         sync.Mutex
416	state      connectivity.State
417	notifyChan chan struct{}
418	channelzID int64
419}
420
421// updateState updates the connectivity.State of ClientConn.
422// If there's a change it notifies goroutines waiting on state change to
423// happen.
424func (csm *connectivityStateManager) updateState(state connectivity.State) {
425	csm.mu.Lock()
426	defer csm.mu.Unlock()
427	if csm.state == connectivity.Shutdown {
428		return
429	}
430	if csm.state == state {
431		return
432	}
433	csm.state = state
434	channelz.Infof(logger, csm.channelzID, "Channel Connectivity change to %v", state)
435	if csm.notifyChan != nil {
436		// There are other goroutines waiting on this channel.
437		close(csm.notifyChan)
438		csm.notifyChan = nil
439	}
440}
441
442func (csm *connectivityStateManager) getState() connectivity.State {
443	csm.mu.Lock()
444	defer csm.mu.Unlock()
445	return csm.state
446}
447
448func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
449	csm.mu.Lock()
450	defer csm.mu.Unlock()
451	if csm.notifyChan == nil {
452		csm.notifyChan = make(chan struct{})
453	}
454	return csm.notifyChan
455}
456
457// ClientConnInterface defines the functions clients need to perform unary and
458// streaming RPCs.  It is implemented by *ClientConn, and is only intended to
459// be referenced by generated code.
460type ClientConnInterface interface {
461	// Invoke performs a unary RPC and returns after the response is received
462	// into reply.
463	Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error
464	// NewStream begins a streaming RPC.
465	NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error)
466}
467
468// Assert *ClientConn implements ClientConnInterface.
469var _ ClientConnInterface = (*ClientConn)(nil)
470
471// ClientConn represents a virtual connection to a conceptual endpoint, to
472// perform RPCs.
473//
474// A ClientConn is free to have zero or more actual connections to the endpoint
475// based on configuration, load, etc. It is also free to determine which actual
476// endpoints to use and may change it every RPC, permitting client-side load
477// balancing.
478//
479// A ClientConn encapsulates a range of functionality including name
480// resolution, TCP connection establishment (with retries and backoff) and TLS
481// handshakes. It also handles errors on established connections by
482// re-resolving the name and reconnecting.
483type ClientConn struct {
484	ctx    context.Context
485	cancel context.CancelFunc
486
487	target       string
488	parsedTarget resolver.Target
489	authority    string
490	dopts        dialOptions
491	csMgr        *connectivityStateManager
492
493	balancerBuildOpts balancer.BuildOptions
494	blockingpicker    *pickerWrapper
495
496	safeConfigSelector iresolver.SafeConfigSelector
497
498	mu              sync.RWMutex
499	resolverWrapper *ccResolverWrapper
500	sc              *ServiceConfig
501	conns           map[*addrConn]struct{}
502	// Keepalive parameter can be updated if a GoAway is received.
503	mkp             keepalive.ClientParameters
504	curBalancerName string
505	balancerWrapper *ccBalancerWrapper
506	retryThrottler  atomic.Value
507
508	firstResolveEvent *grpcsync.Event
509
510	channelzID int64 // channelz unique identification number
511	czData     *channelzData
512
513	lceMu               sync.Mutex // protects lastConnectionError
514	lastConnectionError error
515}
516
517// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
518// ctx expires. A true value is returned in former case and false in latter.
519//
520// Experimental
521//
522// Notice: This API is EXPERIMENTAL and may be changed or removed in a
523// later release.
524func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
525	ch := cc.csMgr.getNotifyChan()
526	if cc.csMgr.getState() != sourceState {
527		return true
528	}
529	select {
530	case <-ctx.Done():
531		return false
532	case <-ch:
533		return true
534	}
535}
536
537// GetState returns the connectivity.State of ClientConn.
538//
539// Experimental
540//
541// Notice: This API is EXPERIMENTAL and may be changed or removed in a
542// later release.
543func (cc *ClientConn) GetState() connectivity.State {
544	return cc.csMgr.getState()
545}
546
547func (cc *ClientConn) scWatcher() {
548	for {
549		select {
550		case sc, ok := <-cc.dopts.scChan:
551			if !ok {
552				return
553			}
554			cc.mu.Lock()
555			// TODO: load balance policy runtime change is ignored.
556			// We may revisit this decision in the future.
557			cc.sc = &sc
558			cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{&sc})
559			cc.mu.Unlock()
560		case <-cc.ctx.Done():
561			return
562		}
563	}
564}
565
566// waitForResolvedAddrs blocks until the resolver has provided addresses or the
567// context expires.  Returns nil unless the context expires first; otherwise
568// returns a status error based on the context.
569func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
570	// This is on the RPC path, so we use a fast path to avoid the
571	// more-expensive "select" below after the resolver has returned once.
572	if cc.firstResolveEvent.HasFired() {
573		return nil
574	}
575	select {
576	case <-cc.firstResolveEvent.Done():
577		return nil
578	case <-ctx.Done():
579		return status.FromContextError(ctx.Err()).Err()
580	case <-cc.ctx.Done():
581		return ErrClientConnClosing
582	}
583}
584
585var emptyServiceConfig *ServiceConfig
586
587func init() {
588	cfg := parseServiceConfig("{}")
589	if cfg.Err != nil {
590		panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err))
591	}
592	emptyServiceConfig = cfg.Config.(*ServiceConfig)
593}
594
595func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) {
596	if cc.sc != nil {
597		cc.applyServiceConfigAndBalancer(cc.sc, nil, addrs)
598		return
599	}
600	if cc.dopts.defaultServiceConfig != nil {
601		cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, &defaultConfigSelector{cc.dopts.defaultServiceConfig}, addrs)
602	} else {
603		cc.applyServiceConfigAndBalancer(emptyServiceConfig, &defaultConfigSelector{emptyServiceConfig}, addrs)
604	}
605}
606
607func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
608	defer cc.firstResolveEvent.Fire()
609	cc.mu.Lock()
610	// Check if the ClientConn is already closed. Some fields (e.g.
611	// balancerWrapper) are set to nil when closing the ClientConn, and could
612	// cause nil pointer panic if we don't have this check.
613	if cc.conns == nil {
614		cc.mu.Unlock()
615		return nil
616	}
617
618	if err != nil {
619		// May need to apply the initial service config in case the resolver
620		// doesn't support service configs, or doesn't provide a service config
621		// with the new addresses.
622		cc.maybeApplyDefaultServiceConfig(nil)
623
624		if cc.balancerWrapper != nil {
625			cc.balancerWrapper.resolverError(err)
626		}
627
628		// No addresses are valid with err set; return early.
629		cc.mu.Unlock()
630		return balancer.ErrBadResolverState
631	}
632
633	var ret error
634	if cc.dopts.disableServiceConfig || s.ServiceConfig == nil {
635		cc.maybeApplyDefaultServiceConfig(s.Addresses)
636		// TODO: do we need to apply a failing LB policy if there is no
637		// default, per the error handling design?
638	} else {
639		if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok {
640			configSelector := iresolver.GetConfigSelector(s)
641			if configSelector != nil {
642				if len(s.ServiceConfig.Config.(*ServiceConfig).Methods) != 0 {
643					channelz.Infof(logger, cc.channelzID, "method configs in service config will be ignored due to presence of config selector")
644				}
645			} else {
646				configSelector = &defaultConfigSelector{sc}
647			}
648			cc.applyServiceConfigAndBalancer(sc, configSelector, s.Addresses)
649		} else {
650			ret = balancer.ErrBadResolverState
651			if cc.balancerWrapper == nil {
652				var err error
653				if s.ServiceConfig.Err != nil {
654					err = status.Errorf(codes.Unavailable, "error parsing service config: %v", s.ServiceConfig.Err)
655				} else {
656					err = status.Errorf(codes.Unavailable, "illegal service config type: %T", s.ServiceConfig.Config)
657				}
658				cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{cc.sc})
659				cc.blockingpicker.updatePicker(base.NewErrPicker(err))
660				cc.csMgr.updateState(connectivity.TransientFailure)
661				cc.mu.Unlock()
662				return ret
663			}
664		}
665	}
666
667	var balCfg serviceconfig.LoadBalancingConfig
668	if cc.dopts.balancerBuilder == nil && cc.sc != nil && cc.sc.lbConfig != nil {
669		balCfg = cc.sc.lbConfig.cfg
670	}
671
672	cbn := cc.curBalancerName
673	bw := cc.balancerWrapper
674	cc.mu.Unlock()
675	if cbn != grpclbName {
676		// Filter any grpclb addresses since we don't have the grpclb balancer.
677		for i := 0; i < len(s.Addresses); {
678			if s.Addresses[i].Type == resolver.GRPCLB {
679				copy(s.Addresses[i:], s.Addresses[i+1:])
680				s.Addresses = s.Addresses[:len(s.Addresses)-1]
681				continue
682			}
683			i++
684		}
685	}
686	uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
687	if ret == nil {
688		ret = uccsErr // prefer ErrBadResolver state since any other error is
689		// currently meaningless to the caller.
690	}
691	return ret
692}
693
694// switchBalancer starts the switching from current balancer to the balancer
695// with the given name.
696//
697// It will NOT send the current address list to the new balancer. If needed,
698// caller of this function should send address list to the new balancer after
699// this function returns.
700//
701// Caller must hold cc.mu.
702func (cc *ClientConn) switchBalancer(name string) {
703	if strings.EqualFold(cc.curBalancerName, name) {
704		return
705	}
706
707	channelz.Infof(logger, cc.channelzID, "ClientConn switching balancer to %q", name)
708	if cc.dopts.balancerBuilder != nil {
709		channelz.Info(logger, cc.channelzID, "ignoring balancer switching: Balancer DialOption used instead")
710		return
711	}
712	if cc.balancerWrapper != nil {
713		cc.balancerWrapper.close()
714	}
715
716	builder := balancer.Get(name)
717	if builder == nil {
718		channelz.Warningf(logger, cc.channelzID, "Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName)
719		channelz.Infof(logger, cc.channelzID, "failed to get balancer builder for: %v, using pick_first instead", name)
720		builder = newPickfirstBuilder()
721	} else {
722		channelz.Infof(logger, cc.channelzID, "Channel switches to new LB policy %q", name)
723	}
724
725	cc.curBalancerName = builder.Name()
726	cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
727}
728
729func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {
730	cc.mu.Lock()
731	if cc.conns == nil {
732		cc.mu.Unlock()
733		return
734	}
735	// TODO(bar switching) send updates to all balancer wrappers when balancer
736	// gracefully switching is supported.
737	cc.balancerWrapper.handleSubConnStateChange(sc, s, err)
738	cc.mu.Unlock()
739}
740
741// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
742//
743// Caller needs to make sure len(addrs) > 0.
744func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
745	ac := &addrConn{
746		state:        connectivity.Idle,
747		cc:           cc,
748		addrs:        addrs,
749		scopts:       opts,
750		dopts:        cc.dopts,
751		czData:       new(channelzData),
752		resetBackoff: make(chan struct{}),
753	}
754	ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
755	// Track ac in cc. This needs to be done before any getTransport(...) is called.
756	cc.mu.Lock()
757	if cc.conns == nil {
758		cc.mu.Unlock()
759		return nil, ErrClientConnClosing
760	}
761	if channelz.IsOn() {
762		ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "")
763		channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{
764			Desc:     "Subchannel Created",
765			Severity: channelz.CtInfo,
766			Parent: &channelz.TraceEventDesc{
767				Desc:     fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID),
768				Severity: channelz.CtInfo,
769			},
770		})
771	}
772	cc.conns[ac] = struct{}{}
773	cc.mu.Unlock()
774	return ac, nil
775}
776
777// removeAddrConn removes the addrConn in the subConn from clientConn.
778// It also tears down the ac with the given error.
779func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
780	cc.mu.Lock()
781	if cc.conns == nil {
782		cc.mu.Unlock()
783		return
784	}
785	delete(cc.conns, ac)
786	cc.mu.Unlock()
787	ac.tearDown(err)
788}
789
790func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric {
791	return &channelz.ChannelInternalMetric{
792		State:                    cc.GetState(),
793		Target:                   cc.target,
794		CallsStarted:             atomic.LoadInt64(&cc.czData.callsStarted),
795		CallsSucceeded:           atomic.LoadInt64(&cc.czData.callsSucceeded),
796		CallsFailed:              atomic.LoadInt64(&cc.czData.callsFailed),
797		LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)),
798	}
799}
800
801// Target returns the target string of the ClientConn.
802//
803// Experimental
804//
805// Notice: This API is EXPERIMENTAL and may be changed or removed in a
806// later release.
807func (cc *ClientConn) Target() string {
808	return cc.target
809}
810
811func (cc *ClientConn) incrCallsStarted() {
812	atomic.AddInt64(&cc.czData.callsStarted, 1)
813	atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano())
814}
815
816func (cc *ClientConn) incrCallsSucceeded() {
817	atomic.AddInt64(&cc.czData.callsSucceeded, 1)
818}
819
820func (cc *ClientConn) incrCallsFailed() {
821	atomic.AddInt64(&cc.czData.callsFailed, 1)
822}
823
824// connect starts creating a transport.
825// It does nothing if the ac is not IDLE.
826// TODO(bar) Move this to the addrConn section.
827func (ac *addrConn) connect() error {
828	ac.mu.Lock()
829	if ac.state == connectivity.Shutdown {
830		ac.mu.Unlock()
831		return errConnClosing
832	}
833	if ac.state != connectivity.Idle {
834		ac.mu.Unlock()
835		return nil
836	}
837	// Update connectivity state within the lock to prevent subsequent or
838	// concurrent calls from resetting the transport more than once.
839	ac.updateConnectivityState(connectivity.Connecting, nil)
840	ac.mu.Unlock()
841
842	// Start a goroutine connecting to the server asynchronously.
843	go ac.resetTransport()
844	return nil
845}
846
847// tryUpdateAddrs tries to update ac.addrs with the new addresses list.
848//
849// If ac is Connecting, it returns false. The caller should tear down the ac and
850// create a new one. Note that the backoff will be reset when this happens.
851//
852// If ac is TransientFailure, it updates ac.addrs and returns true. The updated
853// addresses will be picked up by retry in the next iteration after backoff.
854//
855// If ac is Shutdown or Idle, it updates ac.addrs and returns true.
856//
857// If ac is Ready, it checks whether current connected address of ac is in the
858// new addrs list.
859//  - If true, it updates ac.addrs and returns true. The ac will keep using
860//    the existing connection.
861//  - If false, it does nothing and returns false.
862func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
863	ac.mu.Lock()
864	defer ac.mu.Unlock()
865	channelz.Infof(logger, ac.channelzID, "addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
866	if ac.state == connectivity.Shutdown ||
867		ac.state == connectivity.TransientFailure ||
868		ac.state == connectivity.Idle {
869		ac.addrs = addrs
870		return true
871	}
872
873	if ac.state == connectivity.Connecting {
874		return false
875	}
876
877	// ac.state is Ready, try to find the connected address.
878	var curAddrFound bool
879	for _, a := range addrs {
880		if reflect.DeepEqual(ac.curAddr, a) {
881			curAddrFound = true
882			break
883		}
884	}
885	channelz.Infof(logger, ac.channelzID, "addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
886	if curAddrFound {
887		ac.addrs = addrs
888	}
889
890	return curAddrFound
891}
892
893func getMethodConfig(sc *ServiceConfig, method string) MethodConfig {
894	if sc == nil {
895		return MethodConfig{}
896	}
897	if m, ok := sc.Methods[method]; ok {
898		return m
899	}
900	i := strings.LastIndex(method, "/")
901	if m, ok := sc.Methods[method[:i+1]]; ok {
902		return m
903	}
904	return sc.Methods[""]
905}
906
907// GetMethodConfig gets the method config of the input method.
908// If there's an exact match for input method (i.e. /service/method), we return
909// the corresponding MethodConfig.
910// If there isn't an exact match for the input method, we look for the service's default
911// config under the service (i.e /service/) and then for the default for all services (empty string).
912//
913// If there is a default MethodConfig for the service, we return it.
914// Otherwise, we return an empty MethodConfig.
915func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
916	// TODO: Avoid the locking here.
917	cc.mu.RLock()
918	defer cc.mu.RUnlock()
919	return getMethodConfig(cc.sc, method)
920}
921
922func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
923	cc.mu.RLock()
924	defer cc.mu.RUnlock()
925	if cc.sc == nil {
926		return nil
927	}
928	return cc.sc.healthCheckConfig
929}
930
931func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
932	t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
933		Ctx:            ctx,
934		FullMethodName: method,
935	})
936	if err != nil {
937		return nil, nil, toRPCErr(err)
938	}
939	return t, done, nil
940}
941
942func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector, addrs []resolver.Address) {
943	if sc == nil {
944		// should never reach here.
945		return
946	}
947	cc.sc = sc
948	if configSelector != nil {
949		cc.safeConfigSelector.UpdateConfigSelector(configSelector)
950	}
951
952	if cc.sc.retryThrottling != nil {
953		newThrottler := &retryThrottler{
954			tokens: cc.sc.retryThrottling.MaxTokens,
955			max:    cc.sc.retryThrottling.MaxTokens,
956			thresh: cc.sc.retryThrottling.MaxTokens / 2,
957			ratio:  cc.sc.retryThrottling.TokenRatio,
958		}
959		cc.retryThrottler.Store(newThrottler)
960	} else {
961		cc.retryThrottler.Store((*retryThrottler)(nil))
962	}
963
964	if cc.dopts.balancerBuilder == nil {
965		// Only look at balancer types and switch balancer if balancer dial
966		// option is not set.
967		var newBalancerName string
968		if cc.sc != nil && cc.sc.lbConfig != nil {
969			newBalancerName = cc.sc.lbConfig.name
970		} else {
971			var isGRPCLB bool
972			for _, a := range addrs {
973				if a.Type == resolver.GRPCLB {
974					isGRPCLB = true
975					break
976				}
977			}
978			if isGRPCLB {
979				newBalancerName = grpclbName
980			} else if cc.sc != nil && cc.sc.LB != nil {
981				newBalancerName = *cc.sc.LB
982			} else {
983				newBalancerName = PickFirstBalancerName
984			}
985		}
986		cc.switchBalancer(newBalancerName)
987	} else if cc.balancerWrapper == nil {
988		// Balancer dial option was set, and this is the first time handling
989		// resolved addresses. Build a balancer with dopts.balancerBuilder.
990		cc.curBalancerName = cc.dopts.balancerBuilder.Name()
991		cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
992	}
993}
994
995func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) {
996	cc.mu.RLock()
997	r := cc.resolverWrapper
998	cc.mu.RUnlock()
999	if r == nil {
1000		return
1001	}
1002	go r.resolveNow(o)
1003}
1004
1005// ResetConnectBackoff wakes up all subchannels in transient failure and causes
1006// them to attempt another connection immediately.  It also resets the backoff
1007// times used for subsequent attempts regardless of the current state.
1008//
1009// In general, this function should not be used.  Typical service or network
1010// outages result in a reasonable client reconnection strategy by default.
1011// However, if a previously unavailable network becomes available, this may be
1012// used to trigger an immediate reconnect.
1013//
1014// Experimental
1015//
1016// Notice: This API is EXPERIMENTAL and may be changed or removed in a
1017// later release.
1018func (cc *ClientConn) ResetConnectBackoff() {
1019	cc.mu.Lock()
1020	conns := cc.conns
1021	cc.mu.Unlock()
1022	for ac := range conns {
1023		ac.resetConnectBackoff()
1024	}
1025}
1026
1027// Close tears down the ClientConn and all underlying connections.
1028func (cc *ClientConn) Close() error {
1029	defer cc.cancel()
1030
1031	cc.mu.Lock()
1032	if cc.conns == nil {
1033		cc.mu.Unlock()
1034		return ErrClientConnClosing
1035	}
1036	conns := cc.conns
1037	cc.conns = nil
1038	cc.csMgr.updateState(connectivity.Shutdown)
1039
1040	rWrapper := cc.resolverWrapper
1041	cc.resolverWrapper = nil
1042	bWrapper := cc.balancerWrapper
1043	cc.balancerWrapper = nil
1044	cc.mu.Unlock()
1045
1046	cc.blockingpicker.close()
1047
1048	if rWrapper != nil {
1049		rWrapper.close()
1050	}
1051	if bWrapper != nil {
1052		bWrapper.close()
1053	}
1054
1055	for ac := range conns {
1056		ac.tearDown(ErrClientConnClosing)
1057	}
1058	if channelz.IsOn() {
1059		ted := &channelz.TraceEventDesc{
1060			Desc:     "Channel Deleted",
1061			Severity: channelz.CtInfo,
1062		}
1063		if cc.dopts.channelzParentID != 0 {
1064			ted.Parent = &channelz.TraceEventDesc{
1065				Desc:     fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID),
1066				Severity: channelz.CtInfo,
1067			}
1068		}
1069		channelz.AddTraceEvent(logger, cc.channelzID, 0, ted)
1070		// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
1071		// the entity being deleted, and thus prevent it from being deleted right away.
1072		channelz.RemoveEntry(cc.channelzID)
1073	}
1074	return nil
1075}
1076
1077// addrConn is a network connection to a given address.
1078type addrConn struct {
1079	ctx    context.Context
1080	cancel context.CancelFunc
1081
1082	cc     *ClientConn
1083	dopts  dialOptions
1084	acbw   balancer.SubConn
1085	scopts balancer.NewSubConnOptions
1086
1087	// transport is set when there's a viable transport (note: ac state may not be READY as LB channel
1088	// health checking may require server to report healthy to set ac to READY), and is reset
1089	// to nil when the current transport should no longer be used to create a stream (e.g. after GoAway
1090	// is received, transport is closed, ac has been torn down).
1091	transport transport.ClientTransport // The current transport.
1092
1093	mu      sync.Mutex
1094	curAddr resolver.Address   // The current address.
1095	addrs   []resolver.Address // All addresses that the resolver resolved to.
1096
1097	// Use updateConnectivityState for updating addrConn's connectivity state.
1098	state connectivity.State
1099
1100	backoffIdx   int // Needs to be stateful for resetConnectBackoff.
1101	resetBackoff chan struct{}
1102
1103	channelzID int64 // channelz unique identification number.
1104	czData     *channelzData
1105}
1106
1107// Note: this requires a lock on ac.mu.
1108func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) {
1109	if ac.state == s {
1110		return
1111	}
1112	ac.state = s
1113	channelz.Infof(logger, ac.channelzID, "Subchannel Connectivity change to %v", s)
1114	ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr)
1115}
1116
1117// adjustParams updates parameters used to create transports upon
1118// receiving a GoAway.
1119func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
1120	switch r {
1121	case transport.GoAwayTooManyPings:
1122		v := 2 * ac.dopts.copts.KeepaliveParams.Time
1123		ac.cc.mu.Lock()
1124		if v > ac.cc.mkp.Time {
1125			ac.cc.mkp.Time = v
1126		}
1127		ac.cc.mu.Unlock()
1128	}
1129}
1130
1131func (ac *addrConn) resetTransport() {
1132	for i := 0; ; i++ {
1133		if i > 0 {
1134			ac.cc.resolveNow(resolver.ResolveNowOptions{})
1135		}
1136
1137		ac.mu.Lock()
1138		if ac.state == connectivity.Shutdown {
1139			ac.mu.Unlock()
1140			return
1141		}
1142
1143		addrs := ac.addrs
1144		backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
1145		// This will be the duration that dial gets to finish.
1146		dialDuration := minConnectTimeout
1147		if ac.dopts.minConnectTimeout != nil {
1148			dialDuration = ac.dopts.minConnectTimeout()
1149		}
1150
1151		if dialDuration < backoffFor {
1152			// Give dial more time as we keep failing to connect.
1153			dialDuration = backoffFor
1154		}
1155		// We can potentially spend all the time trying the first address, and
1156		// if the server accepts the connection and then hangs, the following
1157		// addresses will never be tried.
1158		//
1159		// The spec doesn't mention what should be done for multiple addresses.
1160		// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm
1161		connectDeadline := time.Now().Add(dialDuration)
1162
1163		ac.updateConnectivityState(connectivity.Connecting, nil)
1164		ac.transport = nil
1165		ac.mu.Unlock()
1166
1167		newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline)
1168		if err != nil {
1169			// After exhausting all addresses, the addrConn enters
1170			// TRANSIENT_FAILURE.
1171			ac.mu.Lock()
1172			if ac.state == connectivity.Shutdown {
1173				ac.mu.Unlock()
1174				return
1175			}
1176			ac.updateConnectivityState(connectivity.TransientFailure, err)
1177
1178			// Backoff.
1179			b := ac.resetBackoff
1180			ac.mu.Unlock()
1181
1182			timer := time.NewTimer(backoffFor)
1183			select {
1184			case <-timer.C:
1185				ac.mu.Lock()
1186				ac.backoffIdx++
1187				ac.mu.Unlock()
1188			case <-b:
1189				timer.Stop()
1190			case <-ac.ctx.Done():
1191				timer.Stop()
1192				return
1193			}
1194			continue
1195		}
1196
1197		ac.mu.Lock()
1198		if ac.state == connectivity.Shutdown {
1199			ac.mu.Unlock()
1200			newTr.Close()
1201			return
1202		}
1203		ac.curAddr = addr
1204		ac.transport = newTr
1205		ac.backoffIdx = 0
1206
1207		hctx, hcancel := context.WithCancel(ac.ctx)
1208		ac.startHealthCheck(hctx)
1209		ac.mu.Unlock()
1210
1211		// Block until the created transport is down. And when this happens,
1212		// we restart from the top of the addr list.
1213		<-reconnect.Done()
1214		hcancel()
1215		// restart connecting - the top of the loop will set state to
1216		// CONNECTING.  This is against the current connectivity semantics doc,
1217		// however it allows for graceful behavior for RPCs not yet dispatched
1218		// - unfortunate timing would otherwise lead to the RPC failing even
1219		// though the TRANSIENT_FAILURE state (called for by the doc) would be
1220		// instantaneous.
1221		//
1222		// Ideally we should transition to Idle here and block until there is
1223		// RPC activity that leads to the balancer requesting a reconnect of
1224		// the associated SubConn.
1225	}
1226}
1227
1228// tryAllAddrs tries to creates a connection to the addresses, and stop when at the
1229// first successful one. It returns the transport, the address and a Event in
1230// the successful case. The Event fires when the returned transport disconnects.
1231func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) (transport.ClientTransport, resolver.Address, *grpcsync.Event, error) {
1232	var firstConnErr error
1233	for _, addr := range addrs {
1234		ac.mu.Lock()
1235		if ac.state == connectivity.Shutdown {
1236			ac.mu.Unlock()
1237			return nil, resolver.Address{}, nil, errConnClosing
1238		}
1239
1240		ac.cc.mu.RLock()
1241		ac.dopts.copts.KeepaliveParams = ac.cc.mkp
1242		ac.cc.mu.RUnlock()
1243
1244		copts := ac.dopts.copts
1245		if ac.scopts.CredsBundle != nil {
1246			copts.CredsBundle = ac.scopts.CredsBundle
1247		}
1248		ac.mu.Unlock()
1249
1250		channelz.Infof(logger, ac.channelzID, "Subchannel picks a new address %q to connect", addr.Addr)
1251
1252		newTr, reconnect, err := ac.createTransport(addr, copts, connectDeadline)
1253		if err == nil {
1254			return newTr, addr, reconnect, nil
1255		}
1256		if firstConnErr == nil {
1257			firstConnErr = err
1258		}
1259		ac.cc.updateConnectionError(err)
1260	}
1261
1262	// Couldn't connect to any address.
1263	return nil, resolver.Address{}, nil, firstConnErr
1264}
1265
1266// createTransport creates a connection to addr. It returns the transport and a
1267// Event in the successful case. The Event fires when the returned transport
1268// disconnects.
1269func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) (transport.ClientTransport, *grpcsync.Event, error) {
1270	prefaceReceived := make(chan struct{})
1271	onCloseCalled := make(chan struct{})
1272	reconnect := grpcsync.NewEvent()
1273
1274	// addr.ServerName takes precedent over ClientConn authority, if present.
1275	if addr.ServerName == "" {
1276		addr.ServerName = ac.cc.authority
1277	}
1278
1279	once := sync.Once{}
1280	onGoAway := func(r transport.GoAwayReason) {
1281		ac.mu.Lock()
1282		ac.adjustParams(r)
1283		once.Do(func() {
1284			if ac.state == connectivity.Ready {
1285				// Prevent this SubConn from being used for new RPCs by setting its
1286				// state to Connecting.
1287				//
1288				// TODO: this should be Idle when grpc-go properly supports it.
1289				ac.updateConnectivityState(connectivity.Connecting, nil)
1290			}
1291		})
1292		ac.mu.Unlock()
1293		reconnect.Fire()
1294	}
1295
1296	onClose := func() {
1297		ac.mu.Lock()
1298		once.Do(func() {
1299			if ac.state == connectivity.Ready {
1300				// Prevent this SubConn from being used for new RPCs by setting its
1301				// state to Connecting.
1302				//
1303				// TODO: this should be Idle when grpc-go properly supports it.
1304				ac.updateConnectivityState(connectivity.Connecting, nil)
1305			}
1306		})
1307		ac.mu.Unlock()
1308		close(onCloseCalled)
1309		reconnect.Fire()
1310	}
1311
1312	onPrefaceReceipt := func() {
1313		close(prefaceReceived)
1314	}
1315
1316	connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
1317	defer cancel()
1318	if channelz.IsOn() {
1319		copts.ChannelzParentID = ac.channelzID
1320	}
1321
1322	newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, onPrefaceReceipt, onGoAway, onClose)
1323	if err != nil {
1324		// newTr is either nil, or closed.
1325		channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v. Err: %v. Reconnecting...", addr, err)
1326		return nil, nil, err
1327	}
1328
1329	select {
1330	case <-time.After(time.Until(connectDeadline)):
1331		// We didn't get the preface in time.
1332		newTr.Close()
1333		channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
1334		return nil, nil, errors.New("timed out waiting for server handshake")
1335	case <-prefaceReceived:
1336		// We got the preface - huzzah! things are good.
1337	case <-onCloseCalled:
1338		// The transport has already closed - noop.
1339		return nil, nil, errors.New("connection closed")
1340		// TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.
1341	}
1342	return newTr, reconnect, nil
1343}
1344
1345// startHealthCheck starts the health checking stream (RPC) to watch the health
1346// stats of this connection if health checking is requested and configured.
1347//
1348// LB channel health checking is enabled when all requirements below are met:
1349// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption
1350// 2. internal.HealthCheckFunc is set by importing the grpc/health package
1351// 3. a service config with non-empty healthCheckConfig field is provided
1352// 4. the load balancer requests it
1353//
1354// It sets addrConn to READY if the health checking stream is not started.
1355//
1356// Caller must hold ac.mu.
1357func (ac *addrConn) startHealthCheck(ctx context.Context) {
1358	var healthcheckManagingState bool
1359	defer func() {
1360		if !healthcheckManagingState {
1361			ac.updateConnectivityState(connectivity.Ready, nil)
1362		}
1363	}()
1364
1365	if ac.cc.dopts.disableHealthCheck {
1366		return
1367	}
1368	healthCheckConfig := ac.cc.healthCheckConfig()
1369	if healthCheckConfig == nil {
1370		return
1371	}
1372	if !ac.scopts.HealthCheckEnabled {
1373		return
1374	}
1375	healthCheckFunc := ac.cc.dopts.healthCheckFunc
1376	if healthCheckFunc == nil {
1377		// The health package is not imported to set health check function.
1378		//
1379		// TODO: add a link to the health check doc in the error message.
1380		channelz.Error(logger, ac.channelzID, "Health check is requested but health check function is not set.")
1381		return
1382	}
1383
1384	healthcheckManagingState = true
1385
1386	// Set up the health check helper functions.
1387	currentTr := ac.transport
1388	newStream := func(method string) (interface{}, error) {
1389		ac.mu.Lock()
1390		if ac.transport != currentTr {
1391			ac.mu.Unlock()
1392			return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
1393		}
1394		ac.mu.Unlock()
1395		return newNonRetryClientStream(ctx, &StreamDesc{ServerStreams: true}, method, currentTr, ac)
1396	}
1397	setConnectivityState := func(s connectivity.State, lastErr error) {
1398		ac.mu.Lock()
1399		defer ac.mu.Unlock()
1400		if ac.transport != currentTr {
1401			return
1402		}
1403		ac.updateConnectivityState(s, lastErr)
1404	}
1405	// Start the health checking stream.
1406	go func() {
1407		err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName)
1408		if err != nil {
1409			if status.Code(err) == codes.Unimplemented {
1410				channelz.Error(logger, ac.channelzID, "Subchannel health check is unimplemented at server side, thus health check is disabled")
1411			} else {
1412				channelz.Errorf(logger, ac.channelzID, "HealthCheckFunc exits with unexpected error %v", err)
1413			}
1414		}
1415	}()
1416}
1417
1418func (ac *addrConn) resetConnectBackoff() {
1419	ac.mu.Lock()
1420	close(ac.resetBackoff)
1421	ac.backoffIdx = 0
1422	ac.resetBackoff = make(chan struct{})
1423	ac.mu.Unlock()
1424}
1425
1426// getReadyTransport returns the transport if ac's state is READY.
1427// Otherwise it returns nil, false.
1428// If ac's state is IDLE, it will trigger ac to connect.
1429func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
1430	ac.mu.Lock()
1431	if ac.state == connectivity.Ready && ac.transport != nil {
1432		t := ac.transport
1433		ac.mu.Unlock()
1434		return t, true
1435	}
1436	var idle bool
1437	if ac.state == connectivity.Idle {
1438		idle = true
1439	}
1440	ac.mu.Unlock()
1441	// Trigger idle ac to connect.
1442	if idle {
1443		ac.connect()
1444	}
1445	return nil, false
1446}
1447
1448// tearDown starts to tear down the addrConn.
1449// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
1450// some edge cases (e.g., the caller opens and closes many addrConn's in a
1451// tight loop.
1452// tearDown doesn't remove ac from ac.cc.conns.
1453func (ac *addrConn) tearDown(err error) {
1454	ac.mu.Lock()
1455	if ac.state == connectivity.Shutdown {
1456		ac.mu.Unlock()
1457		return
1458	}
1459	curTr := ac.transport
1460	ac.transport = nil
1461	// We have to set the state to Shutdown before anything else to prevent races
1462	// between setting the state and logic that waits on context cancellation / etc.
1463	ac.updateConnectivityState(connectivity.Shutdown, nil)
1464	ac.cancel()
1465	ac.curAddr = resolver.Address{}
1466	if err == errConnDrain && curTr != nil {
1467		// GracefulClose(...) may be executed multiple times when
1468		// i) receiving multiple GoAway frames from the server; or
1469		// ii) there are concurrent name resolver/Balancer triggered
1470		// address removal and GoAway.
1471		// We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu.
1472		ac.mu.Unlock()
1473		curTr.GracefulClose()
1474		ac.mu.Lock()
1475	}
1476	if channelz.IsOn() {
1477		channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{
1478			Desc:     "Subchannel Deleted",
1479			Severity: channelz.CtInfo,
1480			Parent: &channelz.TraceEventDesc{
1481				Desc:     fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID),
1482				Severity: channelz.CtInfo,
1483			},
1484		})
1485		// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
1486		// the entity being deleted, and thus prevent it from being deleted right away.
1487		channelz.RemoveEntry(ac.channelzID)
1488	}
1489	ac.mu.Unlock()
1490}
1491
1492func (ac *addrConn) getState() connectivity.State {
1493	ac.mu.Lock()
1494	defer ac.mu.Unlock()
1495	return ac.state
1496}
1497
1498func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric {
1499	ac.mu.Lock()
1500	addr := ac.curAddr.Addr
1501	ac.mu.Unlock()
1502	return &channelz.ChannelInternalMetric{
1503		State:                    ac.getState(),
1504		Target:                   addr,
1505		CallsStarted:             atomic.LoadInt64(&ac.czData.callsStarted),
1506		CallsSucceeded:           atomic.LoadInt64(&ac.czData.callsSucceeded),
1507		CallsFailed:              atomic.LoadInt64(&ac.czData.callsFailed),
1508		LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)),
1509	}
1510}
1511
1512func (ac *addrConn) incrCallsStarted() {
1513	atomic.AddInt64(&ac.czData.callsStarted, 1)
1514	atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano())
1515}
1516
1517func (ac *addrConn) incrCallsSucceeded() {
1518	atomic.AddInt64(&ac.czData.callsSucceeded, 1)
1519}
1520
1521func (ac *addrConn) incrCallsFailed() {
1522	atomic.AddInt64(&ac.czData.callsFailed, 1)
1523}
1524
1525type retryThrottler struct {
1526	max    float64
1527	thresh float64
1528	ratio  float64
1529
1530	mu     sync.Mutex
1531	tokens float64 // TODO(dfawley): replace with atomic and remove lock.
1532}
1533
1534// throttle subtracts a retry token from the pool and returns whether a retry
1535// should be throttled (disallowed) based upon the retry throttling policy in
1536// the service config.
1537func (rt *retryThrottler) throttle() bool {
1538	if rt == nil {
1539		return false
1540	}
1541	rt.mu.Lock()
1542	defer rt.mu.Unlock()
1543	rt.tokens--
1544	if rt.tokens < 0 {
1545		rt.tokens = 0
1546	}
1547	return rt.tokens <= rt.thresh
1548}
1549
1550func (rt *retryThrottler) successfulRPC() {
1551	if rt == nil {
1552		return
1553	}
1554	rt.mu.Lock()
1555	defer rt.mu.Unlock()
1556	rt.tokens += rt.ratio
1557	if rt.tokens > rt.max {
1558		rt.tokens = rt.max
1559	}
1560}
1561
1562type channelzChannel struct {
1563	cc *ClientConn
1564}
1565
1566func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric {
1567	return c.cc.channelzMetric()
1568}
1569
1570// ErrClientConnTimeout indicates that the ClientConn cannot establish the
1571// underlying connections within the specified timeout.
1572//
1573// Deprecated: This error is never returned by grpc and should not be
1574// referenced by users.
1575var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
1576
1577func (cc *ClientConn) getResolver(scheme string) resolver.Builder {
1578	for _, rb := range cc.dopts.resolvers {
1579		if scheme == rb.Scheme() {
1580			return rb
1581		}
1582	}
1583	return resolver.Get(scheme)
1584}
1585
1586func (cc *ClientConn) updateConnectionError(err error) {
1587	cc.lceMu.Lock()
1588	cc.lastConnectionError = err
1589	cc.lceMu.Unlock()
1590}
1591
1592func (cc *ClientConn) connectionError() error {
1593	cc.lceMu.Lock()
1594	defer cc.lceMu.Unlock()
1595	return cc.lastConnectionError
1596}
1597