1/*
2 *
3 * Copyright 2021 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package clusterresolver
20
21import (
22	"encoding/json"
23	"fmt"
24	"sort"
25
26	"google.golang.org/grpc/balancer/roundrobin"
27	"google.golang.org/grpc/balancer/weightedroundrobin"
28	"google.golang.org/grpc/internal/hierarchy"
29	internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
30	"google.golang.org/grpc/resolver"
31	"google.golang.org/grpc/xds/internal"
32	"google.golang.org/grpc/xds/internal/balancer/clusterimpl"
33	"google.golang.org/grpc/xds/internal/balancer/priority"
34	"google.golang.org/grpc/xds/internal/balancer/weightedtarget"
35	"google.golang.org/grpc/xds/internal/xdsclient"
36)
37
38const million = 1000000
39
40// priorityConfig is config for one priority. For example, if there an EDS and a
41// DNS, the priority list will be [priorityConfig{EDS}, priorityConfig{DNS}].
42//
43// Each priorityConfig corresponds to one discovery mechanism from the LBConfig
44// generated by the CDS balancer. The CDS balancer resolves the cluster name to
45// an ordered list of discovery mechanisms (if the top cluster is an aggregated
46// cluster), one for each underlying cluster.
47type priorityConfig struct {
48	mechanism DiscoveryMechanism
49	// edsResp is set only if type is EDS.
50	edsResp xdsclient.EndpointsUpdate
51	// addresses is set only if type is DNS.
52	addresses []string
53}
54
55// buildPriorityConfigJSON builds balancer config for the passed in
56// priorities.
57//
58// The built tree of balancers (see test for the output struct).
59//
60//                                   ┌────────┐
61//                                   │priority│
62//                                   └┬──────┬┘
63//                                    │      │
64//                        ┌───────────▼┐    ┌▼───────────┐
65//                        │cluster_impl│    │cluster_impl│
66//                        └─┬──────────┘    └──────────┬─┘
67//                          │                          │
68//           ┌──────────────▼─┐                      ┌─▼──────────────┐
69//           │locality_picking│                      │locality_picking│
70//           └┬──────────────┬┘                      └┬──────────────┬┘
71//            │              │                        │              │
72//          ┌─▼─┐          ┌─▼─┐                    ┌─▼─┐          ┌─▼─┐
73//          │LRS│          │LRS│                    │LRS│          │LRS│
74//          └─┬─┘          └─┬─┘                    └─┬─┘          └─┬─┘
75//            │              │                        │              │
76// ┌──────────▼─────┐  ┌─────▼──────────┐  ┌──────────▼─────┐  ┌─────▼──────────┐
77// │endpoint_picking│  │endpoint_picking│  │endpoint_picking│  │endpoint_picking│
78// └────────────────┘  └────────────────┘  └────────────────┘  └────────────────┘
79//
80// If endpointPickingPolicy is nil, roundrobin will be used.
81//
82// Custom locality picking policy isn't support, and weighted_target is always
83// used.
84//
85// TODO: support setting locality picking policy, and add a parameter for
86// locality picking policy.
87func buildPriorityConfigJSON(priorities []priorityConfig, endpointPickingPolicy *internalserviceconfig.BalancerConfig) ([]byte, []resolver.Address, error) {
88	pc, addrs := buildPriorityConfig(priorities, endpointPickingPolicy)
89	ret, err := json.Marshal(pc)
90	if err != nil {
91		return nil, nil, fmt.Errorf("failed to marshal built priority config struct into json: %v", err)
92	}
93	return ret, addrs, nil
94}
95
96func buildPriorityConfig(priorities []priorityConfig, endpointPickingPolicy *internalserviceconfig.BalancerConfig) (*priority.LBConfig, []resolver.Address) {
97	var (
98		retConfig = &priority.LBConfig{Children: make(map[string]*priority.Child)}
99		retAddrs  []resolver.Address
100	)
101	for i, p := range priorities {
102		switch p.mechanism.Type {
103		case DiscoveryMechanismTypeEDS:
104			names, configs, addrs := buildClusterImplConfigForEDS(i, p.edsResp, p.mechanism, endpointPickingPolicy)
105			retConfig.Priorities = append(retConfig.Priorities, names...)
106			for n, c := range configs {
107				retConfig.Children[n] = &priority.Child{
108					Config: &internalserviceconfig.BalancerConfig{Name: clusterimpl.Name, Config: c},
109					// Ignore all re-resolution from EDS children.
110					IgnoreReresolutionRequests: true,
111				}
112			}
113			retAddrs = append(retAddrs, addrs...)
114		case DiscoveryMechanismTypeLogicalDNS:
115			name, config, addrs := buildClusterImplConfigForDNS(i, p.addresses)
116			retConfig.Priorities = append(retConfig.Priorities, name)
117			retConfig.Children[name] = &priority.Child{
118				Config: &internalserviceconfig.BalancerConfig{Name: clusterimpl.Name, Config: config},
119				// Not ignore re-resolution from DNS children, they will trigger
120				// DNS to re-resolve.
121				IgnoreReresolutionRequests: false,
122			}
123			retAddrs = append(retAddrs, addrs...)
124		}
125	}
126	return retConfig, retAddrs
127}
128
129func buildClusterImplConfigForDNS(parentPriority int, addrStrs []string) (string, *clusterimpl.LBConfig, []resolver.Address) {
130	// Endpoint picking policy for DNS is hardcoded to pick_first.
131	const childPolicy = "pick_first"
132	var retAddrs []resolver.Address
133	pName := fmt.Sprintf("priority-%v", parentPriority)
134	for _, addrStr := range addrStrs {
135		retAddrs = append(retAddrs, hierarchy.Set(resolver.Address{Addr: addrStr}, []string{pName}))
136	}
137	return pName, &clusterimpl.LBConfig{ChildPolicy: &internalserviceconfig.BalancerConfig{Name: childPolicy}}, retAddrs
138}
139
140// buildClusterImplConfigForEDS returns a list of cluster_impl configs, one for
141// each priority, sorted by priority, and the addresses for each priority (with
142// hierarchy attributes set).
143//
144// For example, if there are two priorities, the returned values will be
145// - ["p0", "p1"]
146// - map{"p0":p0_config, "p1":p1_config}
147// - [p0_address_0, p0_address_1, p1_address_0, p1_address_1]
148//   - p0 addresses' hierarchy attributes are set to p0
149func buildClusterImplConfigForEDS(parentPriority int, edsResp xdsclient.EndpointsUpdate, mechanism DiscoveryMechanism, endpointPickingPolicy *internalserviceconfig.BalancerConfig) ([]string, map[string]*clusterimpl.LBConfig, []resolver.Address) {
150	var (
151		retNames   []string
152		retAddrs   []resolver.Address
153		retConfigs = make(map[string]*clusterimpl.LBConfig)
154	)
155
156	if endpointPickingPolicy == nil {
157		endpointPickingPolicy = &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}
158	}
159
160	drops := make([]clusterimpl.DropConfig, 0, len(edsResp.Drops))
161	for _, d := range edsResp.Drops {
162		drops = append(drops, clusterimpl.DropConfig{
163			Category:           d.Category,
164			RequestsPerMillion: d.Numerator * million / d.Denominator,
165		})
166	}
167
168	priorityChildNames, priorities := groupLocalitiesByPriority(edsResp.Localities)
169	for _, priorityName := range priorityChildNames {
170		priorityLocalities := priorities[priorityName]
171		// Prepend parent priority to the priority names, to avoid duplicates.
172		pName := fmt.Sprintf("priority-%v-%v", parentPriority, priorityName)
173		retNames = append(retNames, pName)
174		wtConfig, addrs := localitiesToWeightedTarget(priorityLocalities, pName, endpointPickingPolicy, mechanism.Cluster, mechanism.EDSServiceName)
175		retConfigs[pName] = &clusterimpl.LBConfig{
176			Cluster:                 mechanism.Cluster,
177			EDSServiceName:          mechanism.EDSServiceName,
178			ChildPolicy:             &internalserviceconfig.BalancerConfig{Name: weightedtarget.Name, Config: wtConfig},
179			LoadReportingServerName: mechanism.LoadReportingServerName,
180			MaxConcurrentRequests:   mechanism.MaxConcurrentRequests,
181			DropCategories:          drops,
182		}
183		retAddrs = append(retAddrs, addrs...)
184	}
185
186	return retNames, retConfigs, retAddrs
187}
188
189// groupLocalitiesByPriority returns the localities grouped by priority.
190//
191// It also returns a list of strings where each string represents a priority,
192// and the list is sorted from higher priority to lower priority.
193//
194// For example, for L0-p0, L1-p0, L2-p1, results will be
195// - ["p0", "p1"]
196// - map{"p0":[L0, L1], "p1":[L2]}
197func groupLocalitiesByPriority(localities []xdsclient.Locality) ([]string, map[string][]xdsclient.Locality) {
198	var priorityIntSlice []int
199	priorities := make(map[string][]xdsclient.Locality)
200	for _, locality := range localities {
201		if locality.Weight == 0 {
202			continue
203		}
204		priorityName := fmt.Sprintf("%v", locality.Priority)
205		priorities[priorityName] = append(priorities[priorityName], locality)
206		priorityIntSlice = append(priorityIntSlice, int(locality.Priority))
207	}
208	// Sort the priorities based on the int value, deduplicate, and then turn
209	// the sorted list into a string list. This will be child names, in priority
210	// order.
211	sort.Ints(priorityIntSlice)
212	priorityIntSliceDeduped := dedupSortedIntSlice(priorityIntSlice)
213	priorityNameSlice := make([]string, 0, len(priorityIntSliceDeduped))
214	for _, p := range priorityIntSliceDeduped {
215		priorityNameSlice = append(priorityNameSlice, fmt.Sprintf("%v", p))
216	}
217	return priorityNameSlice, priorities
218}
219
220func dedupSortedIntSlice(a []int) []int {
221	if len(a) == 0 {
222		return a
223	}
224	i, j := 0, 1
225	for ; j < len(a); j++ {
226		if a[i] == a[j] {
227			continue
228		}
229		i++
230		if i != j {
231			a[i] = a[j]
232		}
233	}
234	return a[:i+1]
235}
236
237// localitiesToWeightedTarget takes a list of localities (with the same
238// priority), and generates a weighted target config, and list of addresses.
239//
240// The addresses have path hierarchy set to [priority-name, locality-name], so
241// priority and weighted target know which child policy they are for.
242func localitiesToWeightedTarget(localities []xdsclient.Locality, priorityName string, childPolicy *internalserviceconfig.BalancerConfig, cluster, edsService string) (*weightedtarget.LBConfig, []resolver.Address) {
243	weightedTargets := make(map[string]weightedtarget.Target)
244	var addrs []resolver.Address
245	for _, locality := range localities {
246		localityStr, err := locality.ID.ToString()
247		if err != nil {
248			localityStr = fmt.Sprintf("%+v", locality.ID)
249		}
250		weightedTargets[localityStr] = weightedtarget.Target{Weight: locality.Weight, ChildPolicy: childPolicy}
251		for _, endpoint := range locality.Endpoints {
252			// Filter out all "unhealthy" endpoints (unknown and healthy are
253			// both considered to be healthy:
254			// https://www.envoyproxy.io/docs/envoy/latest/api-v2/api/v2/core/health_check.proto#envoy-api-enum-core-healthstatus).
255			if endpoint.HealthStatus != xdsclient.EndpointHealthStatusHealthy && endpoint.HealthStatus != xdsclient.EndpointHealthStatusUnknown {
256				continue
257			}
258
259			addr := resolver.Address{Addr: endpoint.Address}
260			if childPolicy.Name == weightedroundrobin.Name && endpoint.Weight != 0 {
261				ai := weightedroundrobin.AddrInfo{Weight: endpoint.Weight}
262				addr = weightedroundrobin.SetAddrInfo(addr, ai)
263			}
264			addr = hierarchy.Set(addr, []string{priorityName, localityStr})
265			addr = internal.SetLocalityID(addr, locality.ID)
266			addrs = append(addrs, addr)
267		}
268	}
269	return &weightedtarget.LBConfig{Targets: weightedTargets}, addrs
270}
271