1/*
2 * Copyright 2019 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17// Package balancergroup implements a utility struct to bind multiple balancers
18// into one balancer.
19package balancergroup
20
21import (
22	"fmt"
23	"sync"
24	"time"
25
26	orcapb "github.com/cncf/udpa/go/udpa/data/orca/v1"
27	"google.golang.org/grpc/xds/internal/client/load"
28
29	"google.golang.org/grpc/balancer"
30	"google.golang.org/grpc/connectivity"
31	"google.golang.org/grpc/internal/cache"
32	"google.golang.org/grpc/internal/grpclog"
33	"google.golang.org/grpc/resolver"
34)
35
36// subBalancerWrapper is used to keep the configurations that will be used to start
37// the underlying balancer. It can be called to start/stop the underlying
38// balancer.
39//
40// When the config changes, it will pass the update to the underlying balancer
41// if it exists.
42//
43// TODO: move to a separate file?
44type subBalancerWrapper struct {
45	// subBalancerWrapper is passed to the sub-balancer as a ClientConn
46	// wrapper, only to keep the state and picker.  When sub-balancer is
47	// restarted while in cache, the picker needs to be resent.
48	//
49	// It also contains the sub-balancer ID, so the parent balancer group can
50	// keep track of SubConn/pickers and the sub-balancers they belong to. Some
51	// of the actions are forwarded to the parent ClientConn with no change.
52	// Some are forward to balancer group with the sub-balancer ID.
53	balancer.ClientConn
54	id    string
55	group *BalancerGroup
56
57	mu    sync.Mutex
58	state balancer.State
59
60	// The static part of sub-balancer. Keeps balancerBuilders and addresses.
61	// To be used when restarting sub-balancer.
62	builder balancer.Builder
63	// Options to be passed to sub-balancer at the time of creation.
64	buildOpts balancer.BuildOptions
65	// ccState is a cache of the addresses/balancer config, so when the balancer
66	// is restarted after close, it will get the previous update. It's a pointer
67	// and is set to nil at init, so when the balancer is built for the first
68	// time (not a restart), it won't receive an empty update. Note that this
69	// isn't reset to nil when the underlying balancer is closed.
70	ccState *balancer.ClientConnState
71	// The dynamic part of sub-balancer. Only used when balancer group is
72	// started. Gets cleared when sub-balancer is closed.
73	balancer balancer.Balancer
74}
75
76// UpdateState overrides balancer.ClientConn, to keep state and picker.
77func (sbc *subBalancerWrapper) UpdateState(state balancer.State) {
78	sbc.mu.Lock()
79	sbc.state = state
80	sbc.group.updateBalancerState(sbc.id, state)
81	sbc.mu.Unlock()
82}
83
84// NewSubConn overrides balancer.ClientConn, so balancer group can keep track of
85// the relation between subconns and sub-balancers.
86func (sbc *subBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
87	return sbc.group.newSubConn(sbc, addrs, opts)
88}
89
90func (sbc *subBalancerWrapper) updateBalancerStateWithCachedPicker() {
91	sbc.mu.Lock()
92	if sbc.state.Picker != nil {
93		sbc.group.updateBalancerState(sbc.id, sbc.state)
94	}
95	sbc.mu.Unlock()
96}
97
98func (sbc *subBalancerWrapper) startBalancer() {
99	b := sbc.builder.Build(sbc, sbc.buildOpts)
100	sbc.group.logger.Infof("Created child policy %p of type %v", b, sbc.builder.Name())
101	sbc.balancer = b
102	if sbc.ccState != nil {
103		b.UpdateClientConnState(*sbc.ccState)
104	}
105}
106
107func (sbc *subBalancerWrapper) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
108	b := sbc.balancer
109	if b == nil {
110		// This sub-balancer was closed. This can happen when EDS removes a
111		// locality. The balancer for this locality was already closed, and the
112		// SubConns are being deleted. But SubConn state change can still
113		// happen.
114		return
115	}
116	b.UpdateSubConnState(sc, state)
117}
118
119func (sbc *subBalancerWrapper) updateClientConnState(s balancer.ClientConnState) error {
120	sbc.ccState = &s
121	b := sbc.balancer
122	if b == nil {
123		// This sub-balancer was closed. This should never happen because
124		// sub-balancers are closed when the locality is removed from EDS, or
125		// the balancer group is closed. There should be no further address
126		// updates when either of this happened.
127		//
128		// This will be a common case with priority support, because a
129		// sub-balancer (and the whole balancer group) could be closed because
130		// it's the lower priority, but it can still get address updates.
131		return nil
132	}
133	return b.UpdateClientConnState(s)
134}
135
136func (sbc *subBalancerWrapper) resolverError(err error) {
137	b := sbc.balancer
138	if b == nil {
139		// This sub-balancer was closed. This should never happen because
140		// sub-balancers are closed when the locality is removed from EDS, or
141		// the balancer group is closed. There should be no further address
142		// updates when either of this happened.
143		//
144		// This will be a common case with priority support, because a
145		// sub-balancer (and the whole balancer group) could be closed because
146		// it's the lower priority, but it can still get address updates.
147		return
148	}
149	b.ResolverError(err)
150}
151
152func (sbc *subBalancerWrapper) stopBalancer() {
153	sbc.balancer.Close()
154	sbc.balancer = nil
155}
156
157// BalancerGroup takes a list of balancers, and make them into one balancer.
158//
159// Note that this struct doesn't implement balancer.Balancer, because it's not
160// intended to be used directly as a balancer. It's expected to be used as a
161// sub-balancer manager by a high level balancer.
162//
163// Updates from ClientConn are forwarded to sub-balancers
164//  - service config update
165//     - Not implemented
166//  - address update
167//  - subConn state change
168//     - find the corresponding balancer and forward
169//
170// Actions from sub-balances are forwarded to parent ClientConn
171//  - new/remove SubConn
172//  - picker update and health states change
173//     - sub-pickers are sent to an aggregator provided by the parent, which
174//     will group them into a group-picker. The aggregated connectivity state is
175//     also handled by the aggregator.
176//  - resolveNow
177//
178// Sub-balancers are only built when the balancer group is started. If the
179// balancer group is closed, the sub-balancers are also closed. And it's
180// guaranteed that no updates will be sent to parent ClientConn from a closed
181// balancer group.
182type BalancerGroup struct {
183	cc        balancer.ClientConn
184	buildOpts balancer.BuildOptions
185	logger    *grpclog.PrefixLogger
186	loadStore load.PerClusterReporter
187
188	// stateAggregator is where the state/picker updates will be sent to. It's
189	// provided by the parent balancer, to build a picker with all the
190	// sub-pickers.
191	stateAggregator BalancerStateAggregator
192
193	// outgoingMu guards all operations in the direction:
194	// ClientConn-->Sub-balancer. Including start, stop, resolver updates and
195	// SubConn state changes.
196	//
197	// The corresponding boolean outgoingStarted is used to stop further updates
198	// to sub-balancers after they are closed.
199	outgoingMu         sync.Mutex
200	outgoingStarted    bool
201	idToBalancerConfig map[string]*subBalancerWrapper
202	// Cache for sub-balancers when they are removed.
203	balancerCache *cache.TimeoutCache
204
205	// incomingMu is to make sure this balancer group doesn't send updates to cc
206	// after it's closed.
207	//
208	// We don't share the mutex to avoid deadlocks (e.g. a call to sub-balancer
209	// may call back to balancer group inline. It causes deaclock if they
210	// require the same mutex).
211	//
212	// We should never need to hold multiple locks at the same time in this
213	// struct. The case where two locks are held can only happen when the
214	// underlying balancer calls back into balancer group inline. So there's an
215	// implicit lock acquisition order that outgoingMu is locked before
216	// incomingMu.
217
218	// incomingMu guards all operations in the direction:
219	// Sub-balancer-->ClientConn. Including NewSubConn, RemoveSubConn. It also
220	// guards the map from SubConn to balancer ID, so updateSubConnState needs
221	// to hold it shortly to find the sub-balancer to forward the update.
222	//
223	// UpdateState is called by the balancer state aggretator, and it will
224	// decide when and whether to call.
225	//
226	// The corresponding boolean incomingStarted is used to stop further updates
227	// from sub-balancers after they are closed.
228	incomingMu      sync.Mutex
229	incomingStarted bool // This boolean only guards calls back to ClientConn.
230	scToSubBalancer map[balancer.SubConn]*subBalancerWrapper
231}
232
233// DefaultSubBalancerCloseTimeout is defined as a variable instead of const for
234// testing.
235//
236// TODO: make it a parameter for New().
237var DefaultSubBalancerCloseTimeout = 15 * time.Minute
238
239// New creates a new BalancerGroup. Note that the BalancerGroup
240// needs to be started to work.
241//
242// TODO(easwars): Pass an options struct instead of N args.
243func New(cc balancer.ClientConn, bOpts balancer.BuildOptions, stateAggregator BalancerStateAggregator, loadStore load.PerClusterReporter, logger *grpclog.PrefixLogger) *BalancerGroup {
244	return &BalancerGroup{
245		cc:        cc,
246		buildOpts: bOpts,
247		logger:    logger,
248		loadStore: loadStore,
249
250		stateAggregator: stateAggregator,
251
252		idToBalancerConfig: make(map[string]*subBalancerWrapper),
253		balancerCache:      cache.NewTimeoutCache(DefaultSubBalancerCloseTimeout),
254		scToSubBalancer:    make(map[balancer.SubConn]*subBalancerWrapper),
255	}
256}
257
258// Start starts the balancer group, including building all the sub-balancers,
259// and send the existing addresses to them.
260//
261// A BalancerGroup can be closed and started later. When a BalancerGroup is
262// closed, it can still receive address updates, which will be applied when
263// restarted.
264func (bg *BalancerGroup) Start() {
265	bg.incomingMu.Lock()
266	bg.incomingStarted = true
267	bg.incomingMu.Unlock()
268
269	bg.outgoingMu.Lock()
270	if bg.outgoingStarted {
271		bg.outgoingMu.Unlock()
272		return
273	}
274
275	for _, config := range bg.idToBalancerConfig {
276		config.startBalancer()
277	}
278	bg.outgoingStarted = true
279	bg.outgoingMu.Unlock()
280}
281
282// Add adds a balancer built by builder to the group, with given id.
283func (bg *BalancerGroup) Add(id string, builder balancer.Builder) {
284	// Store data in static map, and then check to see if bg is started.
285	bg.outgoingMu.Lock()
286	var sbc *subBalancerWrapper
287	// If outgoingStarted is true, search in the cache. Otherwise, cache is
288	// guaranteed to be empty, searching is unnecessary.
289	if bg.outgoingStarted {
290		if old, ok := bg.balancerCache.Remove(id); ok {
291			sbc, _ = old.(*subBalancerWrapper)
292			if sbc != nil && sbc.builder != builder {
293				// If the sub-balancer in cache was built with a different
294				// balancer builder, don't use it, cleanup this old-balancer,
295				// and behave as sub-balancer is not found in cache.
296				//
297				// NOTE that this will also drop the cached addresses for this
298				// sub-balancer, which seems to be reasonable.
299				sbc.stopBalancer()
300				// cleanupSubConns must be done before the new balancer starts,
301				// otherwise new SubConns created by the new balancer might be
302				// removed by mistake.
303				bg.cleanupSubConns(sbc)
304				sbc = nil
305			}
306		}
307	}
308	if sbc == nil {
309		sbc = &subBalancerWrapper{
310			ClientConn: bg.cc,
311			id:         id,
312			group:      bg,
313			builder:    builder,
314			buildOpts:  bg.buildOpts,
315		}
316		if bg.outgoingStarted {
317			// Only start the balancer if bg is started. Otherwise, we only keep the
318			// static data.
319			sbc.startBalancer()
320		}
321	} else {
322		// When brining back a sub-balancer from cache, re-send the cached
323		// picker and state.
324		sbc.updateBalancerStateWithCachedPicker()
325	}
326	bg.idToBalancerConfig[id] = sbc
327	bg.outgoingMu.Unlock()
328}
329
330// Remove removes the balancer with id from the group.
331//
332// But doesn't close the balancer. The balancer is kept in a cache, and will be
333// closed after timeout. Cleanup work (closing sub-balancer and removing
334// subconns) will be done after timeout.
335func (bg *BalancerGroup) Remove(id string) {
336	bg.outgoingMu.Lock()
337	if sbToRemove, ok := bg.idToBalancerConfig[id]; ok {
338		if bg.outgoingStarted {
339			bg.balancerCache.Add(id, sbToRemove, func() {
340				// After timeout, when sub-balancer is removed from cache, need
341				// to close the underlying sub-balancer, and remove all its
342				// subconns.
343				bg.outgoingMu.Lock()
344				if bg.outgoingStarted {
345					sbToRemove.stopBalancer()
346				}
347				bg.outgoingMu.Unlock()
348				bg.cleanupSubConns(sbToRemove)
349			})
350		}
351		delete(bg.idToBalancerConfig, id)
352	} else {
353		bg.logger.Infof("balancer group: trying to remove a non-existing locality from balancer group: %v", id)
354	}
355	bg.outgoingMu.Unlock()
356}
357
358// bg.remove(id) doesn't do cleanup for the sub-balancer. This function does
359// cleanup after the timeout.
360func (bg *BalancerGroup) cleanupSubConns(config *subBalancerWrapper) {
361	bg.incomingMu.Lock()
362	// Remove SubConns. This is only done after the balancer is
363	// actually closed.
364	//
365	// NOTE: if NewSubConn is called by this (closed) balancer later, the
366	// SubConn will be leaked. This shouldn't happen if the balancer
367	// implementation is correct. To make sure this never happens, we need to
368	// add another layer (balancer manager) between balancer group and the
369	// sub-balancers.
370	for sc, b := range bg.scToSubBalancer {
371		if b == config {
372			bg.cc.RemoveSubConn(sc)
373			delete(bg.scToSubBalancer, sc)
374		}
375	}
376	bg.incomingMu.Unlock()
377}
378
379// Following are actions from the parent grpc.ClientConn, forward to sub-balancers.
380
381// UpdateSubConnState handles the state for the subconn. It finds the
382// corresponding balancer and forwards the update.
383func (bg *BalancerGroup) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
384	bg.incomingMu.Lock()
385	config, ok := bg.scToSubBalancer[sc]
386	if !ok {
387		bg.incomingMu.Unlock()
388		return
389	}
390	if state.ConnectivityState == connectivity.Shutdown {
391		// Only delete sc from the map when state changed to Shutdown.
392		delete(bg.scToSubBalancer, sc)
393	}
394	bg.incomingMu.Unlock()
395
396	bg.outgoingMu.Lock()
397	config.updateSubConnState(sc, state)
398	bg.outgoingMu.Unlock()
399}
400
401// UpdateClientConnState handles ClientState (including balancer config and
402// addresses) from resolver. It finds the balancer and forwards the update.
403func (bg *BalancerGroup) UpdateClientConnState(id string, s balancer.ClientConnState) error {
404	bg.outgoingMu.Lock()
405	defer bg.outgoingMu.Unlock()
406	if config, ok := bg.idToBalancerConfig[id]; ok {
407		return config.updateClientConnState(s)
408	}
409	return nil
410}
411
412// ResolverError forwards resolver errors to all sub-balancers.
413func (bg *BalancerGroup) ResolverError(err error) {
414	bg.outgoingMu.Lock()
415	for _, config := range bg.idToBalancerConfig {
416		config.resolverError(err)
417	}
418	bg.outgoingMu.Unlock()
419}
420
421// Following are actions from sub-balancers, forward to ClientConn.
422
423// newSubConn: forward to ClientConn, and also create a map from sc to balancer,
424// so state update will find the right balancer.
425//
426// One note about removing SubConn: only forward to ClientConn, but not delete
427// from map. Delete sc from the map only when state changes to Shutdown. Since
428// it's just forwarding the action, there's no need for a removeSubConn()
429// wrapper function.
430func (bg *BalancerGroup) newSubConn(config *subBalancerWrapper, addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
431	// NOTE: if balancer with id was already removed, this should also return
432	// error. But since we call balancer.stopBalancer when removing the balancer, this
433	// shouldn't happen.
434	bg.incomingMu.Lock()
435	if !bg.incomingStarted {
436		bg.incomingMu.Unlock()
437		return nil, fmt.Errorf("NewSubConn is called after balancer group is closed")
438	}
439	sc, err := bg.cc.NewSubConn(addrs, opts)
440	if err != nil {
441		bg.incomingMu.Unlock()
442		return nil, err
443	}
444	bg.scToSubBalancer[sc] = config
445	bg.incomingMu.Unlock()
446	return sc, nil
447}
448
449// updateBalancerState: forward the new state to balancer state aggregator. The
450// aggregator will create an aggregated picker and an aggregated connectivity
451// state, then forward to ClientConn.
452func (bg *BalancerGroup) updateBalancerState(id string, state balancer.State) {
453	bg.logger.Infof("Balancer state update from locality %v, new state: %+v", id, state)
454	if bg.loadStore != nil {
455		// Only wrap the picker to do load reporting if loadStore was set.
456		state.Picker = newLoadReportPicker(state.Picker, id, bg.loadStore)
457	}
458
459	// Send new state to the aggregator, without holding the incomingMu.
460	// incomingMu is to protect all calls to the parent ClientConn, this update
461	// doesn't necessary trigger a call to ClientConn, and should already be
462	// protected by aggregator's mutex if necessary.
463	if bg.stateAggregator != nil {
464		bg.stateAggregator.UpdateState(id, state)
465	}
466}
467
468// Close closes the balancer. It stops sub-balancers, and removes the subconns.
469// The BalancerGroup can be restarted later.
470func (bg *BalancerGroup) Close() {
471	bg.incomingMu.Lock()
472	if bg.incomingStarted {
473		bg.incomingStarted = false
474		// Also remove all SubConns.
475		for sc := range bg.scToSubBalancer {
476			bg.cc.RemoveSubConn(sc)
477			delete(bg.scToSubBalancer, sc)
478		}
479	}
480	bg.incomingMu.Unlock()
481
482	// Clear(true) runs clear function to close sub-balancers in cache. It
483	// must be called out of outgoing mutex.
484	bg.balancerCache.Clear(true)
485
486	bg.outgoingMu.Lock()
487	if bg.outgoingStarted {
488		bg.outgoingStarted = false
489		for _, config := range bg.idToBalancerConfig {
490			config.stopBalancer()
491		}
492	}
493	bg.outgoingMu.Unlock()
494}
495
496const (
497	serverLoadCPUName    = "cpu_utilization"
498	serverLoadMemoryName = "mem_utilization"
499)
500
501type loadReportPicker struct {
502	p balancer.Picker
503
504	locality  string
505	loadStore load.PerClusterReporter
506}
507
508func newLoadReportPicker(p balancer.Picker, id string, loadStore load.PerClusterReporter) *loadReportPicker {
509	return &loadReportPicker{
510		p:         p,
511		locality:  id,
512		loadStore: loadStore,
513	}
514}
515
516func (lrp *loadReportPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
517	res, err := lrp.p.Pick(info)
518	if err != nil {
519		return res, err
520	}
521
522	lrp.loadStore.CallStarted(lrp.locality)
523	oldDone := res.Done
524	res.Done = func(info balancer.DoneInfo) {
525		if oldDone != nil {
526			oldDone(info)
527		}
528		lrp.loadStore.CallFinished(lrp.locality, info.Err)
529
530		load, ok := info.ServerLoad.(*orcapb.OrcaLoadReport)
531		if !ok {
532			return
533		}
534		lrp.loadStore.CallServerLoad(lrp.locality, serverLoadCPUName, load.CpuUtilization)
535		lrp.loadStore.CallServerLoad(lrp.locality, serverLoadMemoryName, load.MemUtilization)
536		for n, d := range load.RequestCost {
537			lrp.loadStore.CallServerLoad(lrp.locality, n, d)
538		}
539		for n, d := range load.Utilization {
540			lrp.loadStore.CallServerLoad(lrp.locality, n, d)
541		}
542	}
543	return res, err
544}
545