1/*
2 * Copyright 2019 gRPC authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package edsbalancer
18
19import (
20	"encoding/json"
21	"reflect"
22	"sync"
23	"time"
24
25	"google.golang.org/grpc/balancer"
26	"google.golang.org/grpc/balancer/roundrobin"
27	"google.golang.org/grpc/balancer/weightedroundrobin"
28	"google.golang.org/grpc/codes"
29	"google.golang.org/grpc/connectivity"
30	"google.golang.org/grpc/grpclog"
31	"google.golang.org/grpc/resolver"
32	"google.golang.org/grpc/status"
33	"google.golang.org/grpc/xds/internal"
34	"google.golang.org/grpc/xds/internal/balancer/lrs"
35	xdsclient "google.golang.org/grpc/xds/internal/client"
36)
37
38// TODO: make this a environment variable?
39var defaultPriorityInitTimeout = 10 * time.Second
40
41type localityConfig struct {
42	weight uint32
43	addrs  []resolver.Address
44}
45
46// balancerGroupWithConfig contains the localities with the same priority. It
47// manages all localities using a balancerGroup.
48type balancerGroupWithConfig struct {
49	bg      *balancerGroup
50	configs map[internal.Locality]*localityConfig
51}
52
53// edsBalancerImpl does load balancing based on the EDS responses. Note that it
54// doesn't implement the balancer interface. It's intended to be used by a high
55// level balancer implementation.
56//
57// The localities are picked as weighted round robin. A configurable child
58// policy is used to manage endpoints in each locality.
59type edsBalancerImpl struct {
60	cc balancer.ClientConn
61
62	subBalancerBuilder   balancer.Builder
63	loadStore            lrs.Store
64	priorityToLocalities map[priorityType]*balancerGroupWithConfig
65
66	// There's no need to hold any mutexes at the same time. The order to take
67	// mutex should be: priorityMu > subConnMu, but this is implicit via
68	// balancers (starting balancer with next priority while holding priorityMu,
69	// and the balancer may create new SubConn).
70
71	priorityMu sync.Mutex
72	// priorities are pointers, and will be nil when EDS returns empty result.
73	priorityInUse   priorityType
74	priorityLowest  priorityType
75	priorityToState map[priorityType]*balancer.State
76	// The timer to give a priority 10 seconds to connect. And if the priority
77	// doesn't go into Ready/Failure, start the next priority.
78	//
79	// One timer is enough because there can be at most one priority in init
80	// state.
81	priorityInitTimer *time.Timer
82
83	subConnMu         sync.Mutex
84	subConnToPriority map[balancer.SubConn]priorityType
85
86	pickerMu   sync.Mutex
87	drops      []*dropper
88	innerState balancer.State // The state of the picker without drop support.
89}
90
91// newEDSBalancerImpl create a new edsBalancerImpl.
92func newEDSBalancerImpl(cc balancer.ClientConn, loadStore lrs.Store) *edsBalancerImpl {
93	edsImpl := &edsBalancerImpl{
94		cc:                 cc,
95		subBalancerBuilder: balancer.Get(roundrobin.Name),
96
97		priorityToLocalities: make(map[priorityType]*balancerGroupWithConfig),
98		priorityToState:      make(map[priorityType]*balancer.State),
99		subConnToPriority:    make(map[balancer.SubConn]priorityType),
100		loadStore:            loadStore,
101	}
102	// Don't start balancer group here. Start it when handling the first EDS
103	// response. Otherwise the balancer group will be started with round-robin,
104	// and if users specify a different sub-balancer, all balancers in balancer
105	// group will be closed and recreated when sub-balancer update happens.
106	return edsImpl
107}
108
109// HandleChildPolicy updates the child balancers handling endpoints. Child
110// policy is roundrobin by default. If the specified balancer is not installed,
111// the old child balancer will be used.
112//
113// HandleChildPolicy and HandleEDSResponse must be called by the same goroutine.
114func (edsImpl *edsBalancerImpl) HandleChildPolicy(name string, config json.RawMessage) {
115	if edsImpl.subBalancerBuilder.Name() == name {
116		return
117	}
118	newSubBalancerBuilder := balancer.Get(name)
119	if newSubBalancerBuilder == nil {
120		grpclog.Infof("edsBalancerImpl: failed to find balancer with name %q, keep using %q", name, edsImpl.subBalancerBuilder.Name())
121		return
122	}
123	edsImpl.subBalancerBuilder = newSubBalancerBuilder
124	for _, bgwc := range edsImpl.priorityToLocalities {
125		if bgwc == nil {
126			continue
127		}
128		for id, config := range bgwc.configs {
129			// TODO: (eds) add support to balancer group to support smoothly
130			//  switching sub-balancers (keep old balancer around until new
131			//  balancer becomes ready).
132			bgwc.bg.remove(id)
133			bgwc.bg.add(id, config.weight, edsImpl.subBalancerBuilder)
134			bgwc.bg.handleResolvedAddrs(id, config.addrs)
135		}
136	}
137}
138
139// updateDrops compares new drop policies with the old. If they are different,
140// it updates the drop policies and send ClientConn an updated picker.
141func (edsImpl *edsBalancerImpl) updateDrops(dropPolicies []xdsclient.OverloadDropConfig) {
142	var (
143		newDrops     []*dropper
144		dropsChanged bool
145	)
146	for i, dropPolicy := range dropPolicies {
147		var (
148			numerator   = dropPolicy.Numerator
149			denominator = dropPolicy.Denominator
150		)
151		newDrops = append(newDrops, newDropper(numerator, denominator, dropPolicy.Category))
152
153		// The following reading edsImpl.drops doesn't need mutex because it can only
154		// be updated by the code following.
155		if dropsChanged {
156			continue
157		}
158		if i >= len(edsImpl.drops) {
159			dropsChanged = true
160			continue
161		}
162		if oldDrop := edsImpl.drops[i]; numerator != oldDrop.numerator || denominator != oldDrop.denominator {
163			dropsChanged = true
164		}
165	}
166	if dropsChanged {
167		edsImpl.pickerMu.Lock()
168		edsImpl.drops = newDrops
169		if edsImpl.innerState.Picker != nil {
170			// Update picker with old inner picker, new drops.
171			edsImpl.cc.UpdateState(balancer.State{
172				ConnectivityState: edsImpl.innerState.ConnectivityState,
173				Picker:            newDropPicker(edsImpl.innerState.Picker, newDrops, edsImpl.loadStore)},
174			)
175		}
176		edsImpl.pickerMu.Unlock()
177	}
178}
179
180// HandleEDSResponse handles the EDS response and creates/deletes localities and
181// SubConns. It also handles drops.
182//
183// HandleChildPolicy and HandleEDSResponse must be called by the same goroutine.
184func (edsImpl *edsBalancerImpl) HandleEDSResponse(edsResp *xdsclient.EDSUpdate) {
185	// TODO: Unhandled fields from EDS response:
186	//  - edsResp.GetPolicy().GetOverprovisioningFactor()
187	//  - locality.GetPriority()
188	//  - lbEndpoint.GetMetadata(): contains BNS name, send to sub-balancers
189	//    - as service config or as resolved address
190	//  - if socketAddress is not ip:port
191	//     - socketAddress.GetNamedPort(), socketAddress.GetResolverName()
192	//     - resolve endpoint's name with another resolver
193
194	edsImpl.updateDrops(edsResp.Drops)
195
196	// Filter out all localities with weight 0.
197	//
198	// Locality weighted load balancer can be enabled by setting an option in
199	// CDS, and the weight of each locality. Currently, without the guarantee
200	// that CDS is always sent, we assume locality weighted load balance is
201	// always enabled, and ignore all weight 0 localities.
202	//
203	// In the future, we should look at the config in CDS response and decide
204	// whether locality weight matters.
205	newLocalitiesWithPriority := make(map[priorityType][]xdsclient.Locality)
206	for _, locality := range edsResp.Localities {
207		if locality.Weight == 0 {
208			continue
209		}
210		priority := newPriorityType(locality.Priority)
211		newLocalitiesWithPriority[priority] = append(newLocalitiesWithPriority[priority], locality)
212	}
213
214	var (
215		priorityLowest  priorityType
216		priorityChanged bool
217	)
218
219	for priority, newLocalities := range newLocalitiesWithPriority {
220		if !priorityLowest.isSet() || priorityLowest.higherThan(priority) {
221			priorityLowest = priority
222		}
223
224		bgwc, ok := edsImpl.priorityToLocalities[priority]
225		if !ok {
226			// Create balancer group if it's never created (this is the first
227			// time this priority is received). We don't start it here. It may
228			// be started when necessary (e.g. when higher is down, or if it's a
229			// new lowest priority).
230			bgwc = &balancerGroupWithConfig{
231				bg: newBalancerGroup(
232					edsImpl.ccWrapperWithPriority(priority), edsImpl.loadStore,
233				),
234				configs: make(map[internal.Locality]*localityConfig),
235			}
236			edsImpl.priorityToLocalities[priority] = bgwc
237			priorityChanged = true
238		}
239		edsImpl.handleEDSResponsePerPriority(bgwc, newLocalities)
240	}
241	edsImpl.priorityLowest = priorityLowest
242
243	// Delete priorities that are removed in the latest response, and also close
244	// the balancer group.
245	for p, bgwc := range edsImpl.priorityToLocalities {
246		if _, ok := newLocalitiesWithPriority[p]; !ok {
247			delete(edsImpl.priorityToLocalities, p)
248			bgwc.bg.close()
249			delete(edsImpl.priorityToState, p)
250			priorityChanged = true
251		}
252	}
253
254	// If priority was added/removed, it may affect the balancer group to use.
255	// E.g. priorityInUse was removed, or all priorities are down, and a new
256	// lower priority was added.
257	if priorityChanged {
258		edsImpl.handlePriorityChange()
259	}
260}
261
262func (edsImpl *edsBalancerImpl) handleEDSResponsePerPriority(bgwc *balancerGroupWithConfig, newLocalities []xdsclient.Locality) {
263	// newLocalitiesSet contains all names of localities in the new EDS response
264	// for the same priority. It's used to delete localities that are removed in
265	// the new EDS response.
266	newLocalitiesSet := make(map[internal.Locality]struct{})
267	for _, locality := range newLocalities {
268		// One balancer for each locality.
269
270		lid := locality.ID
271		newLocalitiesSet[lid] = struct{}{}
272
273		newWeight := locality.Weight
274		var newAddrs []resolver.Address
275		for _, lbEndpoint := range locality.Endpoints {
276			// Filter out all "unhealthy" endpoints (unknown and
277			// healthy are both considered to be healthy:
278			// https://www.envoyproxy.io/docs/envoy/latest/api-v2/api/v2/core/health_check.proto#envoy-api-enum-core-healthstatus).
279			if lbEndpoint.HealthStatus != xdsclient.EndpointHealthStatusHealthy &&
280				lbEndpoint.HealthStatus != xdsclient.EndpointHealthStatusUnknown {
281				continue
282			}
283
284			address := resolver.Address{
285				Addr: lbEndpoint.Address,
286			}
287			if edsImpl.subBalancerBuilder.Name() == weightedroundrobin.Name && lbEndpoint.Weight != 0 {
288				address.Metadata = &weightedroundrobin.AddrInfo{
289					Weight: lbEndpoint.Weight,
290				}
291			}
292			newAddrs = append(newAddrs, address)
293		}
294		var weightChanged, addrsChanged bool
295		config, ok := bgwc.configs[lid]
296		if !ok {
297			// A new balancer, add it to balancer group and balancer map.
298			bgwc.bg.add(lid, newWeight, edsImpl.subBalancerBuilder)
299			config = &localityConfig{
300				weight: newWeight,
301			}
302			bgwc.configs[lid] = config
303
304			// weightChanged is false for new locality, because there's no need
305			// to update weight in bg.
306			addrsChanged = true
307		} else {
308			// Compare weight and addrs.
309			if config.weight != newWeight {
310				weightChanged = true
311			}
312			if !reflect.DeepEqual(config.addrs, newAddrs) {
313				addrsChanged = true
314			}
315		}
316
317		if weightChanged {
318			config.weight = newWeight
319			bgwc.bg.changeWeight(lid, newWeight)
320		}
321
322		if addrsChanged {
323			config.addrs = newAddrs
324			bgwc.bg.handleResolvedAddrs(lid, newAddrs)
325		}
326	}
327
328	// Delete localities that are removed in the latest response.
329	for lid := range bgwc.configs {
330		if _, ok := newLocalitiesSet[lid]; !ok {
331			bgwc.bg.remove(lid)
332			delete(bgwc.configs, lid)
333		}
334	}
335}
336
337// HandleSubConnStateChange handles the state change and update pickers accordingly.
338func (edsImpl *edsBalancerImpl) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
339	edsImpl.subConnMu.Lock()
340	var bgwc *balancerGroupWithConfig
341	if p, ok := edsImpl.subConnToPriority[sc]; ok {
342		if s == connectivity.Shutdown {
343			// Only delete sc from the map when state changed to Shutdown.
344			delete(edsImpl.subConnToPriority, sc)
345		}
346		bgwc = edsImpl.priorityToLocalities[p]
347	}
348	edsImpl.subConnMu.Unlock()
349	if bgwc == nil {
350		grpclog.Infof("edsBalancerImpl: priority not found for sc state change")
351		return
352	}
353	if bg := bgwc.bg; bg != nil {
354		bg.handleSubConnStateChange(sc, s)
355	}
356}
357
358// updateState first handles priority, and then wraps picker in a drop picker
359// before forwarding the update.
360func (edsImpl *edsBalancerImpl) updateState(priority priorityType, s balancer.State) {
361	_, ok := edsImpl.priorityToLocalities[priority]
362	if !ok {
363		grpclog.Infof("eds: received picker update from unknown priority")
364		return
365	}
366
367	if edsImpl.handlePriorityWithNewState(priority, s) {
368		edsImpl.pickerMu.Lock()
369		defer edsImpl.pickerMu.Unlock()
370		edsImpl.innerState = s
371		// Don't reset drops when it's a state change.
372		edsImpl.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: newDropPicker(s.Picker, edsImpl.drops, edsImpl.loadStore)})
373	}
374}
375
376func (edsImpl *edsBalancerImpl) ccWrapperWithPriority(priority priorityType) *edsBalancerWrapperCC {
377	return &edsBalancerWrapperCC{
378		ClientConn: edsImpl.cc,
379		priority:   priority,
380		parent:     edsImpl,
381	}
382}
383
384// edsBalancerWrapperCC implements the balancer.ClientConn API and get passed to
385// each balancer group. It contains the locality priority.
386type edsBalancerWrapperCC struct {
387	balancer.ClientConn
388	priority priorityType
389	parent   *edsBalancerImpl
390}
391
392func (ebwcc *edsBalancerWrapperCC) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
393	return ebwcc.parent.newSubConn(ebwcc.priority, addrs, opts)
394}
395func (ebwcc *edsBalancerWrapperCC) UpdateBalancerState(state connectivity.State, picker balancer.Picker) {
396	grpclog.Fatalln("not implemented")
397}
398func (ebwcc *edsBalancerWrapperCC) UpdateState(state balancer.State) {
399	ebwcc.parent.updateState(ebwcc.priority, state)
400}
401
402func (edsImpl *edsBalancerImpl) newSubConn(priority priorityType, addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
403	sc, err := edsImpl.cc.NewSubConn(addrs, opts)
404	if err != nil {
405		return nil, err
406	}
407	edsImpl.subConnMu.Lock()
408	edsImpl.subConnToPriority[sc] = priority
409	edsImpl.subConnMu.Unlock()
410	return sc, nil
411}
412
413// Close closes the balancer.
414func (edsImpl *edsBalancerImpl) Close() {
415	for _, bgwc := range edsImpl.priorityToLocalities {
416		if bg := bgwc.bg; bg != nil {
417			bg.close()
418		}
419	}
420}
421
422type dropPicker struct {
423	drops     []*dropper
424	p         balancer.V2Picker
425	loadStore lrs.Store
426}
427
428func newDropPicker(p balancer.V2Picker, drops []*dropper, loadStore lrs.Store) *dropPicker {
429	return &dropPicker{
430		drops:     drops,
431		p:         p,
432		loadStore: loadStore,
433	}
434}
435
436func (d *dropPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
437	var (
438		drop     bool
439		category string
440	)
441	for _, dp := range d.drops {
442		if dp.drop() {
443			drop = true
444			category = dp.category
445			break
446		}
447	}
448	if drop {
449		if d.loadStore != nil {
450			d.loadStore.CallDropped(category)
451		}
452		return balancer.PickResult{}, status.Errorf(codes.Unavailable, "RPC is dropped")
453	}
454	// TODO: (eds) don't drop unless the inner picker is READY. Similar to
455	// https://github.com/grpc/grpc-go/issues/2622.
456	return d.p.Pick(info)
457}
458