1// Copyright 2018 Istio Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package v2 16 17import ( 18 "reflect" 19 "strconv" 20 "sync/atomic" 21 "time" 22 23 xdsapi "github.com/envoyproxy/go-control-plane/envoy/api/v2" 24 endpoint "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint" 25 "github.com/golang/protobuf/ptypes/wrappers" 26 27 networkingapi "istio.io/api/networking/v1alpha3" 28 29 "istio.io/istio/pilot/pkg/model" 30 networking "istio.io/istio/pilot/pkg/networking/core/v1alpha3" 31 "istio.io/istio/pilot/pkg/networking/core/v1alpha3/loadbalancer" 32 "istio.io/istio/pilot/pkg/networking/util" 33 "istio.io/istio/pilot/pkg/serviceregistry" 34 "istio.io/istio/pilot/pkg/serviceregistry/aggregate" 35 "istio.io/istio/pkg/config/host" 36 "istio.io/istio/pkg/config/labels" 37 "istio.io/istio/pkg/config/protocol" 38) 39 40// EDS returns the list of endpoints (IP:port and in future labels) associated with a real 41// service or a subset of a service, selected using labels. 42// 43// The source of info is a list of service registries. 44// 45// Primary event is an endpoint creation/deletion. Once the event is fired, EDS needs to 46// find the list of services associated with the endpoint. 47// 48// In case of k8s, Endpoints event is fired when the endpoints are added to service - typically 49// after readiness check. At that point we have the 'real' Service. The Endpoint includes a list 50// of port numbers and names. 51// 52// For the subset case, the Pod referenced in the Endpoint must be looked up, and pod checked 53// for labels. 54// 55// In addition, ExternalEndpoint includes IPs and labels directly and can be directly processed. 56// 57// TODO: for selector-less services (mesh expansion), skip pod processing 58// TODO: optimize the code path for ExternalEndpoint, no additional processing needed 59// TODO: if a service doesn't have split traffic - we can also skip pod and label processing 60// TODO: efficient label processing. In alpha3, the destination policies are set per service, so 61// we may only need to search in a small list. 62 63var ( 64 // Tracks connections, increment on each new connection. 65 connectionNumber = int64(0) 66) 67 68// TODO: add prom metrics ! 69 70// buildEnvoyLbEndpoint packs the endpoint based on istio info. 71func buildEnvoyLbEndpoint(e *model.IstioEndpoint, push *model.PushContext) *endpoint.LbEndpoint { 72 addr := util.BuildAddress(e.Address, e.EndpointPort) 73 74 epWeight := e.LbWeight 75 if epWeight == 0 { 76 epWeight = 1 77 } 78 ep := &endpoint.LbEndpoint{ 79 LoadBalancingWeight: &wrappers.UInt32Value{ 80 Value: epWeight, 81 }, 82 HostIdentifier: &endpoint.LbEndpoint_Endpoint{ 83 Endpoint: &endpoint.Endpoint{ 84 Address: addr, 85 }, 86 }, 87 } 88 89 // Istio telemetry depends on the metadata value being set for endpoints in the mesh. 90 // Istio endpoint level tls transport socket configuration depends on this logic 91 // Do not remove 92 ep.Metadata = util.BuildLbEndpointMetadata(e.UID, e.Network, e.TLSMode, push) 93 94 return ep 95} 96 97// UpdateServiceShards will list the endpoints and create the shards. 98// This is used to reconcile and to support non-k8s registries (until they migrate). 99// Note that aggregated list is expensive (for large numbers) - we want to replace 100// it with a model where DiscoveryServer keeps track of all endpoint registries 101// directly, and calls them one by one. 102func (s *DiscoveryServer) UpdateServiceShards(push *model.PushContext) error { 103 var registries []serviceregistry.Instance 104 var nonK8sRegistries []serviceregistry.Instance 105 if agg, ok := s.Env.ServiceDiscovery.(*aggregate.Controller); ok { 106 registries = agg.GetRegistries() 107 } else { 108 registries = []serviceregistry.Instance{ 109 serviceregistry.Simple{ 110 ServiceDiscovery: s.Env.ServiceDiscovery, 111 }, 112 } 113 } 114 115 for _, registry := range registries { 116 if registry.Provider() != serviceregistry.Kubernetes { 117 nonK8sRegistries = append(nonK8sRegistries, registry) 118 } 119 } 120 121 // Each registry acts as a shard - we don't want to combine them because some 122 // may individually update their endpoints incrementally 123 for _, svc := range push.Services(nil) { 124 for _, registry := range nonK8sRegistries { 125 // skip the service in case this svc does not belong to the registry. 126 if svc.Attributes.ServiceRegistry != string(registry.Provider()) { 127 continue 128 } 129 endpoints := make([]*model.IstioEndpoint, 0) 130 for _, port := range svc.Ports { 131 if port.Protocol == protocol.UDP { 132 continue 133 } 134 135 // This loses track of grouping (shards) 136 instances, err := registry.InstancesByPort(svc, port.Port, labels.Collection{}) 137 if err != nil { 138 return err 139 } 140 141 for _, inst := range instances { 142 endpoints = append(endpoints, inst.Endpoint) 143 } 144 } 145 146 // TODO(nmittler): Should we get the cluster from the endpoints instead? May require organizing endpoints by cluster first. 147 s.edsUpdate(registry.Cluster(), string(svc.Hostname), svc.Attributes.Namespace, endpoints, true) 148 } 149 } 150 151 return nil 152} 153 154// SvcUpdate is a callback from service discovery when service info changes. 155func (s *DiscoveryServer) SvcUpdate(cluster, hostname string, namespace string, event model.Event) { 156 // When a service deleted, we should cleanup the endpoint shards and also remove keys from EndpointShardsByService to 157 // prevent memory leaks. 158 if event == model.EventDelete { 159 inboundServiceDeletes.Increment() 160 s.mutex.Lock() 161 defer s.mutex.Unlock() 162 s.deleteService(cluster, hostname, namespace) 163 } else { 164 inboundServiceUpdates.Increment() 165 } 166} 167 168// Update clusters for an incremental EDS push, and initiate the push. 169// Only clusters that changed are updated/pushed. 170func (s *DiscoveryServer) edsIncremental(version string, req *model.PushRequest) { 171 adsLog.Infof("XDS:EDSInc Pushing:%s Services:%v ConnectedEndpoints:%d", 172 version, model.ConfigNamesOfKind(req.ConfigsUpdated, model.ServiceEntryKind), s.adsClientCount()) 173 s.startPush(req) 174} 175 176// EDSUpdate computes destination address membership across all clusters and networks. 177// This is the main method implementing EDS. 178// It replaces InstancesByPort in model - instead of iterating over all endpoints it uses 179// the hostname-keyed map. And it avoids the conversion from Endpoint to ServiceEntry to envoy 180// on each step: instead the conversion happens once, when an endpoint is first discovered. 181func (s *DiscoveryServer) EDSUpdate(clusterID, serviceName string, namespace string, 182 istioEndpoints []*model.IstioEndpoint) error { 183 inboundEDSUpdates.Increment() 184 s.edsUpdate(clusterID, serviceName, namespace, istioEndpoints, false) 185 return nil 186} 187 188// edsUpdate updates EDS by clusterID, serviceName, IstioEndpoints, 189// and requests a Full/EDS push. 190func (s *DiscoveryServer) edsUpdate(clusterID, serviceName string, namespace string, 191 istioEndpoints []*model.IstioEndpoint, internal bool) { 192 // edsShardUpdate replaces a subset (shard) of endpoints, as result of an incremental 193 // update. The endpoint updates may be grouped by K8S clusters, other service registries 194 // or by deployment. Multiple updates are debounced, to avoid too frequent pushes. 195 // After debounce, the services are merged and pushed. 196 s.mutex.Lock() 197 defer s.mutex.Unlock() 198 requireFull := false 199 200 // Should delete the service EndpointShards when endpoints become zero to prevent memory leak, 201 // but we should not do not delete the keys from EndpointShardsByService map - that will trigger 202 // unnecessary full push which can become a real problem if a pod is in crashloop and thus endpoints 203 // flip flopping between 1 and 0. 204 if len(istioEndpoints) == 0 { 205 if s.EndpointShardsByService[serviceName][namespace] != nil { 206 s.deleteEndpointShards(clusterID, serviceName, namespace) 207 adsLog.Infof("Incremental push, service %s has no endpoints", serviceName) 208 s.ConfigUpdate(&model.PushRequest{ 209 Full: false, 210 ConfigsUpdated: map[model.ConfigKey]struct{}{{ 211 Kind: model.ServiceEntryKind, 212 Name: serviceName, 213 Namespace: namespace, 214 }: {}}, 215 Reason: []model.TriggerReason{model.EndpointUpdate}, 216 }) 217 } 218 return 219 } 220 221 // Update the data structures for the service. 222 // 1. Find the 'per service' data 223 if _, f := s.EndpointShardsByService[serviceName]; !f { 224 s.EndpointShardsByService[serviceName] = map[string]*EndpointShards{} 225 } 226 ep, f := s.EndpointShardsByService[serviceName][namespace] 227 if !f { 228 // This endpoint is for a service that was not previously loaded. 229 // Return an error to force a full sync, which will also cause the 230 // EndpointsShardsByService to be initialized with all services. 231 ep = &EndpointShards{ 232 Shards: map[string][]*model.IstioEndpoint{}, 233 ServiceAccounts: map[string]bool{}, 234 } 235 s.EndpointShardsByService[serviceName][namespace] = ep 236 if !internal { 237 adsLog.Infof("Full push, new service %s", serviceName) 238 requireFull = true 239 } 240 } 241 242 // 2. Update data for the specific cluster. Each cluster gets independent 243 // updates containing the full list of endpoints for the service in that cluster. 244 serviceAccounts := map[string]bool{} 245 for _, e := range istioEndpoints { 246 if e.ServiceAccount != "" { 247 serviceAccounts[e.ServiceAccount] = true 248 } 249 } 250 251 if !reflect.DeepEqual(serviceAccounts, ep.ServiceAccounts) { 252 adsLog.Debugf("Updating service accounts now, svc %v, before service account %v, after %v", 253 serviceName, ep.ServiceAccounts, serviceAccounts) 254 if !internal { 255 requireFull = true 256 adsLog.Infof("Full push, service accounts changed, %v", serviceName) 257 } 258 } 259 260 ep.mutex.Lock() 261 ep.Shards[clusterID] = istioEndpoints 262 ep.ServiceAccounts = serviceAccounts 263 ep.mutex.Unlock() 264 265 // for internal update: this called by DiscoveryServer.Push --> UpdateServiceShards, 266 // no need to trigger push here. 267 // It is done in DiscoveryServer.Push --> AdsPushAll 268 if !internal { 269 s.ConfigUpdate(&model.PushRequest{ 270 Full: requireFull, 271 ConfigsUpdated: map[model.ConfigKey]struct{}{{ 272 Kind: model.ServiceEntryKind, 273 Name: serviceName, 274 Namespace: namespace, 275 }: {}}, 276 Reason: []model.TriggerReason{model.EndpointUpdate}, 277 }) 278 } 279} 280 281// deleteEndpointShards deletes matching endpoint shards from EndpointShardsByService map. This is called when 282// endpoints are deleted. 283func (s *DiscoveryServer) deleteEndpointShards(cluster, serviceName, namespace string) { 284 if s.EndpointShardsByService[serviceName][namespace] != nil { 285 s.EndpointShardsByService[serviceName][namespace].mutex.Lock() 286 delete(s.EndpointShardsByService[serviceName][namespace].Shards, cluster) 287 s.EndpointShardsByService[serviceName][namespace].mutex.Unlock() 288 } 289} 290 291// deleteService deletes all service related references from EndpointShardsByService. This is called 292// when a service is deleted. 293func (s *DiscoveryServer) deleteService(cluster, serviceName, namespace string) { 294 if s.EndpointShardsByService[serviceName][namespace] != nil { 295 s.EndpointShardsByService[serviceName][namespace].mutex.Lock() 296 delete(s.EndpointShardsByService[serviceName][namespace].Shards, cluster) 297 svcShards := len(s.EndpointShardsByService[serviceName][namespace].Shards) 298 s.EndpointShardsByService[serviceName][namespace].mutex.Unlock() 299 if svcShards == 0 { 300 delete(s.EndpointShardsByService[serviceName], namespace) 301 } 302 if len(s.EndpointShardsByService[serviceName]) == 0 { 303 delete(s.EndpointShardsByService, serviceName) 304 } 305 } 306} 307 308func connectionID(node string) string { 309 id := atomic.AddInt64(&connectionNumber, 1) 310 return node + "-" + strconv.FormatInt(id, 10) 311} 312 313// loadAssignmentsForClusterIsolated return the endpoints for a proxy in an isolated namespace 314// Initial implementation is computing the endpoints on the flight - caching will be added as needed, based on 315// perf tests. The logic to compute is based on the current UpdateClusterInc 316func (s *DiscoveryServer) loadAssignmentsForClusterIsolated(proxy *model.Proxy, push *model.PushContext, 317 clusterName string) *xdsapi.ClusterLoadAssignment { 318 _, subsetName, hostname, port := model.ParseSubsetKey(clusterName) 319 320 // TODO: BUG. this code is incorrect if 1.1 isolation is used. With destination rule scoping 321 // (public/private) as well as sidecar scopes allowing import of 322 // specific destination rules, the destination rule for a given 323 // namespace should be determined based on the sidecar scope or the 324 // proxy's config namespace. As such, this code searches through all 325 // destination rules, public and private and returns a completely 326 // arbitrary destination rule's subset labels! 327 subsetLabels := push.SubsetToLabels(proxy, subsetName, hostname) 328 329 push.Mutex.Lock() 330 svc := proxy.SidecarScope.ServiceForHostname(hostname, push.ServiceByHostnameAndNamespace) 331 push.Mutex.Unlock() 332 if svc == nil { 333 // Shouldn't happen here 334 adsLog.Debugf("can not find the service for cluster %s", clusterName) 335 return buildEmptyClusterLoadAssignment(clusterName) 336 } 337 338 // Service resolution type might have changed and Cluster may be still in the EDS cluster list of "XdsConnection.Clusters". 339 // This can happen if a ServiceEntry's resolution is changed from STATIC to DNS which changes the Envoy cluster type from 340 // EDS to STRICT_DNS. When pushEds is called before Envoy sends the updated cluster list via Endpoint request which in turn 341 // will update "XdsConnection.Clusters", we might accidentally send EDS updates for STRICT_DNS cluster. This check gaurds 342 // against such behavior and returns nil. When the updated cluster warms up in Envoy, it would update with new endpoints 343 // automatically. 344 // Gateways use EDS for Passthrough cluster. So we should allow Passthrough here. 345 if svc.Resolution == model.DNSLB { 346 adsLog.Infof("XdsConnection has %s in its eds clusters but its resolution now is updated to %v, skipping it.", clusterName, svc.Resolution) 347 return nil 348 } 349 350 svcPort, f := svc.Ports.GetByPort(port) 351 if !f { 352 // Shouldn't happen here 353 adsLog.Debugf("can not find the service port %d for cluster %s", port, clusterName) 354 return buildEmptyClusterLoadAssignment(clusterName) 355 } 356 357 // The service was never updated - do the full update 358 s.mutex.RLock() 359 se, f := s.EndpointShardsByService[string(hostname)][svc.Attributes.Namespace] 360 s.mutex.RUnlock() 361 if !f { 362 // Shouldn't happen here 363 adsLog.Debugf("can not find the endpointShards for cluster %s", clusterName) 364 return buildEmptyClusterLoadAssignment(clusterName) 365 } 366 367 locEps := buildLocalityLbEndpointsFromShards(proxy, se, svc, svcPort, subsetLabels, clusterName, push) 368 369 return &xdsapi.ClusterLoadAssignment{ 370 ClusterName: clusterName, 371 Endpoints: locEps, 372 } 373} 374 375func (s *DiscoveryServer) generateEndpoints( 376 clusterName string, proxy *model.Proxy, push *model.PushContext, edsUpdatedServices map[string]struct{}, 377) *xdsapi.ClusterLoadAssignment { 378 _, _, hostname, _ := model.ParseSubsetKey(clusterName) 379 if edsUpdatedServices != nil { 380 if _, ok := edsUpdatedServices[string(hostname)]; !ok { 381 // Cluster was not updated, skip recomputing. This happens when we get an incremental update for a 382 // specific Hostname. On connect or for full push edsUpdatedServices will be empty. 383 return nil 384 } 385 } 386 387 l := s.loadAssignmentsForClusterIsolated(proxy, push, clusterName) 388 if l == nil { 389 return nil 390 } 391 392 // If networks are set (by default they aren't) apply the Split Horizon 393 // EDS filter on the endpoints 394 if push.Networks != nil && len(push.Networks.Networks) > 0 { 395 endpoints := EndpointsByNetworkFilter(push, proxy.Metadata.Network, l.Endpoints) 396 filteredCLA := &xdsapi.ClusterLoadAssignment{ 397 ClusterName: l.ClusterName, 398 Endpoints: endpoints, 399 Policy: l.Policy, 400 } 401 l = filteredCLA 402 } 403 404 // If locality aware routing is enabled, prioritize endpoints or set their lb weight. 405 // Failover should only be enabled when there is an outlier detection, otherwise Envoy 406 // will never detect the hosts are unhealthy and redirect traffic. 407 enableFailover, lb := getOutlierDetectionAndLoadBalancerSettings(push, proxy, clusterName) 408 lbSetting := loadbalancer.GetLocalityLbSetting(push.Mesh.GetLocalityLbSetting(), lb.GetLocalityLbSetting()) 409 if lbSetting != nil { 410 // Make a shallow copy of the cla as we are mutating the endpoints with priorities/weights relative to the calling proxy 411 clonedCLA := util.CloneClusterLoadAssignment(l) 412 l = &clonedCLA 413 loadbalancer.ApplyLocalityLBSetting(proxy.Locality, l, lbSetting, enableFailover) 414 } 415 return l 416} 417 418// pushEds is pushing EDS updates for a single connection. Called the first time 419// a client connects, for incremental updates and for full periodic updates. 420func (s *DiscoveryServer) pushEds(push *model.PushContext, con *XdsConnection, version string, edsUpdatedServices map[string]struct{}) error { 421 pushStart := time.Now() 422 loadAssignments := make([]*xdsapi.ClusterLoadAssignment, 0) 423 endpoints := 0 424 empty := 0 425 426 // All clusters that this endpoint is watching. For 1.0 - it's typically all clusters in the mesh. 427 // For 1.1+Sidecar - it's the small set of explicitly imported clusters, using the isolated DestinationRules 428 for _, clusterName := range con.Clusters { 429 430 l := s.generateEndpoints(clusterName, con.node, push, edsUpdatedServices) 431 if l == nil { 432 continue 433 } 434 435 for _, e := range l.Endpoints { 436 endpoints += len(e.LbEndpoints) 437 } 438 439 if len(l.Endpoints) == 0 { 440 empty++ 441 } 442 loadAssignments = append(loadAssignments, l) 443 } 444 445 response := endpointDiscoveryResponse(loadAssignments, version, push.Version, con.RequestedTypes.EDS) 446 err := con.send(response) 447 edsPushTime.Record(time.Since(pushStart).Seconds()) 448 if err != nil { 449 adsLog.Warnf("EDS: Send failure %s: %v", con.ConID, err) 450 recordSendError(edsSendErrPushes, err) 451 return err 452 } 453 edsPushes.Increment() 454 455 if edsUpdatedServices == nil { 456 adsLog.Infof("EDS: PUSH for node:%s clusters:%d endpoints:%d empty:%v", 457 con.node.ID, len(con.Clusters), endpoints, empty) 458 } else { 459 adsLog.Debugf("EDS: PUSH INC for node:%s clusters:%d endpoints:%d empty:%v", 460 con.node.ID, len(con.Clusters), endpoints, empty) 461 } 462 return nil 463} 464 465// getDestinationRule gets the DestinationRule for a given hostname. As an optimization, this also gets the service port, 466// which is needed to access the traffic policy from the destination rule. 467func getDestinationRule(push *model.PushContext, proxy *model.Proxy, hostname host.Name, clusterPort int) (*networkingapi.DestinationRule, *model.Port) { 468 for _, service := range push.Services(proxy) { 469 if service.Hostname == hostname { 470 cfg := push.DestinationRule(proxy, service) 471 if cfg == nil { 472 continue 473 } 474 for _, p := range service.Ports { 475 if p.Port == clusterPort { 476 return cfg.Spec.(*networkingapi.DestinationRule), p 477 } 478 } 479 } 480 } 481 return nil, nil 482} 483 484func getOutlierDetectionAndLoadBalancerSettings(push *model.PushContext, proxy *model.Proxy, clusterName string) (bool, *networkingapi.LoadBalancerSettings) { 485 _, subsetName, hostname, portNumber := model.ParseSubsetKey(clusterName) 486 var outlierDetectionEnabled = false 487 var lbSettings *networkingapi.LoadBalancerSettings 488 489 destinationRule, port := getDestinationRule(push, proxy, hostname, portNumber) 490 if destinationRule == nil || port == nil { 491 return false, nil 492 } 493 494 _, outlierDetection, loadBalancerSettings, _ := networking.SelectTrafficPolicyComponents(destinationRule.TrafficPolicy, port) 495 lbSettings = loadBalancerSettings 496 if outlierDetection != nil { 497 outlierDetectionEnabled = true 498 } 499 500 for _, subset := range destinationRule.Subsets { 501 if subset.Name == subsetName { 502 _, outlierDetection, loadBalancerSettings, _ := networking.SelectTrafficPolicyComponents(subset.TrafficPolicy, port) 503 lbSettings = loadBalancerSettings 504 if outlierDetection != nil { 505 outlierDetectionEnabled = true 506 } 507 break 508 } 509 } 510 return outlierDetectionEnabled, lbSettings 511} 512 513func endpointDiscoveryResponse(loadAssignments []*xdsapi.ClusterLoadAssignment, version, noncePrefix, typeURL string) *xdsapi.DiscoveryResponse { 514 out := &xdsapi.DiscoveryResponse{ 515 TypeUrl: typeURL, 516 // Pilot does not really care for versioning. It always supplies what's currently 517 // available to it, irrespective of whether Envoy chooses to accept or reject EDS 518 // responses. Pilot believes in eventual consistency and that at some point, Envoy 519 // will begin seeing results it deems to be good. 520 VersionInfo: version, 521 Nonce: nonce(noncePrefix), 522 } 523 for _, loadAssignment := range loadAssignments { 524 resource := util.MessageToAny(loadAssignment) 525 resource.TypeUrl = typeURL 526 out.Resources = append(out.Resources, resource) 527 } 528 529 return out 530} 531 532// build LocalityLbEndpoints for a cluster from existing EndpointShards. 533func buildLocalityLbEndpointsFromShards( 534 proxy *model.Proxy, 535 shards *EndpointShards, 536 svc *model.Service, 537 svcPort *model.Port, 538 epLabels labels.Collection, 539 clusterName string, 540 push *model.PushContext) []*endpoint.LocalityLbEndpoints { 541 localityEpMap := make(map[string]*endpoint.LocalityLbEndpoints) 542 543 // Determine whether or not the target service is considered local to the cluster 544 // and should, therefore, not be accessed from outside the cluster. 545 isClusterLocal := push.IsClusterLocal(svc) 546 547 shards.mutex.Lock() 548 // The shards are updated independently, now need to filter and merge 549 // for this cluster 550 for clusterID, endpoints := range shards.Shards { 551 // If the downstream service is configured as cluster-local, only include endpoints that 552 // reside in the same cluster. 553 if isClusterLocal && (clusterID != proxy.ClusterID) { 554 continue 555 } 556 557 for _, ep := range endpoints { 558 if svcPort.Name != ep.ServicePortName { 559 continue 560 } 561 // Port labels 562 if !epLabels.HasSubsetOf(ep.Labels) { 563 continue 564 } 565 566 locLbEps, found := localityEpMap[ep.Locality.Label] 567 if !found { 568 locLbEps = &endpoint.LocalityLbEndpoints{ 569 Locality: util.ConvertLocality(ep.Locality.Label), 570 LbEndpoints: make([]*endpoint.LbEndpoint, 0, len(endpoints)), 571 } 572 localityEpMap[ep.Locality.Label] = locLbEps 573 } 574 if ep.EnvoyEndpoint == nil { 575 ep.EnvoyEndpoint = buildEnvoyLbEndpoint(ep, push) 576 } 577 locLbEps.LbEndpoints = append(locLbEps.LbEndpoints, ep.EnvoyEndpoint) 578 579 } 580 } 581 shards.mutex.Unlock() 582 583 locEps := make([]*endpoint.LocalityLbEndpoints, 0, len(localityEpMap)) 584 for _, locLbEps := range localityEpMap { 585 var weight uint32 586 for _, ep := range locLbEps.LbEndpoints { 587 weight += ep.LoadBalancingWeight.GetValue() 588 } 589 locLbEps.LoadBalancingWeight = &wrappers.UInt32Value{ 590 Value: weight, 591 } 592 locEps = append(locEps, locLbEps) 593 } 594 595 if len(locEps) == 0 { 596 push.AddMetric(model.ProxyStatusClusterNoInstances, clusterName, nil, "") 597 } 598 599 updateEdsStats(locEps, clusterName) 600 601 return locEps 602} 603 604// cluster with no endpoints 605func buildEmptyClusterLoadAssignment(clusterName string) *xdsapi.ClusterLoadAssignment { 606 return &xdsapi.ClusterLoadAssignment{ 607 ClusterName: clusterName, 608 } 609} 610 611func updateEdsStats(locEps []*endpoint.LocalityLbEndpoints, cluster string) { 612 edsInstances.With(clusterTag.Value(cluster)).Record(float64(len(locEps))) 613 epc := 0 614 for _, locLbEps := range locEps { 615 epc += len(locLbEps.GetLbEndpoints()) 616 } 617 edsAllLocalityEndpoints.With(clusterTag.Value(cluster)).Record(float64(epc)) 618} 619