1/*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19// Package grpclb defines a grpclb balancer.
20//
21// To install grpclb balancer, import this package as:
22//    import _ "google.golang.org/grpc/balancer/grpclb"
23package grpclb
24
25import (
26	"context"
27	"errors"
28	"fmt"
29	"sync"
30	"time"
31
32	"google.golang.org/grpc"
33	"google.golang.org/grpc/balancer"
34	grpclbstate "google.golang.org/grpc/balancer/grpclb/state"
35	"google.golang.org/grpc/connectivity"
36	"google.golang.org/grpc/credentials"
37	"google.golang.org/grpc/grpclog"
38	"google.golang.org/grpc/internal"
39	"google.golang.org/grpc/internal/backoff"
40	"google.golang.org/grpc/internal/resolver/dns"
41	"google.golang.org/grpc/resolver"
42
43	durationpb "github.com/golang/protobuf/ptypes/duration"
44	lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
45)
46
47const (
48	lbTokenKey             = "lb-token"
49	defaultFallbackTimeout = 10 * time.Second
50	grpclbName             = "grpclb"
51)
52
53var errServerTerminatedConnection = errors.New("grpclb: failed to recv server list: server terminated connection")
54var logger = grpclog.Component("grpclb")
55
56func convertDuration(d *durationpb.Duration) time.Duration {
57	if d == nil {
58		return 0
59	}
60	return time.Duration(d.Seconds)*time.Second + time.Duration(d.Nanos)*time.Nanosecond
61}
62
63// Client API for LoadBalancer service.
64// Mostly copied from generated pb.go file.
65// To avoid circular dependency.
66type loadBalancerClient struct {
67	cc *grpc.ClientConn
68}
69
70func (c *loadBalancerClient) BalanceLoad(ctx context.Context, opts ...grpc.CallOption) (*balanceLoadClientStream, error) {
71	desc := &grpc.StreamDesc{
72		StreamName:    "BalanceLoad",
73		ServerStreams: true,
74		ClientStreams: true,
75	}
76	stream, err := c.cc.NewStream(ctx, desc, "/grpc.lb.v1.LoadBalancer/BalanceLoad", opts...)
77	if err != nil {
78		return nil, err
79	}
80	x := &balanceLoadClientStream{stream}
81	return x, nil
82}
83
84type balanceLoadClientStream struct {
85	grpc.ClientStream
86}
87
88func (x *balanceLoadClientStream) Send(m *lbpb.LoadBalanceRequest) error {
89	return x.ClientStream.SendMsg(m)
90}
91
92func (x *balanceLoadClientStream) Recv() (*lbpb.LoadBalanceResponse, error) {
93	m := new(lbpb.LoadBalanceResponse)
94	if err := x.ClientStream.RecvMsg(m); err != nil {
95		return nil, err
96	}
97	return m, nil
98}
99
100func init() {
101	balancer.Register(newLBBuilder())
102	dns.EnableSRVLookups = true
103}
104
105// newLBBuilder creates a builder for grpclb.
106func newLBBuilder() balancer.Builder {
107	return newLBBuilderWithFallbackTimeout(defaultFallbackTimeout)
108}
109
110// newLBBuilderWithFallbackTimeout creates a grpclb builder with the given
111// fallbackTimeout. If no response is received from the remote balancer within
112// fallbackTimeout, the backend addresses from the resolved address list will be
113// used.
114//
115// Only call this function when a non-default fallback timeout is needed.
116func newLBBuilderWithFallbackTimeout(fallbackTimeout time.Duration) balancer.Builder {
117	return &lbBuilder{
118		fallbackTimeout: fallbackTimeout,
119	}
120}
121
122type lbBuilder struct {
123	fallbackTimeout time.Duration
124}
125
126func (b *lbBuilder) Name() string {
127	return grpclbName
128}
129
130func (b *lbBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
131	// This generates a manual resolver builder with a fixed scheme. This
132	// scheme will be used to dial to remote LB, so we can send filtered
133	// address updates to remote LB ClientConn using this manual resolver.
134	r := &lbManualResolver{scheme: "grpclb-internal", ccb: cc}
135
136	lb := &lbBalancer{
137		cc:              newLBCacheClientConn(cc),
138		target:          opt.Target.Endpoint,
139		opt:             opt,
140		fallbackTimeout: b.fallbackTimeout,
141		doneCh:          make(chan struct{}),
142
143		manualResolver: r,
144		subConns:       make(map[resolver.Address]balancer.SubConn),
145		scStates:       make(map[balancer.SubConn]connectivity.State),
146		picker:         &errPicker{err: balancer.ErrNoSubConnAvailable},
147		clientStats:    newRPCStats(),
148		backoff:        backoff.DefaultExponential, // TODO: make backoff configurable.
149	}
150
151	var err error
152	if opt.CredsBundle != nil {
153		lb.grpclbClientConnCreds, err = opt.CredsBundle.NewWithMode(internal.CredsBundleModeBalancer)
154		if err != nil {
155			logger.Warningf("lbBalancer: client connection creds NewWithMode failed: %v", err)
156		}
157		lb.grpclbBackendCreds, err = opt.CredsBundle.NewWithMode(internal.CredsBundleModeBackendFromBalancer)
158		if err != nil {
159			logger.Warningf("lbBalancer: backend creds NewWithMode failed: %v", err)
160		}
161	}
162
163	return lb
164}
165
166type lbBalancer struct {
167	cc     *lbCacheClientConn
168	target string
169	opt    balancer.BuildOptions
170
171	usePickFirst bool
172
173	// grpclbClientConnCreds is the creds bundle to be used to connect to grpclb
174	// servers. If it's nil, use the TransportCredentials from BuildOptions
175	// instead.
176	grpclbClientConnCreds credentials.Bundle
177	// grpclbBackendCreds is the creds bundle to be used for addresses that are
178	// returned by grpclb server. If it's nil, don't set anything when creating
179	// SubConns.
180	grpclbBackendCreds credentials.Bundle
181
182	fallbackTimeout time.Duration
183	doneCh          chan struct{}
184
185	// manualResolver is used in the remote LB ClientConn inside grpclb. When
186	// resolved address updates are received by grpclb, filtered updates will be
187	// send to remote LB ClientConn through this resolver.
188	manualResolver *lbManualResolver
189	// The ClientConn to talk to the remote balancer.
190	ccRemoteLB *remoteBalancerCCWrapper
191	// backoff for calling remote balancer.
192	backoff backoff.Strategy
193
194	// Support client side load reporting. Each picker gets a reference to this,
195	// and will update its content.
196	clientStats *rpcStats
197
198	mu sync.Mutex // guards everything following.
199	// The full server list including drops, used to check if the newly received
200	// serverList contains anything new. Each generate picker will also have
201	// reference to this list to do the first layer pick.
202	fullServerList []*lbpb.Server
203	// Backend addresses. It's kept so the addresses are available when
204	// switching between round_robin and pickfirst.
205	backendAddrs []resolver.Address
206	// All backends addresses, with metadata set to nil. This list contains all
207	// backend addresses in the same order and with the same duplicates as in
208	// serverlist. When generating picker, a SubConn slice with the same order
209	// but with only READY SCs will be gerenated.
210	backendAddrsWithoutMetadata []resolver.Address
211	// Roundrobin functionalities.
212	state    connectivity.State
213	subConns map[resolver.Address]balancer.SubConn   // Used to new/remove SubConn.
214	scStates map[balancer.SubConn]connectivity.State // Used to filter READY SubConns.
215	picker   balancer.Picker
216	// Support fallback to resolved backend addresses if there's no response
217	// from remote balancer within fallbackTimeout.
218	remoteBalancerConnected bool
219	serverListReceived      bool
220	inFallback              bool
221	// resolvedBackendAddrs is resolvedAddrs minus remote balancers. It's set
222	// when resolved address updates are received, and read in the goroutine
223	// handling fallback.
224	resolvedBackendAddrs []resolver.Address
225	connErr              error // the last connection error
226}
227
228// regeneratePicker takes a snapshot of the balancer, and generates a picker from
229// it. The picker
230//  - always returns ErrTransientFailure if the balancer is in TransientFailure,
231//  - does two layer roundrobin pick otherwise.
232// Caller must hold lb.mu.
233func (lb *lbBalancer) regeneratePicker(resetDrop bool) {
234	if lb.state == connectivity.TransientFailure {
235		lb.picker = &errPicker{err: fmt.Errorf("all SubConns are in TransientFailure, last connection error: %v", lb.connErr)}
236		return
237	}
238
239	if lb.state == connectivity.Connecting {
240		lb.picker = &errPicker{err: balancer.ErrNoSubConnAvailable}
241		return
242	}
243
244	var readySCs []balancer.SubConn
245	if lb.usePickFirst {
246		for _, sc := range lb.subConns {
247			readySCs = append(readySCs, sc)
248			break
249		}
250	} else {
251		for _, a := range lb.backendAddrsWithoutMetadata {
252			if sc, ok := lb.subConns[a]; ok {
253				if st, ok := lb.scStates[sc]; ok && st == connectivity.Ready {
254					readySCs = append(readySCs, sc)
255				}
256			}
257		}
258	}
259
260	if len(readySCs) <= 0 {
261		// If there's no ready SubConns, always re-pick. This is to avoid drops
262		// unless at least one SubConn is ready. Otherwise we may drop more
263		// often than want because of drops + re-picks(which become re-drops).
264		//
265		// This doesn't seem to be necessary after the connecting check above.
266		// Kept for safety.
267		lb.picker = &errPicker{err: balancer.ErrNoSubConnAvailable}
268		return
269	}
270	if lb.inFallback {
271		lb.picker = newRRPicker(readySCs)
272		return
273	}
274	if resetDrop {
275		lb.picker = newLBPicker(lb.fullServerList, readySCs, lb.clientStats)
276		return
277	}
278	prevLBPicker, ok := lb.picker.(*lbPicker)
279	if !ok {
280		lb.picker = newLBPicker(lb.fullServerList, readySCs, lb.clientStats)
281		return
282	}
283	prevLBPicker.updateReadySCs(readySCs)
284}
285
286// aggregateSubConnStats calculate the aggregated state of SubConns in
287// lb.SubConns. These SubConns are subconns in use (when switching between
288// fallback and grpclb). lb.scState contains states for all SubConns, including
289// those in cache (SubConns are cached for 10 seconds after remove).
290//
291// The aggregated state is:
292//  - If at least one SubConn in Ready, the aggregated state is Ready;
293//  - Else if at least one SubConn in Connecting or IDLE, the aggregated state is Connecting;
294//    - It's OK to consider IDLE as Connecting. SubConns never stay in IDLE,
295//    they start to connect immediately. But there's a race between the overall
296//    state is reported, and when the new SubConn state arrives. And SubConns
297//    never go back to IDLE.
298//  - Else the aggregated state is TransientFailure.
299func (lb *lbBalancer) aggregateSubConnStates() connectivity.State {
300	var numConnecting uint64
301
302	for _, sc := range lb.subConns {
303		if state, ok := lb.scStates[sc]; ok {
304			switch state {
305			case connectivity.Ready:
306				return connectivity.Ready
307			case connectivity.Connecting, connectivity.Idle:
308				numConnecting++
309			}
310		}
311	}
312	if numConnecting > 0 {
313		return connectivity.Connecting
314	}
315	return connectivity.TransientFailure
316}
317
318func (lb *lbBalancer) UpdateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) {
319	s := scs.ConnectivityState
320	if logger.V(2) {
321		logger.Infof("lbBalancer: handle SubConn state change: %p, %v", sc, s)
322	}
323	lb.mu.Lock()
324	defer lb.mu.Unlock()
325
326	oldS, ok := lb.scStates[sc]
327	if !ok {
328		if logger.V(2) {
329			logger.Infof("lbBalancer: got state changes for an unknown SubConn: %p, %v", sc, s)
330		}
331		return
332	}
333	lb.scStates[sc] = s
334	switch s {
335	case connectivity.Idle:
336		sc.Connect()
337	case connectivity.Shutdown:
338		// When an address was removed by resolver, b called RemoveSubConn but
339		// kept the sc's state in scStates. Remove state for this sc here.
340		delete(lb.scStates, sc)
341	case connectivity.TransientFailure:
342		lb.connErr = scs.ConnectionError
343	}
344	// Force regenerate picker if
345	//  - this sc became ready from not-ready
346	//  - this sc became not-ready from ready
347	lb.updateStateAndPicker((oldS == connectivity.Ready) != (s == connectivity.Ready), false)
348
349	// Enter fallback when the aggregated state is not Ready and the connection
350	// to remote balancer is lost.
351	if lb.state != connectivity.Ready {
352		if !lb.inFallback && !lb.remoteBalancerConnected {
353			// Enter fallback.
354			lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst)
355		}
356	}
357}
358
359// updateStateAndPicker re-calculate the aggregated state, and regenerate picker
360// if overall state is changed.
361//
362// If forceRegeneratePicker is true, picker will be regenerated.
363func (lb *lbBalancer) updateStateAndPicker(forceRegeneratePicker bool, resetDrop bool) {
364	oldAggrState := lb.state
365	lb.state = lb.aggregateSubConnStates()
366	// Regenerate picker when one of the following happens:
367	//  - caller wants to regenerate
368	//  - the aggregated state changed
369	if forceRegeneratePicker || (lb.state != oldAggrState) {
370		lb.regeneratePicker(resetDrop)
371	}
372
373	lb.cc.UpdateState(balancer.State{ConnectivityState: lb.state, Picker: lb.picker})
374}
375
376// fallbackToBackendsAfter blocks for fallbackTimeout and falls back to use
377// resolved backends (backends received from resolver, not from remote balancer)
378// if no connection to remote balancers was successful.
379func (lb *lbBalancer) fallbackToBackendsAfter(fallbackTimeout time.Duration) {
380	timer := time.NewTimer(fallbackTimeout)
381	defer timer.Stop()
382	select {
383	case <-timer.C:
384	case <-lb.doneCh:
385		return
386	}
387	lb.mu.Lock()
388	if lb.inFallback || lb.serverListReceived {
389		lb.mu.Unlock()
390		return
391	}
392	// Enter fallback.
393	lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst)
394	lb.mu.Unlock()
395}
396
397func (lb *lbBalancer) handleServiceConfig(gc *grpclbServiceConfig) {
398	lb.mu.Lock()
399	defer lb.mu.Unlock()
400
401	newUsePickFirst := childIsPickFirst(gc)
402	if lb.usePickFirst == newUsePickFirst {
403		return
404	}
405	if logger.V(2) {
406		logger.Infof("lbBalancer: switching mode, new usePickFirst: %+v", newUsePickFirst)
407	}
408	lb.refreshSubConns(lb.backendAddrs, lb.inFallback, newUsePickFirst)
409}
410
411func (lb *lbBalancer) ResolverError(error) {
412	// Ignore resolver errors.  GRPCLB is not selected unless the resolver
413	// works at least once.
414}
415
416func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
417	if logger.V(2) {
418		logger.Infof("lbBalancer: UpdateClientConnState: %+v", ccs)
419	}
420	gc, _ := ccs.BalancerConfig.(*grpclbServiceConfig)
421	lb.handleServiceConfig(gc)
422
423	addrs := ccs.ResolverState.Addresses
424
425	var remoteBalancerAddrs, backendAddrs []resolver.Address
426	for _, a := range addrs {
427		if a.Type == resolver.GRPCLB {
428			a.Type = resolver.Backend
429			remoteBalancerAddrs = append(remoteBalancerAddrs, a)
430		} else {
431			backendAddrs = append(backendAddrs, a)
432		}
433	}
434	if sd := grpclbstate.Get(ccs.ResolverState); sd != nil {
435		// Override any balancer addresses provided via
436		// ccs.ResolverState.Addresses.
437		remoteBalancerAddrs = sd.BalancerAddresses
438	}
439
440	if len(backendAddrs)+len(remoteBalancerAddrs) == 0 {
441		// There should be at least one address, either grpclb server or
442		// fallback. Empty address is not valid.
443		return balancer.ErrBadResolverState
444	}
445
446	if len(remoteBalancerAddrs) == 0 {
447		if lb.ccRemoteLB != nil {
448			lb.ccRemoteLB.close()
449			lb.ccRemoteLB = nil
450		}
451	} else if lb.ccRemoteLB == nil {
452		// First time receiving resolved addresses, create a cc to remote
453		// balancers.
454		lb.newRemoteBalancerCCWrapper()
455		// Start the fallback goroutine.
456		go lb.fallbackToBackendsAfter(lb.fallbackTimeout)
457	}
458
459	if lb.ccRemoteLB != nil {
460		// cc to remote balancers uses lb.manualResolver. Send the updated remote
461		// balancer addresses to it through manualResolver.
462		lb.manualResolver.UpdateState(resolver.State{Addresses: remoteBalancerAddrs})
463	}
464
465	lb.mu.Lock()
466	lb.resolvedBackendAddrs = backendAddrs
467	if len(remoteBalancerAddrs) == 0 || lb.inFallback {
468		// If there's no remote balancer address in ClientConn update, grpclb
469		// enters fallback mode immediately.
470		//
471		// If a new update is received while grpclb is in fallback, update the
472		// list of backends being used to the new fallback backends.
473		lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst)
474	}
475	lb.mu.Unlock()
476	return nil
477}
478
479func (lb *lbBalancer) Close() {
480	select {
481	case <-lb.doneCh:
482		return
483	default:
484	}
485	close(lb.doneCh)
486	if lb.ccRemoteLB != nil {
487		lb.ccRemoteLB.close()
488	}
489	lb.cc.close()
490}
491