1package xds 2 3import ( 4 "errors" 5 "fmt" 6 7 envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" 8 envoy_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" 9 envoy_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" 10 11 "github.com/golang/protobuf/proto" 12 bexpr "github.com/hashicorp/go-bexpr" 13 14 "github.com/hashicorp/consul/agent/connect" 15 "github.com/hashicorp/consul/agent/proxycfg" 16 "github.com/hashicorp/consul/agent/structs" 17 "github.com/hashicorp/consul/api" 18) 19 20const ( 21 UnnamedSubset = "" 22) 23 24// endpointsFromSnapshot returns the xDS API representation of the "endpoints" 25func (s *ResourceGenerator) endpointsFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) { 26 if cfgSnap == nil { 27 return nil, errors.New("nil config given") 28 } 29 30 switch cfgSnap.Kind { 31 case structs.ServiceKindConnectProxy: 32 return s.endpointsFromSnapshotConnectProxy(cfgSnap) 33 case structs.ServiceKindTerminatingGateway: 34 return s.endpointsFromSnapshotTerminatingGateway(cfgSnap) 35 case structs.ServiceKindMeshGateway: 36 return s.endpointsFromSnapshotMeshGateway(cfgSnap) 37 case structs.ServiceKindIngressGateway: 38 return s.endpointsFromSnapshotIngressGateway(cfgSnap) 39 default: 40 return nil, fmt.Errorf("Invalid service kind: %v", cfgSnap.Kind) 41 } 42} 43 44// endpointsFromSnapshotConnectProxy returns the xDS API representation of the "endpoints" 45// (upstream instances) in the snapshot. 46func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) { 47 resources := make([]proto.Message, 0, 48 len(cfgSnap.ConnectProxy.PreparedQueryEndpoints)+len(cfgSnap.ConnectProxy.WatchedUpstreamEndpoints)) 49 50 for id, chain := range cfgSnap.ConnectProxy.DiscoveryChain { 51 es := s.endpointsFromDiscoveryChain( 52 id, 53 chain, 54 cfgSnap.Datacenter, 55 cfgSnap.ConnectProxy.UpstreamConfig[id], 56 cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[id], 57 cfgSnap.ConnectProxy.WatchedGatewayEndpoints[id], 58 ) 59 resources = append(resources, es...) 60 } 61 62 // Looping over explicit upstreams is only needed for prepared queries because they do not have discovery chains 63 for _, u := range cfgSnap.Proxy.Upstreams { 64 if u.DestinationType != structs.UpstreamDestTypePreparedQuery { 65 continue 66 } 67 id := u.Identifier() 68 69 dc := u.Datacenter 70 if dc == "" { 71 dc = cfgSnap.Datacenter 72 } 73 clusterName := connect.UpstreamSNI(&u, "", dc, cfgSnap.Roots.TrustDomain) 74 75 endpoints, ok := cfgSnap.ConnectProxy.PreparedQueryEndpoints[id] 76 if ok { 77 la := makeLoadAssignment( 78 clusterName, 79 []loadAssignmentEndpointGroup{ 80 {Endpoints: endpoints}, 81 }, 82 cfgSnap.Datacenter, 83 ) 84 resources = append(resources, la) 85 } 86 } 87 88 return resources, nil 89} 90 91func (s *ResourceGenerator) filterSubsetEndpoints(subset *structs.ServiceResolverSubset, endpoints structs.CheckServiceNodes) (structs.CheckServiceNodes, error) { 92 // locally execute the subsets filter 93 if subset.Filter != "" { 94 filter, err := bexpr.CreateFilter(subset.Filter, nil, endpoints) 95 if err != nil { 96 return nil, err 97 } 98 99 raw, err := filter.Execute(endpoints) 100 if err != nil { 101 return nil, err 102 } 103 return raw.(structs.CheckServiceNodes), nil 104 } 105 return endpoints, nil 106} 107 108func (s *ResourceGenerator) endpointsFromSnapshotTerminatingGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) { 109 return s.endpointsFromServicesAndResolvers(cfgSnap, cfgSnap.TerminatingGateway.ServiceGroups, cfgSnap.TerminatingGateway.ServiceResolvers) 110} 111 112func (s *ResourceGenerator) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) { 113 datacenters := cfgSnap.MeshGateway.Datacenters() 114 resources := make([]proto.Message, 0, len(datacenters)+len(cfgSnap.MeshGateway.ServiceGroups)) 115 116 // generate the endpoints for the gateways in the remote datacenters 117 for _, dc := range datacenters { 118 // Skip creating endpoints for mesh gateways in local DC and gateways in remote DCs with a hostname as their address 119 // EDS cannot resolve hostnames so we provide them through CDS instead 120 if dc == cfgSnap.Datacenter || len(cfgSnap.MeshGateway.HostnameDatacenters[dc]) > 0 { 121 continue 122 } 123 124 endpoints, ok := cfgSnap.MeshGateway.GatewayGroups[dc] 125 if !ok { 126 endpoints, ok = cfgSnap.MeshGateway.FedStateGateways[dc] 127 if !ok { // not possible 128 s.Logger.Error("skipping mesh gateway endpoints because no definition found", "datacenter", dc) 129 continue 130 } 131 } 132 133 { // standard connect 134 clusterName := connect.DatacenterSNI(dc, cfgSnap.Roots.TrustDomain) 135 136 la := makeLoadAssignment( 137 clusterName, 138 []loadAssignmentEndpointGroup{ 139 {Endpoints: endpoints}, 140 }, 141 cfgSnap.Datacenter, 142 ) 143 resources = append(resources, la) 144 } 145 146 if cfgSnap.ServiceMeta[structs.MetaWANFederationKey] == "1" && cfgSnap.ServerSNIFn != nil { 147 clusterName := cfgSnap.ServerSNIFn(dc, "") 148 149 la := makeLoadAssignment( 150 clusterName, 151 []loadAssignmentEndpointGroup{ 152 {Endpoints: endpoints}, 153 }, 154 cfgSnap.Datacenter, 155 ) 156 resources = append(resources, la) 157 } 158 } 159 160 // generate endpoints for our servers if WAN federation is enabled 161 if cfgSnap.ServiceMeta[structs.MetaWANFederationKey] == "1" && cfgSnap.ServerSNIFn != nil { 162 var allServersLbEndpoints []*envoy_endpoint_v3.LbEndpoint 163 164 for _, srv := range cfgSnap.MeshGateway.ConsulServers { 165 clusterName := cfgSnap.ServerSNIFn(cfgSnap.Datacenter, srv.Node.Node) 166 167 addr, port := srv.BestAddress(false /*wan*/) 168 169 lbEndpoint := &envoy_endpoint_v3.LbEndpoint{ 170 HostIdentifier: &envoy_endpoint_v3.LbEndpoint_Endpoint{ 171 Endpoint: &envoy_endpoint_v3.Endpoint{ 172 Address: makeAddress(addr, port), 173 }, 174 }, 175 HealthStatus: envoy_core_v3.HealthStatus_UNKNOWN, 176 } 177 178 cla := &envoy_endpoint_v3.ClusterLoadAssignment{ 179 ClusterName: clusterName, 180 Endpoints: []*envoy_endpoint_v3.LocalityLbEndpoints{{ 181 LbEndpoints: []*envoy_endpoint_v3.LbEndpoint{lbEndpoint}, 182 }}, 183 } 184 allServersLbEndpoints = append(allServersLbEndpoints, lbEndpoint) 185 186 resources = append(resources, cla) 187 } 188 189 // And add one catch all so that remote datacenters can dial ANY server 190 // in this datacenter without knowing its name. 191 resources = append(resources, &envoy_endpoint_v3.ClusterLoadAssignment{ 192 ClusterName: cfgSnap.ServerSNIFn(cfgSnap.Datacenter, ""), 193 Endpoints: []*envoy_endpoint_v3.LocalityLbEndpoints{{ 194 LbEndpoints: allServersLbEndpoints, 195 }}, 196 }) 197 } 198 199 // Generate the endpoints for each service and its subsets 200 e, err := s.endpointsFromServicesAndResolvers(cfgSnap, cfgSnap.MeshGateway.ServiceGroups, cfgSnap.MeshGateway.ServiceResolvers) 201 if err != nil { 202 return nil, err 203 } 204 resources = append(resources, e...) 205 206 return resources, nil 207} 208 209func (s *ResourceGenerator) endpointsFromServicesAndResolvers( 210 cfgSnap *proxycfg.ConfigSnapshot, 211 services map[structs.ServiceName]structs.CheckServiceNodes, 212 resolvers map[structs.ServiceName]*structs.ServiceResolverConfigEntry, 213) ([]proto.Message, error) { 214 resources := make([]proto.Message, 0, len(services)) 215 216 // generate the endpoints for the linked service groups 217 for svc, endpoints := range services { 218 // Skip creating endpoints for services that have hostnames as addresses 219 // EDS cannot resolve hostnames so we provide them through CDS instead 220 if cfgSnap.Kind == structs.ServiceKindTerminatingGateway && len(cfgSnap.TerminatingGateway.HostnameServices[svc]) > 0 { 221 continue 222 } 223 224 clusterEndpoints := make(map[string][]loadAssignmentEndpointGroup) 225 clusterEndpoints[UnnamedSubset] = []loadAssignmentEndpointGroup{{Endpoints: endpoints, OnlyPassing: false}} 226 227 // Collect all of the loadAssignmentEndpointGroups for the various subsets. We do this before generating 228 // the endpoints for the default/unnamed subset so that we can take into account the DefaultSubset on the 229 // service-resolver which may prevent the default/unnamed cluster from creating endpoints for all service 230 // instances. 231 if resolver, hasResolver := resolvers[svc]; hasResolver { 232 for subsetName, subset := range resolver.Subsets { 233 subsetEndpoints, err := s.filterSubsetEndpoints(&subset, endpoints) 234 if err != nil { 235 return nil, err 236 } 237 groups := []loadAssignmentEndpointGroup{{Endpoints: subsetEndpoints, OnlyPassing: subset.OnlyPassing}} 238 clusterEndpoints[subsetName] = groups 239 240 // if this subset is the default then override the unnamed subset with this configuration 241 if subsetName == resolver.DefaultSubset { 242 clusterEndpoints[UnnamedSubset] = groups 243 } 244 } 245 } 246 247 // now generate the load assignment for all subsets 248 for subsetName, groups := range clusterEndpoints { 249 clusterName := connect.ServiceSNI(svc.Name, subsetName, svc.NamespaceOrDefault(), cfgSnap.Datacenter, cfgSnap.Roots.TrustDomain) 250 la := makeLoadAssignment( 251 clusterName, 252 groups, 253 cfgSnap.Datacenter, 254 ) 255 resources = append(resources, la) 256 } 257 } 258 259 return resources, nil 260} 261 262func (s *ResourceGenerator) endpointsFromSnapshotIngressGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) { 263 var resources []proto.Message 264 createdClusters := make(map[string]bool) 265 for _, upstreams := range cfgSnap.IngressGateway.Upstreams { 266 for _, u := range upstreams { 267 id := u.Identifier() 268 269 // If we've already created endpoints for this upstream, skip it. Multiple listeners may 270 // reference the same upstream, so we don't need to create duplicate endpoints in that case. 271 if createdClusters[id] { 272 continue 273 } 274 275 es := s.endpointsFromDiscoveryChain( 276 id, 277 cfgSnap.IngressGateway.DiscoveryChain[id], 278 cfgSnap.Datacenter, 279 &u, 280 cfgSnap.IngressGateway.WatchedUpstreamEndpoints[id], 281 cfgSnap.IngressGateway.WatchedGatewayEndpoints[id], 282 ) 283 resources = append(resources, es...) 284 createdClusters[id] = true 285 } 286 } 287 return resources, nil 288} 289 290// used in clusters.go 291func makeEndpoint(host string, port int) *envoy_endpoint_v3.LbEndpoint { 292 return &envoy_endpoint_v3.LbEndpoint{ 293 HostIdentifier: &envoy_endpoint_v3.LbEndpoint_Endpoint{ 294 Endpoint: &envoy_endpoint_v3.Endpoint{ 295 Address: makeAddress(host, port), 296 }, 297 }, 298 } 299} 300 301func makePipeEndpoint(path string) *envoy_endpoint_v3.LbEndpoint { 302 return &envoy_endpoint_v3.LbEndpoint{ 303 HostIdentifier: &envoy_endpoint_v3.LbEndpoint_Endpoint{ 304 Endpoint: &envoy_endpoint_v3.Endpoint{ 305 Address: makePipeAddress(path, 0), 306 }, 307 }, 308 } 309} 310 311func (s *ResourceGenerator) endpointsFromDiscoveryChain( 312 id string, 313 chain *structs.CompiledDiscoveryChain, 314 datacenter string, 315 upstream *structs.Upstream, 316 upstreamEndpoints, gatewayEndpoints map[string]structs.CheckServiceNodes, 317) []proto.Message { 318 var resources []proto.Message 319 320 if chain == nil { 321 return resources 322 } 323 324 configMap := make(map[string]interface{}) 325 if upstream != nil { 326 configMap = upstream.Config 327 } 328 cfg, err := structs.ParseUpstreamConfigNoDefaults(configMap) 329 if err != nil { 330 // Don't hard fail on a config typo, just warn. The parse func returns 331 // default config if there is an error so it's safe to continue. 332 s.Logger.Warn("failed to parse", "upstream", id, 333 "error", err) 334 } 335 336 var escapeHatchCluster *envoy_cluster_v3.Cluster 337 if cfg.EnvoyClusterJSON != "" { 338 if chain.IsDefault() { 339 // If you haven't done anything to setup the discovery chain, then 340 // you can use the envoy_cluster_json escape hatch. 341 escapeHatchCluster, err = makeClusterFromUserConfig(cfg.EnvoyClusterJSON) 342 if err != nil { 343 return resources 344 } 345 } else { 346 s.Logger.Warn("ignoring escape hatch setting, because a discovery chain is configued for", 347 "discovery chain", chain.ServiceName, "upstream", id, 348 "envoy_cluster_json", chain.ServiceName) 349 } 350 } 351 352 // Find all resolver nodes. 353 for _, node := range chain.Nodes { 354 if node.Type != structs.DiscoveryGraphNodeTypeResolver { 355 continue 356 } 357 failover := node.Resolver.Failover 358 targetID := node.Resolver.Target 359 360 target := chain.Targets[targetID] 361 362 clusterName := CustomizeClusterName(target.Name, chain) 363 if escapeHatchCluster != nil { 364 clusterName = escapeHatchCluster.Name 365 } 366 s.Logger.Debug("generating endpoints for", "cluster", clusterName) 367 368 // Determine if we have to generate the entire cluster differently. 369 failoverThroughMeshGateway := chain.WillFailoverThroughMeshGateway(node) 370 371 if failoverThroughMeshGateway { 372 actualTargetID := firstHealthyTarget( 373 chain.Targets, 374 upstreamEndpoints, 375 targetID, 376 failover.Targets, 377 ) 378 if actualTargetID != targetID { 379 targetID = actualTargetID 380 } 381 382 failover = nil 383 } 384 385 primaryGroup, valid := makeLoadAssignmentEndpointGroup( 386 chain.Targets, 387 upstreamEndpoints, 388 gatewayEndpoints, 389 targetID, 390 datacenter, 391 ) 392 if !valid { 393 continue // skip the cluster if we're still populating the snapshot 394 } 395 396 var endpointGroups []loadAssignmentEndpointGroup 397 398 if failover != nil && len(failover.Targets) > 0 { 399 endpointGroups = make([]loadAssignmentEndpointGroup, 0, len(failover.Targets)+1) 400 401 endpointGroups = append(endpointGroups, primaryGroup) 402 403 for _, failTargetID := range failover.Targets { 404 failoverGroup, valid := makeLoadAssignmentEndpointGroup( 405 chain.Targets, 406 upstreamEndpoints, 407 gatewayEndpoints, 408 failTargetID, 409 datacenter, 410 ) 411 if !valid { 412 continue // skip the failover target if we're still populating the snapshot 413 } 414 endpointGroups = append(endpointGroups, failoverGroup) 415 } 416 } else { 417 endpointGroups = append(endpointGroups, primaryGroup) 418 } 419 420 la := makeLoadAssignment( 421 clusterName, 422 endpointGroups, 423 datacenter, 424 ) 425 resources = append(resources, la) 426 } 427 428 return resources 429} 430 431type loadAssignmentEndpointGroup struct { 432 Endpoints structs.CheckServiceNodes 433 OnlyPassing bool 434 OverrideHealth envoy_core_v3.HealthStatus 435} 436 437func makeLoadAssignment(clusterName string, endpointGroups []loadAssignmentEndpointGroup, localDatacenter string) *envoy_endpoint_v3.ClusterLoadAssignment { 438 cla := &envoy_endpoint_v3.ClusterLoadAssignment{ 439 ClusterName: clusterName, 440 Endpoints: make([]*envoy_endpoint_v3.LocalityLbEndpoints, 0, len(endpointGroups)), 441 } 442 443 if len(endpointGroups) > 1 { 444 cla.Policy = &envoy_endpoint_v3.ClusterLoadAssignment_Policy{ 445 // We choose such a large value here that the failover math should 446 // in effect not happen until zero instances are healthy. 447 OverprovisioningFactor: makeUint32Value(100000), 448 } 449 } 450 451 for priority, endpointGroup := range endpointGroups { 452 endpoints := endpointGroup.Endpoints 453 es := make([]*envoy_endpoint_v3.LbEndpoint, 0, len(endpoints)) 454 455 for _, ep := range endpoints { 456 // TODO (mesh-gateway) - should we respect the translate_wan_addrs configuration here or just always use the wan for cross-dc? 457 addr, port := ep.BestAddress(localDatacenter != ep.Node.Datacenter) 458 healthStatus, weight := calculateEndpointHealthAndWeight(ep, endpointGroup.OnlyPassing) 459 460 if endpointGroup.OverrideHealth != envoy_core_v3.HealthStatus_UNKNOWN { 461 healthStatus = endpointGroup.OverrideHealth 462 } 463 464 es = append(es, &envoy_endpoint_v3.LbEndpoint{ 465 HostIdentifier: &envoy_endpoint_v3.LbEndpoint_Endpoint{ 466 Endpoint: &envoy_endpoint_v3.Endpoint{ 467 Address: makeAddress(addr, port), 468 }, 469 }, 470 HealthStatus: healthStatus, 471 LoadBalancingWeight: makeUint32Value(weight), 472 }) 473 } 474 475 cla.Endpoints = append(cla.Endpoints, &envoy_endpoint_v3.LocalityLbEndpoints{ 476 Priority: uint32(priority), 477 LbEndpoints: es, 478 }) 479 } 480 481 return cla 482} 483 484func makeLoadAssignmentEndpointGroup( 485 targets map[string]*structs.DiscoveryTarget, 486 targetHealth map[string]structs.CheckServiceNodes, 487 gatewayHealth map[string]structs.CheckServiceNodes, 488 targetID string, 489 currentDatacenter string, 490) (loadAssignmentEndpointGroup, bool) { 491 realEndpoints, ok := targetHealth[targetID] 492 if !ok { 493 // skip the cluster if we're still populating the snapshot 494 return loadAssignmentEndpointGroup{}, false 495 } 496 target := targets[targetID] 497 498 var gatewayDatacenter string 499 switch target.MeshGateway.Mode { 500 case structs.MeshGatewayModeRemote: 501 gatewayDatacenter = target.Datacenter 502 case structs.MeshGatewayModeLocal: 503 gatewayDatacenter = currentDatacenter 504 } 505 506 if gatewayDatacenter == "" { 507 return loadAssignmentEndpointGroup{ 508 Endpoints: realEndpoints, 509 OnlyPassing: target.Subset.OnlyPassing, 510 }, true 511 } 512 513 // If using a mesh gateway we need to pull those endpoints instead. 514 gatewayEndpoints, ok := gatewayHealth[gatewayDatacenter] 515 if !ok { 516 // skip the cluster if we're still populating the snapshot 517 return loadAssignmentEndpointGroup{}, false 518 } 519 520 // But we will use the health from the actual backend service. 521 overallHealth := envoy_core_v3.HealthStatus_UNHEALTHY 522 for _, ep := range realEndpoints { 523 health, _ := calculateEndpointHealthAndWeight(ep, target.Subset.OnlyPassing) 524 if health == envoy_core_v3.HealthStatus_HEALTHY { 525 overallHealth = envoy_core_v3.HealthStatus_HEALTHY 526 break 527 } 528 } 529 530 return loadAssignmentEndpointGroup{ 531 Endpoints: gatewayEndpoints, 532 OverrideHealth: overallHealth, 533 }, true 534} 535 536func calculateEndpointHealthAndWeight( 537 ep structs.CheckServiceNode, 538 onlyPassing bool, 539) (envoy_core_v3.HealthStatus, int) { 540 healthStatus := envoy_core_v3.HealthStatus_HEALTHY 541 weight := 1 542 if ep.Service.Weights != nil { 543 weight = ep.Service.Weights.Passing 544 } 545 546 for _, chk := range ep.Checks { 547 if chk.Status == api.HealthCritical { 548 healthStatus = envoy_core_v3.HealthStatus_UNHEALTHY 549 } 550 if onlyPassing && chk.Status != api.HealthPassing { 551 healthStatus = envoy_core_v3.HealthStatus_UNHEALTHY 552 } 553 if chk.Status == api.HealthWarning && ep.Service.Weights != nil { 554 weight = ep.Service.Weights.Warning 555 } 556 } 557 // Make weights fit Envoy's limits. A zero weight means that either Warning 558 // (likely) or Passing (weirdly) weight has been set to 0 effectively making 559 // this instance unhealthy and should not be sent traffic. 560 if weight < 1 { 561 healthStatus = envoy_core_v3.HealthStatus_UNHEALTHY 562 weight = 1 563 } 564 if weight > 128 { 565 weight = 128 566 } 567 return healthStatus, weight 568} 569