1package xds
2
3import (
4	"errors"
5	"fmt"
6
7	envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
8	envoy_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
9	envoy_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
10
11	"github.com/golang/protobuf/proto"
12	bexpr "github.com/hashicorp/go-bexpr"
13
14	"github.com/hashicorp/consul/agent/connect"
15	"github.com/hashicorp/consul/agent/proxycfg"
16	"github.com/hashicorp/consul/agent/structs"
17	"github.com/hashicorp/consul/api"
18)
19
20const (
21	UnnamedSubset = ""
22)
23
24// endpointsFromSnapshot returns the xDS API representation of the "endpoints"
25func (s *ResourceGenerator) endpointsFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
26	if cfgSnap == nil {
27		return nil, errors.New("nil config given")
28	}
29
30	switch cfgSnap.Kind {
31	case structs.ServiceKindConnectProxy:
32		return s.endpointsFromSnapshotConnectProxy(cfgSnap)
33	case structs.ServiceKindTerminatingGateway:
34		return s.endpointsFromSnapshotTerminatingGateway(cfgSnap)
35	case structs.ServiceKindMeshGateway:
36		return s.endpointsFromSnapshotMeshGateway(cfgSnap)
37	case structs.ServiceKindIngressGateway:
38		return s.endpointsFromSnapshotIngressGateway(cfgSnap)
39	default:
40		return nil, fmt.Errorf("Invalid service kind: %v", cfgSnap.Kind)
41	}
42}
43
44// endpointsFromSnapshotConnectProxy returns the xDS API representation of the "endpoints"
45// (upstream instances) in the snapshot.
46func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
47	resources := make([]proto.Message, 0,
48		len(cfgSnap.ConnectProxy.PreparedQueryEndpoints)+len(cfgSnap.ConnectProxy.WatchedUpstreamEndpoints))
49
50	for id, chain := range cfgSnap.ConnectProxy.DiscoveryChain {
51		es := s.endpointsFromDiscoveryChain(
52			id,
53			chain,
54			cfgSnap.Datacenter,
55			cfgSnap.ConnectProxy.UpstreamConfig[id],
56			cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[id],
57			cfgSnap.ConnectProxy.WatchedGatewayEndpoints[id],
58		)
59		resources = append(resources, es...)
60	}
61
62	// Looping over explicit upstreams is only needed for prepared queries because they do not have discovery chains
63	for _, u := range cfgSnap.Proxy.Upstreams {
64		if u.DestinationType != structs.UpstreamDestTypePreparedQuery {
65			continue
66		}
67		id := u.Identifier()
68
69		dc := u.Datacenter
70		if dc == "" {
71			dc = cfgSnap.Datacenter
72		}
73		clusterName := connect.UpstreamSNI(&u, "", dc, cfgSnap.Roots.TrustDomain)
74
75		endpoints, ok := cfgSnap.ConnectProxy.PreparedQueryEndpoints[id]
76		if ok {
77			la := makeLoadAssignment(
78				clusterName,
79				[]loadAssignmentEndpointGroup{
80					{Endpoints: endpoints},
81				},
82				cfgSnap.Datacenter,
83			)
84			resources = append(resources, la)
85		}
86	}
87
88	return resources, nil
89}
90
91func (s *ResourceGenerator) filterSubsetEndpoints(subset *structs.ServiceResolverSubset, endpoints structs.CheckServiceNodes) (structs.CheckServiceNodes, error) {
92	// locally execute the subsets filter
93	if subset.Filter != "" {
94		filter, err := bexpr.CreateFilter(subset.Filter, nil, endpoints)
95		if err != nil {
96			return nil, err
97		}
98
99		raw, err := filter.Execute(endpoints)
100		if err != nil {
101			return nil, err
102		}
103		return raw.(structs.CheckServiceNodes), nil
104	}
105	return endpoints, nil
106}
107
108func (s *ResourceGenerator) endpointsFromSnapshotTerminatingGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
109	return s.endpointsFromServicesAndResolvers(cfgSnap, cfgSnap.TerminatingGateway.ServiceGroups, cfgSnap.TerminatingGateway.ServiceResolvers)
110}
111
112func (s *ResourceGenerator) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
113	datacenters := cfgSnap.MeshGateway.Datacenters()
114	resources := make([]proto.Message, 0, len(datacenters)+len(cfgSnap.MeshGateway.ServiceGroups))
115
116	// generate the endpoints for the gateways in the remote datacenters
117	for _, dc := range datacenters {
118		// Skip creating endpoints for mesh gateways in local DC and gateways in remote DCs with a hostname as their address
119		// EDS cannot resolve hostnames so we provide them through CDS instead
120		if dc == cfgSnap.Datacenter || len(cfgSnap.MeshGateway.HostnameDatacenters[dc]) > 0 {
121			continue
122		}
123
124		endpoints, ok := cfgSnap.MeshGateway.GatewayGroups[dc]
125		if !ok {
126			endpoints, ok = cfgSnap.MeshGateway.FedStateGateways[dc]
127			if !ok { // not possible
128				s.Logger.Error("skipping mesh gateway endpoints because no definition found", "datacenter", dc)
129				continue
130			}
131		}
132
133		{ // standard connect
134			clusterName := connect.DatacenterSNI(dc, cfgSnap.Roots.TrustDomain)
135
136			la := makeLoadAssignment(
137				clusterName,
138				[]loadAssignmentEndpointGroup{
139					{Endpoints: endpoints},
140				},
141				cfgSnap.Datacenter,
142			)
143			resources = append(resources, la)
144		}
145
146		if cfgSnap.ServiceMeta[structs.MetaWANFederationKey] == "1" && cfgSnap.ServerSNIFn != nil {
147			clusterName := cfgSnap.ServerSNIFn(dc, "")
148
149			la := makeLoadAssignment(
150				clusterName,
151				[]loadAssignmentEndpointGroup{
152					{Endpoints: endpoints},
153				},
154				cfgSnap.Datacenter,
155			)
156			resources = append(resources, la)
157		}
158	}
159
160	// generate endpoints for our servers if WAN federation is enabled
161	if cfgSnap.ServiceMeta[structs.MetaWANFederationKey] == "1" && cfgSnap.ServerSNIFn != nil {
162		var allServersLbEndpoints []*envoy_endpoint_v3.LbEndpoint
163
164		for _, srv := range cfgSnap.MeshGateway.ConsulServers {
165			clusterName := cfgSnap.ServerSNIFn(cfgSnap.Datacenter, srv.Node.Node)
166
167			addr, port := srv.BestAddress(false /*wan*/)
168
169			lbEndpoint := &envoy_endpoint_v3.LbEndpoint{
170				HostIdentifier: &envoy_endpoint_v3.LbEndpoint_Endpoint{
171					Endpoint: &envoy_endpoint_v3.Endpoint{
172						Address: makeAddress(addr, port),
173					},
174				},
175				HealthStatus: envoy_core_v3.HealthStatus_UNKNOWN,
176			}
177
178			cla := &envoy_endpoint_v3.ClusterLoadAssignment{
179				ClusterName: clusterName,
180				Endpoints: []*envoy_endpoint_v3.LocalityLbEndpoints{{
181					LbEndpoints: []*envoy_endpoint_v3.LbEndpoint{lbEndpoint},
182				}},
183			}
184			allServersLbEndpoints = append(allServersLbEndpoints, lbEndpoint)
185
186			resources = append(resources, cla)
187		}
188
189		// And add one catch all so that remote datacenters can dial ANY server
190		// in this datacenter without knowing its name.
191		resources = append(resources, &envoy_endpoint_v3.ClusterLoadAssignment{
192			ClusterName: cfgSnap.ServerSNIFn(cfgSnap.Datacenter, ""),
193			Endpoints: []*envoy_endpoint_v3.LocalityLbEndpoints{{
194				LbEndpoints: allServersLbEndpoints,
195			}},
196		})
197	}
198
199	// Generate the endpoints for each service and its subsets
200	e, err := s.endpointsFromServicesAndResolvers(cfgSnap, cfgSnap.MeshGateway.ServiceGroups, cfgSnap.MeshGateway.ServiceResolvers)
201	if err != nil {
202		return nil, err
203	}
204	resources = append(resources, e...)
205
206	return resources, nil
207}
208
209func (s *ResourceGenerator) endpointsFromServicesAndResolvers(
210	cfgSnap *proxycfg.ConfigSnapshot,
211	services map[structs.ServiceName]structs.CheckServiceNodes,
212	resolvers map[structs.ServiceName]*structs.ServiceResolverConfigEntry,
213) ([]proto.Message, error) {
214	resources := make([]proto.Message, 0, len(services))
215
216	// generate the endpoints for the linked service groups
217	for svc, endpoints := range services {
218		// Skip creating endpoints for services that have hostnames as addresses
219		// EDS cannot resolve hostnames so we provide them through CDS instead
220		if cfgSnap.Kind == structs.ServiceKindTerminatingGateway && len(cfgSnap.TerminatingGateway.HostnameServices[svc]) > 0 {
221			continue
222		}
223
224		clusterEndpoints := make(map[string][]loadAssignmentEndpointGroup)
225		clusterEndpoints[UnnamedSubset] = []loadAssignmentEndpointGroup{{Endpoints: endpoints, OnlyPassing: false}}
226
227		// Collect all of the loadAssignmentEndpointGroups for the various subsets. We do this before generating
228		// the endpoints for the default/unnamed subset so that we can take into account the DefaultSubset on the
229		// service-resolver which may prevent the default/unnamed cluster from creating endpoints for all service
230		// instances.
231		if resolver, hasResolver := resolvers[svc]; hasResolver {
232			for subsetName, subset := range resolver.Subsets {
233				subsetEndpoints, err := s.filterSubsetEndpoints(&subset, endpoints)
234				if err != nil {
235					return nil, err
236				}
237				groups := []loadAssignmentEndpointGroup{{Endpoints: subsetEndpoints, OnlyPassing: subset.OnlyPassing}}
238				clusterEndpoints[subsetName] = groups
239
240				// if this subset is the default then override the unnamed subset with this configuration
241				if subsetName == resolver.DefaultSubset {
242					clusterEndpoints[UnnamedSubset] = groups
243				}
244			}
245		}
246
247		// now generate the load assignment for all subsets
248		for subsetName, groups := range clusterEndpoints {
249			clusterName := connect.ServiceSNI(svc.Name, subsetName, svc.NamespaceOrDefault(), cfgSnap.Datacenter, cfgSnap.Roots.TrustDomain)
250			la := makeLoadAssignment(
251				clusterName,
252				groups,
253				cfgSnap.Datacenter,
254			)
255			resources = append(resources, la)
256		}
257	}
258
259	return resources, nil
260}
261
262func (s *ResourceGenerator) endpointsFromSnapshotIngressGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
263	var resources []proto.Message
264	createdClusters := make(map[string]bool)
265	for _, upstreams := range cfgSnap.IngressGateway.Upstreams {
266		for _, u := range upstreams {
267			id := u.Identifier()
268
269			// If we've already created endpoints for this upstream, skip it. Multiple listeners may
270			// reference the same upstream, so we don't need to create duplicate endpoints in that case.
271			if createdClusters[id] {
272				continue
273			}
274
275			es := s.endpointsFromDiscoveryChain(
276				id,
277				cfgSnap.IngressGateway.DiscoveryChain[id],
278				cfgSnap.Datacenter,
279				&u,
280				cfgSnap.IngressGateway.WatchedUpstreamEndpoints[id],
281				cfgSnap.IngressGateway.WatchedGatewayEndpoints[id],
282			)
283			resources = append(resources, es...)
284			createdClusters[id] = true
285		}
286	}
287	return resources, nil
288}
289
290// used in clusters.go
291func makeEndpoint(host string, port int) *envoy_endpoint_v3.LbEndpoint {
292	return &envoy_endpoint_v3.LbEndpoint{
293		HostIdentifier: &envoy_endpoint_v3.LbEndpoint_Endpoint{
294			Endpoint: &envoy_endpoint_v3.Endpoint{
295				Address: makeAddress(host, port),
296			},
297		},
298	}
299}
300
301func makePipeEndpoint(path string) *envoy_endpoint_v3.LbEndpoint {
302	return &envoy_endpoint_v3.LbEndpoint{
303		HostIdentifier: &envoy_endpoint_v3.LbEndpoint_Endpoint{
304			Endpoint: &envoy_endpoint_v3.Endpoint{
305				Address: makePipeAddress(path, 0),
306			},
307		},
308	}
309}
310
311func (s *ResourceGenerator) endpointsFromDiscoveryChain(
312	id string,
313	chain *structs.CompiledDiscoveryChain,
314	datacenter string,
315	upstream *structs.Upstream,
316	upstreamEndpoints, gatewayEndpoints map[string]structs.CheckServiceNodes,
317) []proto.Message {
318	var resources []proto.Message
319
320	if chain == nil {
321		return resources
322	}
323
324	configMap := make(map[string]interface{})
325	if upstream != nil {
326		configMap = upstream.Config
327	}
328	cfg, err := structs.ParseUpstreamConfigNoDefaults(configMap)
329	if err != nil {
330		// Don't hard fail on a config typo, just warn. The parse func returns
331		// default config if there is an error so it's safe to continue.
332		s.Logger.Warn("failed to parse", "upstream", id,
333			"error", err)
334	}
335
336	var escapeHatchCluster *envoy_cluster_v3.Cluster
337	if cfg.EnvoyClusterJSON != "" {
338		if chain.IsDefault() {
339			// If you haven't done anything to setup the discovery chain, then
340			// you can use the envoy_cluster_json escape hatch.
341			escapeHatchCluster, err = makeClusterFromUserConfig(cfg.EnvoyClusterJSON)
342			if err != nil {
343				return resources
344			}
345		} else {
346			s.Logger.Warn("ignoring escape hatch setting, because a discovery chain is configued for",
347				"discovery chain", chain.ServiceName, "upstream", id,
348				"envoy_cluster_json", chain.ServiceName)
349		}
350	}
351
352	// Find all resolver nodes.
353	for _, node := range chain.Nodes {
354		if node.Type != structs.DiscoveryGraphNodeTypeResolver {
355			continue
356		}
357		failover := node.Resolver.Failover
358		targetID := node.Resolver.Target
359
360		target := chain.Targets[targetID]
361
362		clusterName := CustomizeClusterName(target.Name, chain)
363		if escapeHatchCluster != nil {
364			clusterName = escapeHatchCluster.Name
365		}
366		s.Logger.Debug("generating endpoints for", "cluster", clusterName)
367
368		// Determine if we have to generate the entire cluster differently.
369		failoverThroughMeshGateway := chain.WillFailoverThroughMeshGateway(node)
370
371		if failoverThroughMeshGateway {
372			actualTargetID := firstHealthyTarget(
373				chain.Targets,
374				upstreamEndpoints,
375				targetID,
376				failover.Targets,
377			)
378			if actualTargetID != targetID {
379				targetID = actualTargetID
380			}
381
382			failover = nil
383		}
384
385		primaryGroup, valid := makeLoadAssignmentEndpointGroup(
386			chain.Targets,
387			upstreamEndpoints,
388			gatewayEndpoints,
389			targetID,
390			datacenter,
391		)
392		if !valid {
393			continue // skip the cluster if we're still populating the snapshot
394		}
395
396		var endpointGroups []loadAssignmentEndpointGroup
397
398		if failover != nil && len(failover.Targets) > 0 {
399			endpointGroups = make([]loadAssignmentEndpointGroup, 0, len(failover.Targets)+1)
400
401			endpointGroups = append(endpointGroups, primaryGroup)
402
403			for _, failTargetID := range failover.Targets {
404				failoverGroup, valid := makeLoadAssignmentEndpointGroup(
405					chain.Targets,
406					upstreamEndpoints,
407					gatewayEndpoints,
408					failTargetID,
409					datacenter,
410				)
411				if !valid {
412					continue // skip the failover target if we're still populating the snapshot
413				}
414				endpointGroups = append(endpointGroups, failoverGroup)
415			}
416		} else {
417			endpointGroups = append(endpointGroups, primaryGroup)
418		}
419
420		la := makeLoadAssignment(
421			clusterName,
422			endpointGroups,
423			datacenter,
424		)
425		resources = append(resources, la)
426	}
427
428	return resources
429}
430
431type loadAssignmentEndpointGroup struct {
432	Endpoints      structs.CheckServiceNodes
433	OnlyPassing    bool
434	OverrideHealth envoy_core_v3.HealthStatus
435}
436
437func makeLoadAssignment(clusterName string, endpointGroups []loadAssignmentEndpointGroup, localDatacenter string) *envoy_endpoint_v3.ClusterLoadAssignment {
438	cla := &envoy_endpoint_v3.ClusterLoadAssignment{
439		ClusterName: clusterName,
440		Endpoints:   make([]*envoy_endpoint_v3.LocalityLbEndpoints, 0, len(endpointGroups)),
441	}
442
443	if len(endpointGroups) > 1 {
444		cla.Policy = &envoy_endpoint_v3.ClusterLoadAssignment_Policy{
445			// We choose such a large value here that the failover math should
446			// in effect not happen until zero instances are healthy.
447			OverprovisioningFactor: makeUint32Value(100000),
448		}
449	}
450
451	for priority, endpointGroup := range endpointGroups {
452		endpoints := endpointGroup.Endpoints
453		es := make([]*envoy_endpoint_v3.LbEndpoint, 0, len(endpoints))
454
455		for _, ep := range endpoints {
456			// TODO (mesh-gateway) - should we respect the translate_wan_addrs configuration here or just always use the wan for cross-dc?
457			addr, port := ep.BestAddress(localDatacenter != ep.Node.Datacenter)
458			healthStatus, weight := calculateEndpointHealthAndWeight(ep, endpointGroup.OnlyPassing)
459
460			if endpointGroup.OverrideHealth != envoy_core_v3.HealthStatus_UNKNOWN {
461				healthStatus = endpointGroup.OverrideHealth
462			}
463
464			es = append(es, &envoy_endpoint_v3.LbEndpoint{
465				HostIdentifier: &envoy_endpoint_v3.LbEndpoint_Endpoint{
466					Endpoint: &envoy_endpoint_v3.Endpoint{
467						Address: makeAddress(addr, port),
468					},
469				},
470				HealthStatus:        healthStatus,
471				LoadBalancingWeight: makeUint32Value(weight),
472			})
473		}
474
475		cla.Endpoints = append(cla.Endpoints, &envoy_endpoint_v3.LocalityLbEndpoints{
476			Priority:    uint32(priority),
477			LbEndpoints: es,
478		})
479	}
480
481	return cla
482}
483
484func makeLoadAssignmentEndpointGroup(
485	targets map[string]*structs.DiscoveryTarget,
486	targetHealth map[string]structs.CheckServiceNodes,
487	gatewayHealth map[string]structs.CheckServiceNodes,
488	targetID string,
489	currentDatacenter string,
490) (loadAssignmentEndpointGroup, bool) {
491	realEndpoints, ok := targetHealth[targetID]
492	if !ok {
493		// skip the cluster if we're still populating the snapshot
494		return loadAssignmentEndpointGroup{}, false
495	}
496	target := targets[targetID]
497
498	var gatewayDatacenter string
499	switch target.MeshGateway.Mode {
500	case structs.MeshGatewayModeRemote:
501		gatewayDatacenter = target.Datacenter
502	case structs.MeshGatewayModeLocal:
503		gatewayDatacenter = currentDatacenter
504	}
505
506	if gatewayDatacenter == "" {
507		return loadAssignmentEndpointGroup{
508			Endpoints:   realEndpoints,
509			OnlyPassing: target.Subset.OnlyPassing,
510		}, true
511	}
512
513	// If using a mesh gateway we need to pull those endpoints instead.
514	gatewayEndpoints, ok := gatewayHealth[gatewayDatacenter]
515	if !ok {
516		// skip the cluster if we're still populating the snapshot
517		return loadAssignmentEndpointGroup{}, false
518	}
519
520	// But we will use the health from the actual backend service.
521	overallHealth := envoy_core_v3.HealthStatus_UNHEALTHY
522	for _, ep := range realEndpoints {
523		health, _ := calculateEndpointHealthAndWeight(ep, target.Subset.OnlyPassing)
524		if health == envoy_core_v3.HealthStatus_HEALTHY {
525			overallHealth = envoy_core_v3.HealthStatus_HEALTHY
526			break
527		}
528	}
529
530	return loadAssignmentEndpointGroup{
531		Endpoints:      gatewayEndpoints,
532		OverrideHealth: overallHealth,
533	}, true
534}
535
536func calculateEndpointHealthAndWeight(
537	ep structs.CheckServiceNode,
538	onlyPassing bool,
539) (envoy_core_v3.HealthStatus, int) {
540	healthStatus := envoy_core_v3.HealthStatus_HEALTHY
541	weight := 1
542	if ep.Service.Weights != nil {
543		weight = ep.Service.Weights.Passing
544	}
545
546	for _, chk := range ep.Checks {
547		if chk.Status == api.HealthCritical {
548			healthStatus = envoy_core_v3.HealthStatus_UNHEALTHY
549		}
550		if onlyPassing && chk.Status != api.HealthPassing {
551			healthStatus = envoy_core_v3.HealthStatus_UNHEALTHY
552		}
553		if chk.Status == api.HealthWarning && ep.Service.Weights != nil {
554			weight = ep.Service.Weights.Warning
555		}
556	}
557	// Make weights fit Envoy's limits. A zero weight means that either Warning
558	// (likely) or Passing (weirdly) weight has been set to 0 effectively making
559	// this instance unhealthy and should not be sent traffic.
560	if weight < 1 {
561		healthStatus = envoy_core_v3.HealthStatus_UNHEALTHY
562		weight = 1
563	}
564	if weight > 128 {
565		weight = 128
566	}
567	return healthStatus, weight
568}
569