1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22	"context"
23	"errors"
24	"fmt"
25	"math"
26	"net"
27	"reflect"
28	"strings"
29	"sync"
30	"sync/atomic"
31	"time"
32
33	"google.golang.org/grpc/balancer"
34	"google.golang.org/grpc/balancer/base"
35	"google.golang.org/grpc/codes"
36	"google.golang.org/grpc/connectivity"
37	"google.golang.org/grpc/credentials"
38	"google.golang.org/grpc/internal/backoff"
39	"google.golang.org/grpc/internal/channelz"
40	"google.golang.org/grpc/internal/grpcsync"
41	"google.golang.org/grpc/internal/grpcutil"
42	"google.golang.org/grpc/internal/transport"
43	"google.golang.org/grpc/keepalive"
44	"google.golang.org/grpc/resolver"
45	"google.golang.org/grpc/serviceconfig"
46	"google.golang.org/grpc/status"
47
48	_ "google.golang.org/grpc/balancer/roundrobin"           // To register roundrobin.
49	_ "google.golang.org/grpc/internal/resolver/dns"         // To register dns resolver.
50	_ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver.
51)
52
53const (
54	// minimum time to give a connection to complete
55	minConnectTimeout = 20 * time.Second
56	// must match grpclbName in grpclb/grpclb.go
57	grpclbName = "grpclb"
58)
59
60var (
61	// ErrClientConnClosing indicates that the operation is illegal because
62	// the ClientConn is closing.
63	//
64	// Deprecated: this error should not be relied upon by users; use the status
65	// code of Canceled instead.
66	ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing")
67	// errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
68	errConnDrain = errors.New("grpc: the connection is drained")
69	// errConnClosing indicates that the connection is closing.
70	errConnClosing = errors.New("grpc: the connection is closing")
71	// errBalancerClosed indicates that the balancer is closed.
72	errBalancerClosed = errors.New("grpc: balancer is closed")
73	// invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default
74	// service config.
75	invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid"
76)
77
78// The following errors are returned from Dial and DialContext
79var (
80	// errNoTransportSecurity indicates that there is no transport security
81	// being set for ClientConn. Users should either set one or explicitly
82	// call WithInsecure DialOption to disable security.
83	errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
84	// errTransportCredsAndBundle indicates that creds bundle is used together
85	// with other individual Transport Credentials.
86	errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials")
87	// errTransportCredentialsMissing indicates that users want to transmit security
88	// information (e.g., OAuth2 token) which requires secure connection on an insecure
89	// connection.
90	errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
91	// errCredentialsConflict indicates that grpc.WithTransportCredentials()
92	// and grpc.WithInsecure() are both called for a connection.
93	errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
94)
95
96const (
97	defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
98	defaultClientMaxSendMessageSize    = math.MaxInt32
99	// http2IOBufSize specifies the buffer size for sending frames.
100	defaultWriteBufSize = 32 * 1024
101	defaultReadBufSize  = 32 * 1024
102)
103
104// Dial creates a client connection to the given target.
105func Dial(target string, opts ...DialOption) (*ClientConn, error) {
106	return DialContext(context.Background(), target, opts...)
107}
108
109// DialContext creates a client connection to the given target. By default, it's
110// a non-blocking dial (the function won't wait for connections to be
111// established, and connecting happens in the background). To make it a blocking
112// dial, use WithBlock() dial option.
113//
114// In the non-blocking case, the ctx does not act against the connection. It
115// only controls the setup steps.
116//
117// In the blocking case, ctx can be used to cancel or expire the pending
118// connection. Once this function returns, the cancellation and expiration of
119// ctx will be noop. Users should call ClientConn.Close to terminate all the
120// pending operations after this function returns.
121//
122// The target name syntax is defined in
123// https://github.com/grpc/grpc/blob/master/doc/naming.md.
124// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
125func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
126	cc := &ClientConn{
127		target:            target,
128		csMgr:             &connectivityStateManager{},
129		conns:             make(map[*addrConn]struct{}),
130		dopts:             defaultDialOptions(),
131		blockingpicker:    newPickerWrapper(),
132		czData:            new(channelzData),
133		firstResolveEvent: grpcsync.NewEvent(),
134	}
135	cc.retryThrottler.Store((*retryThrottler)(nil))
136	cc.ctx, cc.cancel = context.WithCancel(context.Background())
137
138	for _, opt := range opts {
139		opt.apply(&cc.dopts)
140	}
141
142	chainUnaryClientInterceptors(cc)
143	chainStreamClientInterceptors(cc)
144
145	defer func() {
146		if err != nil {
147			cc.Close()
148		}
149	}()
150
151	if channelz.IsOn() {
152		if cc.dopts.channelzParentID != 0 {
153			cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
154			channelz.AddTraceEvent(cc.channelzID, 0, &channelz.TraceEventDesc{
155				Desc:     "Channel Created",
156				Severity: channelz.CtINFO,
157				Parent: &channelz.TraceEventDesc{
158					Desc:     fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID),
159					Severity: channelz.CtINFO,
160				},
161			})
162		} else {
163			cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)
164			channelz.Info(cc.channelzID, "Channel Created")
165		}
166		cc.csMgr.channelzID = cc.channelzID
167	}
168
169	if !cc.dopts.insecure {
170		if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
171			return nil, errNoTransportSecurity
172		}
173		if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
174			return nil, errTransportCredsAndBundle
175		}
176	} else {
177		if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil {
178			return nil, errCredentialsConflict
179		}
180		for _, cd := range cc.dopts.copts.PerRPCCredentials {
181			if cd.RequireTransportSecurity() {
182				return nil, errTransportCredentialsMissing
183			}
184		}
185	}
186
187	if cc.dopts.defaultServiceConfigRawJSON != nil {
188		scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON)
189		if scpr.Err != nil {
190			return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err)
191		}
192		cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig)
193	}
194	cc.mkp = cc.dopts.copts.KeepaliveParams
195
196	if cc.dopts.copts.Dialer == nil {
197		cc.dopts.copts.Dialer = newProxyDialer(
198			func(ctx context.Context, addr string) (net.Conn, error) {
199				network, addr := parseDialTarget(addr)
200				return (&net.Dialer{}).DialContext(ctx, network, addr)
201			},
202		)
203	}
204
205	if cc.dopts.copts.UserAgent != "" {
206		cc.dopts.copts.UserAgent += " " + grpcUA
207	} else {
208		cc.dopts.copts.UserAgent = grpcUA
209	}
210
211	if cc.dopts.timeout > 0 {
212		var cancel context.CancelFunc
213		ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
214		defer cancel()
215	}
216	defer func() {
217		select {
218		case <-ctx.Done():
219			conn, err = nil, ctx.Err()
220		default:
221		}
222	}()
223
224	scSet := false
225	if cc.dopts.scChan != nil {
226		// Try to get an initial service config.
227		select {
228		case sc, ok := <-cc.dopts.scChan:
229			if ok {
230				cc.sc = &sc
231				scSet = true
232			}
233		default:
234		}
235	}
236	if cc.dopts.bs == nil {
237		cc.dopts.bs = backoff.DefaultExponential
238	}
239
240	// Determine the resolver to use.
241	cc.parsedTarget = grpcutil.ParseTarget(cc.target)
242	channelz.Infof(cc.channelzID, "parsed scheme: %q", cc.parsedTarget.Scheme)
243	resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)
244	if resolverBuilder == nil {
245		// If resolver builder is still nil, the parsed target's scheme is
246		// not registered. Fallback to default resolver and set Endpoint to
247		// the original target.
248		channelz.Infof(cc.channelzID, "scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
249		cc.parsedTarget = resolver.Target{
250			Scheme:   resolver.GetDefaultScheme(),
251			Endpoint: target,
252		}
253		resolverBuilder = cc.getResolver(cc.parsedTarget.Scheme)
254		if resolverBuilder == nil {
255			return nil, fmt.Errorf("could not get resolver for default scheme: %q", cc.parsedTarget.Scheme)
256		}
257	}
258
259	creds := cc.dopts.copts.TransportCredentials
260	if creds != nil && creds.Info().ServerName != "" {
261		cc.authority = creds.Info().ServerName
262	} else if cc.dopts.insecure && cc.dopts.authority != "" {
263		cc.authority = cc.dopts.authority
264	} else {
265		// Use endpoint from "scheme://authority/endpoint" as the default
266		// authority for ClientConn.
267		cc.authority = cc.parsedTarget.Endpoint
268	}
269
270	if cc.dopts.scChan != nil && !scSet {
271		// Blocking wait for the initial service config.
272		select {
273		case sc, ok := <-cc.dopts.scChan:
274			if ok {
275				cc.sc = &sc
276			}
277		case <-ctx.Done():
278			return nil, ctx.Err()
279		}
280	}
281	if cc.dopts.scChan != nil {
282		go cc.scWatcher()
283	}
284
285	var credsClone credentials.TransportCredentials
286	if creds := cc.dopts.copts.TransportCredentials; creds != nil {
287		credsClone = creds.Clone()
288	}
289	cc.balancerBuildOpts = balancer.BuildOptions{
290		DialCreds:        credsClone,
291		CredsBundle:      cc.dopts.copts.CredsBundle,
292		Dialer:           cc.dopts.copts.Dialer,
293		ChannelzParentID: cc.channelzID,
294		Target:           cc.parsedTarget,
295	}
296
297	// Build the resolver.
298	rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
299	if err != nil {
300		return nil, fmt.Errorf("failed to build resolver: %v", err)
301	}
302	cc.mu.Lock()
303	cc.resolverWrapper = rWrapper
304	cc.mu.Unlock()
305
306	// A blocking dial blocks until the clientConn is ready.
307	if cc.dopts.block {
308		for {
309			s := cc.GetState()
310			if s == connectivity.Ready {
311				break
312			} else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
313				if err = cc.blockingpicker.connectionError(); err != nil {
314					terr, ok := err.(interface {
315						Temporary() bool
316					})
317					if ok && !terr.Temporary() {
318						return nil, err
319					}
320				}
321			}
322			if !cc.WaitForStateChange(ctx, s) {
323				// ctx got timeout or canceled.
324				return nil, ctx.Err()
325			}
326		}
327	}
328
329	return cc, nil
330}
331
332// chainUnaryClientInterceptors chains all unary client interceptors into one.
333func chainUnaryClientInterceptors(cc *ClientConn) {
334	interceptors := cc.dopts.chainUnaryInts
335	// Prepend dopts.unaryInt to the chaining interceptors if it exists, since unaryInt will
336	// be executed before any other chained interceptors.
337	if cc.dopts.unaryInt != nil {
338		interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)
339	}
340	var chainedInt UnaryClientInterceptor
341	if len(interceptors) == 0 {
342		chainedInt = nil
343	} else if len(interceptors) == 1 {
344		chainedInt = interceptors[0]
345	} else {
346		chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {
347			return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)
348		}
349	}
350	cc.dopts.unaryInt = chainedInt
351}
352
353// getChainUnaryInvoker recursively generate the chained unary invoker.
354func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, finalInvoker UnaryInvoker) UnaryInvoker {
355	if curr == len(interceptors)-1 {
356		return finalInvoker
357	}
358	return func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
359		return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...)
360	}
361}
362
363// chainStreamClientInterceptors chains all stream client interceptors into one.
364func chainStreamClientInterceptors(cc *ClientConn) {
365	interceptors := cc.dopts.chainStreamInts
366	// Prepend dopts.streamInt to the chaining interceptors if it exists, since streamInt will
367	// be executed before any other chained interceptors.
368	if cc.dopts.streamInt != nil {
369		interceptors = append([]StreamClientInterceptor{cc.dopts.streamInt}, interceptors...)
370	}
371	var chainedInt StreamClientInterceptor
372	if len(interceptors) == 0 {
373		chainedInt = nil
374	} else if len(interceptors) == 1 {
375		chainedInt = interceptors[0]
376	} else {
377		chainedInt = func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error) {
378			return interceptors[0](ctx, desc, cc, method, getChainStreamer(interceptors, 0, streamer), opts...)
379		}
380	}
381	cc.dopts.streamInt = chainedInt
382}
383
384// getChainStreamer recursively generate the chained client stream constructor.
385func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStreamer Streamer) Streamer {
386	if curr == len(interceptors)-1 {
387		return finalStreamer
388	}
389	return func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
390		return interceptors[curr+1](ctx, desc, cc, method, getChainStreamer(interceptors, curr+1, finalStreamer), opts...)
391	}
392}
393
394// connectivityStateManager keeps the connectivity.State of ClientConn.
395// This struct will eventually be exported so the balancers can access it.
396type connectivityStateManager struct {
397	mu         sync.Mutex
398	state      connectivity.State
399	notifyChan chan struct{}
400	channelzID int64
401}
402
403// updateState updates the connectivity.State of ClientConn.
404// If there's a change it notifies goroutines waiting on state change to
405// happen.
406func (csm *connectivityStateManager) updateState(state connectivity.State) {
407	csm.mu.Lock()
408	defer csm.mu.Unlock()
409	if csm.state == connectivity.Shutdown {
410		return
411	}
412	if csm.state == state {
413		return
414	}
415	csm.state = state
416	channelz.Infof(csm.channelzID, "Channel Connectivity change to %v", state)
417	if csm.notifyChan != nil {
418		// There are other goroutines waiting on this channel.
419		close(csm.notifyChan)
420		csm.notifyChan = nil
421	}
422}
423
424func (csm *connectivityStateManager) getState() connectivity.State {
425	csm.mu.Lock()
426	defer csm.mu.Unlock()
427	return csm.state
428}
429
430func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
431	csm.mu.Lock()
432	defer csm.mu.Unlock()
433	if csm.notifyChan == nil {
434		csm.notifyChan = make(chan struct{})
435	}
436	return csm.notifyChan
437}
438
439// ClientConnInterface defines the functions clients need to perform unary and
440// streaming RPCs.  It is implemented by *ClientConn, and is only intended to
441// be referenced by generated code.
442type ClientConnInterface interface {
443	// Invoke performs a unary RPC and returns after the response is received
444	// into reply.
445	Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error
446	// NewStream begins a streaming RPC.
447	NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error)
448}
449
450// Assert *ClientConn implements ClientConnInterface.
451var _ ClientConnInterface = (*ClientConn)(nil)
452
453// ClientConn represents a virtual connection to a conceptual endpoint, to
454// perform RPCs.
455//
456// A ClientConn is free to have zero or more actual connections to the endpoint
457// based on configuration, load, etc. It is also free to determine which actual
458// endpoints to use and may change it every RPC, permitting client-side load
459// balancing.
460//
461// A ClientConn encapsulates a range of functionality including name
462// resolution, TCP connection establishment (with retries and backoff) and TLS
463// handshakes. It also handles errors on established connections by
464// re-resolving the name and reconnecting.
465type ClientConn struct {
466	ctx    context.Context
467	cancel context.CancelFunc
468
469	target       string
470	parsedTarget resolver.Target
471	authority    string
472	dopts        dialOptions
473	csMgr        *connectivityStateManager
474
475	balancerBuildOpts balancer.BuildOptions
476	blockingpicker    *pickerWrapper
477
478	mu              sync.RWMutex
479	resolverWrapper *ccResolverWrapper
480	sc              *ServiceConfig
481	conns           map[*addrConn]struct{}
482	// Keepalive parameter can be updated if a GoAway is received.
483	mkp             keepalive.ClientParameters
484	curBalancerName string
485	balancerWrapper *ccBalancerWrapper
486	retryThrottler  atomic.Value
487
488	firstResolveEvent *grpcsync.Event
489
490	channelzID int64 // channelz unique identification number
491	czData     *channelzData
492}
493
494// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
495// ctx expires. A true value is returned in former case and false in latter.
496// This is an EXPERIMENTAL API.
497func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
498	ch := cc.csMgr.getNotifyChan()
499	if cc.csMgr.getState() != sourceState {
500		return true
501	}
502	select {
503	case <-ctx.Done():
504		return false
505	case <-ch:
506		return true
507	}
508}
509
510// GetState returns the connectivity.State of ClientConn.
511// This is an EXPERIMENTAL API.
512func (cc *ClientConn) GetState() connectivity.State {
513	return cc.csMgr.getState()
514}
515
516func (cc *ClientConn) scWatcher() {
517	for {
518		select {
519		case sc, ok := <-cc.dopts.scChan:
520			if !ok {
521				return
522			}
523			cc.mu.Lock()
524			// TODO: load balance policy runtime change is ignored.
525			// We may revisit this decision in the future.
526			cc.sc = &sc
527			cc.mu.Unlock()
528		case <-cc.ctx.Done():
529			return
530		}
531	}
532}
533
534// waitForResolvedAddrs blocks until the resolver has provided addresses or the
535// context expires.  Returns nil unless the context expires first; otherwise
536// returns a status error based on the context.
537func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
538	// This is on the RPC path, so we use a fast path to avoid the
539	// more-expensive "select" below after the resolver has returned once.
540	if cc.firstResolveEvent.HasFired() {
541		return nil
542	}
543	select {
544	case <-cc.firstResolveEvent.Done():
545		return nil
546	case <-ctx.Done():
547		return status.FromContextError(ctx.Err()).Err()
548	case <-cc.ctx.Done():
549		return ErrClientConnClosing
550	}
551}
552
553var emptyServiceConfig *ServiceConfig
554
555func init() {
556	cfg := parseServiceConfig("{}")
557	if cfg.Err != nil {
558		panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err))
559	}
560	emptyServiceConfig = cfg.Config.(*ServiceConfig)
561}
562
563func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) {
564	if cc.sc != nil {
565		cc.applyServiceConfigAndBalancer(cc.sc, addrs)
566		return
567	}
568	if cc.dopts.defaultServiceConfig != nil {
569		cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, addrs)
570	} else {
571		cc.applyServiceConfigAndBalancer(emptyServiceConfig, addrs)
572	}
573}
574
575func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
576	defer cc.firstResolveEvent.Fire()
577	cc.mu.Lock()
578	// Check if the ClientConn is already closed. Some fields (e.g.
579	// balancerWrapper) are set to nil when closing the ClientConn, and could
580	// cause nil pointer panic if we don't have this check.
581	if cc.conns == nil {
582		cc.mu.Unlock()
583		return nil
584	}
585
586	if err != nil {
587		// May need to apply the initial service config in case the resolver
588		// doesn't support service configs, or doesn't provide a service config
589		// with the new addresses.
590		cc.maybeApplyDefaultServiceConfig(nil)
591
592		if cc.balancerWrapper != nil {
593			cc.balancerWrapper.resolverError(err)
594		}
595
596		// No addresses are valid with err set; return early.
597		cc.mu.Unlock()
598		return balancer.ErrBadResolverState
599	}
600
601	var ret error
602	if cc.dopts.disableServiceConfig || s.ServiceConfig == nil {
603		cc.maybeApplyDefaultServiceConfig(s.Addresses)
604		// TODO: do we need to apply a failing LB policy if there is no
605		// default, per the error handling design?
606	} else {
607		if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok {
608			cc.applyServiceConfigAndBalancer(sc, s.Addresses)
609		} else {
610			ret = balancer.ErrBadResolverState
611			if cc.balancerWrapper == nil {
612				var err error
613				if s.ServiceConfig.Err != nil {
614					err = status.Errorf(codes.Unavailable, "error parsing service config: %v", s.ServiceConfig.Err)
615				} else {
616					err = status.Errorf(codes.Unavailable, "illegal service config type: %T", s.ServiceConfig.Config)
617				}
618				cc.blockingpicker.updatePicker(base.NewErrPicker(err))
619				cc.csMgr.updateState(connectivity.TransientFailure)
620				cc.mu.Unlock()
621				return ret
622			}
623		}
624	}
625
626	var balCfg serviceconfig.LoadBalancingConfig
627	if cc.dopts.balancerBuilder == nil && cc.sc != nil && cc.sc.lbConfig != nil {
628		balCfg = cc.sc.lbConfig.cfg
629	}
630
631	cbn := cc.curBalancerName
632	bw := cc.balancerWrapper
633	cc.mu.Unlock()
634	if cbn != grpclbName {
635		// Filter any grpclb addresses since we don't have the grpclb balancer.
636		for i := 0; i < len(s.Addresses); {
637			if s.Addresses[i].Type == resolver.GRPCLB {
638				copy(s.Addresses[i:], s.Addresses[i+1:])
639				s.Addresses = s.Addresses[:len(s.Addresses)-1]
640				continue
641			}
642			i++
643		}
644	}
645	uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
646	if ret == nil {
647		ret = uccsErr // prefer ErrBadResolver state since any other error is
648		// currently meaningless to the caller.
649	}
650	return ret
651}
652
653// switchBalancer starts the switching from current balancer to the balancer
654// with the given name.
655//
656// It will NOT send the current address list to the new balancer. If needed,
657// caller of this function should send address list to the new balancer after
658// this function returns.
659//
660// Caller must hold cc.mu.
661func (cc *ClientConn) switchBalancer(name string) {
662	if strings.EqualFold(cc.curBalancerName, name) {
663		return
664	}
665
666	channelz.Infof(cc.channelzID, "ClientConn switching balancer to %q", name)
667	if cc.dopts.balancerBuilder != nil {
668		channelz.Info(cc.channelzID, "ignoring balancer switching: Balancer DialOption used instead")
669		return
670	}
671	if cc.balancerWrapper != nil {
672		cc.balancerWrapper.close()
673	}
674
675	builder := balancer.Get(name)
676	if builder == nil {
677		channelz.Warningf(cc.channelzID, "Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName)
678		channelz.Infof(cc.channelzID, "failed to get balancer builder for: %v, using pick_first instead", name)
679		builder = newPickfirstBuilder()
680	} else {
681		channelz.Infof(cc.channelzID, "Channel switches to new LB policy %q", name)
682	}
683
684	cc.curBalancerName = builder.Name()
685	cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
686}
687
688func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {
689	cc.mu.Lock()
690	if cc.conns == nil {
691		cc.mu.Unlock()
692		return
693	}
694	// TODO(bar switching) send updates to all balancer wrappers when balancer
695	// gracefully switching is supported.
696	cc.balancerWrapper.handleSubConnStateChange(sc, s, err)
697	cc.mu.Unlock()
698}
699
700// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
701//
702// Caller needs to make sure len(addrs) > 0.
703func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
704	ac := &addrConn{
705		state:        connectivity.Idle,
706		cc:           cc,
707		addrs:        addrs,
708		scopts:       opts,
709		dopts:        cc.dopts,
710		czData:       new(channelzData),
711		resetBackoff: make(chan struct{}),
712	}
713	ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
714	// Track ac in cc. This needs to be done before any getTransport(...) is called.
715	cc.mu.Lock()
716	if cc.conns == nil {
717		cc.mu.Unlock()
718		return nil, ErrClientConnClosing
719	}
720	if channelz.IsOn() {
721		ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "")
722		channelz.AddTraceEvent(ac.channelzID, 0, &channelz.TraceEventDesc{
723			Desc:     "Subchannel Created",
724			Severity: channelz.CtINFO,
725			Parent: &channelz.TraceEventDesc{
726				Desc:     fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID),
727				Severity: channelz.CtINFO,
728			},
729		})
730	}
731	cc.conns[ac] = struct{}{}
732	cc.mu.Unlock()
733	return ac, nil
734}
735
736// removeAddrConn removes the addrConn in the subConn from clientConn.
737// It also tears down the ac with the given error.
738func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
739	cc.mu.Lock()
740	if cc.conns == nil {
741		cc.mu.Unlock()
742		return
743	}
744	delete(cc.conns, ac)
745	cc.mu.Unlock()
746	ac.tearDown(err)
747}
748
749func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric {
750	return &channelz.ChannelInternalMetric{
751		State:                    cc.GetState(),
752		Target:                   cc.target,
753		CallsStarted:             atomic.LoadInt64(&cc.czData.callsStarted),
754		CallsSucceeded:           atomic.LoadInt64(&cc.czData.callsSucceeded),
755		CallsFailed:              atomic.LoadInt64(&cc.czData.callsFailed),
756		LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)),
757	}
758}
759
760// Target returns the target string of the ClientConn.
761// This is an EXPERIMENTAL API.
762func (cc *ClientConn) Target() string {
763	return cc.target
764}
765
766func (cc *ClientConn) incrCallsStarted() {
767	atomic.AddInt64(&cc.czData.callsStarted, 1)
768	atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano())
769}
770
771func (cc *ClientConn) incrCallsSucceeded() {
772	atomic.AddInt64(&cc.czData.callsSucceeded, 1)
773}
774
775func (cc *ClientConn) incrCallsFailed() {
776	atomic.AddInt64(&cc.czData.callsFailed, 1)
777}
778
779// connect starts creating a transport.
780// It does nothing if the ac is not IDLE.
781// TODO(bar) Move this to the addrConn section.
782func (ac *addrConn) connect() error {
783	ac.mu.Lock()
784	if ac.state == connectivity.Shutdown {
785		ac.mu.Unlock()
786		return errConnClosing
787	}
788	if ac.state != connectivity.Idle {
789		ac.mu.Unlock()
790		return nil
791	}
792	// Update connectivity state within the lock to prevent subsequent or
793	// concurrent calls from resetting the transport more than once.
794	ac.updateConnectivityState(connectivity.Connecting, nil)
795	ac.mu.Unlock()
796
797	// Start a goroutine connecting to the server asynchronously.
798	go ac.resetTransport()
799	return nil
800}
801
802// tryUpdateAddrs tries to update ac.addrs with the new addresses list.
803//
804// If ac is Connecting, it returns false. The caller should tear down the ac and
805// create a new one. Note that the backoff will be reset when this happens.
806//
807// If ac is TransientFailure, it updates ac.addrs and returns true. The updated
808// addresses will be picked up by retry in the next iteration after backoff.
809//
810// If ac is Shutdown or Idle, it updates ac.addrs and returns true.
811//
812// If ac is Ready, it checks whether current connected address of ac is in the
813// new addrs list.
814//  - If true, it updates ac.addrs and returns true. The ac will keep using
815//    the existing connection.
816//  - If false, it does nothing and returns false.
817func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
818	ac.mu.Lock()
819	defer ac.mu.Unlock()
820	channelz.Infof(ac.channelzID, "addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
821	if ac.state == connectivity.Shutdown ||
822		ac.state == connectivity.TransientFailure ||
823		ac.state == connectivity.Idle {
824		ac.addrs = addrs
825		return true
826	}
827
828	if ac.state == connectivity.Connecting {
829		return false
830	}
831
832	// ac.state is Ready, try to find the connected address.
833	var curAddrFound bool
834	for _, a := range addrs {
835		if reflect.DeepEqual(ac.curAddr, a) {
836			curAddrFound = true
837			break
838		}
839	}
840	channelz.Infof(ac.channelzID, "addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
841	if curAddrFound {
842		ac.addrs = addrs
843	}
844
845	return curAddrFound
846}
847
848// GetMethodConfig gets the method config of the input method.
849// If there's an exact match for input method (i.e. /service/method), we return
850// the corresponding MethodConfig.
851// If there isn't an exact match for the input method, we look for the default config
852// under the service (i.e /service/). If there is a default MethodConfig for
853// the service, we return it.
854// Otherwise, we return an empty MethodConfig.
855func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
856	// TODO: Avoid the locking here.
857	cc.mu.RLock()
858	defer cc.mu.RUnlock()
859	if cc.sc == nil {
860		return MethodConfig{}
861	}
862	m, ok := cc.sc.Methods[method]
863	if !ok {
864		i := strings.LastIndex(method, "/")
865		m = cc.sc.Methods[method[:i+1]]
866	}
867	return m
868}
869
870func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
871	cc.mu.RLock()
872	defer cc.mu.RUnlock()
873	if cc.sc == nil {
874		return nil
875	}
876	return cc.sc.healthCheckConfig
877}
878
879func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
880	t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
881		Ctx:            ctx,
882		FullMethodName: method,
883	})
884	if err != nil {
885		return nil, nil, toRPCErr(err)
886	}
887	return t, done, nil
888}
889
890func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, addrs []resolver.Address) {
891	if sc == nil {
892		// should never reach here.
893		return
894	}
895	cc.sc = sc
896
897	if cc.sc.retryThrottling != nil {
898		newThrottler := &retryThrottler{
899			tokens: cc.sc.retryThrottling.MaxTokens,
900			max:    cc.sc.retryThrottling.MaxTokens,
901			thresh: cc.sc.retryThrottling.MaxTokens / 2,
902			ratio:  cc.sc.retryThrottling.TokenRatio,
903		}
904		cc.retryThrottler.Store(newThrottler)
905	} else {
906		cc.retryThrottler.Store((*retryThrottler)(nil))
907	}
908
909	if cc.dopts.balancerBuilder == nil {
910		// Only look at balancer types and switch balancer if balancer dial
911		// option is not set.
912		var newBalancerName string
913		if cc.sc != nil && cc.sc.lbConfig != nil {
914			newBalancerName = cc.sc.lbConfig.name
915		} else {
916			var isGRPCLB bool
917			for _, a := range addrs {
918				if a.Type == resolver.GRPCLB {
919					isGRPCLB = true
920					break
921				}
922			}
923			if isGRPCLB {
924				newBalancerName = grpclbName
925			} else if cc.sc != nil && cc.sc.LB != nil {
926				newBalancerName = *cc.sc.LB
927			} else {
928				newBalancerName = PickFirstBalancerName
929			}
930		}
931		cc.switchBalancer(newBalancerName)
932	} else if cc.balancerWrapper == nil {
933		// Balancer dial option was set, and this is the first time handling
934		// resolved addresses. Build a balancer with dopts.balancerBuilder.
935		cc.curBalancerName = cc.dopts.balancerBuilder.Name()
936		cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
937	}
938}
939
940func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) {
941	cc.mu.RLock()
942	r := cc.resolverWrapper
943	cc.mu.RUnlock()
944	if r == nil {
945		return
946	}
947	go r.resolveNow(o)
948}
949
950// ResetConnectBackoff wakes up all subchannels in transient failure and causes
951// them to attempt another connection immediately.  It also resets the backoff
952// times used for subsequent attempts regardless of the current state.
953//
954// In general, this function should not be used.  Typical service or network
955// outages result in a reasonable client reconnection strategy by default.
956// However, if a previously unavailable network becomes available, this may be
957// used to trigger an immediate reconnect.
958//
959// This API is EXPERIMENTAL.
960func (cc *ClientConn) ResetConnectBackoff() {
961	cc.mu.Lock()
962	conns := cc.conns
963	cc.mu.Unlock()
964	for ac := range conns {
965		ac.resetConnectBackoff()
966	}
967}
968
969// Close tears down the ClientConn and all underlying connections.
970func (cc *ClientConn) Close() error {
971	defer cc.cancel()
972
973	cc.mu.Lock()
974	if cc.conns == nil {
975		cc.mu.Unlock()
976		return ErrClientConnClosing
977	}
978	conns := cc.conns
979	cc.conns = nil
980	cc.csMgr.updateState(connectivity.Shutdown)
981
982	rWrapper := cc.resolverWrapper
983	cc.resolverWrapper = nil
984	bWrapper := cc.balancerWrapper
985	cc.balancerWrapper = nil
986	cc.mu.Unlock()
987
988	cc.blockingpicker.close()
989
990	if rWrapper != nil {
991		rWrapper.close()
992	}
993	if bWrapper != nil {
994		bWrapper.close()
995	}
996
997	for ac := range conns {
998		ac.tearDown(ErrClientConnClosing)
999	}
1000	if channelz.IsOn() {
1001		ted := &channelz.TraceEventDesc{
1002			Desc:     "Channel Deleted",
1003			Severity: channelz.CtINFO,
1004		}
1005		if cc.dopts.channelzParentID != 0 {
1006			ted.Parent = &channelz.TraceEventDesc{
1007				Desc:     fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID),
1008				Severity: channelz.CtINFO,
1009			}
1010		}
1011		channelz.AddTraceEvent(cc.channelzID, 0, ted)
1012		// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
1013		// the entity being deleted, and thus prevent it from being deleted right away.
1014		channelz.RemoveEntry(cc.channelzID)
1015	}
1016	return nil
1017}
1018
1019// addrConn is a network connection to a given address.
1020type addrConn struct {
1021	ctx    context.Context
1022	cancel context.CancelFunc
1023
1024	cc     *ClientConn
1025	dopts  dialOptions
1026	acbw   balancer.SubConn
1027	scopts balancer.NewSubConnOptions
1028
1029	// transport is set when there's a viable transport (note: ac state may not be READY as LB channel
1030	// health checking may require server to report healthy to set ac to READY), and is reset
1031	// to nil when the current transport should no longer be used to create a stream (e.g. after GoAway
1032	// is received, transport is closed, ac has been torn down).
1033	transport transport.ClientTransport // The current transport.
1034
1035	mu      sync.Mutex
1036	curAddr resolver.Address   // The current address.
1037	addrs   []resolver.Address // All addresses that the resolver resolved to.
1038
1039	// Use updateConnectivityState for updating addrConn's connectivity state.
1040	state connectivity.State
1041
1042	backoffIdx   int // Needs to be stateful for resetConnectBackoff.
1043	resetBackoff chan struct{}
1044
1045	channelzID int64 // channelz unique identification number.
1046	czData     *channelzData
1047}
1048
1049// Note: this requires a lock on ac.mu.
1050func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) {
1051	if ac.state == s {
1052		return
1053	}
1054	ac.state = s
1055	channelz.Infof(ac.channelzID, "Subchannel Connectivity change to %v", s)
1056	ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr)
1057}
1058
1059// adjustParams updates parameters used to create transports upon
1060// receiving a GoAway.
1061func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
1062	switch r {
1063	case transport.GoAwayTooManyPings:
1064		v := 2 * ac.dopts.copts.KeepaliveParams.Time
1065		ac.cc.mu.Lock()
1066		if v > ac.cc.mkp.Time {
1067			ac.cc.mkp.Time = v
1068		}
1069		ac.cc.mu.Unlock()
1070	}
1071}
1072
1073func (ac *addrConn) resetTransport() {
1074	for i := 0; ; i++ {
1075		if i > 0 {
1076			ac.cc.resolveNow(resolver.ResolveNowOptions{})
1077		}
1078
1079		ac.mu.Lock()
1080		if ac.state == connectivity.Shutdown {
1081			ac.mu.Unlock()
1082			return
1083		}
1084
1085		addrs := ac.addrs
1086		backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
1087		// This will be the duration that dial gets to finish.
1088		dialDuration := minConnectTimeout
1089		if ac.dopts.minConnectTimeout != nil {
1090			dialDuration = ac.dopts.minConnectTimeout()
1091		}
1092
1093		if dialDuration < backoffFor {
1094			// Give dial more time as we keep failing to connect.
1095			dialDuration = backoffFor
1096		}
1097		// We can potentially spend all the time trying the first address, and
1098		// if the server accepts the connection and then hangs, the following
1099		// addresses will never be tried.
1100		//
1101		// The spec doesn't mention what should be done for multiple addresses.
1102		// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm
1103		connectDeadline := time.Now().Add(dialDuration)
1104
1105		ac.updateConnectivityState(connectivity.Connecting, nil)
1106		ac.transport = nil
1107		ac.mu.Unlock()
1108
1109		newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline)
1110		if err != nil {
1111			// After exhausting all addresses, the addrConn enters
1112			// TRANSIENT_FAILURE.
1113			ac.mu.Lock()
1114			if ac.state == connectivity.Shutdown {
1115				ac.mu.Unlock()
1116				return
1117			}
1118			ac.updateConnectivityState(connectivity.TransientFailure, err)
1119
1120			// Backoff.
1121			b := ac.resetBackoff
1122			ac.mu.Unlock()
1123
1124			timer := time.NewTimer(backoffFor)
1125			select {
1126			case <-timer.C:
1127				ac.mu.Lock()
1128				ac.backoffIdx++
1129				ac.mu.Unlock()
1130			case <-b:
1131				timer.Stop()
1132			case <-ac.ctx.Done():
1133				timer.Stop()
1134				return
1135			}
1136			continue
1137		}
1138
1139		ac.mu.Lock()
1140		if ac.state == connectivity.Shutdown {
1141			ac.mu.Unlock()
1142			newTr.Close()
1143			return
1144		}
1145		ac.curAddr = addr
1146		ac.transport = newTr
1147		ac.backoffIdx = 0
1148
1149		hctx, hcancel := context.WithCancel(ac.ctx)
1150		ac.startHealthCheck(hctx)
1151		ac.mu.Unlock()
1152
1153		// Block until the created transport is down. And when this happens,
1154		// we restart from the top of the addr list.
1155		<-reconnect.Done()
1156		hcancel()
1157		// restart connecting - the top of the loop will set state to
1158		// CONNECTING.  This is against the current connectivity semantics doc,
1159		// however it allows for graceful behavior for RPCs not yet dispatched
1160		// - unfortunate timing would otherwise lead to the RPC failing even
1161		// though the TRANSIENT_FAILURE state (called for by the doc) would be
1162		// instantaneous.
1163		//
1164		// Ideally we should transition to Idle here and block until there is
1165		// RPC activity that leads to the balancer requesting a reconnect of
1166		// the associated SubConn.
1167	}
1168}
1169
1170// tryAllAddrs tries to creates a connection to the addresses, and stop when at the
1171// first successful one. It returns the transport, the address and a Event in
1172// the successful case. The Event fires when the returned transport disconnects.
1173func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) (transport.ClientTransport, resolver.Address, *grpcsync.Event, error) {
1174	var firstConnErr error
1175	for _, addr := range addrs {
1176		ac.mu.Lock()
1177		if ac.state == connectivity.Shutdown {
1178			ac.mu.Unlock()
1179			return nil, resolver.Address{}, nil, errConnClosing
1180		}
1181
1182		ac.cc.mu.RLock()
1183		ac.dopts.copts.KeepaliveParams = ac.cc.mkp
1184		ac.cc.mu.RUnlock()
1185
1186		copts := ac.dopts.copts
1187		if ac.scopts.CredsBundle != nil {
1188			copts.CredsBundle = ac.scopts.CredsBundle
1189		}
1190		ac.mu.Unlock()
1191
1192		channelz.Infof(ac.channelzID, "Subchannel picks a new address %q to connect", addr.Addr)
1193
1194		newTr, reconnect, err := ac.createTransport(addr, copts, connectDeadline)
1195		if err == nil {
1196			return newTr, addr, reconnect, nil
1197		}
1198		if firstConnErr == nil {
1199			firstConnErr = err
1200		}
1201		ac.cc.blockingpicker.updateConnectionError(err)
1202	}
1203
1204	// Couldn't connect to any address.
1205	return nil, resolver.Address{}, nil, firstConnErr
1206}
1207
1208// createTransport creates a connection to addr. It returns the transport and a
1209// Event in the successful case. The Event fires when the returned transport
1210// disconnects.
1211func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) (transport.ClientTransport, *grpcsync.Event, error) {
1212	prefaceReceived := make(chan struct{})
1213	onCloseCalled := make(chan struct{})
1214	reconnect := grpcsync.NewEvent()
1215
1216	authority := ac.cc.authority
1217	// addr.ServerName takes precedent over ClientConn authority, if present.
1218	if addr.ServerName != "" {
1219		authority = addr.ServerName
1220	}
1221
1222	target := transport.TargetInfo{
1223		Addr:      addr.Addr,
1224		Metadata:  addr.Metadata,
1225		Authority: authority,
1226	}
1227
1228	once := sync.Once{}
1229	onGoAway := func(r transport.GoAwayReason) {
1230		ac.mu.Lock()
1231		ac.adjustParams(r)
1232		once.Do(func() {
1233			if ac.state == connectivity.Ready {
1234				// Prevent this SubConn from being used for new RPCs by setting its
1235				// state to Connecting.
1236				//
1237				// TODO: this should be Idle when grpc-go properly supports it.
1238				ac.updateConnectivityState(connectivity.Connecting, nil)
1239			}
1240		})
1241		ac.mu.Unlock()
1242		reconnect.Fire()
1243	}
1244
1245	onClose := func() {
1246		ac.mu.Lock()
1247		once.Do(func() {
1248			if ac.state == connectivity.Ready {
1249				// Prevent this SubConn from being used for new RPCs by setting its
1250				// state to Connecting.
1251				//
1252				// TODO: this should be Idle when grpc-go properly supports it.
1253				ac.updateConnectivityState(connectivity.Connecting, nil)
1254			}
1255		})
1256		ac.mu.Unlock()
1257		close(onCloseCalled)
1258		reconnect.Fire()
1259	}
1260
1261	onPrefaceReceipt := func() {
1262		close(prefaceReceived)
1263	}
1264
1265	connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
1266	defer cancel()
1267	if channelz.IsOn() {
1268		copts.ChannelzParentID = ac.channelzID
1269	}
1270
1271	newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)
1272	if err != nil {
1273		// newTr is either nil, or closed.
1274		channelz.Warningf(ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v. Err: %v. Reconnecting...", addr, err)
1275		return nil, nil, err
1276	}
1277
1278	select {
1279	case <-time.After(time.Until(connectDeadline)):
1280		// We didn't get the preface in time.
1281		newTr.Close()
1282		channelz.Warningf(ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
1283		return nil, nil, errors.New("timed out waiting for server handshake")
1284	case <-prefaceReceived:
1285		// We got the preface - huzzah! things are good.
1286	case <-onCloseCalled:
1287		// The transport has already closed - noop.
1288		return nil, nil, errors.New("connection closed")
1289		// TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.
1290	}
1291	return newTr, reconnect, nil
1292}
1293
1294// startHealthCheck starts the health checking stream (RPC) to watch the health
1295// stats of this connection if health checking is requested and configured.
1296//
1297// LB channel health checking is enabled when all requirements below are met:
1298// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption
1299// 2. internal.HealthCheckFunc is set by importing the grpc/healthcheck package
1300// 3. a service config with non-empty healthCheckConfig field is provided
1301// 4. the load balancer requests it
1302//
1303// It sets addrConn to READY if the health checking stream is not started.
1304//
1305// Caller must hold ac.mu.
1306func (ac *addrConn) startHealthCheck(ctx context.Context) {
1307	var healthcheckManagingState bool
1308	defer func() {
1309		if !healthcheckManagingState {
1310			ac.updateConnectivityState(connectivity.Ready, nil)
1311		}
1312	}()
1313
1314	if ac.cc.dopts.disableHealthCheck {
1315		return
1316	}
1317	healthCheckConfig := ac.cc.healthCheckConfig()
1318	if healthCheckConfig == nil {
1319		return
1320	}
1321	if !ac.scopts.HealthCheckEnabled {
1322		return
1323	}
1324	healthCheckFunc := ac.cc.dopts.healthCheckFunc
1325	if healthCheckFunc == nil {
1326		// The health package is not imported to set health check function.
1327		//
1328		// TODO: add a link to the health check doc in the error message.
1329		channelz.Error(ac.channelzID, "Health check is requested but health check function is not set.")
1330		return
1331	}
1332
1333	healthcheckManagingState = true
1334
1335	// Set up the health check helper functions.
1336	currentTr := ac.transport
1337	newStream := func(method string) (interface{}, error) {
1338		ac.mu.Lock()
1339		if ac.transport != currentTr {
1340			ac.mu.Unlock()
1341			return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
1342		}
1343		ac.mu.Unlock()
1344		return newNonRetryClientStream(ctx, &StreamDesc{ServerStreams: true}, method, currentTr, ac)
1345	}
1346	setConnectivityState := func(s connectivity.State, lastErr error) {
1347		ac.mu.Lock()
1348		defer ac.mu.Unlock()
1349		if ac.transport != currentTr {
1350			return
1351		}
1352		ac.updateConnectivityState(s, lastErr)
1353	}
1354	// Start the health checking stream.
1355	go func() {
1356		err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName)
1357		if err != nil {
1358			if status.Code(err) == codes.Unimplemented {
1359				channelz.Error(ac.channelzID, "Subchannel health check is unimplemented at server side, thus health check is disabled")
1360			} else {
1361				channelz.Errorf(ac.channelzID, "HealthCheckFunc exits with unexpected error %v", err)
1362			}
1363		}
1364	}()
1365}
1366
1367func (ac *addrConn) resetConnectBackoff() {
1368	ac.mu.Lock()
1369	close(ac.resetBackoff)
1370	ac.backoffIdx = 0
1371	ac.resetBackoff = make(chan struct{})
1372	ac.mu.Unlock()
1373}
1374
1375// getReadyTransport returns the transport if ac's state is READY.
1376// Otherwise it returns nil, false.
1377// If ac's state is IDLE, it will trigger ac to connect.
1378func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
1379	ac.mu.Lock()
1380	if ac.state == connectivity.Ready && ac.transport != nil {
1381		t := ac.transport
1382		ac.mu.Unlock()
1383		return t, true
1384	}
1385	var idle bool
1386	if ac.state == connectivity.Idle {
1387		idle = true
1388	}
1389	ac.mu.Unlock()
1390	// Trigger idle ac to connect.
1391	if idle {
1392		ac.connect()
1393	}
1394	return nil, false
1395}
1396
1397// tearDown starts to tear down the addrConn.
1398// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
1399// some edge cases (e.g., the caller opens and closes many addrConn's in a
1400// tight loop.
1401// tearDown doesn't remove ac from ac.cc.conns.
1402func (ac *addrConn) tearDown(err error) {
1403	ac.mu.Lock()
1404	if ac.state == connectivity.Shutdown {
1405		ac.mu.Unlock()
1406		return
1407	}
1408	curTr := ac.transport
1409	ac.transport = nil
1410	// We have to set the state to Shutdown before anything else to prevent races
1411	// between setting the state and logic that waits on context cancellation / etc.
1412	ac.updateConnectivityState(connectivity.Shutdown, nil)
1413	ac.cancel()
1414	ac.curAddr = resolver.Address{}
1415	if err == errConnDrain && curTr != nil {
1416		// GracefulClose(...) may be executed multiple times when
1417		// i) receiving multiple GoAway frames from the server; or
1418		// ii) there are concurrent name resolver/Balancer triggered
1419		// address removal and GoAway.
1420		// We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu.
1421		ac.mu.Unlock()
1422		curTr.GracefulClose()
1423		ac.mu.Lock()
1424	}
1425	if channelz.IsOn() {
1426		channelz.AddTraceEvent(ac.channelzID, 0, &channelz.TraceEventDesc{
1427			Desc:     "Subchannel Deleted",
1428			Severity: channelz.CtINFO,
1429			Parent: &channelz.TraceEventDesc{
1430				Desc:     fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID),
1431				Severity: channelz.CtINFO,
1432			},
1433		})
1434		// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
1435		// the entity being deleted, and thus prevent it from being deleted right away.
1436		channelz.RemoveEntry(ac.channelzID)
1437	}
1438	ac.mu.Unlock()
1439}
1440
1441func (ac *addrConn) getState() connectivity.State {
1442	ac.mu.Lock()
1443	defer ac.mu.Unlock()
1444	return ac.state
1445}
1446
1447func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric {
1448	ac.mu.Lock()
1449	addr := ac.curAddr.Addr
1450	ac.mu.Unlock()
1451	return &channelz.ChannelInternalMetric{
1452		State:                    ac.getState(),
1453		Target:                   addr,
1454		CallsStarted:             atomic.LoadInt64(&ac.czData.callsStarted),
1455		CallsSucceeded:           atomic.LoadInt64(&ac.czData.callsSucceeded),
1456		CallsFailed:              atomic.LoadInt64(&ac.czData.callsFailed),
1457		LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)),
1458	}
1459}
1460
1461func (ac *addrConn) incrCallsStarted() {
1462	atomic.AddInt64(&ac.czData.callsStarted, 1)
1463	atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano())
1464}
1465
1466func (ac *addrConn) incrCallsSucceeded() {
1467	atomic.AddInt64(&ac.czData.callsSucceeded, 1)
1468}
1469
1470func (ac *addrConn) incrCallsFailed() {
1471	atomic.AddInt64(&ac.czData.callsFailed, 1)
1472}
1473
1474type retryThrottler struct {
1475	max    float64
1476	thresh float64
1477	ratio  float64
1478
1479	mu     sync.Mutex
1480	tokens float64 // TODO(dfawley): replace with atomic and remove lock.
1481}
1482
1483// throttle subtracts a retry token from the pool and returns whether a retry
1484// should be throttled (disallowed) based upon the retry throttling policy in
1485// the service config.
1486func (rt *retryThrottler) throttle() bool {
1487	if rt == nil {
1488		return false
1489	}
1490	rt.mu.Lock()
1491	defer rt.mu.Unlock()
1492	rt.tokens--
1493	if rt.tokens < 0 {
1494		rt.tokens = 0
1495	}
1496	return rt.tokens <= rt.thresh
1497}
1498
1499func (rt *retryThrottler) successfulRPC() {
1500	if rt == nil {
1501		return
1502	}
1503	rt.mu.Lock()
1504	defer rt.mu.Unlock()
1505	rt.tokens += rt.ratio
1506	if rt.tokens > rt.max {
1507		rt.tokens = rt.max
1508	}
1509}
1510
1511type channelzChannel struct {
1512	cc *ClientConn
1513}
1514
1515func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric {
1516	return c.cc.channelzMetric()
1517}
1518
1519// ErrClientConnTimeout indicates that the ClientConn cannot establish the
1520// underlying connections within the specified timeout.
1521//
1522// Deprecated: This error is never returned by grpc and should not be
1523// referenced by users.
1524var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
1525
1526func (cc *ClientConn) getResolver(scheme string) resolver.Builder {
1527	for _, rb := range cc.dopts.resolvers {
1528		if cc.parsedTarget.Scheme == rb.Scheme() {
1529			return rb
1530		}
1531	}
1532	return resolver.Get(cc.parsedTarget.Scheme)
1533}
1534