1/* 2 * 3 * Copyright 2021 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package clusterresolver 20 21import ( 22 "encoding/json" 23 "fmt" 24 "sort" 25 26 "google.golang.org/grpc/balancer/roundrobin" 27 "google.golang.org/grpc/balancer/weightedroundrobin" 28 "google.golang.org/grpc/balancer/weightedtarget" 29 "google.golang.org/grpc/internal/hierarchy" 30 internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" 31 "google.golang.org/grpc/resolver" 32 "google.golang.org/grpc/xds/internal" 33 "google.golang.org/grpc/xds/internal/balancer/clusterimpl" 34 "google.golang.org/grpc/xds/internal/balancer/priority" 35 "google.golang.org/grpc/xds/internal/balancer/ringhash" 36 "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" 37) 38 39const million = 1000000 40 41// priorityConfig is config for one priority. For example, if there an EDS and a 42// DNS, the priority list will be [priorityConfig{EDS}, priorityConfig{DNS}]. 43// 44// Each priorityConfig corresponds to one discovery mechanism from the LBConfig 45// generated by the CDS balancer. The CDS balancer resolves the cluster name to 46// an ordered list of discovery mechanisms (if the top cluster is an aggregated 47// cluster), one for each underlying cluster. 48type priorityConfig struct { 49 mechanism DiscoveryMechanism 50 // edsResp is set only if type is EDS. 51 edsResp xdsresource.EndpointsUpdate 52 // addresses is set only if type is DNS. 53 addresses []string 54} 55 56// buildPriorityConfigJSON builds balancer config for the passed in 57// priorities. 58// 59// The built tree of balancers (see test for the output struct). 60// 61// If xds lb policy is ROUND_ROBIN, the children will be weighted_target for 62// locality picking, and round_robin for endpoint picking. 63// 64// ┌────────┐ 65// │priority│ 66// └┬──────┬┘ 67// │ │ 68// ┌───────────▼┐ ┌▼───────────┐ 69// │cluster_impl│ │cluster_impl│ 70// └─┬──────────┘ └──────────┬─┘ 71// │ │ 72// ┌──────────────▼─┐ ┌─▼──────────────┐ 73// │locality_picking│ │locality_picking│ 74// └┬──────────────┬┘ └┬──────────────┬┘ 75// │ │ │ │ 76// ┌─▼─┐ ┌─▼─┐ ┌─▼─┐ ┌─▼─┐ 77// │LRS│ │LRS│ │LRS│ │LRS│ 78// └─┬─┘ └─┬─┘ └─┬─┘ └─┬─┘ 79// │ │ │ │ 80// ┌──────────▼─────┐ ┌─────▼──────────┐ ┌──────────▼─────┐ ┌─────▼──────────┐ 81// │endpoint_picking│ │endpoint_picking│ │endpoint_picking│ │endpoint_picking│ 82// └────────────────┘ └────────────────┘ └────────────────┘ └────────────────┘ 83// 84// If xds lb policy is RING_HASH, the children will be just a ring_hash policy. 85// The endpoints from all localities will be flattened to one addresses list, 86// and the ring_hash policy will pick endpoints from it. 87// 88// ┌────────┐ 89// │priority│ 90// └┬──────┬┘ 91// │ │ 92// ┌──────────▼─┐ ┌─▼──────────┐ 93// │cluster_impl│ │cluster_impl│ 94// └──────┬─────┘ └─────┬──────┘ 95// │ │ 96// ┌──────▼─────┐ ┌─────▼──────┐ 97// │ ring_hash │ │ ring_hash │ 98// └────────────┘ └────────────┘ 99// 100// If endpointPickingPolicy is nil, roundrobin will be used. 101// 102// Custom locality picking policy isn't support, and weighted_target is always 103// used. 104func buildPriorityConfigJSON(priorities []priorityConfig, xdsLBPolicy *internalserviceconfig.BalancerConfig) ([]byte, []resolver.Address, error) { 105 pc, addrs, err := buildPriorityConfig(priorities, xdsLBPolicy) 106 if err != nil { 107 return nil, nil, fmt.Errorf("failed to build priority config: %v", err) 108 } 109 ret, err := json.Marshal(pc) 110 if err != nil { 111 return nil, nil, fmt.Errorf("failed to marshal built priority config struct into json: %v", err) 112 } 113 return ret, addrs, nil 114} 115 116func buildPriorityConfig(priorities []priorityConfig, xdsLBPolicy *internalserviceconfig.BalancerConfig) (*priority.LBConfig, []resolver.Address, error) { 117 var ( 118 retConfig = &priority.LBConfig{Children: make(map[string]*priority.Child)} 119 retAddrs []resolver.Address 120 ) 121 for i, p := range priorities { 122 switch p.mechanism.Type { 123 case DiscoveryMechanismTypeEDS: 124 names, configs, addrs, err := buildClusterImplConfigForEDS(i, p.edsResp, p.mechanism, xdsLBPolicy) 125 if err != nil { 126 return nil, nil, err 127 } 128 retConfig.Priorities = append(retConfig.Priorities, names...) 129 for n, c := range configs { 130 retConfig.Children[n] = &priority.Child{ 131 Config: &internalserviceconfig.BalancerConfig{Name: clusterimpl.Name, Config: c}, 132 // Ignore all re-resolution from EDS children. 133 IgnoreReresolutionRequests: true, 134 } 135 } 136 retAddrs = append(retAddrs, addrs...) 137 case DiscoveryMechanismTypeLogicalDNS: 138 name, config, addrs := buildClusterImplConfigForDNS(i, p.addresses) 139 retConfig.Priorities = append(retConfig.Priorities, name) 140 retConfig.Children[name] = &priority.Child{ 141 Config: &internalserviceconfig.BalancerConfig{Name: clusterimpl.Name, Config: config}, 142 // Not ignore re-resolution from DNS children, they will trigger 143 // DNS to re-resolve. 144 IgnoreReresolutionRequests: false, 145 } 146 retAddrs = append(retAddrs, addrs...) 147 } 148 } 149 return retConfig, retAddrs, nil 150} 151 152func buildClusterImplConfigForDNS(parentPriority int, addrStrs []string) (string, *clusterimpl.LBConfig, []resolver.Address) { 153 // Endpoint picking policy for DNS is hardcoded to pick_first. 154 const childPolicy = "pick_first" 155 retAddrs := make([]resolver.Address, 0, len(addrStrs)) 156 pName := fmt.Sprintf("priority-%v", parentPriority) 157 for _, addrStr := range addrStrs { 158 retAddrs = append(retAddrs, hierarchy.Set(resolver.Address{Addr: addrStr}, []string{pName})) 159 } 160 return pName, &clusterimpl.LBConfig{ChildPolicy: &internalserviceconfig.BalancerConfig{Name: childPolicy}}, retAddrs 161} 162 163// buildClusterImplConfigForEDS returns a list of cluster_impl configs, one for 164// each priority, sorted by priority, and the addresses for each priority (with 165// hierarchy attributes set). 166// 167// For example, if there are two priorities, the returned values will be 168// - ["p0", "p1"] 169// - map{"p0":p0_config, "p1":p1_config} 170// - [p0_address_0, p0_address_1, p1_address_0, p1_address_1] 171// - p0 addresses' hierarchy attributes are set to p0 172func buildClusterImplConfigForEDS(parentPriority int, edsResp xdsresource.EndpointsUpdate, mechanism DiscoveryMechanism, xdsLBPolicy *internalserviceconfig.BalancerConfig) ([]string, map[string]*clusterimpl.LBConfig, []resolver.Address, error) { 173 drops := make([]clusterimpl.DropConfig, 0, len(edsResp.Drops)) 174 for _, d := range edsResp.Drops { 175 drops = append(drops, clusterimpl.DropConfig{ 176 Category: d.Category, 177 RequestsPerMillion: d.Numerator * million / d.Denominator, 178 }) 179 } 180 181 priorityChildNames, priorities := groupLocalitiesByPriority(edsResp.Localities) 182 retNames := make([]string, 0, len(priorityChildNames)) 183 retAddrs := make([]resolver.Address, 0, len(priorityChildNames)) 184 retConfigs := make(map[string]*clusterimpl.LBConfig, len(priorityChildNames)) 185 for _, priorityName := range priorityChildNames { 186 priorityLocalities := priorities[priorityName] 187 // Prepend parent priority to the priority names, to avoid duplicates. 188 pName := fmt.Sprintf("priority-%v-%v", parentPriority, priorityName) 189 retNames = append(retNames, pName) 190 cfg, addrs, err := priorityLocalitiesToClusterImpl(priorityLocalities, pName, mechanism, drops, xdsLBPolicy) 191 if err != nil { 192 return nil, nil, nil, err 193 } 194 retConfigs[pName] = cfg 195 retAddrs = append(retAddrs, addrs...) 196 } 197 return retNames, retConfigs, retAddrs, nil 198} 199 200// groupLocalitiesByPriority returns the localities grouped by priority. 201// 202// It also returns a list of strings where each string represents a priority, 203// and the list is sorted from higher priority to lower priority. 204// 205// For example, for L0-p0, L1-p0, L2-p1, results will be 206// - ["p0", "p1"] 207// - map{"p0":[L0, L1], "p1":[L2]} 208func groupLocalitiesByPriority(localities []xdsresource.Locality) ([]string, map[string][]xdsresource.Locality) { 209 var priorityIntSlice []int 210 priorities := make(map[string][]xdsresource.Locality) 211 for _, locality := range localities { 212 if locality.Weight == 0 { 213 continue 214 } 215 priorityName := fmt.Sprintf("%v", locality.Priority) 216 priorities[priorityName] = append(priorities[priorityName], locality) 217 priorityIntSlice = append(priorityIntSlice, int(locality.Priority)) 218 } 219 // Sort the priorities based on the int value, deduplicate, and then turn 220 // the sorted list into a string list. This will be child names, in priority 221 // order. 222 sort.Ints(priorityIntSlice) 223 priorityIntSliceDeduped := dedupSortedIntSlice(priorityIntSlice) 224 priorityNameSlice := make([]string, 0, len(priorityIntSliceDeduped)) 225 for _, p := range priorityIntSliceDeduped { 226 priorityNameSlice = append(priorityNameSlice, fmt.Sprintf("%v", p)) 227 } 228 return priorityNameSlice, priorities 229} 230 231func dedupSortedIntSlice(a []int) []int { 232 if len(a) == 0 { 233 return a 234 } 235 i, j := 0, 1 236 for ; j < len(a); j++ { 237 if a[i] == a[j] { 238 continue 239 } 240 i++ 241 if i != j { 242 a[i] = a[j] 243 } 244 } 245 return a[:i+1] 246} 247 248// rrBalancerConfig is a const roundrobin config, used as child of 249// weighted-roundrobin. To avoid allocating memory everytime. 250var rrBalancerConfig = &internalserviceconfig.BalancerConfig{Name: roundrobin.Name} 251 252// priorityLocalitiesToClusterImpl takes a list of localities (with the same 253// priority), and generates a cluster impl policy config, and a list of 254// addresses. 255func priorityLocalitiesToClusterImpl(localities []xdsresource.Locality, priorityName string, mechanism DiscoveryMechanism, drops []clusterimpl.DropConfig, xdsLBPolicy *internalserviceconfig.BalancerConfig) (*clusterimpl.LBConfig, []resolver.Address, error) { 256 clusterImplCfg := &clusterimpl.LBConfig{ 257 Cluster: mechanism.Cluster, 258 EDSServiceName: mechanism.EDSServiceName, 259 LoadReportingServerName: mechanism.LoadReportingServerName, 260 MaxConcurrentRequests: mechanism.MaxConcurrentRequests, 261 DropCategories: drops, 262 // ChildPolicy is not set. Will be set based on xdsLBPolicy 263 } 264 265 if xdsLBPolicy == nil || xdsLBPolicy.Name == rrName { 266 // If lb policy is ROUND_ROBIN: 267 // - locality-picking policy is weighted_target 268 // - endpoint-picking policy is round_robin 269 logger.Infof("xds lb policy is %q, building config with weighted_target + round_robin", rrName) 270 // Child of weighted_target is hardcoded to round_robin. 271 wtConfig, addrs := localitiesToWeightedTarget(localities, priorityName, rrBalancerConfig) 272 clusterImplCfg.ChildPolicy = &internalserviceconfig.BalancerConfig{Name: weightedtarget.Name, Config: wtConfig} 273 return clusterImplCfg, addrs, nil 274 } 275 276 if xdsLBPolicy.Name == rhName { 277 // If lb policy is RIHG_HASH, will build one ring_hash policy as child. 278 // The endpoints from all localities will be flattened to one addresses 279 // list, and the ring_hash policy will pick endpoints from it. 280 logger.Infof("xds lb policy is %q, building config with ring_hash", rhName) 281 addrs := localitiesToRingHash(localities, priorityName) 282 // Set child to ring_hash, note that the ring_hash config is from 283 // xdsLBPolicy. 284 clusterImplCfg.ChildPolicy = &internalserviceconfig.BalancerConfig{Name: ringhash.Name, Config: xdsLBPolicy.Config} 285 return clusterImplCfg, addrs, nil 286 } 287 288 return nil, nil, fmt.Errorf("unsupported xds LB policy %q, not one of {%q,%q}", xdsLBPolicy.Name, rrName, rhName) 289} 290 291// localitiesToRingHash takes a list of localities (with the same priority), and 292// generates a list of addresses. 293// 294// The addresses have path hierarchy set to [priority-name], so priority knows 295// which child policy they are for. 296func localitiesToRingHash(localities []xdsresource.Locality, priorityName string) []resolver.Address { 297 var addrs []resolver.Address 298 for _, locality := range localities { 299 var lw uint32 = 1 300 if locality.Weight != 0 { 301 lw = locality.Weight 302 } 303 localityStr, err := locality.ID.ToString() 304 if err != nil { 305 localityStr = fmt.Sprintf("%+v", locality.ID) 306 } 307 for _, endpoint := range locality.Endpoints { 308 // Filter out all "unhealthy" endpoints (unknown and healthy are 309 // both considered to be healthy: 310 // https://www.envoyproxy.io/docs/envoy/latest/api-v2/api/v2/core/health_check.proto#envoy-api-enum-core-healthstatus). 311 if endpoint.HealthStatus != xdsresource.EndpointHealthStatusHealthy && endpoint.HealthStatus != xdsresource.EndpointHealthStatusUnknown { 312 continue 313 } 314 315 var ew uint32 = 1 316 if endpoint.Weight != 0 { 317 ew = endpoint.Weight 318 } 319 320 // The weight of each endpoint is locality_weight * endpoint_weight. 321 ai := weightedroundrobin.AddrInfo{Weight: lw * ew} 322 addr := weightedroundrobin.SetAddrInfo(resolver.Address{Addr: endpoint.Address}, ai) 323 addr = hierarchy.Set(addr, []string{priorityName, localityStr}) 324 addr = internal.SetLocalityID(addr, locality.ID) 325 addrs = append(addrs, addr) 326 } 327 } 328 return addrs 329} 330 331// localitiesToWeightedTarget takes a list of localities (with the same 332// priority), and generates a weighted target config, and list of addresses. 333// 334// The addresses have path hierarchy set to [priority-name, locality-name], so 335// priority and weighted target know which child policy they are for. 336func localitiesToWeightedTarget(localities []xdsresource.Locality, priorityName string, childPolicy *internalserviceconfig.BalancerConfig) (*weightedtarget.LBConfig, []resolver.Address) { 337 weightedTargets := make(map[string]weightedtarget.Target) 338 var addrs []resolver.Address 339 for _, locality := range localities { 340 localityStr, err := locality.ID.ToString() 341 if err != nil { 342 localityStr = fmt.Sprintf("%+v", locality.ID) 343 } 344 weightedTargets[localityStr] = weightedtarget.Target{Weight: locality.Weight, ChildPolicy: childPolicy} 345 for _, endpoint := range locality.Endpoints { 346 // Filter out all "unhealthy" endpoints (unknown and healthy are 347 // both considered to be healthy: 348 // https://www.envoyproxy.io/docs/envoy/latest/api-v2/api/v2/core/health_check.proto#envoy-api-enum-core-healthstatus). 349 if endpoint.HealthStatus != xdsresource.EndpointHealthStatusHealthy && endpoint.HealthStatus != xdsresource.EndpointHealthStatusUnknown { 350 continue 351 } 352 353 addr := resolver.Address{Addr: endpoint.Address} 354 if childPolicy.Name == weightedroundrobin.Name && endpoint.Weight != 0 { 355 ai := weightedroundrobin.AddrInfo{Weight: endpoint.Weight} 356 addr = weightedroundrobin.SetAddrInfo(addr, ai) 357 } 358 addr = hierarchy.Set(addr, []string{priorityName, localityStr}) 359 addr = internal.SetLocalityID(addr, locality.ID) 360 addrs = append(addrs, addr) 361 } 362 } 363 return &weightedtarget.LBConfig{Targets: weightedTargets}, addrs 364} 365