1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22	"context"
23	"errors"
24	"fmt"
25	"math"
26	"net"
27	"reflect"
28	"strings"
29	"sync"
30	"sync/atomic"
31	"time"
32
33	"google.golang.org/grpc/balancer"
34	"google.golang.org/grpc/balancer/base"
35	"google.golang.org/grpc/codes"
36	"google.golang.org/grpc/connectivity"
37	"google.golang.org/grpc/credentials"
38	"google.golang.org/grpc/internal/backoff"
39	"google.golang.org/grpc/internal/channelz"
40	"google.golang.org/grpc/internal/grpcsync"
41	"google.golang.org/grpc/internal/grpcutil"
42	"google.golang.org/grpc/internal/transport"
43	"google.golang.org/grpc/keepalive"
44	"google.golang.org/grpc/resolver"
45	"google.golang.org/grpc/serviceconfig"
46	"google.golang.org/grpc/status"
47
48	_ "google.golang.org/grpc/balancer/roundrobin"           // To register roundrobin.
49	_ "google.golang.org/grpc/internal/resolver/dns"         // To register dns resolver.
50	_ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver.
51)
52
53const (
54	// minimum time to give a connection to complete
55	minConnectTimeout = 20 * time.Second
56	// must match grpclbName in grpclb/grpclb.go
57	grpclbName = "grpclb"
58)
59
60var (
61	// ErrClientConnClosing indicates that the operation is illegal because
62	// the ClientConn is closing.
63	//
64	// Deprecated: this error should not be relied upon by users; use the status
65	// code of Canceled instead.
66	ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing")
67	// errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
68	errConnDrain = errors.New("grpc: the connection is drained")
69	// errConnClosing indicates that the connection is closing.
70	errConnClosing = errors.New("grpc: the connection is closing")
71	// errBalancerClosed indicates that the balancer is closed.
72	errBalancerClosed = errors.New("grpc: balancer is closed")
73	// invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default
74	// service config.
75	invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid"
76)
77
78// The following errors are returned from Dial and DialContext
79var (
80	// errNoTransportSecurity indicates that there is no transport security
81	// being set for ClientConn. Users should either set one or explicitly
82	// call WithInsecure DialOption to disable security.
83	errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
84	// errTransportCredsAndBundle indicates that creds bundle is used together
85	// with other individual Transport Credentials.
86	errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials")
87	// errTransportCredentialsMissing indicates that users want to transmit security
88	// information (e.g., OAuth2 token) which requires secure connection on an insecure
89	// connection.
90	errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
91	// errCredentialsConflict indicates that grpc.WithTransportCredentials()
92	// and grpc.WithInsecure() are both called for a connection.
93	errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
94)
95
96const (
97	defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
98	defaultClientMaxSendMessageSize    = math.MaxInt32
99	// http2IOBufSize specifies the buffer size for sending frames.
100	defaultWriteBufSize = 32 * 1024
101	defaultReadBufSize  = 32 * 1024
102)
103
104// Dial creates a client connection to the given target.
105func Dial(target string, opts ...DialOption) (*ClientConn, error) {
106	return DialContext(context.Background(), target, opts...)
107}
108
109// DialContext creates a client connection to the given target. By default, it's
110// a non-blocking dial (the function won't wait for connections to be
111// established, and connecting happens in the background). To make it a blocking
112// dial, use WithBlock() dial option.
113//
114// In the non-blocking case, the ctx does not act against the connection. It
115// only controls the setup steps.
116//
117// In the blocking case, ctx can be used to cancel or expire the pending
118// connection. Once this function returns, the cancellation and expiration of
119// ctx will be noop. Users should call ClientConn.Close to terminate all the
120// pending operations after this function returns.
121//
122// The target name syntax is defined in
123// https://github.com/grpc/grpc/blob/master/doc/naming.md.
124// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
125func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
126	cc := &ClientConn{
127		target:            target,
128		csMgr:             &connectivityStateManager{},
129		conns:             make(map[*addrConn]struct{}),
130		dopts:             defaultDialOptions(),
131		blockingpicker:    newPickerWrapper(),
132		czData:            new(channelzData),
133		firstResolveEvent: grpcsync.NewEvent(),
134	}
135	cc.retryThrottler.Store((*retryThrottler)(nil))
136	cc.ctx, cc.cancel = context.WithCancel(context.Background())
137
138	for _, opt := range opts {
139		opt.apply(&cc.dopts)
140	}
141
142	chainUnaryClientInterceptors(cc)
143	chainStreamClientInterceptors(cc)
144
145	defer func() {
146		if err != nil {
147			cc.Close()
148		}
149	}()
150
151	if channelz.IsOn() {
152		if cc.dopts.channelzParentID != 0 {
153			cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
154			channelz.AddTraceEvent(cc.channelzID, 0, &channelz.TraceEventDesc{
155				Desc:     "Channel Created",
156				Severity: channelz.CtINFO,
157				Parent: &channelz.TraceEventDesc{
158					Desc:     fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID),
159					Severity: channelz.CtINFO,
160				},
161			})
162		} else {
163			cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)
164			channelz.Info(cc.channelzID, "Channel Created")
165		}
166		cc.csMgr.channelzID = cc.channelzID
167	}
168
169	if !cc.dopts.insecure {
170		if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
171			return nil, errNoTransportSecurity
172		}
173		if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
174			return nil, errTransportCredsAndBundle
175		}
176	} else {
177		if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil {
178			return nil, errCredentialsConflict
179		}
180		for _, cd := range cc.dopts.copts.PerRPCCredentials {
181			if cd.RequireTransportSecurity() {
182				return nil, errTransportCredentialsMissing
183			}
184		}
185	}
186
187	if cc.dopts.defaultServiceConfigRawJSON != nil {
188		scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON)
189		if scpr.Err != nil {
190			return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err)
191		}
192		cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig)
193	}
194	cc.mkp = cc.dopts.copts.KeepaliveParams
195
196	if cc.dopts.copts.Dialer == nil {
197		cc.dopts.copts.Dialer = func(ctx context.Context, addr string) (net.Conn, error) {
198			network, addr := parseDialTarget(addr)
199			return (&net.Dialer{}).DialContext(ctx, network, addr)
200		}
201		if cc.dopts.withProxy {
202			cc.dopts.copts.Dialer = newProxyDialer(cc.dopts.copts.Dialer)
203		}
204	}
205
206	if cc.dopts.copts.UserAgent != "" {
207		cc.dopts.copts.UserAgent += " " + grpcUA
208	} else {
209		cc.dopts.copts.UserAgent = grpcUA
210	}
211
212	if cc.dopts.timeout > 0 {
213		var cancel context.CancelFunc
214		ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
215		defer cancel()
216	}
217	defer func() {
218		select {
219		case <-ctx.Done():
220			conn, err = nil, ctx.Err()
221		default:
222		}
223	}()
224
225	scSet := false
226	if cc.dopts.scChan != nil {
227		// Try to get an initial service config.
228		select {
229		case sc, ok := <-cc.dopts.scChan:
230			if ok {
231				cc.sc = &sc
232				scSet = true
233			}
234		default:
235		}
236	}
237	if cc.dopts.bs == nil {
238		cc.dopts.bs = backoff.DefaultExponential
239	}
240
241	// Determine the resolver to use.
242	cc.parsedTarget = grpcutil.ParseTarget(cc.target)
243	channelz.Infof(cc.channelzID, "parsed scheme: %q", cc.parsedTarget.Scheme)
244	resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)
245	if resolverBuilder == nil {
246		// If resolver builder is still nil, the parsed target's scheme is
247		// not registered. Fallback to default resolver and set Endpoint to
248		// the original target.
249		channelz.Infof(cc.channelzID, "scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
250		cc.parsedTarget = resolver.Target{
251			Scheme:   resolver.GetDefaultScheme(),
252			Endpoint: target,
253		}
254		resolverBuilder = cc.getResolver(cc.parsedTarget.Scheme)
255		if resolverBuilder == nil {
256			return nil, fmt.Errorf("could not get resolver for default scheme: %q", cc.parsedTarget.Scheme)
257		}
258	}
259
260	creds := cc.dopts.copts.TransportCredentials
261	if creds != nil && creds.Info().ServerName != "" {
262		cc.authority = creds.Info().ServerName
263	} else if cc.dopts.insecure && cc.dopts.authority != "" {
264		cc.authority = cc.dopts.authority
265	} else {
266		// Use endpoint from "scheme://authority/endpoint" as the default
267		// authority for ClientConn.
268		cc.authority = cc.parsedTarget.Endpoint
269	}
270
271	if cc.dopts.scChan != nil && !scSet {
272		// Blocking wait for the initial service config.
273		select {
274		case sc, ok := <-cc.dopts.scChan:
275			if ok {
276				cc.sc = &sc
277			}
278		case <-ctx.Done():
279			return nil, ctx.Err()
280		}
281	}
282	if cc.dopts.scChan != nil {
283		go cc.scWatcher()
284	}
285
286	var credsClone credentials.TransportCredentials
287	if creds := cc.dopts.copts.TransportCredentials; creds != nil {
288		credsClone = creds.Clone()
289	}
290	cc.balancerBuildOpts = balancer.BuildOptions{
291		DialCreds:        credsClone,
292		CredsBundle:      cc.dopts.copts.CredsBundle,
293		Dialer:           cc.dopts.copts.Dialer,
294		ChannelzParentID: cc.channelzID,
295		Target:           cc.parsedTarget,
296	}
297
298	// Build the resolver.
299	rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
300	if err != nil {
301		return nil, fmt.Errorf("failed to build resolver: %v", err)
302	}
303	cc.mu.Lock()
304	cc.resolverWrapper = rWrapper
305	cc.mu.Unlock()
306
307	// A blocking dial blocks until the clientConn is ready.
308	if cc.dopts.block {
309		for {
310			s := cc.GetState()
311			if s == connectivity.Ready {
312				break
313			} else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
314				if err = cc.blockingpicker.connectionError(); err != nil {
315					terr, ok := err.(interface {
316						Temporary() bool
317					})
318					if ok && !terr.Temporary() {
319						return nil, err
320					}
321				}
322			}
323			if !cc.WaitForStateChange(ctx, s) {
324				// ctx got timeout or canceled.
325				return nil, ctx.Err()
326			}
327		}
328	}
329
330	return cc, nil
331}
332
333// chainUnaryClientInterceptors chains all unary client interceptors into one.
334func chainUnaryClientInterceptors(cc *ClientConn) {
335	interceptors := cc.dopts.chainUnaryInts
336	// Prepend dopts.unaryInt to the chaining interceptors if it exists, since unaryInt will
337	// be executed before any other chained interceptors.
338	if cc.dopts.unaryInt != nil {
339		interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)
340	}
341	var chainedInt UnaryClientInterceptor
342	if len(interceptors) == 0 {
343		chainedInt = nil
344	} else if len(interceptors) == 1 {
345		chainedInt = interceptors[0]
346	} else {
347		chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {
348			return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)
349		}
350	}
351	cc.dopts.unaryInt = chainedInt
352}
353
354// getChainUnaryInvoker recursively generate the chained unary invoker.
355func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, finalInvoker UnaryInvoker) UnaryInvoker {
356	if curr == len(interceptors)-1 {
357		return finalInvoker
358	}
359	return func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
360		return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...)
361	}
362}
363
364// chainStreamClientInterceptors chains all stream client interceptors into one.
365func chainStreamClientInterceptors(cc *ClientConn) {
366	interceptors := cc.dopts.chainStreamInts
367	// Prepend dopts.streamInt to the chaining interceptors if it exists, since streamInt will
368	// be executed before any other chained interceptors.
369	if cc.dopts.streamInt != nil {
370		interceptors = append([]StreamClientInterceptor{cc.dopts.streamInt}, interceptors...)
371	}
372	var chainedInt StreamClientInterceptor
373	if len(interceptors) == 0 {
374		chainedInt = nil
375	} else if len(interceptors) == 1 {
376		chainedInt = interceptors[0]
377	} else {
378		chainedInt = func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error) {
379			return interceptors[0](ctx, desc, cc, method, getChainStreamer(interceptors, 0, streamer), opts...)
380		}
381	}
382	cc.dopts.streamInt = chainedInt
383}
384
385// getChainStreamer recursively generate the chained client stream constructor.
386func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStreamer Streamer) Streamer {
387	if curr == len(interceptors)-1 {
388		return finalStreamer
389	}
390	return func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
391		return interceptors[curr+1](ctx, desc, cc, method, getChainStreamer(interceptors, curr+1, finalStreamer), opts...)
392	}
393}
394
395// connectivityStateManager keeps the connectivity.State of ClientConn.
396// This struct will eventually be exported so the balancers can access it.
397type connectivityStateManager struct {
398	mu         sync.Mutex
399	state      connectivity.State
400	notifyChan chan struct{}
401	channelzID int64
402}
403
404// updateState updates the connectivity.State of ClientConn.
405// If there's a change it notifies goroutines waiting on state change to
406// happen.
407func (csm *connectivityStateManager) updateState(state connectivity.State) {
408	csm.mu.Lock()
409	defer csm.mu.Unlock()
410	if csm.state == connectivity.Shutdown {
411		return
412	}
413	if csm.state == state {
414		return
415	}
416	csm.state = state
417	channelz.Infof(csm.channelzID, "Channel Connectivity change to %v", state)
418	if csm.notifyChan != nil {
419		// There are other goroutines waiting on this channel.
420		close(csm.notifyChan)
421		csm.notifyChan = nil
422	}
423}
424
425func (csm *connectivityStateManager) getState() connectivity.State {
426	csm.mu.Lock()
427	defer csm.mu.Unlock()
428	return csm.state
429}
430
431func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
432	csm.mu.Lock()
433	defer csm.mu.Unlock()
434	if csm.notifyChan == nil {
435		csm.notifyChan = make(chan struct{})
436	}
437	return csm.notifyChan
438}
439
440// ClientConnInterface defines the functions clients need to perform unary and
441// streaming RPCs.  It is implemented by *ClientConn, and is only intended to
442// be referenced by generated code.
443type ClientConnInterface interface {
444	// Invoke performs a unary RPC and returns after the response is received
445	// into reply.
446	Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error
447	// NewStream begins a streaming RPC.
448	NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error)
449}
450
451// Assert *ClientConn implements ClientConnInterface.
452var _ ClientConnInterface = (*ClientConn)(nil)
453
454// ClientConn represents a virtual connection to a conceptual endpoint, to
455// perform RPCs.
456//
457// A ClientConn is free to have zero or more actual connections to the endpoint
458// based on configuration, load, etc. It is also free to determine which actual
459// endpoints to use and may change it every RPC, permitting client-side load
460// balancing.
461//
462// A ClientConn encapsulates a range of functionality including name
463// resolution, TCP connection establishment (with retries and backoff) and TLS
464// handshakes. It also handles errors on established connections by
465// re-resolving the name and reconnecting.
466type ClientConn struct {
467	ctx    context.Context
468	cancel context.CancelFunc
469
470	target       string
471	parsedTarget resolver.Target
472	authority    string
473	dopts        dialOptions
474	csMgr        *connectivityStateManager
475
476	balancerBuildOpts balancer.BuildOptions
477	blockingpicker    *pickerWrapper
478
479	mu              sync.RWMutex
480	resolverWrapper *ccResolverWrapper
481	sc              *ServiceConfig
482	conns           map[*addrConn]struct{}
483	// Keepalive parameter can be updated if a GoAway is received.
484	mkp             keepalive.ClientParameters
485	curBalancerName string
486	balancerWrapper *ccBalancerWrapper
487	retryThrottler  atomic.Value
488
489	firstResolveEvent *grpcsync.Event
490
491	channelzID int64 // channelz unique identification number
492	czData     *channelzData
493}
494
495// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
496// ctx expires. A true value is returned in former case and false in latter.
497// This is an EXPERIMENTAL API.
498func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
499	ch := cc.csMgr.getNotifyChan()
500	if cc.csMgr.getState() != sourceState {
501		return true
502	}
503	select {
504	case <-ctx.Done():
505		return false
506	case <-ch:
507		return true
508	}
509}
510
511// GetState returns the connectivity.State of ClientConn.
512// This is an EXPERIMENTAL API.
513func (cc *ClientConn) GetState() connectivity.State {
514	return cc.csMgr.getState()
515}
516
517func (cc *ClientConn) scWatcher() {
518	for {
519		select {
520		case sc, ok := <-cc.dopts.scChan:
521			if !ok {
522				return
523			}
524			cc.mu.Lock()
525			// TODO: load balance policy runtime change is ignored.
526			// We may revisit this decision in the future.
527			cc.sc = &sc
528			cc.mu.Unlock()
529		case <-cc.ctx.Done():
530			return
531		}
532	}
533}
534
535// waitForResolvedAddrs blocks until the resolver has provided addresses or the
536// context expires.  Returns nil unless the context expires first; otherwise
537// returns a status error based on the context.
538func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
539	// This is on the RPC path, so we use a fast path to avoid the
540	// more-expensive "select" below after the resolver has returned once.
541	if cc.firstResolveEvent.HasFired() {
542		return nil
543	}
544	select {
545	case <-cc.firstResolveEvent.Done():
546		return nil
547	case <-ctx.Done():
548		return status.FromContextError(ctx.Err()).Err()
549	case <-cc.ctx.Done():
550		return ErrClientConnClosing
551	}
552}
553
554var emptyServiceConfig *ServiceConfig
555
556func init() {
557	cfg := parseServiceConfig("{}")
558	if cfg.Err != nil {
559		panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err))
560	}
561	emptyServiceConfig = cfg.Config.(*ServiceConfig)
562}
563
564func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) {
565	if cc.sc != nil {
566		cc.applyServiceConfigAndBalancer(cc.sc, addrs)
567		return
568	}
569	if cc.dopts.defaultServiceConfig != nil {
570		cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, addrs)
571	} else {
572		cc.applyServiceConfigAndBalancer(emptyServiceConfig, addrs)
573	}
574}
575
576func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
577	defer cc.firstResolveEvent.Fire()
578	cc.mu.Lock()
579	// Check if the ClientConn is already closed. Some fields (e.g.
580	// balancerWrapper) are set to nil when closing the ClientConn, and could
581	// cause nil pointer panic if we don't have this check.
582	if cc.conns == nil {
583		cc.mu.Unlock()
584		return nil
585	}
586
587	if err != nil {
588		// May need to apply the initial service config in case the resolver
589		// doesn't support service configs, or doesn't provide a service config
590		// with the new addresses.
591		cc.maybeApplyDefaultServiceConfig(nil)
592
593		if cc.balancerWrapper != nil {
594			cc.balancerWrapper.resolverError(err)
595		}
596
597		// No addresses are valid with err set; return early.
598		cc.mu.Unlock()
599		return balancer.ErrBadResolverState
600	}
601
602	var ret error
603	if cc.dopts.disableServiceConfig || s.ServiceConfig == nil {
604		cc.maybeApplyDefaultServiceConfig(s.Addresses)
605		// TODO: do we need to apply a failing LB policy if there is no
606		// default, per the error handling design?
607	} else {
608		if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok {
609			cc.applyServiceConfigAndBalancer(sc, s.Addresses)
610		} else {
611			ret = balancer.ErrBadResolverState
612			if cc.balancerWrapper == nil {
613				var err error
614				if s.ServiceConfig.Err != nil {
615					err = status.Errorf(codes.Unavailable, "error parsing service config: %v", s.ServiceConfig.Err)
616				} else {
617					err = status.Errorf(codes.Unavailable, "illegal service config type: %T", s.ServiceConfig.Config)
618				}
619				cc.blockingpicker.updatePicker(base.NewErrPicker(err))
620				cc.csMgr.updateState(connectivity.TransientFailure)
621				cc.mu.Unlock()
622				return ret
623			}
624		}
625	}
626
627	var balCfg serviceconfig.LoadBalancingConfig
628	if cc.dopts.balancerBuilder == nil && cc.sc != nil && cc.sc.lbConfig != nil {
629		balCfg = cc.sc.lbConfig.cfg
630	}
631
632	cbn := cc.curBalancerName
633	bw := cc.balancerWrapper
634	cc.mu.Unlock()
635	if cbn != grpclbName {
636		// Filter any grpclb addresses since we don't have the grpclb balancer.
637		for i := 0; i < len(s.Addresses); {
638			if s.Addresses[i].Type == resolver.GRPCLB {
639				copy(s.Addresses[i:], s.Addresses[i+1:])
640				s.Addresses = s.Addresses[:len(s.Addresses)-1]
641				continue
642			}
643			i++
644		}
645	}
646	uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
647	if ret == nil {
648		ret = uccsErr // prefer ErrBadResolver state since any other error is
649		// currently meaningless to the caller.
650	}
651	return ret
652}
653
654// switchBalancer starts the switching from current balancer to the balancer
655// with the given name.
656//
657// It will NOT send the current address list to the new balancer. If needed,
658// caller of this function should send address list to the new balancer after
659// this function returns.
660//
661// Caller must hold cc.mu.
662func (cc *ClientConn) switchBalancer(name string) {
663	if strings.EqualFold(cc.curBalancerName, name) {
664		return
665	}
666
667	channelz.Infof(cc.channelzID, "ClientConn switching balancer to %q", name)
668	if cc.dopts.balancerBuilder != nil {
669		channelz.Info(cc.channelzID, "ignoring balancer switching: Balancer DialOption used instead")
670		return
671	}
672	if cc.balancerWrapper != nil {
673		cc.balancerWrapper.close()
674	}
675
676	builder := balancer.Get(name)
677	if builder == nil {
678		channelz.Warningf(cc.channelzID, "Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName)
679		channelz.Infof(cc.channelzID, "failed to get balancer builder for: %v, using pick_first instead", name)
680		builder = newPickfirstBuilder()
681	} else {
682		channelz.Infof(cc.channelzID, "Channel switches to new LB policy %q", name)
683	}
684
685	cc.curBalancerName = builder.Name()
686	cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
687}
688
689func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {
690	cc.mu.Lock()
691	if cc.conns == nil {
692		cc.mu.Unlock()
693		return
694	}
695	// TODO(bar switching) send updates to all balancer wrappers when balancer
696	// gracefully switching is supported.
697	cc.balancerWrapper.handleSubConnStateChange(sc, s, err)
698	cc.mu.Unlock()
699}
700
701// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
702//
703// Caller needs to make sure len(addrs) > 0.
704func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
705	ac := &addrConn{
706		state:        connectivity.Idle,
707		cc:           cc,
708		addrs:        addrs,
709		scopts:       opts,
710		dopts:        cc.dopts,
711		czData:       new(channelzData),
712		resetBackoff: make(chan struct{}),
713	}
714	ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
715	// Track ac in cc. This needs to be done before any getTransport(...) is called.
716	cc.mu.Lock()
717	if cc.conns == nil {
718		cc.mu.Unlock()
719		return nil, ErrClientConnClosing
720	}
721	if channelz.IsOn() {
722		ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "")
723		channelz.AddTraceEvent(ac.channelzID, 0, &channelz.TraceEventDesc{
724			Desc:     "Subchannel Created",
725			Severity: channelz.CtINFO,
726			Parent: &channelz.TraceEventDesc{
727				Desc:     fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID),
728				Severity: channelz.CtINFO,
729			},
730		})
731	}
732	cc.conns[ac] = struct{}{}
733	cc.mu.Unlock()
734	return ac, nil
735}
736
737// removeAddrConn removes the addrConn in the subConn from clientConn.
738// It also tears down the ac with the given error.
739func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
740	cc.mu.Lock()
741	if cc.conns == nil {
742		cc.mu.Unlock()
743		return
744	}
745	delete(cc.conns, ac)
746	cc.mu.Unlock()
747	ac.tearDown(err)
748}
749
750func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric {
751	return &channelz.ChannelInternalMetric{
752		State:                    cc.GetState(),
753		Target:                   cc.target,
754		CallsStarted:             atomic.LoadInt64(&cc.czData.callsStarted),
755		CallsSucceeded:           atomic.LoadInt64(&cc.czData.callsSucceeded),
756		CallsFailed:              atomic.LoadInt64(&cc.czData.callsFailed),
757		LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)),
758	}
759}
760
761// Target returns the target string of the ClientConn.
762// This is an EXPERIMENTAL API.
763func (cc *ClientConn) Target() string {
764	return cc.target
765}
766
767func (cc *ClientConn) incrCallsStarted() {
768	atomic.AddInt64(&cc.czData.callsStarted, 1)
769	atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano())
770}
771
772func (cc *ClientConn) incrCallsSucceeded() {
773	atomic.AddInt64(&cc.czData.callsSucceeded, 1)
774}
775
776func (cc *ClientConn) incrCallsFailed() {
777	atomic.AddInt64(&cc.czData.callsFailed, 1)
778}
779
780// connect starts creating a transport.
781// It does nothing if the ac is not IDLE.
782// TODO(bar) Move this to the addrConn section.
783func (ac *addrConn) connect() error {
784	ac.mu.Lock()
785	if ac.state == connectivity.Shutdown {
786		ac.mu.Unlock()
787		return errConnClosing
788	}
789	if ac.state != connectivity.Idle {
790		ac.mu.Unlock()
791		return nil
792	}
793	// Update connectivity state within the lock to prevent subsequent or
794	// concurrent calls from resetting the transport more than once.
795	ac.updateConnectivityState(connectivity.Connecting, nil)
796	ac.mu.Unlock()
797
798	// Start a goroutine connecting to the server asynchronously.
799	go ac.resetTransport()
800	return nil
801}
802
803// tryUpdateAddrs tries to update ac.addrs with the new addresses list.
804//
805// If ac is Connecting, it returns false. The caller should tear down the ac and
806// create a new one. Note that the backoff will be reset when this happens.
807//
808// If ac is TransientFailure, it updates ac.addrs and returns true. The updated
809// addresses will be picked up by retry in the next iteration after backoff.
810//
811// If ac is Shutdown or Idle, it updates ac.addrs and returns true.
812//
813// If ac is Ready, it checks whether current connected address of ac is in the
814// new addrs list.
815//  - If true, it updates ac.addrs and returns true. The ac will keep using
816//    the existing connection.
817//  - If false, it does nothing and returns false.
818func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
819	ac.mu.Lock()
820	defer ac.mu.Unlock()
821	channelz.Infof(ac.channelzID, "addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
822	if ac.state == connectivity.Shutdown ||
823		ac.state == connectivity.TransientFailure ||
824		ac.state == connectivity.Idle {
825		ac.addrs = addrs
826		return true
827	}
828
829	if ac.state == connectivity.Connecting {
830		return false
831	}
832
833	// ac.state is Ready, try to find the connected address.
834	var curAddrFound bool
835	for _, a := range addrs {
836		if reflect.DeepEqual(ac.curAddr, a) {
837			curAddrFound = true
838			break
839		}
840	}
841	channelz.Infof(ac.channelzID, "addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
842	if curAddrFound {
843		ac.addrs = addrs
844	}
845
846	return curAddrFound
847}
848
849// GetMethodConfig gets the method config of the input method.
850// If there's an exact match for input method (i.e. /service/method), we return
851// the corresponding MethodConfig.
852// If there isn't an exact match for the input method, we look for the default config
853// under the service (i.e /service/). If there is a default MethodConfig for
854// the service, we return it.
855// Otherwise, we return an empty MethodConfig.
856func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
857	// TODO: Avoid the locking here.
858	cc.mu.RLock()
859	defer cc.mu.RUnlock()
860	if cc.sc == nil {
861		return MethodConfig{}
862	}
863	m, ok := cc.sc.Methods[method]
864	if !ok {
865		i := strings.LastIndex(method, "/")
866		m = cc.sc.Methods[method[:i+1]]
867	}
868	return m
869}
870
871func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
872	cc.mu.RLock()
873	defer cc.mu.RUnlock()
874	if cc.sc == nil {
875		return nil
876	}
877	return cc.sc.healthCheckConfig
878}
879
880func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
881	t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
882		Ctx:            ctx,
883		FullMethodName: method,
884	})
885	if err != nil {
886		return nil, nil, toRPCErr(err)
887	}
888	return t, done, nil
889}
890
891func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, addrs []resolver.Address) {
892	if sc == nil {
893		// should never reach here.
894		return
895	}
896	cc.sc = sc
897
898	if cc.sc.retryThrottling != nil {
899		newThrottler := &retryThrottler{
900			tokens: cc.sc.retryThrottling.MaxTokens,
901			max:    cc.sc.retryThrottling.MaxTokens,
902			thresh: cc.sc.retryThrottling.MaxTokens / 2,
903			ratio:  cc.sc.retryThrottling.TokenRatio,
904		}
905		cc.retryThrottler.Store(newThrottler)
906	} else {
907		cc.retryThrottler.Store((*retryThrottler)(nil))
908	}
909
910	if cc.dopts.balancerBuilder == nil {
911		// Only look at balancer types and switch balancer if balancer dial
912		// option is not set.
913		var newBalancerName string
914		if cc.sc != nil && cc.sc.lbConfig != nil {
915			newBalancerName = cc.sc.lbConfig.name
916		} else {
917			var isGRPCLB bool
918			for _, a := range addrs {
919				if a.Type == resolver.GRPCLB {
920					isGRPCLB = true
921					break
922				}
923			}
924			if isGRPCLB {
925				newBalancerName = grpclbName
926			} else if cc.sc != nil && cc.sc.LB != nil {
927				newBalancerName = *cc.sc.LB
928			} else {
929				newBalancerName = PickFirstBalancerName
930			}
931		}
932		cc.switchBalancer(newBalancerName)
933	} else if cc.balancerWrapper == nil {
934		// Balancer dial option was set, and this is the first time handling
935		// resolved addresses. Build a balancer with dopts.balancerBuilder.
936		cc.curBalancerName = cc.dopts.balancerBuilder.Name()
937		cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
938	}
939}
940
941func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) {
942	cc.mu.RLock()
943	r := cc.resolverWrapper
944	cc.mu.RUnlock()
945	if r == nil {
946		return
947	}
948	go r.resolveNow(o)
949}
950
951// ResetConnectBackoff wakes up all subchannels in transient failure and causes
952// them to attempt another connection immediately.  It also resets the backoff
953// times used for subsequent attempts regardless of the current state.
954//
955// In general, this function should not be used.  Typical service or network
956// outages result in a reasonable client reconnection strategy by default.
957// However, if a previously unavailable network becomes available, this may be
958// used to trigger an immediate reconnect.
959//
960// This API is EXPERIMENTAL.
961func (cc *ClientConn) ResetConnectBackoff() {
962	cc.mu.Lock()
963	conns := cc.conns
964	cc.mu.Unlock()
965	for ac := range conns {
966		ac.resetConnectBackoff()
967	}
968}
969
970// Close tears down the ClientConn and all underlying connections.
971func (cc *ClientConn) Close() error {
972	defer cc.cancel()
973
974	cc.mu.Lock()
975	if cc.conns == nil {
976		cc.mu.Unlock()
977		return ErrClientConnClosing
978	}
979	conns := cc.conns
980	cc.conns = nil
981	cc.csMgr.updateState(connectivity.Shutdown)
982
983	rWrapper := cc.resolverWrapper
984	cc.resolverWrapper = nil
985	bWrapper := cc.balancerWrapper
986	cc.balancerWrapper = nil
987	cc.mu.Unlock()
988
989	cc.blockingpicker.close()
990
991	if rWrapper != nil {
992		rWrapper.close()
993	}
994	if bWrapper != nil {
995		bWrapper.close()
996	}
997
998	for ac := range conns {
999		ac.tearDown(ErrClientConnClosing)
1000	}
1001	if channelz.IsOn() {
1002		ted := &channelz.TraceEventDesc{
1003			Desc:     "Channel Deleted",
1004			Severity: channelz.CtINFO,
1005		}
1006		if cc.dopts.channelzParentID != 0 {
1007			ted.Parent = &channelz.TraceEventDesc{
1008				Desc:     fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID),
1009				Severity: channelz.CtINFO,
1010			}
1011		}
1012		channelz.AddTraceEvent(cc.channelzID, 0, ted)
1013		// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
1014		// the entity being deleted, and thus prevent it from being deleted right away.
1015		channelz.RemoveEntry(cc.channelzID)
1016	}
1017	return nil
1018}
1019
1020// addrConn is a network connection to a given address.
1021type addrConn struct {
1022	ctx    context.Context
1023	cancel context.CancelFunc
1024
1025	cc     *ClientConn
1026	dopts  dialOptions
1027	acbw   balancer.SubConn
1028	scopts balancer.NewSubConnOptions
1029
1030	// transport is set when there's a viable transport (note: ac state may not be READY as LB channel
1031	// health checking may require server to report healthy to set ac to READY), and is reset
1032	// to nil when the current transport should no longer be used to create a stream (e.g. after GoAway
1033	// is received, transport is closed, ac has been torn down).
1034	transport transport.ClientTransport // The current transport.
1035
1036	mu      sync.Mutex
1037	curAddr resolver.Address   // The current address.
1038	addrs   []resolver.Address // All addresses that the resolver resolved to.
1039
1040	// Use updateConnectivityState for updating addrConn's connectivity state.
1041	state connectivity.State
1042
1043	backoffIdx   int // Needs to be stateful for resetConnectBackoff.
1044	resetBackoff chan struct{}
1045
1046	channelzID int64 // channelz unique identification number.
1047	czData     *channelzData
1048}
1049
1050// Note: this requires a lock on ac.mu.
1051func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) {
1052	if ac.state == s {
1053		return
1054	}
1055	ac.state = s
1056	channelz.Infof(ac.channelzID, "Subchannel Connectivity change to %v", s)
1057	ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr)
1058}
1059
1060// adjustParams updates parameters used to create transports upon
1061// receiving a GoAway.
1062func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
1063	switch r {
1064	case transport.GoAwayTooManyPings:
1065		v := 2 * ac.dopts.copts.KeepaliveParams.Time
1066		ac.cc.mu.Lock()
1067		if v > ac.cc.mkp.Time {
1068			ac.cc.mkp.Time = v
1069		}
1070		ac.cc.mu.Unlock()
1071	}
1072}
1073
1074func (ac *addrConn) resetTransport() {
1075	for i := 0; ; i++ {
1076		if i > 0 {
1077			ac.cc.resolveNow(resolver.ResolveNowOptions{})
1078		}
1079
1080		ac.mu.Lock()
1081		if ac.state == connectivity.Shutdown {
1082			ac.mu.Unlock()
1083			return
1084		}
1085
1086		addrs := ac.addrs
1087		backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
1088		// This will be the duration that dial gets to finish.
1089		dialDuration := minConnectTimeout
1090		if ac.dopts.minConnectTimeout != nil {
1091			dialDuration = ac.dopts.minConnectTimeout()
1092		}
1093
1094		if dialDuration < backoffFor {
1095			// Give dial more time as we keep failing to connect.
1096			dialDuration = backoffFor
1097		}
1098		// We can potentially spend all the time trying the first address, and
1099		// if the server accepts the connection and then hangs, the following
1100		// addresses will never be tried.
1101		//
1102		// The spec doesn't mention what should be done for multiple addresses.
1103		// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm
1104		connectDeadline := time.Now().Add(dialDuration)
1105
1106		ac.updateConnectivityState(connectivity.Connecting, nil)
1107		ac.transport = nil
1108		ac.mu.Unlock()
1109
1110		newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline)
1111		if err != nil {
1112			// After exhausting all addresses, the addrConn enters
1113			// TRANSIENT_FAILURE.
1114			ac.mu.Lock()
1115			if ac.state == connectivity.Shutdown {
1116				ac.mu.Unlock()
1117				return
1118			}
1119			ac.updateConnectivityState(connectivity.TransientFailure, err)
1120
1121			// Backoff.
1122			b := ac.resetBackoff
1123			ac.mu.Unlock()
1124
1125			timer := time.NewTimer(backoffFor)
1126			select {
1127			case <-timer.C:
1128				ac.mu.Lock()
1129				ac.backoffIdx++
1130				ac.mu.Unlock()
1131			case <-b:
1132				timer.Stop()
1133			case <-ac.ctx.Done():
1134				timer.Stop()
1135				return
1136			}
1137			continue
1138		}
1139
1140		ac.mu.Lock()
1141		if ac.state == connectivity.Shutdown {
1142			ac.mu.Unlock()
1143			newTr.Close()
1144			return
1145		}
1146		ac.curAddr = addr
1147		ac.transport = newTr
1148		ac.backoffIdx = 0
1149
1150		hctx, hcancel := context.WithCancel(ac.ctx)
1151		ac.startHealthCheck(hctx)
1152		ac.mu.Unlock()
1153
1154		// Block until the created transport is down. And when this happens,
1155		// we restart from the top of the addr list.
1156		<-reconnect.Done()
1157		hcancel()
1158		// restart connecting - the top of the loop will set state to
1159		// CONNECTING.  This is against the current connectivity semantics doc,
1160		// however it allows for graceful behavior for RPCs not yet dispatched
1161		// - unfortunate timing would otherwise lead to the RPC failing even
1162		// though the TRANSIENT_FAILURE state (called for by the doc) would be
1163		// instantaneous.
1164		//
1165		// Ideally we should transition to Idle here and block until there is
1166		// RPC activity that leads to the balancer requesting a reconnect of
1167		// the associated SubConn.
1168	}
1169}
1170
1171// tryAllAddrs tries to creates a connection to the addresses, and stop when at the
1172// first successful one. It returns the transport, the address and a Event in
1173// the successful case. The Event fires when the returned transport disconnects.
1174func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) (transport.ClientTransport, resolver.Address, *grpcsync.Event, error) {
1175	var firstConnErr error
1176	for _, addr := range addrs {
1177		ac.mu.Lock()
1178		if ac.state == connectivity.Shutdown {
1179			ac.mu.Unlock()
1180			return nil, resolver.Address{}, nil, errConnClosing
1181		}
1182
1183		ac.cc.mu.RLock()
1184		ac.dopts.copts.KeepaliveParams = ac.cc.mkp
1185		ac.cc.mu.RUnlock()
1186
1187		copts := ac.dopts.copts
1188		if ac.scopts.CredsBundle != nil {
1189			copts.CredsBundle = ac.scopts.CredsBundle
1190		}
1191		ac.mu.Unlock()
1192
1193		channelz.Infof(ac.channelzID, "Subchannel picks a new address %q to connect", addr.Addr)
1194
1195		newTr, reconnect, err := ac.createTransport(addr, copts, connectDeadline)
1196		if err == nil {
1197			return newTr, addr, reconnect, nil
1198		}
1199		if firstConnErr == nil {
1200			firstConnErr = err
1201		}
1202		ac.cc.blockingpicker.updateConnectionError(err)
1203	}
1204
1205	// Couldn't connect to any address.
1206	return nil, resolver.Address{}, nil, firstConnErr
1207}
1208
1209// createTransport creates a connection to addr. It returns the transport and a
1210// Event in the successful case. The Event fires when the returned transport
1211// disconnects.
1212func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) (transport.ClientTransport, *grpcsync.Event, error) {
1213	prefaceReceived := make(chan struct{})
1214	onCloseCalled := make(chan struct{})
1215	reconnect := grpcsync.NewEvent()
1216
1217	authority := ac.cc.authority
1218	// addr.ServerName takes precedent over ClientConn authority, if present.
1219	if addr.ServerName != "" {
1220		authority = addr.ServerName
1221	}
1222
1223	target := transport.TargetInfo{
1224		Addr:      addr.Addr,
1225		Metadata:  addr.Metadata,
1226		Authority: authority,
1227	}
1228
1229	once := sync.Once{}
1230	onGoAway := func(r transport.GoAwayReason) {
1231		ac.mu.Lock()
1232		ac.adjustParams(r)
1233		once.Do(func() {
1234			if ac.state == connectivity.Ready {
1235				// Prevent this SubConn from being used for new RPCs by setting its
1236				// state to Connecting.
1237				//
1238				// TODO: this should be Idle when grpc-go properly supports it.
1239				ac.updateConnectivityState(connectivity.Connecting, nil)
1240			}
1241		})
1242		ac.mu.Unlock()
1243		reconnect.Fire()
1244	}
1245
1246	onClose := func() {
1247		ac.mu.Lock()
1248		once.Do(func() {
1249			if ac.state == connectivity.Ready {
1250				// Prevent this SubConn from being used for new RPCs by setting its
1251				// state to Connecting.
1252				//
1253				// TODO: this should be Idle when grpc-go properly supports it.
1254				ac.updateConnectivityState(connectivity.Connecting, nil)
1255			}
1256		})
1257		ac.mu.Unlock()
1258		close(onCloseCalled)
1259		reconnect.Fire()
1260	}
1261
1262	onPrefaceReceipt := func() {
1263		close(prefaceReceived)
1264	}
1265
1266	connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
1267	defer cancel()
1268	if channelz.IsOn() {
1269		copts.ChannelzParentID = ac.channelzID
1270	}
1271
1272	newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)
1273	if err != nil {
1274		// newTr is either nil, or closed.
1275		channelz.Warningf(ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v. Err: %v. Reconnecting...", addr, err)
1276		return nil, nil, err
1277	}
1278
1279	select {
1280	case <-time.After(time.Until(connectDeadline)):
1281		// We didn't get the preface in time.
1282		newTr.Close()
1283		channelz.Warningf(ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
1284		return nil, nil, errors.New("timed out waiting for server handshake")
1285	case <-prefaceReceived:
1286		// We got the preface - huzzah! things are good.
1287	case <-onCloseCalled:
1288		// The transport has already closed - noop.
1289		return nil, nil, errors.New("connection closed")
1290		// TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.
1291	}
1292	return newTr, reconnect, nil
1293}
1294
1295// startHealthCheck starts the health checking stream (RPC) to watch the health
1296// stats of this connection if health checking is requested and configured.
1297//
1298// LB channel health checking is enabled when all requirements below are met:
1299// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption
1300// 2. internal.HealthCheckFunc is set by importing the grpc/healthcheck package
1301// 3. a service config with non-empty healthCheckConfig field is provided
1302// 4. the load balancer requests it
1303//
1304// It sets addrConn to READY if the health checking stream is not started.
1305//
1306// Caller must hold ac.mu.
1307func (ac *addrConn) startHealthCheck(ctx context.Context) {
1308	var healthcheckManagingState bool
1309	defer func() {
1310		if !healthcheckManagingState {
1311			ac.updateConnectivityState(connectivity.Ready, nil)
1312		}
1313	}()
1314
1315	if ac.cc.dopts.disableHealthCheck {
1316		return
1317	}
1318	healthCheckConfig := ac.cc.healthCheckConfig()
1319	if healthCheckConfig == nil {
1320		return
1321	}
1322	if !ac.scopts.HealthCheckEnabled {
1323		return
1324	}
1325	healthCheckFunc := ac.cc.dopts.healthCheckFunc
1326	if healthCheckFunc == nil {
1327		// The health package is not imported to set health check function.
1328		//
1329		// TODO: add a link to the health check doc in the error message.
1330		channelz.Error(ac.channelzID, "Health check is requested but health check function is not set.")
1331		return
1332	}
1333
1334	healthcheckManagingState = true
1335
1336	// Set up the health check helper functions.
1337	currentTr := ac.transport
1338	newStream := func(method string) (interface{}, error) {
1339		ac.mu.Lock()
1340		if ac.transport != currentTr {
1341			ac.mu.Unlock()
1342			return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
1343		}
1344		ac.mu.Unlock()
1345		return newNonRetryClientStream(ctx, &StreamDesc{ServerStreams: true}, method, currentTr, ac)
1346	}
1347	setConnectivityState := func(s connectivity.State, lastErr error) {
1348		ac.mu.Lock()
1349		defer ac.mu.Unlock()
1350		if ac.transport != currentTr {
1351			return
1352		}
1353		ac.updateConnectivityState(s, lastErr)
1354	}
1355	// Start the health checking stream.
1356	go func() {
1357		err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName)
1358		if err != nil {
1359			if status.Code(err) == codes.Unimplemented {
1360				channelz.Error(ac.channelzID, "Subchannel health check is unimplemented at server side, thus health check is disabled")
1361			} else {
1362				channelz.Errorf(ac.channelzID, "HealthCheckFunc exits with unexpected error %v", err)
1363			}
1364		}
1365	}()
1366}
1367
1368func (ac *addrConn) resetConnectBackoff() {
1369	ac.mu.Lock()
1370	close(ac.resetBackoff)
1371	ac.backoffIdx = 0
1372	ac.resetBackoff = make(chan struct{})
1373	ac.mu.Unlock()
1374}
1375
1376// getReadyTransport returns the transport if ac's state is READY.
1377// Otherwise it returns nil, false.
1378// If ac's state is IDLE, it will trigger ac to connect.
1379func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
1380	ac.mu.Lock()
1381	if ac.state == connectivity.Ready && ac.transport != nil {
1382		t := ac.transport
1383		ac.mu.Unlock()
1384		return t, true
1385	}
1386	var idle bool
1387	if ac.state == connectivity.Idle {
1388		idle = true
1389	}
1390	ac.mu.Unlock()
1391	// Trigger idle ac to connect.
1392	if idle {
1393		ac.connect()
1394	}
1395	return nil, false
1396}
1397
1398// tearDown starts to tear down the addrConn.
1399// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
1400// some edge cases (e.g., the caller opens and closes many addrConn's in a
1401// tight loop.
1402// tearDown doesn't remove ac from ac.cc.conns.
1403func (ac *addrConn) tearDown(err error) {
1404	ac.mu.Lock()
1405	if ac.state == connectivity.Shutdown {
1406		ac.mu.Unlock()
1407		return
1408	}
1409	curTr := ac.transport
1410	ac.transport = nil
1411	// We have to set the state to Shutdown before anything else to prevent races
1412	// between setting the state and logic that waits on context cancellation / etc.
1413	ac.updateConnectivityState(connectivity.Shutdown, nil)
1414	ac.cancel()
1415	ac.curAddr = resolver.Address{}
1416	if err == errConnDrain && curTr != nil {
1417		// GracefulClose(...) may be executed multiple times when
1418		// i) receiving multiple GoAway frames from the server; or
1419		// ii) there are concurrent name resolver/Balancer triggered
1420		// address removal and GoAway.
1421		// We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu.
1422		ac.mu.Unlock()
1423		curTr.GracefulClose()
1424		ac.mu.Lock()
1425	}
1426	if channelz.IsOn() {
1427		channelz.AddTraceEvent(ac.channelzID, 0, &channelz.TraceEventDesc{
1428			Desc:     "Subchannel Deleted",
1429			Severity: channelz.CtINFO,
1430			Parent: &channelz.TraceEventDesc{
1431				Desc:     fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID),
1432				Severity: channelz.CtINFO,
1433			},
1434		})
1435		// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
1436		// the entity being deleted, and thus prevent it from being deleted right away.
1437		channelz.RemoveEntry(ac.channelzID)
1438	}
1439	ac.mu.Unlock()
1440}
1441
1442func (ac *addrConn) getState() connectivity.State {
1443	ac.mu.Lock()
1444	defer ac.mu.Unlock()
1445	return ac.state
1446}
1447
1448func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric {
1449	ac.mu.Lock()
1450	addr := ac.curAddr.Addr
1451	ac.mu.Unlock()
1452	return &channelz.ChannelInternalMetric{
1453		State:                    ac.getState(),
1454		Target:                   addr,
1455		CallsStarted:             atomic.LoadInt64(&ac.czData.callsStarted),
1456		CallsSucceeded:           atomic.LoadInt64(&ac.czData.callsSucceeded),
1457		CallsFailed:              atomic.LoadInt64(&ac.czData.callsFailed),
1458		LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)),
1459	}
1460}
1461
1462func (ac *addrConn) incrCallsStarted() {
1463	atomic.AddInt64(&ac.czData.callsStarted, 1)
1464	atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano())
1465}
1466
1467func (ac *addrConn) incrCallsSucceeded() {
1468	atomic.AddInt64(&ac.czData.callsSucceeded, 1)
1469}
1470
1471func (ac *addrConn) incrCallsFailed() {
1472	atomic.AddInt64(&ac.czData.callsFailed, 1)
1473}
1474
1475type retryThrottler struct {
1476	max    float64
1477	thresh float64
1478	ratio  float64
1479
1480	mu     sync.Mutex
1481	tokens float64 // TODO(dfawley): replace with atomic and remove lock.
1482}
1483
1484// throttle subtracts a retry token from the pool and returns whether a retry
1485// should be throttled (disallowed) based upon the retry throttling policy in
1486// the service config.
1487func (rt *retryThrottler) throttle() bool {
1488	if rt == nil {
1489		return false
1490	}
1491	rt.mu.Lock()
1492	defer rt.mu.Unlock()
1493	rt.tokens--
1494	if rt.tokens < 0 {
1495		rt.tokens = 0
1496	}
1497	return rt.tokens <= rt.thresh
1498}
1499
1500func (rt *retryThrottler) successfulRPC() {
1501	if rt == nil {
1502		return
1503	}
1504	rt.mu.Lock()
1505	defer rt.mu.Unlock()
1506	rt.tokens += rt.ratio
1507	if rt.tokens > rt.max {
1508		rt.tokens = rt.max
1509	}
1510}
1511
1512type channelzChannel struct {
1513	cc *ClientConn
1514}
1515
1516func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric {
1517	return c.cc.channelzMetric()
1518}
1519
1520// ErrClientConnTimeout indicates that the ClientConn cannot establish the
1521// underlying connections within the specified timeout.
1522//
1523// Deprecated: This error is never returned by grpc and should not be
1524// referenced by users.
1525var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
1526
1527func (cc *ClientConn) getResolver(scheme string) resolver.Builder {
1528	for _, rb := range cc.dopts.resolvers {
1529		if scheme == rb.Scheme() {
1530			return rb
1531		}
1532	}
1533	return resolver.Get(scheme)
1534}
1535