1/*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19//go:generate ./regenerate.sh
20
21// Package grpclb defines a grpclb balancer.
22//
23// To install grpclb balancer, import this package as:
24//    import _ "google.golang.org/grpc/balancer/grpclb"
25package grpclb
26
27import (
28	"context"
29	"errors"
30	"strconv"
31	"sync"
32	"time"
33
34	durationpb "github.com/golang/protobuf/ptypes/duration"
35	"google.golang.org/grpc"
36	"google.golang.org/grpc/balancer"
37	lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
38	"google.golang.org/grpc/connectivity"
39	"google.golang.org/grpc/credentials"
40	"google.golang.org/grpc/grpclog"
41	"google.golang.org/grpc/internal"
42	"google.golang.org/grpc/internal/backoff"
43	"google.golang.org/grpc/resolver"
44)
45
46const (
47	lbTokeyKey             = "lb-token"
48	defaultFallbackTimeout = 10 * time.Second
49	grpclbName             = "grpclb"
50)
51
52var (
53	// defaultBackoffConfig configures the backoff strategy that's used when the
54	// init handshake in the RPC is unsuccessful. It's not for the clientconn
55	// reconnect backoff.
56	//
57	// It has the same value as the default grpc.DefaultBackoffConfig.
58	//
59	// TODO: make backoff configurable.
60	defaultBackoffConfig = backoff.Exponential{
61		MaxDelay: 120 * time.Second,
62	}
63	errServerTerminatedConnection = errors.New("grpclb: failed to recv server list: server terminated connection")
64)
65
66func convertDuration(d *durationpb.Duration) time.Duration {
67	if d == nil {
68		return 0
69	}
70	return time.Duration(d.Seconds)*time.Second + time.Duration(d.Nanos)*time.Nanosecond
71}
72
73// Client API for LoadBalancer service.
74// Mostly copied from generated pb.go file.
75// To avoid circular dependency.
76type loadBalancerClient struct {
77	cc *grpc.ClientConn
78}
79
80func (c *loadBalancerClient) BalanceLoad(ctx context.Context, opts ...grpc.CallOption) (*balanceLoadClientStream, error) {
81	desc := &grpc.StreamDesc{
82		StreamName:    "BalanceLoad",
83		ServerStreams: true,
84		ClientStreams: true,
85	}
86	stream, err := c.cc.NewStream(ctx, desc, "/grpc.lb.v1.LoadBalancer/BalanceLoad", opts...)
87	if err != nil {
88		return nil, err
89	}
90	x := &balanceLoadClientStream{stream}
91	return x, nil
92}
93
94type balanceLoadClientStream struct {
95	grpc.ClientStream
96}
97
98func (x *balanceLoadClientStream) Send(m *lbpb.LoadBalanceRequest) error {
99	return x.ClientStream.SendMsg(m)
100}
101
102func (x *balanceLoadClientStream) Recv() (*lbpb.LoadBalanceResponse, error) {
103	m := new(lbpb.LoadBalanceResponse)
104	if err := x.ClientStream.RecvMsg(m); err != nil {
105		return nil, err
106	}
107	return m, nil
108}
109
110func init() {
111	balancer.Register(newLBBuilder())
112}
113
114// newLBBuilder creates a builder for grpclb.
115func newLBBuilder() balancer.Builder {
116	return newLBBuilderWithFallbackTimeout(defaultFallbackTimeout)
117}
118
119// newLBBuilderWithFallbackTimeout creates a grpclb builder with the given
120// fallbackTimeout. If no response is received from the remote balancer within
121// fallbackTimeout, the backend addresses from the resolved address list will be
122// used.
123//
124// Only call this function when a non-default fallback timeout is needed.
125func newLBBuilderWithFallbackTimeout(fallbackTimeout time.Duration) balancer.Builder {
126	return &lbBuilder{
127		fallbackTimeout: fallbackTimeout,
128	}
129}
130
131type lbBuilder struct {
132	fallbackTimeout time.Duration
133}
134
135func (b *lbBuilder) Name() string {
136	return grpclbName
137}
138
139func (b *lbBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
140	// This generates a manual resolver builder with a random scheme. This
141	// scheme will be used to dial to remote LB, so we can send filtered address
142	// updates to remote LB ClientConn using this manual resolver.
143	scheme := "grpclb_internal_" + strconv.FormatInt(time.Now().UnixNano(), 36)
144	r := &lbManualResolver{scheme: scheme, ccb: cc}
145
146	lb := &lbBalancer{
147		cc:              newLBCacheClientConn(cc),
148		target:          opt.Target.Endpoint,
149		opt:             opt,
150		fallbackTimeout: b.fallbackTimeout,
151		doneCh:          make(chan struct{}),
152
153		manualResolver: r,
154		subConns:       make(map[resolver.Address]balancer.SubConn),
155		scStates:       make(map[balancer.SubConn]connectivity.State),
156		picker:         &errPicker{err: balancer.ErrNoSubConnAvailable},
157		clientStats:    newRPCStats(),
158		backoff:        defaultBackoffConfig, // TODO: make backoff configurable.
159	}
160
161	var err error
162	if opt.CredsBundle != nil {
163		lb.grpclbClientConnCreds, err = opt.CredsBundle.NewWithMode(internal.CredsBundleModeBalancer)
164		if err != nil {
165			grpclog.Warningf("lbBalancer: client connection creds NewWithMode failed: %v", err)
166		}
167		lb.grpclbBackendCreds, err = opt.CredsBundle.NewWithMode(internal.CredsBundleModeBackendFromBalancer)
168		if err != nil {
169			grpclog.Warningf("lbBalancer: backend creds NewWithMode failed: %v", err)
170		}
171	}
172
173	return lb
174}
175
176type lbBalancer struct {
177	cc     *lbCacheClientConn
178	target string
179	opt    balancer.BuildOptions
180
181	usePickFirst bool
182
183	// grpclbClientConnCreds is the creds bundle to be used to connect to grpclb
184	// servers. If it's nil, use the TransportCredentials from BuildOptions
185	// instead.
186	grpclbClientConnCreds credentials.Bundle
187	// grpclbBackendCreds is the creds bundle to be used for addresses that are
188	// returned by grpclb server. If it's nil, don't set anything when creating
189	// SubConns.
190	grpclbBackendCreds credentials.Bundle
191
192	fallbackTimeout time.Duration
193	doneCh          chan struct{}
194
195	// manualResolver is used in the remote LB ClientConn inside grpclb. When
196	// resolved address updates are received by grpclb, filtered updates will be
197	// send to remote LB ClientConn through this resolver.
198	manualResolver *lbManualResolver
199	// The ClientConn to talk to the remote balancer.
200	ccRemoteLB *grpc.ClientConn
201	// backoff for calling remote balancer.
202	backoff backoff.Strategy
203
204	// Support client side load reporting. Each picker gets a reference to this,
205	// and will update its content.
206	clientStats *rpcStats
207
208	mu sync.Mutex // guards everything following.
209	// The full server list including drops, used to check if the newly received
210	// serverList contains anything new. Each generate picker will also have
211	// reference to this list to do the first layer pick.
212	fullServerList []*lbpb.Server
213	// Backend addresses. It's kept so the addresses are available when
214	// switching between round_robin and pickfirst.
215	backendAddrs []resolver.Address
216	// All backends addresses, with metadata set to nil. This list contains all
217	// backend addresses in the same order and with the same duplicates as in
218	// serverlist. When generating picker, a SubConn slice with the same order
219	// but with only READY SCs will be gerenated.
220	backendAddrsWithoutMetadata []resolver.Address
221	// Roundrobin functionalities.
222	state    connectivity.State
223	subConns map[resolver.Address]balancer.SubConn   // Used to new/remove SubConn.
224	scStates map[balancer.SubConn]connectivity.State // Used to filter READY SubConns.
225	picker   balancer.Picker
226	// Support fallback to resolved backend addresses if there's no response
227	// from remote balancer within fallbackTimeout.
228	remoteBalancerConnected bool
229	serverListReceived      bool
230	inFallback              bool
231	// resolvedBackendAddrs is resolvedAddrs minus remote balancers. It's set
232	// when resolved address updates are received, and read in the goroutine
233	// handling fallback.
234	resolvedBackendAddrs []resolver.Address
235}
236
237// regeneratePicker takes a snapshot of the balancer, and generates a picker from
238// it. The picker
239//  - always returns ErrTransientFailure if the balancer is in TransientFailure,
240//  - does two layer roundrobin pick otherwise.
241// Caller must hold lb.mu.
242func (lb *lbBalancer) regeneratePicker(resetDrop bool) {
243	if lb.state == connectivity.TransientFailure {
244		lb.picker = &errPicker{err: balancer.ErrTransientFailure}
245		return
246	}
247
248	if lb.state == connectivity.Connecting {
249		lb.picker = &errPicker{err: balancer.ErrNoSubConnAvailable}
250		return
251	}
252
253	var readySCs []balancer.SubConn
254	if lb.usePickFirst {
255		for _, sc := range lb.subConns {
256			readySCs = append(readySCs, sc)
257			break
258		}
259	} else {
260		for _, a := range lb.backendAddrsWithoutMetadata {
261			if sc, ok := lb.subConns[a]; ok {
262				if st, ok := lb.scStates[sc]; ok && st == connectivity.Ready {
263					readySCs = append(readySCs, sc)
264				}
265			}
266		}
267	}
268
269	if len(readySCs) <= 0 {
270		// If there's no ready SubConns, always re-pick. This is to avoid drops
271		// unless at least one SubConn is ready. Otherwise we may drop more
272		// often than want because of drops + re-picks(which become re-drops).
273		//
274		// This doesn't seem to be necessary after the connecting check above.
275		// Kept for safety.
276		lb.picker = &errPicker{err: balancer.ErrNoSubConnAvailable}
277		return
278	}
279	if lb.inFallback {
280		lb.picker = newRRPicker(readySCs)
281		return
282	}
283	if resetDrop {
284		lb.picker = newLBPicker(lb.fullServerList, readySCs, lb.clientStats)
285		return
286	}
287	prevLBPicker, ok := lb.picker.(*lbPicker)
288	if !ok {
289		lb.picker = newLBPicker(lb.fullServerList, readySCs, lb.clientStats)
290		return
291	}
292	prevLBPicker.updateReadySCs(readySCs)
293}
294
295// aggregateSubConnStats calculate the aggregated state of SubConns in
296// lb.SubConns. These SubConns are subconns in use (when switching between
297// fallback and grpclb). lb.scState contains states for all SubConns, including
298// those in cache (SubConns are cached for 10 seconds after remove).
299//
300// The aggregated state is:
301//  - If at least one SubConn in Ready, the aggregated state is Ready;
302//  - Else if at least one SubConn in Connecting, the aggregated state is Connecting;
303//  - Else the aggregated state is TransientFailure.
304func (lb *lbBalancer) aggregateSubConnStates() connectivity.State {
305	var numConnecting uint64
306
307	for _, sc := range lb.subConns {
308		if state, ok := lb.scStates[sc]; ok {
309			switch state {
310			case connectivity.Ready:
311				return connectivity.Ready
312			case connectivity.Connecting:
313				numConnecting++
314			}
315		}
316	}
317	if numConnecting > 0 {
318		return connectivity.Connecting
319	}
320	return connectivity.TransientFailure
321}
322
323func (lb *lbBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
324	panic("not used")
325}
326
327func (lb *lbBalancer) UpdateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) {
328	s := scs.ConnectivityState
329	if grpclog.V(2) {
330		grpclog.Infof("lbBalancer: handle SubConn state change: %p, %v", sc, s)
331	}
332	lb.mu.Lock()
333	defer lb.mu.Unlock()
334
335	oldS, ok := lb.scStates[sc]
336	if !ok {
337		if grpclog.V(2) {
338			grpclog.Infof("lbBalancer: got state changes for an unknown SubConn: %p, %v", sc, s)
339		}
340		return
341	}
342	lb.scStates[sc] = s
343	switch s {
344	case connectivity.Idle:
345		sc.Connect()
346	case connectivity.Shutdown:
347		// When an address was removed by resolver, b called RemoveSubConn but
348		// kept the sc's state in scStates. Remove state for this sc here.
349		delete(lb.scStates, sc)
350	}
351	// Force regenerate picker if
352	//  - this sc became ready from not-ready
353	//  - this sc became not-ready from ready
354	lb.updateStateAndPicker((oldS == connectivity.Ready) != (s == connectivity.Ready), false)
355
356	// Enter fallback when the aggregated state is not Ready and the connection
357	// to remote balancer is lost.
358	if lb.state != connectivity.Ready {
359		if !lb.inFallback && !lb.remoteBalancerConnected {
360			// Enter fallback.
361			lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst)
362		}
363	}
364}
365
366// updateStateAndPicker re-calculate the aggregated state, and regenerate picker
367// if overall state is changed.
368//
369// If forceRegeneratePicker is true, picker will be regenerated.
370func (lb *lbBalancer) updateStateAndPicker(forceRegeneratePicker bool, resetDrop bool) {
371	oldAggrState := lb.state
372	lb.state = lb.aggregateSubConnStates()
373	// Regenerate picker when one of the following happens:
374	//  - caller wants to regenerate
375	//  - the aggregated state changed
376	if forceRegeneratePicker || (lb.state != oldAggrState) {
377		lb.regeneratePicker(resetDrop)
378	}
379
380	lb.cc.UpdateBalancerState(lb.state, lb.picker)
381}
382
383// fallbackToBackendsAfter blocks for fallbackTimeout and falls back to use
384// resolved backends (backends received from resolver, not from remote balancer)
385// if no connection to remote balancers was successful.
386func (lb *lbBalancer) fallbackToBackendsAfter(fallbackTimeout time.Duration) {
387	timer := time.NewTimer(fallbackTimeout)
388	defer timer.Stop()
389	select {
390	case <-timer.C:
391	case <-lb.doneCh:
392		return
393	}
394	lb.mu.Lock()
395	if lb.inFallback || lb.serverListReceived {
396		lb.mu.Unlock()
397		return
398	}
399	// Enter fallback.
400	lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst)
401	lb.mu.Unlock()
402}
403
404// HandleResolvedAddrs sends the updated remoteLB addresses to remoteLB
405// clientConn. The remoteLB clientConn will handle creating/removing remoteLB
406// connections.
407func (lb *lbBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
408	panic("not used")
409}
410
411func (lb *lbBalancer) handleServiceConfig(gc *grpclbServiceConfig) {
412	lb.mu.Lock()
413	defer lb.mu.Unlock()
414
415	newUsePickFirst := childIsPickFirst(gc)
416	if lb.usePickFirst == newUsePickFirst {
417		return
418	}
419	if grpclog.V(2) {
420		grpclog.Infof("lbBalancer: switching mode, new usePickFirst: %+v", newUsePickFirst)
421	}
422	lb.refreshSubConns(lb.backendAddrs, lb.inFallback, newUsePickFirst)
423}
424
425func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) {
426	if grpclog.V(2) {
427		grpclog.Infof("lbBalancer: UpdateClientConnState: %+v", ccs)
428	}
429	gc, _ := ccs.BalancerConfig.(*grpclbServiceConfig)
430	lb.handleServiceConfig(gc)
431
432	addrs := ccs.ResolverState.Addresses
433	if len(addrs) <= 0 {
434		return
435	}
436
437	var remoteBalancerAddrs, backendAddrs []resolver.Address
438	for _, a := range addrs {
439		if a.Type == resolver.GRPCLB {
440			a.Type = resolver.Backend
441			remoteBalancerAddrs = append(remoteBalancerAddrs, a)
442		} else {
443			backendAddrs = append(backendAddrs, a)
444		}
445	}
446
447	if lb.ccRemoteLB == nil {
448		if len(remoteBalancerAddrs) <= 0 {
449			grpclog.Errorf("grpclb: no remote balancer address is available, should never happen")
450			return
451		}
452		// First time receiving resolved addresses, create a cc to remote
453		// balancers.
454		lb.dialRemoteLB(remoteBalancerAddrs[0].ServerName)
455		// Start the fallback goroutine.
456		go lb.fallbackToBackendsAfter(lb.fallbackTimeout)
457	}
458
459	// cc to remote balancers uses lb.manualResolver. Send the updated remote
460	// balancer addresses to it through manualResolver.
461	lb.manualResolver.UpdateState(resolver.State{Addresses: remoteBalancerAddrs})
462
463	lb.mu.Lock()
464	lb.resolvedBackendAddrs = backendAddrs
465	if lb.inFallback {
466		// This means we received a new list of resolved backends, and we are
467		// still in fallback mode. Need to update the list of backends we are
468		// using to the new list of backends.
469		lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst)
470	}
471	lb.mu.Unlock()
472}
473
474func (lb *lbBalancer) Close() {
475	select {
476	case <-lb.doneCh:
477		return
478	default:
479	}
480	close(lb.doneCh)
481	if lb.ccRemoteLB != nil {
482		lb.ccRemoteLB.Close()
483	}
484	lb.cc.close()
485}
486