1/*
2 *
3 * Copyright 2021 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package clusterresolver
20
21import (
22	"encoding/json"
23	"fmt"
24	"sort"
25
26	"google.golang.org/grpc/balancer/roundrobin"
27	"google.golang.org/grpc/balancer/weightedroundrobin"
28	"google.golang.org/grpc/balancer/weightedtarget"
29	"google.golang.org/grpc/internal/hierarchy"
30	internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
31	"google.golang.org/grpc/resolver"
32	"google.golang.org/grpc/xds/internal"
33	"google.golang.org/grpc/xds/internal/balancer/clusterimpl"
34	"google.golang.org/grpc/xds/internal/balancer/priority"
35	"google.golang.org/grpc/xds/internal/balancer/ringhash"
36	"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
37)
38
39const million = 1000000
40
41// priorityConfig is config for one priority. For example, if there an EDS and a
42// DNS, the priority list will be [priorityConfig{EDS}, priorityConfig{DNS}].
43//
44// Each priorityConfig corresponds to one discovery mechanism from the LBConfig
45// generated by the CDS balancer. The CDS balancer resolves the cluster name to
46// an ordered list of discovery mechanisms (if the top cluster is an aggregated
47// cluster), one for each underlying cluster.
48type priorityConfig struct {
49	mechanism DiscoveryMechanism
50	// edsResp is set only if type is EDS.
51	edsResp xdsresource.EndpointsUpdate
52	// addresses is set only if type is DNS.
53	addresses []string
54}
55
56// buildPriorityConfigJSON builds balancer config for the passed in
57// priorities.
58//
59// The built tree of balancers (see test for the output struct).
60//
61// If xds lb policy is ROUND_ROBIN, the children will be weighted_target for
62// locality picking, and round_robin for endpoint picking.
63//
64//                                   ┌────────┐
65//                                   │priority│
66//                                   └┬──────┬┘
67//                                    │      │
68//                        ┌───────────▼┐    ┌▼───────────┐
69//                        │cluster_impl│    │cluster_impl│
70//                        └─┬──────────┘    └──────────┬─┘
71//                          │                          │
72//           ┌──────────────▼─┐                      ┌─▼──────────────┐
73//           │locality_picking│                      │locality_picking│
74//           └┬──────────────┬┘                      └┬──────────────┬┘
75//            │              │                        │              │
76//          ┌─▼─┐          ┌─▼─┐                    ┌─▼─┐          ┌─▼─┐
77//          │LRS│          │LRS│                    │LRS│          │LRS│
78//          └─┬─┘          └─┬─┘                    └─┬─┘          └─┬─┘
79//            │              │                        │              │
80// ┌──────────▼─────┐  ┌─────▼──────────┐  ┌──────────▼─────┐  ┌─────▼──────────┐
81// │endpoint_picking│  │endpoint_picking│  │endpoint_picking│  │endpoint_picking│
82// └────────────────┘  └────────────────┘  └────────────────┘  └────────────────┘
83//
84// If xds lb policy is RING_HASH, the children will be just a ring_hash policy.
85// The endpoints from all localities will be flattened to one addresses list,
86// and the ring_hash policy will pick endpoints from it.
87//
88//           ┌────────┐
89//           │priority│
90//           └┬──────┬┘
91//            │      │
92// ┌──────────▼─┐  ┌─▼──────────┐
93// │cluster_impl│  │cluster_impl│
94// └──────┬─────┘  └─────┬──────┘
95//        │              │
96// ┌──────▼─────┐  ┌─────▼──────┐
97// │ ring_hash  │  │ ring_hash  │
98// └────────────┘  └────────────┘
99//
100// If endpointPickingPolicy is nil, roundrobin will be used.
101//
102// Custom locality picking policy isn't support, and weighted_target is always
103// used.
104func buildPriorityConfigJSON(priorities []priorityConfig, xdsLBPolicy *internalserviceconfig.BalancerConfig) ([]byte, []resolver.Address, error) {
105	pc, addrs, err := buildPriorityConfig(priorities, xdsLBPolicy)
106	if err != nil {
107		return nil, nil, fmt.Errorf("failed to build priority config: %v", err)
108	}
109	ret, err := json.Marshal(pc)
110	if err != nil {
111		return nil, nil, fmt.Errorf("failed to marshal built priority config struct into json: %v", err)
112	}
113	return ret, addrs, nil
114}
115
116func buildPriorityConfig(priorities []priorityConfig, xdsLBPolicy *internalserviceconfig.BalancerConfig) (*priority.LBConfig, []resolver.Address, error) {
117	var (
118		retConfig = &priority.LBConfig{Children: make(map[string]*priority.Child)}
119		retAddrs  []resolver.Address
120	)
121	for i, p := range priorities {
122		switch p.mechanism.Type {
123		case DiscoveryMechanismTypeEDS:
124			names, configs, addrs, err := buildClusterImplConfigForEDS(i, p.edsResp, p.mechanism, xdsLBPolicy)
125			if err != nil {
126				return nil, nil, err
127			}
128			retConfig.Priorities = append(retConfig.Priorities, names...)
129			for n, c := range configs {
130				retConfig.Children[n] = &priority.Child{
131					Config: &internalserviceconfig.BalancerConfig{Name: clusterimpl.Name, Config: c},
132					// Ignore all re-resolution from EDS children.
133					IgnoreReresolutionRequests: true,
134				}
135			}
136			retAddrs = append(retAddrs, addrs...)
137		case DiscoveryMechanismTypeLogicalDNS:
138			name, config, addrs := buildClusterImplConfigForDNS(i, p.addresses)
139			retConfig.Priorities = append(retConfig.Priorities, name)
140			retConfig.Children[name] = &priority.Child{
141				Config: &internalserviceconfig.BalancerConfig{Name: clusterimpl.Name, Config: config},
142				// Not ignore re-resolution from DNS children, they will trigger
143				// DNS to re-resolve.
144				IgnoreReresolutionRequests: false,
145			}
146			retAddrs = append(retAddrs, addrs...)
147		}
148	}
149	return retConfig, retAddrs, nil
150}
151
152func buildClusterImplConfigForDNS(parentPriority int, addrStrs []string) (string, *clusterimpl.LBConfig, []resolver.Address) {
153	// Endpoint picking policy for DNS is hardcoded to pick_first.
154	const childPolicy = "pick_first"
155	retAddrs := make([]resolver.Address, 0, len(addrStrs))
156	pName := fmt.Sprintf("priority-%v", parentPriority)
157	for _, addrStr := range addrStrs {
158		retAddrs = append(retAddrs, hierarchy.Set(resolver.Address{Addr: addrStr}, []string{pName}))
159	}
160	return pName, &clusterimpl.LBConfig{ChildPolicy: &internalserviceconfig.BalancerConfig{Name: childPolicy}}, retAddrs
161}
162
163// buildClusterImplConfigForEDS returns a list of cluster_impl configs, one for
164// each priority, sorted by priority, and the addresses for each priority (with
165// hierarchy attributes set).
166//
167// For example, if there are two priorities, the returned values will be
168// - ["p0", "p1"]
169// - map{"p0":p0_config, "p1":p1_config}
170// - [p0_address_0, p0_address_1, p1_address_0, p1_address_1]
171//   - p0 addresses' hierarchy attributes are set to p0
172func buildClusterImplConfigForEDS(parentPriority int, edsResp xdsresource.EndpointsUpdate, mechanism DiscoveryMechanism, xdsLBPolicy *internalserviceconfig.BalancerConfig) ([]string, map[string]*clusterimpl.LBConfig, []resolver.Address, error) {
173	drops := make([]clusterimpl.DropConfig, 0, len(edsResp.Drops))
174	for _, d := range edsResp.Drops {
175		drops = append(drops, clusterimpl.DropConfig{
176			Category:           d.Category,
177			RequestsPerMillion: d.Numerator * million / d.Denominator,
178		})
179	}
180
181	priorityChildNames, priorities := groupLocalitiesByPriority(edsResp.Localities)
182	retNames := make([]string, 0, len(priorityChildNames))
183	retAddrs := make([]resolver.Address, 0, len(priorityChildNames))
184	retConfigs := make(map[string]*clusterimpl.LBConfig, len(priorityChildNames))
185	for _, priorityName := range priorityChildNames {
186		priorityLocalities := priorities[priorityName]
187		// Prepend parent priority to the priority names, to avoid duplicates.
188		pName := fmt.Sprintf("priority-%v-%v", parentPriority, priorityName)
189		retNames = append(retNames, pName)
190		cfg, addrs, err := priorityLocalitiesToClusterImpl(priorityLocalities, pName, mechanism, drops, xdsLBPolicy)
191		if err != nil {
192			return nil, nil, nil, err
193		}
194		retConfigs[pName] = cfg
195		retAddrs = append(retAddrs, addrs...)
196	}
197	return retNames, retConfigs, retAddrs, nil
198}
199
200// groupLocalitiesByPriority returns the localities grouped by priority.
201//
202// It also returns a list of strings where each string represents a priority,
203// and the list is sorted from higher priority to lower priority.
204//
205// For example, for L0-p0, L1-p0, L2-p1, results will be
206// - ["p0", "p1"]
207// - map{"p0":[L0, L1], "p1":[L2]}
208func groupLocalitiesByPriority(localities []xdsresource.Locality) ([]string, map[string][]xdsresource.Locality) {
209	var priorityIntSlice []int
210	priorities := make(map[string][]xdsresource.Locality)
211	for _, locality := range localities {
212		if locality.Weight == 0 {
213			continue
214		}
215		priorityName := fmt.Sprintf("%v", locality.Priority)
216		priorities[priorityName] = append(priorities[priorityName], locality)
217		priorityIntSlice = append(priorityIntSlice, int(locality.Priority))
218	}
219	// Sort the priorities based on the int value, deduplicate, and then turn
220	// the sorted list into a string list. This will be child names, in priority
221	// order.
222	sort.Ints(priorityIntSlice)
223	priorityIntSliceDeduped := dedupSortedIntSlice(priorityIntSlice)
224	priorityNameSlice := make([]string, 0, len(priorityIntSliceDeduped))
225	for _, p := range priorityIntSliceDeduped {
226		priorityNameSlice = append(priorityNameSlice, fmt.Sprintf("%v", p))
227	}
228	return priorityNameSlice, priorities
229}
230
231func dedupSortedIntSlice(a []int) []int {
232	if len(a) == 0 {
233		return a
234	}
235	i, j := 0, 1
236	for ; j < len(a); j++ {
237		if a[i] == a[j] {
238			continue
239		}
240		i++
241		if i != j {
242			a[i] = a[j]
243		}
244	}
245	return a[:i+1]
246}
247
248// rrBalancerConfig is a const roundrobin config, used as child of
249// weighted-roundrobin. To avoid allocating memory everytime.
250var rrBalancerConfig = &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}
251
252// priorityLocalitiesToClusterImpl takes a list of localities (with the same
253// priority), and generates a cluster impl policy config, and a list of
254// addresses.
255func priorityLocalitiesToClusterImpl(localities []xdsresource.Locality, priorityName string, mechanism DiscoveryMechanism, drops []clusterimpl.DropConfig, xdsLBPolicy *internalserviceconfig.BalancerConfig) (*clusterimpl.LBConfig, []resolver.Address, error) {
256	clusterImplCfg := &clusterimpl.LBConfig{
257		Cluster:                 mechanism.Cluster,
258		EDSServiceName:          mechanism.EDSServiceName,
259		LoadReportingServerName: mechanism.LoadReportingServerName,
260		MaxConcurrentRequests:   mechanism.MaxConcurrentRequests,
261		DropCategories:          drops,
262		// ChildPolicy is not set. Will be set based on xdsLBPolicy
263	}
264
265	if xdsLBPolicy == nil || xdsLBPolicy.Name == rrName {
266		// If lb policy is ROUND_ROBIN:
267		// - locality-picking policy is weighted_target
268		// - endpoint-picking policy is round_robin
269		logger.Infof("xds lb policy is %q, building config with weighted_target + round_robin", rrName)
270		// Child of weighted_target is hardcoded to round_robin.
271		wtConfig, addrs := localitiesToWeightedTarget(localities, priorityName, rrBalancerConfig)
272		clusterImplCfg.ChildPolicy = &internalserviceconfig.BalancerConfig{Name: weightedtarget.Name, Config: wtConfig}
273		return clusterImplCfg, addrs, nil
274	}
275
276	if xdsLBPolicy.Name == rhName {
277		// If lb policy is RIHG_HASH, will build one ring_hash policy as child.
278		// The endpoints from all localities will be flattened to one addresses
279		// list, and the ring_hash policy will pick endpoints from it.
280		logger.Infof("xds lb policy is %q, building config with ring_hash", rhName)
281		addrs := localitiesToRingHash(localities, priorityName)
282		// Set child to ring_hash, note that the ring_hash config is from
283		// xdsLBPolicy.
284		clusterImplCfg.ChildPolicy = &internalserviceconfig.BalancerConfig{Name: ringhash.Name, Config: xdsLBPolicy.Config}
285		return clusterImplCfg, addrs, nil
286	}
287
288	return nil, nil, fmt.Errorf("unsupported xds LB policy %q, not one of {%q,%q}", xdsLBPolicy.Name, rrName, rhName)
289}
290
291// localitiesToRingHash takes a list of localities (with the same priority), and
292// generates a list of addresses.
293//
294// The addresses have path hierarchy set to [priority-name], so priority knows
295// which child policy they are for.
296func localitiesToRingHash(localities []xdsresource.Locality, priorityName string) []resolver.Address {
297	var addrs []resolver.Address
298	for _, locality := range localities {
299		var lw uint32 = 1
300		if locality.Weight != 0 {
301			lw = locality.Weight
302		}
303		localityStr, err := locality.ID.ToString()
304		if err != nil {
305			localityStr = fmt.Sprintf("%+v", locality.ID)
306		}
307		for _, endpoint := range locality.Endpoints {
308			// Filter out all "unhealthy" endpoints (unknown and healthy are
309			// both considered to be healthy:
310			// https://www.envoyproxy.io/docs/envoy/latest/api-v2/api/v2/core/health_check.proto#envoy-api-enum-core-healthstatus).
311			if endpoint.HealthStatus != xdsresource.EndpointHealthStatusHealthy && endpoint.HealthStatus != xdsresource.EndpointHealthStatusUnknown {
312				continue
313			}
314
315			var ew uint32 = 1
316			if endpoint.Weight != 0 {
317				ew = endpoint.Weight
318			}
319
320			// The weight of each endpoint is locality_weight * endpoint_weight.
321			ai := weightedroundrobin.AddrInfo{Weight: lw * ew}
322			addr := weightedroundrobin.SetAddrInfo(resolver.Address{Addr: endpoint.Address}, ai)
323			addr = hierarchy.Set(addr, []string{priorityName, localityStr})
324			addr = internal.SetLocalityID(addr, locality.ID)
325			addrs = append(addrs, addr)
326		}
327	}
328	return addrs
329}
330
331// localitiesToWeightedTarget takes a list of localities (with the same
332// priority), and generates a weighted target config, and list of addresses.
333//
334// The addresses have path hierarchy set to [priority-name, locality-name], so
335// priority and weighted target know which child policy they are for.
336func localitiesToWeightedTarget(localities []xdsresource.Locality, priorityName string, childPolicy *internalserviceconfig.BalancerConfig) (*weightedtarget.LBConfig, []resolver.Address) {
337	weightedTargets := make(map[string]weightedtarget.Target)
338	var addrs []resolver.Address
339	for _, locality := range localities {
340		localityStr, err := locality.ID.ToString()
341		if err != nil {
342			localityStr = fmt.Sprintf("%+v", locality.ID)
343		}
344		weightedTargets[localityStr] = weightedtarget.Target{Weight: locality.Weight, ChildPolicy: childPolicy}
345		for _, endpoint := range locality.Endpoints {
346			// Filter out all "unhealthy" endpoints (unknown and healthy are
347			// both considered to be healthy:
348			// https://www.envoyproxy.io/docs/envoy/latest/api-v2/api/v2/core/health_check.proto#envoy-api-enum-core-healthstatus).
349			if endpoint.HealthStatus != xdsresource.EndpointHealthStatusHealthy && endpoint.HealthStatus != xdsresource.EndpointHealthStatusUnknown {
350				continue
351			}
352
353			addr := resolver.Address{Addr: endpoint.Address}
354			if childPolicy.Name == weightedroundrobin.Name && endpoint.Weight != 0 {
355				ai := weightedroundrobin.AddrInfo{Weight: endpoint.Weight}
356				addr = weightedroundrobin.SetAddrInfo(addr, ai)
357			}
358			addr = hierarchy.Set(addr, []string{priorityName, localityStr})
359			addr = internal.SetLocalityID(addr, locality.ID)
360			addrs = append(addrs, addr)
361		}
362	}
363	return &weightedtarget.LBConfig{Targets: weightedTargets}, addrs
364}
365