1// Copyright 2018 Istio Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package v2
16
17import (
18	"reflect"
19	"strconv"
20	"sync/atomic"
21	"time"
22
23	xdsapi "github.com/envoyproxy/go-control-plane/envoy/api/v2"
24	endpoint "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
25	"github.com/golang/protobuf/ptypes/wrappers"
26
27	networkingapi "istio.io/api/networking/v1alpha3"
28
29	"istio.io/istio/pilot/pkg/model"
30	networking "istio.io/istio/pilot/pkg/networking/core/v1alpha3"
31	"istio.io/istio/pilot/pkg/networking/core/v1alpha3/loadbalancer"
32	"istio.io/istio/pilot/pkg/networking/util"
33	"istio.io/istio/pilot/pkg/serviceregistry"
34	"istio.io/istio/pilot/pkg/serviceregistry/aggregate"
35	"istio.io/istio/pkg/config/host"
36	"istio.io/istio/pkg/config/labels"
37	"istio.io/istio/pkg/config/protocol"
38)
39
40// EDS returns the list of endpoints (IP:port and in future labels) associated with a real
41// service or a subset of a service, selected using labels.
42//
43// The source of info is a list of service registries.
44//
45// Primary event is an endpoint creation/deletion. Once the event is fired, EDS needs to
46// find the list of services associated with the endpoint.
47//
48// In case of k8s, Endpoints event is fired when the endpoints are added to service - typically
49// after readiness check. At that point we have the 'real' Service. The Endpoint includes a list
50// of port numbers and names.
51//
52// For the subset case, the Pod referenced in the Endpoint must be looked up, and pod checked
53// for labels.
54//
55// In addition, ExternalEndpoint includes IPs and labels directly and can be directly processed.
56//
57// TODO: for selector-less services (mesh expansion), skip pod processing
58// TODO: optimize the code path for ExternalEndpoint, no additional processing needed
59// TODO: if a service doesn't have split traffic - we can also skip pod and label processing
60// TODO: efficient label processing. In alpha3, the destination policies are set per service, so
61// we may only need to search in a small list.
62
63var (
64	// Tracks connections, increment on each new connection.
65	connectionNumber = int64(0)
66)
67
68// TODO: add prom metrics !
69
70// buildEnvoyLbEndpoint packs the endpoint based on istio info.
71func buildEnvoyLbEndpoint(e *model.IstioEndpoint, push *model.PushContext) *endpoint.LbEndpoint {
72	addr := util.BuildAddress(e.Address, e.EndpointPort)
73
74	epWeight := e.LbWeight
75	if epWeight == 0 {
76		epWeight = 1
77	}
78	ep := &endpoint.LbEndpoint{
79		LoadBalancingWeight: &wrappers.UInt32Value{
80			Value: epWeight,
81		},
82		HostIdentifier: &endpoint.LbEndpoint_Endpoint{
83			Endpoint: &endpoint.Endpoint{
84				Address: addr,
85			},
86		},
87	}
88
89	// Istio telemetry depends on the metadata value being set for endpoints in the mesh.
90	// Istio endpoint level tls transport socket configuration depends on this logic
91	// Do not remove
92	ep.Metadata = util.BuildLbEndpointMetadata(e.UID, e.Network, e.TLSMode, push)
93
94	return ep
95}
96
97// UpdateServiceShards will list the endpoints and create the shards.
98// This is used to reconcile and to support non-k8s registries (until they migrate).
99// Note that aggregated list is expensive (for large numbers) - we want to replace
100// it with a model where DiscoveryServer keeps track of all endpoint registries
101// directly, and calls them one by one.
102func (s *DiscoveryServer) UpdateServiceShards(push *model.PushContext) error {
103	var registries []serviceregistry.Instance
104	var nonK8sRegistries []serviceregistry.Instance
105	if agg, ok := s.Env.ServiceDiscovery.(*aggregate.Controller); ok {
106		registries = agg.GetRegistries()
107	} else {
108		registries = []serviceregistry.Instance{
109			serviceregistry.Simple{
110				ServiceDiscovery: s.Env.ServiceDiscovery,
111			},
112		}
113	}
114
115	for _, registry := range registries {
116		if registry.Provider() != serviceregistry.Kubernetes {
117			nonK8sRegistries = append(nonK8sRegistries, registry)
118		}
119	}
120
121	// Each registry acts as a shard - we don't want to combine them because some
122	// may individually update their endpoints incrementally
123	for _, svc := range push.Services(nil) {
124		for _, registry := range nonK8sRegistries {
125			// skip the service in case this svc does not belong to the registry.
126			if svc.Attributes.ServiceRegistry != string(registry.Provider()) {
127				continue
128			}
129			endpoints := make([]*model.IstioEndpoint, 0)
130			for _, port := range svc.Ports {
131				if port.Protocol == protocol.UDP {
132					continue
133				}
134
135				// This loses track of grouping (shards)
136				instances, err := registry.InstancesByPort(svc, port.Port, labels.Collection{})
137				if err != nil {
138					return err
139				}
140
141				for _, inst := range instances {
142					endpoints = append(endpoints, inst.Endpoint)
143				}
144			}
145
146			// TODO(nmittler): Should we get the cluster from the endpoints instead? May require organizing endpoints by cluster first.
147			s.edsUpdate(registry.Cluster(), string(svc.Hostname), svc.Attributes.Namespace, endpoints, true)
148		}
149	}
150
151	return nil
152}
153
154// SvcUpdate is a callback from service discovery when service info changes.
155func (s *DiscoveryServer) SvcUpdate(cluster, hostname string, namespace string, event model.Event) {
156	// When a service deleted, we should cleanup the endpoint shards and also remove keys from EndpointShardsByService to
157	// prevent memory leaks.
158	if event == model.EventDelete {
159		inboundServiceDeletes.Increment()
160		s.mutex.Lock()
161		defer s.mutex.Unlock()
162		s.deleteService(cluster, hostname, namespace)
163	} else {
164		inboundServiceUpdates.Increment()
165	}
166}
167
168// Update clusters for an incremental EDS push, and initiate the push.
169// Only clusters that changed are updated/pushed.
170func (s *DiscoveryServer) edsIncremental(version string, req *model.PushRequest) {
171	adsLog.Infof("XDS:EDSInc Pushing:%s Services:%v ConnectedEndpoints:%d",
172		version, model.ConfigNamesOfKind(req.ConfigsUpdated, model.ServiceEntryKind), s.adsClientCount())
173	s.startPush(req)
174}
175
176// EDSUpdate computes destination address membership across all clusters and networks.
177// This is the main method implementing EDS.
178// It replaces InstancesByPort in model - instead of iterating over all endpoints it uses
179// the hostname-keyed map. And it avoids the conversion from Endpoint to ServiceEntry to envoy
180// on each step: instead the conversion happens once, when an endpoint is first discovered.
181func (s *DiscoveryServer) EDSUpdate(clusterID, serviceName string, namespace string,
182	istioEndpoints []*model.IstioEndpoint) error {
183	inboundEDSUpdates.Increment()
184	s.edsUpdate(clusterID, serviceName, namespace, istioEndpoints, false)
185	return nil
186}
187
188// edsUpdate updates EDS by clusterID, serviceName, IstioEndpoints,
189// and requests a Full/EDS push.
190func (s *DiscoveryServer) edsUpdate(clusterID, serviceName string, namespace string,
191	istioEndpoints []*model.IstioEndpoint, internal bool) {
192	// edsShardUpdate replaces a subset (shard) of endpoints, as result of an incremental
193	// update. The endpoint updates may be grouped by K8S clusters, other service registries
194	// or by deployment. Multiple updates are debounced, to avoid too frequent pushes.
195	// After debounce, the services are merged and pushed.
196	s.mutex.Lock()
197	defer s.mutex.Unlock()
198	requireFull := false
199
200	// Should delete the service EndpointShards when endpoints become zero to prevent memory leak,
201	// but we should not do not delete the keys from EndpointShardsByService map - that will trigger
202	// unnecessary full push which can become a real problem if a pod is in crashloop and thus endpoints
203	// flip flopping between 1 and 0.
204	if len(istioEndpoints) == 0 {
205		if s.EndpointShardsByService[serviceName][namespace] != nil {
206			s.deleteEndpointShards(clusterID, serviceName, namespace)
207			adsLog.Infof("Incremental push, service %s has no endpoints", serviceName)
208			s.ConfigUpdate(&model.PushRequest{
209				Full: false,
210				ConfigsUpdated: map[model.ConfigKey]struct{}{{
211					Kind:      model.ServiceEntryKind,
212					Name:      serviceName,
213					Namespace: namespace,
214				}: {}},
215				Reason: []model.TriggerReason{model.EndpointUpdate},
216			})
217		}
218		return
219	}
220
221	// Update the data structures for the service.
222	// 1. Find the 'per service' data
223	if _, f := s.EndpointShardsByService[serviceName]; !f {
224		s.EndpointShardsByService[serviceName] = map[string]*EndpointShards{}
225	}
226	ep, f := s.EndpointShardsByService[serviceName][namespace]
227	if !f {
228		// This endpoint is for a service that was not previously loaded.
229		// Return an error to force a full sync, which will also cause the
230		// EndpointsShardsByService to be initialized with all services.
231		ep = &EndpointShards{
232			Shards:          map[string][]*model.IstioEndpoint{},
233			ServiceAccounts: map[string]bool{},
234		}
235		s.EndpointShardsByService[serviceName][namespace] = ep
236		if !internal {
237			adsLog.Infof("Full push, new service %s", serviceName)
238			requireFull = true
239		}
240	}
241
242	// 2. Update data for the specific cluster. Each cluster gets independent
243	// updates containing the full list of endpoints for the service in that cluster.
244	serviceAccounts := map[string]bool{}
245	for _, e := range istioEndpoints {
246		if e.ServiceAccount != "" {
247			serviceAccounts[e.ServiceAccount] = true
248		}
249	}
250
251	if !reflect.DeepEqual(serviceAccounts, ep.ServiceAccounts) {
252		adsLog.Debugf("Updating service accounts now, svc %v, before service account %v, after %v",
253			serviceName, ep.ServiceAccounts, serviceAccounts)
254		if !internal {
255			requireFull = true
256			adsLog.Infof("Full push, service accounts changed, %v", serviceName)
257		}
258	}
259
260	ep.mutex.Lock()
261	ep.Shards[clusterID] = istioEndpoints
262	ep.ServiceAccounts = serviceAccounts
263	ep.mutex.Unlock()
264
265	// for internal update: this called by DiscoveryServer.Push --> UpdateServiceShards,
266	// no need to trigger push here.
267	// It is done in DiscoveryServer.Push --> AdsPushAll
268	if !internal {
269		s.ConfigUpdate(&model.PushRequest{
270			Full: requireFull,
271			ConfigsUpdated: map[model.ConfigKey]struct{}{{
272				Kind:      model.ServiceEntryKind,
273				Name:      serviceName,
274				Namespace: namespace,
275			}: {}},
276			Reason: []model.TriggerReason{model.EndpointUpdate},
277		})
278	}
279}
280
281// deleteEndpointShards deletes matching endpoint shards from EndpointShardsByService map. This is called when
282// endpoints are deleted.
283func (s *DiscoveryServer) deleteEndpointShards(cluster, serviceName, namespace string) {
284	if s.EndpointShardsByService[serviceName][namespace] != nil {
285		s.EndpointShardsByService[serviceName][namespace].mutex.Lock()
286		delete(s.EndpointShardsByService[serviceName][namespace].Shards, cluster)
287		s.EndpointShardsByService[serviceName][namespace].mutex.Unlock()
288	}
289}
290
291// deleteService deletes all service related references from EndpointShardsByService. This is called
292// when a service is deleted.
293func (s *DiscoveryServer) deleteService(cluster, serviceName, namespace string) {
294	if s.EndpointShardsByService[serviceName][namespace] != nil {
295		s.EndpointShardsByService[serviceName][namespace].mutex.Lock()
296		delete(s.EndpointShardsByService[serviceName][namespace].Shards, cluster)
297		svcShards := len(s.EndpointShardsByService[serviceName][namespace].Shards)
298		s.EndpointShardsByService[serviceName][namespace].mutex.Unlock()
299		if svcShards == 0 {
300			delete(s.EndpointShardsByService[serviceName], namespace)
301		}
302		if len(s.EndpointShardsByService[serviceName]) == 0 {
303			delete(s.EndpointShardsByService, serviceName)
304		}
305	}
306}
307
308func connectionID(node string) string {
309	id := atomic.AddInt64(&connectionNumber, 1)
310	return node + "-" + strconv.FormatInt(id, 10)
311}
312
313// loadAssignmentsForClusterIsolated return the endpoints for a proxy in an isolated namespace
314// Initial implementation is computing the endpoints on the flight - caching will be added as needed, based on
315// perf tests. The logic to compute is based on the current UpdateClusterInc
316func (s *DiscoveryServer) loadAssignmentsForClusterIsolated(proxy *model.Proxy, push *model.PushContext,
317	clusterName string) *xdsapi.ClusterLoadAssignment {
318	_, subsetName, hostname, port := model.ParseSubsetKey(clusterName)
319
320	// TODO: BUG. this code is incorrect if 1.1 isolation is used. With destination rule scoping
321	// (public/private) as well as sidecar scopes allowing import of
322	// specific destination rules, the destination rule for a given
323	// namespace should be determined based on the sidecar scope or the
324	// proxy's config namespace. As such, this code searches through all
325	// destination rules, public and private and returns a completely
326	// arbitrary destination rule's subset labels!
327	subsetLabels := push.SubsetToLabels(proxy, subsetName, hostname)
328
329	push.Mutex.Lock()
330	svc := proxy.SidecarScope.ServiceForHostname(hostname, push.ServiceByHostnameAndNamespace)
331	push.Mutex.Unlock()
332	if svc == nil {
333		// Shouldn't happen here
334		adsLog.Debugf("can not find the service for cluster %s", clusterName)
335		return buildEmptyClusterLoadAssignment(clusterName)
336	}
337
338	// Service resolution type might have changed and Cluster may be still in the EDS cluster list of "XdsConnection.Clusters".
339	// This can happen if a ServiceEntry's resolution is changed from STATIC to DNS which changes the Envoy cluster type from
340	// EDS to STRICT_DNS. When pushEds is called before Envoy sends the updated cluster list via Endpoint request which in turn
341	// will update "XdsConnection.Clusters", we might accidentally send EDS updates for STRICT_DNS cluster. This check gaurds
342	// against such behavior and returns nil. When the updated cluster warms up in Envoy, it would update with new endpoints
343	// automatically.
344	// Gateways use EDS for Passthrough cluster. So we should allow Passthrough here.
345	if svc.Resolution == model.DNSLB {
346		adsLog.Infof("XdsConnection has %s in its eds clusters but its resolution now is updated to %v, skipping it.", clusterName, svc.Resolution)
347		return nil
348	}
349
350	svcPort, f := svc.Ports.GetByPort(port)
351	if !f {
352		// Shouldn't happen here
353		adsLog.Debugf("can not find the service port %d for cluster %s", port, clusterName)
354		return buildEmptyClusterLoadAssignment(clusterName)
355	}
356
357	// The service was never updated - do the full update
358	s.mutex.RLock()
359	se, f := s.EndpointShardsByService[string(hostname)][svc.Attributes.Namespace]
360	s.mutex.RUnlock()
361	if !f {
362		// Shouldn't happen here
363		adsLog.Debugf("can not find the endpointShards for cluster %s", clusterName)
364		return buildEmptyClusterLoadAssignment(clusterName)
365	}
366
367	locEps := buildLocalityLbEndpointsFromShards(proxy, se, svc, svcPort, subsetLabels, clusterName, push)
368
369	return &xdsapi.ClusterLoadAssignment{
370		ClusterName: clusterName,
371		Endpoints:   locEps,
372	}
373}
374
375func (s *DiscoveryServer) generateEndpoints(
376	clusterName string, proxy *model.Proxy, push *model.PushContext, edsUpdatedServices map[string]struct{},
377) *xdsapi.ClusterLoadAssignment {
378	_, _, hostname, _ := model.ParseSubsetKey(clusterName)
379	if edsUpdatedServices != nil {
380		if _, ok := edsUpdatedServices[string(hostname)]; !ok {
381			// Cluster was not updated, skip recomputing. This happens when we get an incremental update for a
382			// specific Hostname. On connect or for full push edsUpdatedServices will be empty.
383			return nil
384		}
385	}
386
387	l := s.loadAssignmentsForClusterIsolated(proxy, push, clusterName)
388	if l == nil {
389		return nil
390	}
391
392	// If networks are set (by default they aren't) apply the Split Horizon
393	// EDS filter on the endpoints
394	if push.Networks != nil && len(push.Networks.Networks) > 0 {
395		endpoints := EndpointsByNetworkFilter(push, proxy.Metadata.Network, l.Endpoints)
396		filteredCLA := &xdsapi.ClusterLoadAssignment{
397			ClusterName: l.ClusterName,
398			Endpoints:   endpoints,
399			Policy:      l.Policy,
400		}
401		l = filteredCLA
402	}
403
404	// If locality aware routing is enabled, prioritize endpoints or set their lb weight.
405	// Failover should only be enabled when there is an outlier detection, otherwise Envoy
406	// will never detect the hosts are unhealthy and redirect traffic.
407	enableFailover, lb := getOutlierDetectionAndLoadBalancerSettings(push, proxy, clusterName)
408	lbSetting := loadbalancer.GetLocalityLbSetting(push.Mesh.GetLocalityLbSetting(), lb.GetLocalityLbSetting())
409	if lbSetting != nil {
410		// Make a shallow copy of the cla as we are mutating the endpoints with priorities/weights relative to the calling proxy
411		clonedCLA := util.CloneClusterLoadAssignment(l)
412		l = &clonedCLA
413		loadbalancer.ApplyLocalityLBSetting(proxy.Locality, l, lbSetting, enableFailover)
414	}
415	return l
416}
417
418// pushEds is pushing EDS updates for a single connection. Called the first time
419// a client connects, for incremental updates and for full periodic updates.
420func (s *DiscoveryServer) pushEds(push *model.PushContext, con *XdsConnection, version string, edsUpdatedServices map[string]struct{}) error {
421	pushStart := time.Now()
422	loadAssignments := make([]*xdsapi.ClusterLoadAssignment, 0)
423	endpoints := 0
424	empty := 0
425
426	// All clusters that this endpoint is watching. For 1.0 - it's typically all clusters in the mesh.
427	// For 1.1+Sidecar - it's the small set of explicitly imported clusters, using the isolated DestinationRules
428	for _, clusterName := range con.Clusters {
429
430		l := s.generateEndpoints(clusterName, con.node, push, edsUpdatedServices)
431		if l == nil {
432			continue
433		}
434
435		for _, e := range l.Endpoints {
436			endpoints += len(e.LbEndpoints)
437		}
438
439		if len(l.Endpoints) == 0 {
440			empty++
441		}
442		loadAssignments = append(loadAssignments, l)
443	}
444
445	response := endpointDiscoveryResponse(loadAssignments, version, push.Version, con.RequestedTypes.EDS)
446	err := con.send(response)
447	edsPushTime.Record(time.Since(pushStart).Seconds())
448	if err != nil {
449		adsLog.Warnf("EDS: Send failure %s: %v", con.ConID, err)
450		recordSendError(edsSendErrPushes, err)
451		return err
452	}
453	edsPushes.Increment()
454
455	if edsUpdatedServices == nil {
456		adsLog.Infof("EDS: PUSH for node:%s clusters:%d endpoints:%d empty:%v",
457			con.node.ID, len(con.Clusters), endpoints, empty)
458	} else {
459		adsLog.Debugf("EDS: PUSH INC for node:%s clusters:%d endpoints:%d empty:%v",
460			con.node.ID, len(con.Clusters), endpoints, empty)
461	}
462	return nil
463}
464
465// getDestinationRule gets the DestinationRule for a given hostname. As an optimization, this also gets the service port,
466// which is needed to access the traffic policy from the destination rule.
467func getDestinationRule(push *model.PushContext, proxy *model.Proxy, hostname host.Name, clusterPort int) (*networkingapi.DestinationRule, *model.Port) {
468	for _, service := range push.Services(proxy) {
469		if service.Hostname == hostname {
470			cfg := push.DestinationRule(proxy, service)
471			if cfg == nil {
472				continue
473			}
474			for _, p := range service.Ports {
475				if p.Port == clusterPort {
476					return cfg.Spec.(*networkingapi.DestinationRule), p
477				}
478			}
479		}
480	}
481	return nil, nil
482}
483
484func getOutlierDetectionAndLoadBalancerSettings(push *model.PushContext, proxy *model.Proxy, clusterName string) (bool, *networkingapi.LoadBalancerSettings) {
485	_, subsetName, hostname, portNumber := model.ParseSubsetKey(clusterName)
486	var outlierDetectionEnabled = false
487	var lbSettings *networkingapi.LoadBalancerSettings
488
489	destinationRule, port := getDestinationRule(push, proxy, hostname, portNumber)
490	if destinationRule == nil || port == nil {
491		return false, nil
492	}
493
494	_, outlierDetection, loadBalancerSettings, _ := networking.SelectTrafficPolicyComponents(destinationRule.TrafficPolicy, port)
495	lbSettings = loadBalancerSettings
496	if outlierDetection != nil {
497		outlierDetectionEnabled = true
498	}
499
500	for _, subset := range destinationRule.Subsets {
501		if subset.Name == subsetName {
502			_, outlierDetection, loadBalancerSettings, _ := networking.SelectTrafficPolicyComponents(subset.TrafficPolicy, port)
503			lbSettings = loadBalancerSettings
504			if outlierDetection != nil {
505				outlierDetectionEnabled = true
506			}
507			break
508		}
509	}
510	return outlierDetectionEnabled, lbSettings
511}
512
513func endpointDiscoveryResponse(loadAssignments []*xdsapi.ClusterLoadAssignment, version, noncePrefix, typeURL string) *xdsapi.DiscoveryResponse {
514	out := &xdsapi.DiscoveryResponse{
515		TypeUrl: typeURL,
516		// Pilot does not really care for versioning. It always supplies what's currently
517		// available to it, irrespective of whether Envoy chooses to accept or reject EDS
518		// responses. Pilot believes in eventual consistency and that at some point, Envoy
519		// will begin seeing results it deems to be good.
520		VersionInfo: version,
521		Nonce:       nonce(noncePrefix),
522	}
523	for _, loadAssignment := range loadAssignments {
524		resource := util.MessageToAny(loadAssignment)
525		resource.TypeUrl = typeURL
526		out.Resources = append(out.Resources, resource)
527	}
528
529	return out
530}
531
532// build LocalityLbEndpoints for a cluster from existing EndpointShards.
533func buildLocalityLbEndpointsFromShards(
534	proxy *model.Proxy,
535	shards *EndpointShards,
536	svc *model.Service,
537	svcPort *model.Port,
538	epLabels labels.Collection,
539	clusterName string,
540	push *model.PushContext) []*endpoint.LocalityLbEndpoints {
541	localityEpMap := make(map[string]*endpoint.LocalityLbEndpoints)
542
543	// Determine whether or not the target service is considered local to the cluster
544	// and should, therefore, not be accessed from outside the cluster.
545	isClusterLocal := push.IsClusterLocal(svc)
546
547	shards.mutex.Lock()
548	// The shards are updated independently, now need to filter and merge
549	// for this cluster
550	for clusterID, endpoints := range shards.Shards {
551		// If the downstream service is configured as cluster-local, only include endpoints that
552		// reside in the same cluster.
553		if isClusterLocal && (clusterID != proxy.ClusterID) {
554			continue
555		}
556
557		for _, ep := range endpoints {
558			if svcPort.Name != ep.ServicePortName {
559				continue
560			}
561			// Port labels
562			if !epLabels.HasSubsetOf(ep.Labels) {
563				continue
564			}
565
566			locLbEps, found := localityEpMap[ep.Locality.Label]
567			if !found {
568				locLbEps = &endpoint.LocalityLbEndpoints{
569					Locality:    util.ConvertLocality(ep.Locality.Label),
570					LbEndpoints: make([]*endpoint.LbEndpoint, 0, len(endpoints)),
571				}
572				localityEpMap[ep.Locality.Label] = locLbEps
573			}
574			if ep.EnvoyEndpoint == nil {
575				ep.EnvoyEndpoint = buildEnvoyLbEndpoint(ep, push)
576			}
577			locLbEps.LbEndpoints = append(locLbEps.LbEndpoints, ep.EnvoyEndpoint)
578
579		}
580	}
581	shards.mutex.Unlock()
582
583	locEps := make([]*endpoint.LocalityLbEndpoints, 0, len(localityEpMap))
584	for _, locLbEps := range localityEpMap {
585		var weight uint32
586		for _, ep := range locLbEps.LbEndpoints {
587			weight += ep.LoadBalancingWeight.GetValue()
588		}
589		locLbEps.LoadBalancingWeight = &wrappers.UInt32Value{
590			Value: weight,
591		}
592		locEps = append(locEps, locLbEps)
593	}
594
595	if len(locEps) == 0 {
596		push.AddMetric(model.ProxyStatusClusterNoInstances, clusterName, nil, "")
597	}
598
599	updateEdsStats(locEps, clusterName)
600
601	return locEps
602}
603
604// cluster with no endpoints
605func buildEmptyClusterLoadAssignment(clusterName string) *xdsapi.ClusterLoadAssignment {
606	return &xdsapi.ClusterLoadAssignment{
607		ClusterName: clusterName,
608	}
609}
610
611func updateEdsStats(locEps []*endpoint.LocalityLbEndpoints, cluster string) {
612	edsInstances.With(clusterTag.Value(cluster)).Record(float64(len(locEps)))
613	epc := 0
614	for _, locLbEps := range locEps {
615		epc += len(locLbEps.GetLbEndpoints())
616	}
617	edsAllLocalityEndpoints.With(clusterTag.Value(cluster)).Record(float64(epc))
618}
619