1/* 2 * 3 * Copyright 2021 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package clusterresolver 20 21import ( 22 "encoding/json" 23 "fmt" 24 "sort" 25 26 "google.golang.org/grpc/balancer/roundrobin" 27 "google.golang.org/grpc/balancer/weightedroundrobin" 28 "google.golang.org/grpc/internal/hierarchy" 29 internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" 30 "google.golang.org/grpc/resolver" 31 "google.golang.org/grpc/xds/internal" 32 "google.golang.org/grpc/xds/internal/balancer/clusterimpl" 33 "google.golang.org/grpc/xds/internal/balancer/priority" 34 "google.golang.org/grpc/xds/internal/balancer/weightedtarget" 35 "google.golang.org/grpc/xds/internal/xdsclient" 36) 37 38const million = 1000000 39 40// priorityConfig is config for one priority. For example, if there an EDS and a 41// DNS, the priority list will be [priorityConfig{EDS}, priorityConfig{DNS}]. 42// 43// Each priorityConfig corresponds to one discovery mechanism from the LBConfig 44// generated by the CDS balancer. The CDS balancer resolves the cluster name to 45// an ordered list of discovery mechanisms (if the top cluster is an aggregated 46// cluster), one for each underlying cluster. 47type priorityConfig struct { 48 mechanism DiscoveryMechanism 49 // edsResp is set only if type is EDS. 50 edsResp xdsclient.EndpointsUpdate 51 // addresses is set only if type is DNS. 52 addresses []string 53} 54 55// buildPriorityConfigJSON builds balancer config for the passed in 56// priorities. 57// 58// The built tree of balancers (see test for the output struct). 59// 60// ┌────────┐ 61// │priority│ 62// └┬──────┬┘ 63// │ │ 64// ┌───────────▼┐ ┌▼───────────┐ 65// │cluster_impl│ │cluster_impl│ 66// └─┬──────────┘ └──────────┬─┘ 67// │ │ 68// ┌──────────────▼─┐ ┌─▼──────────────┐ 69// │locality_picking│ │locality_picking│ 70// └┬──────────────┬┘ └┬──────────────┬┘ 71// │ │ │ │ 72// ┌─▼─┐ ┌─▼─┐ ┌─▼─┐ ┌─▼─┐ 73// │LRS│ │LRS│ │LRS│ │LRS│ 74// └─┬─┘ └─┬─┘ └─┬─┘ └─┬─┘ 75// │ │ │ │ 76// ┌──────────▼─────┐ ┌─────▼──────────┐ ┌──────────▼─────┐ ┌─────▼──────────┐ 77// │endpoint_picking│ │endpoint_picking│ │endpoint_picking│ │endpoint_picking│ 78// └────────────────┘ └────────────────┘ └────────────────┘ └────────────────┘ 79// 80// If endpointPickingPolicy is nil, roundrobin will be used. 81// 82// Custom locality picking policy isn't support, and weighted_target is always 83// used. 84// 85// TODO: support setting locality picking policy, and add a parameter for 86// locality picking policy. 87func buildPriorityConfigJSON(priorities []priorityConfig, endpointPickingPolicy *internalserviceconfig.BalancerConfig) ([]byte, []resolver.Address, error) { 88 pc, addrs := buildPriorityConfig(priorities, endpointPickingPolicy) 89 ret, err := json.Marshal(pc) 90 if err != nil { 91 return nil, nil, fmt.Errorf("failed to marshal built priority config struct into json: %v", err) 92 } 93 return ret, addrs, nil 94} 95 96func buildPriorityConfig(priorities []priorityConfig, endpointPickingPolicy *internalserviceconfig.BalancerConfig) (*priority.LBConfig, []resolver.Address) { 97 var ( 98 retConfig = &priority.LBConfig{Children: make(map[string]*priority.Child)} 99 retAddrs []resolver.Address 100 ) 101 for i, p := range priorities { 102 switch p.mechanism.Type { 103 case DiscoveryMechanismTypeEDS: 104 names, configs, addrs := buildClusterImplConfigForEDS(i, p.edsResp, p.mechanism, endpointPickingPolicy) 105 retConfig.Priorities = append(retConfig.Priorities, names...) 106 for n, c := range configs { 107 retConfig.Children[n] = &priority.Child{ 108 Config: &internalserviceconfig.BalancerConfig{Name: clusterimpl.Name, Config: c}, 109 // Ignore all re-resolution from EDS children. 110 IgnoreReresolutionRequests: true, 111 } 112 } 113 retAddrs = append(retAddrs, addrs...) 114 case DiscoveryMechanismTypeLogicalDNS: 115 name, config, addrs := buildClusterImplConfigForDNS(i, p.addresses) 116 retConfig.Priorities = append(retConfig.Priorities, name) 117 retConfig.Children[name] = &priority.Child{ 118 Config: &internalserviceconfig.BalancerConfig{Name: clusterimpl.Name, Config: config}, 119 // Not ignore re-resolution from DNS children, they will trigger 120 // DNS to re-resolve. 121 IgnoreReresolutionRequests: false, 122 } 123 retAddrs = append(retAddrs, addrs...) 124 } 125 } 126 return retConfig, retAddrs 127} 128 129func buildClusterImplConfigForDNS(parentPriority int, addrStrs []string) (string, *clusterimpl.LBConfig, []resolver.Address) { 130 // Endpoint picking policy for DNS is hardcoded to pick_first. 131 const childPolicy = "pick_first" 132 var retAddrs []resolver.Address 133 pName := fmt.Sprintf("priority-%v", parentPriority) 134 for _, addrStr := range addrStrs { 135 retAddrs = append(retAddrs, hierarchy.Set(resolver.Address{Addr: addrStr}, []string{pName})) 136 } 137 return pName, &clusterimpl.LBConfig{ChildPolicy: &internalserviceconfig.BalancerConfig{Name: childPolicy}}, retAddrs 138} 139 140// buildClusterImplConfigForEDS returns a list of cluster_impl configs, one for 141// each priority, sorted by priority, and the addresses for each priority (with 142// hierarchy attributes set). 143// 144// For example, if there are two priorities, the returned values will be 145// - ["p0", "p1"] 146// - map{"p0":p0_config, "p1":p1_config} 147// - [p0_address_0, p0_address_1, p1_address_0, p1_address_1] 148// - p0 addresses' hierarchy attributes are set to p0 149func buildClusterImplConfigForEDS(parentPriority int, edsResp xdsclient.EndpointsUpdate, mechanism DiscoveryMechanism, endpointPickingPolicy *internalserviceconfig.BalancerConfig) ([]string, map[string]*clusterimpl.LBConfig, []resolver.Address) { 150 var ( 151 retNames []string 152 retAddrs []resolver.Address 153 retConfigs = make(map[string]*clusterimpl.LBConfig) 154 ) 155 156 if endpointPickingPolicy == nil { 157 endpointPickingPolicy = &internalserviceconfig.BalancerConfig{Name: roundrobin.Name} 158 } 159 160 drops := make([]clusterimpl.DropConfig, 0, len(edsResp.Drops)) 161 for _, d := range edsResp.Drops { 162 drops = append(drops, clusterimpl.DropConfig{ 163 Category: d.Category, 164 RequestsPerMillion: d.Numerator * million / d.Denominator, 165 }) 166 } 167 168 priorityChildNames, priorities := groupLocalitiesByPriority(edsResp.Localities) 169 for _, priorityName := range priorityChildNames { 170 priorityLocalities := priorities[priorityName] 171 // Prepend parent priority to the priority names, to avoid duplicates. 172 pName := fmt.Sprintf("priority-%v-%v", parentPriority, priorityName) 173 retNames = append(retNames, pName) 174 wtConfig, addrs := localitiesToWeightedTarget(priorityLocalities, pName, endpointPickingPolicy, mechanism.Cluster, mechanism.EDSServiceName) 175 retConfigs[pName] = &clusterimpl.LBConfig{ 176 Cluster: mechanism.Cluster, 177 EDSServiceName: mechanism.EDSServiceName, 178 ChildPolicy: &internalserviceconfig.BalancerConfig{Name: weightedtarget.Name, Config: wtConfig}, 179 LoadReportingServerName: mechanism.LoadReportingServerName, 180 MaxConcurrentRequests: mechanism.MaxConcurrentRequests, 181 DropCategories: drops, 182 } 183 retAddrs = append(retAddrs, addrs...) 184 } 185 186 return retNames, retConfigs, retAddrs 187} 188 189// groupLocalitiesByPriority returns the localities grouped by priority. 190// 191// It also returns a list of strings where each string represents a priority, 192// and the list is sorted from higher priority to lower priority. 193// 194// For example, for L0-p0, L1-p0, L2-p1, results will be 195// - ["p0", "p1"] 196// - map{"p0":[L0, L1], "p1":[L2]} 197func groupLocalitiesByPriority(localities []xdsclient.Locality) ([]string, map[string][]xdsclient.Locality) { 198 var priorityIntSlice []int 199 priorities := make(map[string][]xdsclient.Locality) 200 for _, locality := range localities { 201 if locality.Weight == 0 { 202 continue 203 } 204 priorityName := fmt.Sprintf("%v", locality.Priority) 205 priorities[priorityName] = append(priorities[priorityName], locality) 206 priorityIntSlice = append(priorityIntSlice, int(locality.Priority)) 207 } 208 // Sort the priorities based on the int value, deduplicate, and then turn 209 // the sorted list into a string list. This will be child names, in priority 210 // order. 211 sort.Ints(priorityIntSlice) 212 priorityIntSliceDeduped := dedupSortedIntSlice(priorityIntSlice) 213 priorityNameSlice := make([]string, 0, len(priorityIntSliceDeduped)) 214 for _, p := range priorityIntSliceDeduped { 215 priorityNameSlice = append(priorityNameSlice, fmt.Sprintf("%v", p)) 216 } 217 return priorityNameSlice, priorities 218} 219 220func dedupSortedIntSlice(a []int) []int { 221 if len(a) == 0 { 222 return a 223 } 224 i, j := 0, 1 225 for ; j < len(a); j++ { 226 if a[i] == a[j] { 227 continue 228 } 229 i++ 230 if i != j { 231 a[i] = a[j] 232 } 233 } 234 return a[:i+1] 235} 236 237// localitiesToWeightedTarget takes a list of localities (with the same 238// priority), and generates a weighted target config, and list of addresses. 239// 240// The addresses have path hierarchy set to [priority-name, locality-name], so 241// priority and weighted target know which child policy they are for. 242func localitiesToWeightedTarget(localities []xdsclient.Locality, priorityName string, childPolicy *internalserviceconfig.BalancerConfig, cluster, edsService string) (*weightedtarget.LBConfig, []resolver.Address) { 243 weightedTargets := make(map[string]weightedtarget.Target) 244 var addrs []resolver.Address 245 for _, locality := range localities { 246 localityStr, err := locality.ID.ToString() 247 if err != nil { 248 localityStr = fmt.Sprintf("%+v", locality.ID) 249 } 250 weightedTargets[localityStr] = weightedtarget.Target{Weight: locality.Weight, ChildPolicy: childPolicy} 251 for _, endpoint := range locality.Endpoints { 252 // Filter out all "unhealthy" endpoints (unknown and healthy are 253 // both considered to be healthy: 254 // https://www.envoyproxy.io/docs/envoy/latest/api-v2/api/v2/core/health_check.proto#envoy-api-enum-core-healthstatus). 255 if endpoint.HealthStatus != xdsclient.EndpointHealthStatusHealthy && endpoint.HealthStatus != xdsclient.EndpointHealthStatusUnknown { 256 continue 257 } 258 259 addr := resolver.Address{Addr: endpoint.Address} 260 if childPolicy.Name == weightedroundrobin.Name && endpoint.Weight != 0 { 261 ai := weightedroundrobin.AddrInfo{Weight: endpoint.Weight} 262 addr = weightedroundrobin.SetAddrInfo(addr, ai) 263 } 264 addr = hierarchy.Set(addr, []string{priorityName, localityStr}) 265 addr = internal.SetLocalityID(addr, locality.ID) 266 addrs = append(addrs, addr) 267 } 268 } 269 return &weightedtarget.LBConfig{Targets: weightedTargets}, addrs 270} 271