1// Copyright 2017 Istio Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package controller
16
17import (
18	"context"
19	"encoding/json"
20	"errors"
21	"fmt"
22	"net"
23	"reflect"
24	"sort"
25	"strconv"
26	"sync"
27	"time"
28
29	"github.com/yl2chen/cidranger"
30	v1 "k8s.io/api/core/v1"
31	"k8s.io/apimachinery/pkg/api/meta"
32	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33	klabels "k8s.io/apimachinery/pkg/labels"
34	"k8s.io/apimachinery/pkg/runtime/schema"
35	"k8s.io/apimachinery/pkg/util/intstr"
36	"k8s.io/client-go/informers"
37	coreinformers "k8s.io/client-go/informers/core/v1"
38	"k8s.io/client-go/kubernetes"
39	listerv1 "k8s.io/client-go/listers/core/v1"
40	"k8s.io/client-go/metadata"
41	"k8s.io/client-go/metadata/metadatainformer"
42	"k8s.io/client-go/tools/cache"
43
44	"istio.io/pkg/log"
45	"istio.io/pkg/monitoring"
46
47	"istio.io/istio/pilot/pkg/model"
48	"istio.io/istio/pilot/pkg/networking/util"
49	"istio.io/istio/pilot/pkg/serviceregistry"
50	"istio.io/istio/pilot/pkg/serviceregistry/kube"
51	"istio.io/istio/pkg/config/host"
52	"istio.io/istio/pkg/config/labels"
53	"istio.io/istio/pkg/config/mesh"
54	"istio.io/istio/pkg/queue"
55)
56
57const (
58	// NodeRegionLabel is the well-known label for kubernetes node region in beta
59	NodeRegionLabel = "failure-domain.beta.kubernetes.io/region"
60	// NodeZoneLabel is the well-known label for kubernetes node zone in beta
61	NodeZoneLabel = "failure-domain.beta.kubernetes.io/zone"
62	// NodeRegionLabelGA is the well-known label for kubernetes node region in ga
63	NodeRegionLabelGA = "topology.kubernetes.io/region"
64	// NodeZoneLabelGA is the well-known label for kubernetes node zone in ga
65	NodeZoneLabelGA = "topology.kubernetes.io/zone"
66	// IstioSubzoneLabel is custom subzone label for locality-based routing in Kubernetes see: https://github.com/istio/istio/issues/19114
67	IstioSubzoneLabel = "topology.istio.io/subzone"
68	// IstioNamespace used by default for Istio cluster-wide installation
69	IstioNamespace = "istio-system"
70	// IstioConfigMap is used by default
71	IstioConfigMap = "istio"
72	// PrometheusScrape is the annotation used by prometheus to determine if service metrics should be scraped (collected)
73	PrometheusScrape = "prometheus.io/scrape"
74	// PrometheusPort is the annotation used to explicitly specify the port to use for scraping metrics
75	PrometheusPort = "prometheus.io/port"
76	// PrometheusPath is the annotation used to specify a path for scraping metrics. Default is "/metrics"
77	PrometheusPath = "prometheus.io/path"
78	// PrometheusPathDefault is the default value for the PrometheusPath annotation
79	PrometheusPathDefault = "/metrics"
80)
81
82var (
83	typeTag  = monitoring.MustCreateLabel("type")
84	eventTag = monitoring.MustCreateLabel("event")
85
86	k8sEvents = monitoring.NewSum(
87		"pilot_k8s_reg_events",
88		"Events from k8s registry.",
89		monitoring.WithLabels(typeTag, eventTag),
90	)
91	// nolint: gocritic
92	// This is deprecated in favor of `pilot_k8s_endpoints_pending_pod`, which is a gauge indicating the number of
93	// currently missing pods. This helps distinguish transient errors from permanent ones
94	endpointsWithNoPods = monitoring.NewSum(
95		"pilot_k8s_endpoints_with_no_pods",
96		"Endpoints that does not have any corresponding pods.")
97
98	endpointsPendingPodUpdate = monitoring.NewGauge(
99		"pilot_k8s_endpoints_pending_pod",
100		"Number of endpoints that do not currently have any corresponding pods.",
101	)
102)
103
104func init() {
105	monitoring.MustRegister(k8sEvents)
106	monitoring.MustRegister(endpointsWithNoPods)
107	monitoring.MustRegister(endpointsPendingPodUpdate)
108}
109
110func incrementEvent(kind, event string) {
111	k8sEvents.With(typeTag.Value(kind), eventTag.Value(event)).Increment()
112}
113
114// Options stores the configurable attributes of a Controller.
115type Options struct {
116	// Namespace the controller watches. If set to meta_v1.NamespaceAll (""), controller watches all namespaces
117	WatchedNamespace string
118	ResyncPeriod     time.Duration
119	DomainSuffix     string
120
121	// ClusterID identifies the remote cluster in a multicluster env.
122	ClusterID string
123
124	// FetchCaRoot defines the function to get caRoot
125	FetchCaRoot func() map[string]string
126
127	// Metrics for capturing node-based metrics.
128	Metrics model.Metrics
129
130	// XDSUpdater will push changes to the xDS server.
131	XDSUpdater model.XDSUpdater
132
133	// TrustDomain used in SPIFFE identity
134	TrustDomain string
135
136	// NetworksWatcher observes changes to the mesh networks config.
137	NetworksWatcher mesh.NetworksWatcher
138
139	// EndpointMode decides what source to use to get endpoint information
140	EndpointMode EndpointMode
141
142	//CABundlePath defines the caBundle path for istiod Server
143	CABundlePath string
144}
145
146// EndpointMode decides what source to use to get endpoint information
147type EndpointMode int
148
149const (
150	// EndpointsOnly type will use only Kubernetes Endpoints
151	EndpointsOnly EndpointMode = iota
152
153	// EndpointSliceOnly type will use only Kubernetes EndpointSlices
154	EndpointSliceOnly
155
156	// TODO: add other modes. Likely want a mode with Endpoints+EndpointSlices that are not controlled by
157	// Kubernetes Controller (e.g. made by user and not duplicated with Endpoints), or a mode with both that
158	// does deduping. Simply doing both won't work for now, since not all Kubernetes components support EndpointSlice.
159)
160
161var EndpointModeNames = map[EndpointMode]string{
162	EndpointsOnly:     "EndpointsOnly",
163	EndpointSliceOnly: "EndpointSliceOnly",
164}
165
166func (m EndpointMode) String() string {
167	return EndpointModeNames[m]
168}
169
170var _ serviceregistry.Instance = &Controller{}
171
172// kubernetesNode represents a kubernetes node that is reachable externally
173type kubernetesNode struct {
174	address string
175	labels  labels.Instance
176}
177
178// Controller is a collection of synchronized resource watchers
179// Caches are thread-safe
180type Controller struct {
181	client         kubernetes.Interface
182	metadataClient metadata.Interface
183	queue          queue.Instance
184	services       cache.SharedIndexInformer
185	endpoints      kubeEndpointsController
186
187	nodeMetadataInformer cache.SharedIndexInformer
188	// Used to watch node accessible from remote cluster.
189	// In multi-cluster(shared control plane multi-networks) scenario, ingress gateway service can be of nodePort type.
190	// With this, we can populate mesh's gateway address with the node ips.
191	filteredNodeInformer cache.SharedIndexInformer
192	pods                 *PodCache
193	metrics              model.Metrics
194	networksWatcher      mesh.NetworksWatcher
195	xdsUpdater           model.XDSUpdater
196	domainSuffix         string
197	clusterID            string
198
199	serviceHandlers  []func(*model.Service, model.Event)
200	instanceHandlers []func(*model.ServiceInstance, model.Event)
201
202	// This is only used for test
203	stop chan struct{}
204
205	sync.RWMutex
206	// servicesMap stores hostname ==> service, it is used to reduce convertService calls.
207	servicesMap map[host.Name]*model.Service
208	// nodeSelectorsForServices stores hostname => label selectors that can be used to
209	// refine the set of node port IPs for a service.
210	nodeSelectorsForServices map[host.Name]labels.Instance
211	// map of node name and its address+labels - this is the only thing we need from nodes
212	// for vm to k8s or cross cluster. When node port services select specific nodes by labels,
213	// we run through the label selectors here to pick only ones that we need.
214	nodeInfoMap map[string]kubernetesNode
215	// externalNameSvcInstanceMap stores hostname ==> instance, is used to store instances for ExternalName k8s services
216	externalNameSvcInstanceMap map[host.Name][]*model.ServiceInstance
217
218	// CIDR ranger based on path-compressed prefix trie
219	ranger cidranger.Ranger
220
221	// Network name for the registry as specified by the MeshNetworks configmap
222	networkForRegistry string
223}
224
225// NewController creates a new Kubernetes controller
226// Created by bootstrap and multicluster (see secretcontroler).
227func NewController(client kubernetes.Interface, metadataClient metadata.Interface, options Options) *Controller {
228	log.Infof("Service controller watching namespace %q for services, endpoints, nodes and pods, refresh %s",
229		options.WatchedNamespace, options.ResyncPeriod)
230
231	// The queue requires a time duration for a retry delay after a handler error
232	c := &Controller{
233		domainSuffix:               options.DomainSuffix,
234		client:                     client,
235		metadataClient:             metadataClient,
236		queue:                      queue.NewQueue(1 * time.Second),
237		clusterID:                  options.ClusterID,
238		xdsUpdater:                 options.XDSUpdater,
239		servicesMap:                make(map[host.Name]*model.Service),
240		nodeSelectorsForServices:   make(map[host.Name]labels.Instance),
241		nodeInfoMap:                make(map[string]kubernetesNode),
242		externalNameSvcInstanceMap: make(map[host.Name][]*model.ServiceInstance),
243		networksWatcher:            options.NetworksWatcher,
244		metrics:                    options.Metrics,
245	}
246
247	sharedInformers := informers.NewSharedInformerFactoryWithOptions(client, options.ResyncPeriod, informers.WithNamespace(options.WatchedNamespace))
248
249	c.services = sharedInformers.Core().V1().Services().Informer()
250	registerHandlers(c.services, c.queue, "Services", c.onServiceEvent)
251
252	switch options.EndpointMode {
253	case EndpointsOnly:
254		c.endpoints = newEndpointsController(c, sharedInformers)
255	case EndpointSliceOnly:
256		c.endpoints = newEndpointSliceController(c, sharedInformers)
257	}
258
259	// This is for getting the pod to node mapping, so that we can get the pod's locality.
260	metadataSharedInformer := metadatainformer.NewSharedInformerFactory(metadataClient, options.ResyncPeriod)
261	nodeResource := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "nodes"}
262	c.nodeMetadataInformer = metadataSharedInformer.ForResource(nodeResource).Informer()
263	// This is for getting the node IPs of a selected set of nodes
264	c.filteredNodeInformer = coreinformers.NewFilteredNodeInformer(client, options.ResyncPeriod,
265		cache.Indexers{},
266		func(options *metav1.ListOptions) {})
267	registerHandlers(c.filteredNodeInformer, c.queue, "Nodes", c.onNodeEvent)
268
269	podInformer := sharedInformers.Core().V1().Pods().Informer()
270	c.pods = newPodCache(podInformer, c, func(key string) {
271		item, exists, err := c.endpoints.getInformer().GetStore().GetByKey(key)
272		if err != nil || !exists {
273			log.Debugf("Endpoint %v lookup failed, skipping stale endpoint. error: %v", key, err)
274			return
275		}
276		c.queue.Push(func() error {
277			return c.endpoints.onEvent(item, model.EventUpdate)
278		})
279	})
280	registerHandlers(podInformer, c.queue, "Pods", c.pods.onEvent)
281
282	return c
283}
284
285func (c *Controller) Provider() serviceregistry.ProviderID {
286	return serviceregistry.Kubernetes
287}
288
289func (c *Controller) Cluster() string {
290	return c.clusterID
291}
292
293func (c *Controller) checkReadyForEvents() error {
294	if !c.HasSynced() {
295		return errors.New("waiting till full synchronization")
296	}
297	return nil
298}
299
300func (c *Controller) onServiceEvent(curr interface{}, event model.Event) error {
301	if err := c.checkReadyForEvents(); err != nil {
302		return err
303	}
304
305	svc, ok := curr.(*v1.Service)
306	if !ok {
307		tombstone, ok := curr.(cache.DeletedFinalStateUnknown)
308		if !ok {
309			log.Errorf("Couldn't get object from tombstone %#v", curr)
310			return nil
311		}
312		svc, ok = tombstone.Obj.(*v1.Service)
313		if !ok {
314			log.Errorf("Tombstone contained object that is not a service %#v", curr)
315			return nil
316		}
317	}
318
319	log.Debugf("Handle event %s for service %s in namespace %s", event, svc.Name, svc.Namespace)
320
321	svcConv := kube.ConvertService(*svc, c.domainSuffix, c.clusterID)
322	switch event {
323	case model.EventDelete:
324		c.Lock()
325		delete(c.servicesMap, svcConv.Hostname)
326		delete(c.nodeSelectorsForServices, svcConv.Hostname)
327		delete(c.externalNameSvcInstanceMap, svcConv.Hostname)
328		c.Unlock()
329	default:
330		// instance conversion is only required when service is added/updated.
331		instances := kube.ExternalNameServiceInstances(*svc, svcConv)
332		if isNodePortGatewayService(svc) {
333			// We need to know which services are using node selectors because during node events,
334			// we have to update all the node port services accordingly.
335			nodeSelector := getNodeSelectorsForService(*svc)
336			c.Lock()
337			// only add when it is nodePort gateway service
338			c.nodeSelectorsForServices[svcConv.Hostname] = nodeSelector
339			c.Unlock()
340			c.updateServiceExternalAddr(svcConv)
341		}
342		c.Lock()
343		c.servicesMap[svcConv.Hostname] = svcConv
344		if len(instances) > 0 {
345			c.externalNameSvcInstanceMap[svcConv.Hostname] = instances
346		}
347		c.Unlock()
348	}
349
350	c.xdsUpdater.SvcUpdate(c.clusterID, svc.Name, svc.Namespace, event)
351	// Notify service handlers.
352	for _, f := range c.serviceHandlers {
353		f(svcConv, event)
354	}
355
356	return nil
357}
358
359func getNodeSelectorsForService(svc v1.Service) labels.Instance {
360	if nodeSelector := svc.Annotations[kube.NodeSelectorAnnotation]; nodeSelector != "" {
361		var nodeSelectorKV map[string]string
362		if err := json.Unmarshal([]byte(nodeSelector), &nodeSelectorKV); err != nil {
363			log.Debugf("failed to unmarshal node selector annotation value for service %s.%s: %v",
364				svc.Name, svc.Namespace, err)
365		}
366		return nodeSelectorKV
367	}
368	return nil
369}
370
371func (c *Controller) onNodeEvent(obj interface{}, event model.Event) error {
372	if err := c.checkReadyForEvents(); err != nil {
373		return err
374	}
375	node, ok := obj.(*v1.Node)
376	if !ok {
377		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
378		if !ok {
379			log.Errorf("couldn't get object from tombstone %+v", obj)
380			return nil
381		}
382		node, ok = tombstone.Obj.(*v1.Node)
383		if !ok {
384			log.Errorf("tombstone contained object that is not a node %#v", obj)
385			return nil
386		}
387	}
388	var updatedNeeded bool
389	if event == model.EventDelete {
390		updatedNeeded = true
391		c.Lock()
392		delete(c.nodeInfoMap, node.Name)
393		c.Unlock()
394	} else {
395		k8sNode := kubernetesNode{labels: node.Labels}
396		for _, address := range node.Status.Addresses {
397			if address.Type == v1.NodeExternalIP && address.Address != "" {
398				k8sNode.address = address.Address
399				break
400			}
401		}
402		if k8sNode.address == "" {
403			return nil
404		}
405
406		c.Lock()
407		// check if the node exists as this add event could be due to controller resync
408		// if the stored object changes, then fire an update event. Otherwise, ignore this event.
409		currentNode, exists := c.nodeInfoMap[node.Name]
410		if !exists || !reflect.DeepEqual(currentNode, k8sNode) {
411			c.nodeInfoMap[node.Name] = k8sNode
412			updatedNeeded = true
413		}
414		c.Unlock()
415	}
416
417	if updatedNeeded {
418		// update all related services
419		c.updateServiceExternalAddr()
420		c.xdsUpdater.ConfigUpdate(&model.PushRequest{
421			Full: true,
422		})
423	}
424	return nil
425}
426
427func isNodePortGatewayService(svc *v1.Service) bool {
428	_, ok := svc.Annotations[kube.NodeSelectorAnnotation]
429	return ok && svc.Spec.Type == v1.ServiceTypeNodePort
430}
431
432func registerHandlers(informer cache.SharedIndexInformer, q queue.Instance, otype string,
433	handler func(interface{}, model.Event) error) {
434
435	informer.AddEventHandler(
436		cache.ResourceEventHandlerFuncs{
437			// TODO: filtering functions to skip over un-referenced resources (perf)
438			AddFunc: func(obj interface{}) {
439				incrementEvent(otype, "add")
440				q.Push(func() error {
441					return handler(obj, model.EventAdd)
442				})
443			},
444			UpdateFunc: func(old, cur interface{}) {
445				if !reflect.DeepEqual(old, cur) {
446					incrementEvent(otype, "update")
447					q.Push(func() error {
448						return handler(cur, model.EventUpdate)
449					})
450				} else {
451					incrementEvent(otype, "updatesame")
452				}
453			},
454			DeleteFunc: func(obj interface{}) {
455				incrementEvent(otype, "delete")
456				q.Push(func() error {
457					return handler(obj, model.EventDelete)
458				})
459			},
460		})
461}
462
463// compareEndpoints returns true if the two endpoints are the same in aspects Pilot cares about
464// This currently means only looking at "Ready" endpoints
465func compareEndpoints(a, b *v1.Endpoints) bool {
466	if len(a.Subsets) != len(b.Subsets) {
467		return false
468	}
469	for i := range a.Subsets {
470		if !reflect.DeepEqual(a.Subsets[i].Ports, b.Subsets[i].Ports) {
471			return false
472		}
473		if !reflect.DeepEqual(a.Subsets[i].Addresses, b.Subsets[i].Addresses) {
474			return false
475		}
476	}
477	return true
478}
479
480// HasSynced returns true after the initial state synchronization
481func (c *Controller) HasSynced() bool {
482	if !c.services.HasSynced() ||
483		!c.endpoints.HasSynced() ||
484		!c.pods.informer.HasSynced() ||
485		!c.nodeMetadataInformer.HasSynced() ||
486		!c.filteredNodeInformer.HasSynced() {
487		return false
488	}
489	return true
490}
491
492// Run all controllers until a signal is received
493func (c *Controller) Run(stop <-chan struct{}) {
494	if c.networksWatcher != nil {
495		c.networksWatcher.AddNetworksHandler(c.initNetworkLookup)
496		c.initNetworkLookup()
497	}
498
499	go func() {
500		cache.WaitForCacheSync(stop, c.HasSynced)
501		c.queue.Run(stop)
502	}()
503
504	go c.services.Run(stop)
505	go c.pods.informer.Run(stop)
506	go c.nodeMetadataInformer.Run(stop)
507	go c.filteredNodeInformer.Run(stop)
508
509	// To avoid endpoints without labels or ports, wait for sync.
510	cache.WaitForCacheSync(stop, c.nodeMetadataInformer.HasSynced, c.filteredNodeInformer.HasSynced,
511		c.pods.informer.HasSynced,
512		c.services.HasSynced)
513
514	go c.endpoints.Run(stop)
515
516	<-stop
517	log.Infof("Controller terminated")
518}
519
520// Stop the controller. Only for tests, to simplify the code (defer c.Stop())
521func (c *Controller) Stop() {
522	if c.stop != nil {
523		close(c.stop)
524	}
525}
526
527// Services implements a service catalog operation
528func (c *Controller) Services() ([]*model.Service, error) {
529	c.RLock()
530	out := make([]*model.Service, 0, len(c.servicesMap))
531	for _, svc := range c.servicesMap {
532		out = append(out, svc)
533	}
534	c.RUnlock()
535	sort.Slice(out, func(i, j int) bool { return out[i].Hostname < out[j].Hostname })
536
537	return out, nil
538}
539
540// GetService implements a service catalog operation by hostname specified.
541func (c *Controller) GetService(hostname host.Name) (*model.Service, error) {
542	c.RLock()
543	svc := c.servicesMap[hostname]
544	c.RUnlock()
545	return svc, nil
546}
547
548// getNodePortServices returns nodePort type gateway service
549func (c *Controller) getNodePortGatewayServices() []*model.Service {
550	c.RLock()
551	defer c.RUnlock()
552	out := make([]*model.Service, 0, len(c.nodeSelectorsForServices))
553	for hostname := range c.nodeSelectorsForServices {
554		svc := c.servicesMap[hostname]
555		if svc != nil {
556			out = append(out, svc)
557		}
558	}
559
560	return out
561}
562
563// updateServiceExternalAddr updates ClusterExternalAddresses for ingress gateway service of nodePort type
564func (c *Controller) updateServiceExternalAddr(svcs ...*model.Service) {
565	// node event, update all nodePort gateway services
566	if len(svcs) == 0 {
567		svcs = c.getNodePortGatewayServices()
568	}
569	for _, svc := range svcs {
570		c.RLock()
571		nodeSelector := c.nodeSelectorsForServices[svc.Hostname]
572		c.RUnlock()
573		// update external address
574		svc.Mutex.Lock()
575		if nodeSelector == nil {
576			var extAddresses []string
577			for _, n := range c.nodeInfoMap {
578				extAddresses = append(extAddresses, n.address)
579			}
580			svc.Attributes.ClusterExternalAddresses = map[string][]string{c.clusterID: extAddresses}
581		} else {
582			var nodeAddresses []string
583			for _, n := range c.nodeInfoMap {
584				if nodeSelector.SubsetOf(n.labels) {
585					nodeAddresses = append(nodeAddresses, n.address)
586				}
587			}
588			svc.Attributes.ClusterExternalAddresses = map[string][]string{c.clusterID: nodeAddresses}
589		}
590		svc.Mutex.Unlock()
591	}
592}
593
594// getPodLocality retrieves the locality for a pod.
595func (c *Controller) getPodLocality(pod *v1.Pod) string {
596	// if pod has `istio-locality` label, skip below ops
597	if len(pod.Labels[model.LocalityLabel]) > 0 {
598		return model.GetLocalityLabelOrDefault(pod.Labels[model.LocalityLabel], "")
599	}
600
601	// NodeName is set by the scheduler after the pod is created
602	// https://github.com/kubernetes/community/blob/master/contributors/devel/api-conventions.md#late-initialization
603	raw, exists, err := c.nodeMetadataInformer.GetStore().GetByKey(pod.Spec.NodeName)
604	if !exists || err != nil {
605		log.Warnf("unable to get node %q for pod %q from cache: %v", pod.Spec.NodeName, pod.Name, err)
606		nodeResource := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "nodes"}
607		raw, err = c.metadataClient.Resource(nodeResource).Get(context.TODO(), pod.Spec.NodeName, metav1.GetOptions{})
608		if err != nil {
609			log.Warnf("unable to get node %q for pod %q: %v", pod.Spec.NodeName, pod.Name, err)
610			return ""
611		}
612	}
613
614	nodeMeta, err := meta.Accessor(raw)
615	if err != nil {
616		log.Warnf("unable to get node meta: %v", nodeMeta)
617		return ""
618	}
619
620	region := getLabelValue(nodeMeta, NodeRegionLabel, NodeRegionLabelGA)
621	zone := getLabelValue(nodeMeta, NodeZoneLabel, NodeZoneLabelGA)
622	subzone := getLabelValue(nodeMeta, IstioSubzoneLabel, "")
623
624	if region == "" && zone == "" && subzone == "" {
625		return ""
626	}
627
628	return region + "/" + zone + "/" + subzone // Format: "%s/%s/%s"
629}
630
631// ManagementPorts implements a service catalog operation
632func (c *Controller) ManagementPorts(addr string) model.PortList {
633	pod := c.pods.getPodByIP(addr)
634	if pod == nil {
635		return nil
636	}
637
638	managementPorts, err := kube.ConvertProbesToPorts(&pod.Spec)
639	if err != nil {
640		log.Infof("Error while parsing liveliness and readiness probe ports for %s => %v", addr, err)
641	}
642
643	// We continue despite the error because healthCheckPorts could return a partial
644	// list of management ports
645	return managementPorts
646}
647
648// WorkloadHealthCheckInfo implements a service catalog operation
649func (c *Controller) WorkloadHealthCheckInfo(addr string) model.ProbeList {
650	pod := c.pods.getPodByIP(addr)
651	if pod == nil {
652		return nil
653	}
654
655	probes := make([]*model.Probe, 0)
656
657	// Obtain probes from the readiness and liveness probes
658	for _, container := range pod.Spec.Containers {
659		if container.ReadinessProbe != nil && container.ReadinessProbe.Handler.HTTPGet != nil {
660			p, err := kube.ConvertProbePort(&container, &container.ReadinessProbe.Handler)
661			if err != nil {
662				log.Infof("Error while parsing readiness probe port =%v", err)
663			}
664			probes = append(probes, &model.Probe{
665				Port: p,
666				Path: container.ReadinessProbe.Handler.HTTPGet.Path,
667			})
668		}
669		if container.LivenessProbe != nil && container.LivenessProbe.Handler.HTTPGet != nil {
670			p, err := kube.ConvertProbePort(&container, &container.LivenessProbe.Handler)
671			if err != nil {
672				log.Infof("Error while parsing liveness probe port =%v", err)
673			}
674			probes = append(probes, &model.Probe{
675				Port: p,
676				Path: container.LivenessProbe.Handler.HTTPGet.Path,
677			})
678		}
679	}
680
681	// Obtain probe from prometheus scrape
682	if scrape := pod.Annotations[PrometheusScrape]; scrape == "true" {
683		var port *model.Port
684		path := PrometheusPathDefault
685		if portstr := pod.Annotations[PrometheusPort]; portstr != "" {
686			portnum, err := strconv.Atoi(portstr)
687			if err != nil {
688				log.Warna(err)
689			} else {
690				port = &model.Port{
691					Port: portnum,
692				}
693			}
694		}
695		if pod.Annotations[PrometheusPath] != "" {
696			path = pod.Annotations[PrometheusPath]
697		}
698		probes = append(probes, &model.Probe{
699			Port: port,
700			Path: path,
701		})
702	}
703
704	return probes
705}
706
707// InstancesByPort implements a service catalog operation
708func (c *Controller) InstancesByPort(svc *model.Service, reqSvcPort int,
709	labelsList labels.Collection) ([]*model.ServiceInstance, error) {
710	res, err := c.endpoints.InstancesByPort(c, svc, reqSvcPort, labelsList)
711	// return when instances found or an error occurs
712	if len(res) > 0 || err != nil {
713		return res, err
714	}
715
716	// Fall back to external name service
717	c.RLock()
718	instances := c.externalNameSvcInstanceMap[svc.Hostname]
719	c.RUnlock()
720	if instances != nil {
721		inScopeInstances := make([]*model.ServiceInstance, 0)
722		for _, i := range instances {
723			if i.Service.Attributes.Namespace == svc.Attributes.Namespace && i.ServicePort.Port == reqSvcPort {
724				inScopeInstances = append(inScopeInstances, i)
725			}
726		}
727		return inScopeInstances, nil
728	}
729	return nil, nil
730}
731
732// GetProxyServiceInstances returns service instances co-located with a given proxy
733func (c *Controller) GetProxyServiceInstances(proxy *model.Proxy) ([]*model.ServiceInstance, error) {
734	out := make([]*model.ServiceInstance, 0)
735	if len(proxy.IPAddresses) > 0 {
736		// only need to fetch the corresponding pod through the first IP, although there are multiple IP scenarios,
737		// because multiple ips belong to the same pod
738		proxyIP := proxy.IPAddresses[0]
739		pod := c.pods.getPodByIP(proxyIP)
740		if pod != nil {
741			// for split horizon EDS k8s multi cluster, in case there are pods of the same ip across clusters,
742			// which can happen when multi clusters using same pod cidr.
743			// As we have proxy Network meta, compare it with the network which endpoint belongs to,
744			// if they are not same, ignore the pod, because the pod is in another cluster.
745			if proxy.Metadata.Network != c.endpointNetwork(proxyIP) {
746				return out, nil
747			}
748			// 1. find proxy service by label selector, if not any, there may exist headless service without selector
749			// failover to 2
750			if services, err := getPodServices(listerv1.NewServiceLister(c.services.GetIndexer()), pod); err == nil && len(services) > 0 {
751				for _, svc := range services {
752					out = append(out, c.getProxyServiceInstancesByPod(pod, svc, proxy)...)
753				}
754				return out, nil
755			}
756			// 2. Headless service without selector
757			out = c.endpoints.GetProxyServiceInstances(c, proxy)
758		} else {
759			var err error
760			// 3. The pod is not present when this is called
761			// due to eventual consistency issues. However, we have a lot of information about the pod from the proxy
762			// metadata already. Because of this, we can still get most of the information we need.
763			// If we cannot accurately construct ServiceInstances from just the metadata, this will return an error and we can
764			// attempt to read the real pod.
765			out, err = c.getProxyServiceInstancesFromMetadata(proxy)
766			if err != nil {
767				log.Warnf("getProxyServiceInstancesFromMetadata for %v failed: %v", proxy.ID, err)
768			}
769		}
770	}
771	if len(out) == 0 {
772		if c.metrics != nil {
773			c.metrics.AddMetric(model.ProxyStatusNoService, proxy.ID, proxy, "")
774		} else {
775			log.Infof("Missing metrics env, empty list of services for pod %s", proxy.ID)
776		}
777	}
778	return out, nil
779}
780
781func getPodServices(s listerv1.ServiceLister, pod *v1.Pod) ([]*v1.Service, error) {
782	allServices, err := s.Services(pod.Namespace).List(klabels.Everything())
783	if err != nil {
784		return nil, err
785	}
786
787	var services []*v1.Service
788	for i := range allServices {
789		service := allServices[i]
790		if service.Spec.Selector == nil {
791			// services with nil selectors match nothing, not everything.
792			continue
793		}
794		selector := klabels.Set(service.Spec.Selector).AsSelectorPreValidated()
795		if selector.Matches(klabels.Set(pod.Labels)) {
796			services = append(services, service)
797		}
798	}
799
800	return services, nil
801}
802
803// getProxyServiceInstancesFromMetadata retrieves ServiceInstances using proxy Metadata rather than
804// from the Pod. This allows retrieving Instances immediately, regardless of delays in Kubernetes.
805// If the proxy doesn't have enough metadata, an error is returned
806func (c *Controller) getProxyServiceInstancesFromMetadata(proxy *model.Proxy) ([]*model.ServiceInstance, error) {
807	if len(proxy.Metadata.Labels) == 0 {
808		return nil, fmt.Errorf("no workload labels found")
809	}
810
811	if proxy.Metadata.ClusterID != c.clusterID {
812		return nil, fmt.Errorf("proxy is in cluster %v, but controller is for cluster %v", proxy.Metadata.ClusterID, c.clusterID)
813	}
814
815	// Create a pod with just the information needed to find the associated Services
816	dummyPod := &v1.Pod{
817		ObjectMeta: metav1.ObjectMeta{
818			Namespace: proxy.ConfigNamespace,
819			Labels:    proxy.Metadata.Labels,
820		},
821	}
822
823	// Find the Service associated with the pod.
824	svcLister := listerv1.NewServiceLister(c.services.GetIndexer())
825	services, err := getPodServices(svcLister, dummyPod)
826	if err != nil {
827		return nil, fmt.Errorf("error getting instances: %v", err)
828
829	}
830	if len(services) == 0 {
831		return nil, fmt.Errorf("no instances found: %v ", err)
832	}
833
834	out := make([]*model.ServiceInstance, 0)
835	for _, svc := range services {
836		svcAccount := proxy.Metadata.ServiceAccount
837		hostname := kube.ServiceHostname(svc.Name, svc.Namespace, c.domainSuffix)
838		c.RLock()
839		modelService, f := c.servicesMap[hostname]
840		c.RUnlock()
841		if !f {
842			return nil, fmt.Errorf("failed to find model service for %v", hostname)
843		}
844
845		tps := make(map[model.Port]*model.Port)
846		for _, port := range svc.Spec.Ports {
847			svcPort, f := modelService.Ports.Get(port.Name)
848			if !f {
849				return nil, fmt.Errorf("failed to get svc port for %v", port.Name)
850			}
851			portNum, err := findPortFromMetadata(port, proxy.Metadata.PodPorts)
852			if err != nil {
853				return nil, fmt.Errorf("failed to find target port for %v: %v", proxy.ID, err)
854			}
855			// Dedupe the target ports here - Service might have configured multiple ports to the same target port,
856			// we will have to create only one ingress listener per port and protocol so that we do not endup
857			// complaining about listener conflicts.
858			targetPort := model.Port{
859				Port:     portNum,
860				Protocol: svcPort.Protocol,
861			}
862			if _, exists := tps[targetPort]; !exists {
863				tps[targetPort] = svcPort
864			}
865		}
866
867		for tp, svcPort := range tps {
868			// consider multiple IP scenarios
869			for _, ip := range proxy.IPAddresses {
870				// Construct the ServiceInstance
871				out = append(out, &model.ServiceInstance{
872					Service:     modelService,
873					ServicePort: svcPort,
874					Endpoint: &model.IstioEndpoint{
875						Address:         ip,
876						EndpointPort:    uint32(tp.Port),
877						ServicePortName: svcPort.Name,
878						// Kubernetes service will only have a single instance of labels, and we return early if there are no labels.
879						Labels:         proxy.Metadata.Labels,
880						ServiceAccount: svcAccount,
881						Network:        c.endpointNetwork(ip),
882						Locality: model.Locality{
883							Label:     util.LocalityToString(proxy.Locality),
884							ClusterID: c.clusterID,
885						},
886					},
887				})
888			}
889		}
890	}
891	return out, nil
892}
893
894// findPortFromMetadata resolves the TargetPort of a Service Port, by reading the Pod spec.
895func findPortFromMetadata(svcPort v1.ServicePort, podPorts []model.PodPort) (int, error) {
896	target := svcPort.TargetPort
897
898	switch target.Type {
899	case intstr.String:
900		name := target.StrVal
901		for _, port := range podPorts {
902			if port.Name == name {
903				return port.ContainerPort, nil
904			}
905		}
906	case intstr.Int:
907		// For a direct reference we can just return the port number
908		return target.IntValue(), nil
909	}
910
911	return 0, fmt.Errorf("no matching port found for %+v", svcPort)
912}
913
914func (c *Controller) getProxyServiceInstancesByPod(pod *v1.Pod, service *v1.Service, proxy *model.Proxy) []*model.ServiceInstance {
915	out := make([]*model.ServiceInstance, 0)
916
917	hostname := kube.ServiceHostname(service.Name, service.Namespace, c.domainSuffix)
918	c.RLock()
919	svc := c.servicesMap[hostname]
920	c.RUnlock()
921
922	if svc == nil {
923		return out
924	}
925
926	tps := make(map[model.Port]*model.Port)
927	for _, port := range service.Spec.Ports {
928		svcPort, exists := svc.Ports.Get(port.Name)
929		if !exists {
930			continue
931		}
932		// find target port
933		portNum, err := FindPort(pod, &port)
934		if err != nil {
935			log.Warnf("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err)
936			continue
937		}
938		// Dedupe the target ports here - Service might have configured multiple ports to the same target port,
939		// we will have to create only one ingress listener per port and protocol so that we do not endup
940		// complaining about listener conflicts.
941		targetPort := model.Port{
942			Port:     portNum,
943			Protocol: svcPort.Protocol,
944		}
945		if _, exists = tps[targetPort]; !exists {
946			tps[targetPort] = svcPort
947		}
948	}
949
950	builder := NewEndpointBuilder(c, pod)
951	for tp, svcPort := range tps {
952		// consider multiple IP scenarios
953		for _, ip := range proxy.IPAddresses {
954			istioEndpoint := builder.buildIstioEndpoint(ip, int32(tp.Port), svcPort.Name)
955			out = append(out, &model.ServiceInstance{
956				Service:     svc,
957				ServicePort: svcPort,
958				Endpoint:    istioEndpoint,
959			})
960		}
961	}
962	return out
963}
964
965func (c *Controller) GetProxyWorkloadLabels(proxy *model.Proxy) (labels.Collection, error) {
966	// There is only one IP for kube registry
967	proxyIP := proxy.IPAddresses[0]
968
969	pod := c.pods.getPodByIP(proxyIP)
970	if pod != nil {
971		return labels.Collection{pod.Labels}, nil
972	}
973	return nil, nil
974}
975
976// GetIstioServiceAccounts returns the Istio service accounts running a serivce
977// hostname. Each service account is encoded according to the SPIFFE VSID spec.
978// For example, a service account named "bar" in namespace "foo" is encoded as
979// "spiffe://cluster.local/ns/foo/sa/bar".
980func (c *Controller) GetIstioServiceAccounts(svc *model.Service, ports []int) []string {
981	saSet := make(map[string]bool)
982
983	instances := make([]*model.ServiceInstance, 0)
984	// Get the service accounts running service within Kubernetes. This is reflected by the pods that
985	// the service is deployed on, and the service accounts of the pods.
986	for _, port := range ports {
987		svcInstances, err := c.InstancesByPort(svc, port, labels.Collection{})
988		if err != nil {
989			log.Warnf("InstancesByPort(%s:%d) error: %v", svc.Hostname, port, err)
990			return nil
991		}
992		instances = append(instances, svcInstances...)
993	}
994
995	for _, si := range instances {
996		if si.Endpoint.ServiceAccount != "" {
997			saSet[si.Endpoint.ServiceAccount] = true
998		}
999	}
1000
1001	for _, serviceAccount := range svc.ServiceAccounts {
1002		sa := serviceAccount
1003		saSet[sa] = true
1004	}
1005
1006	saArray := make([]string, 0, len(saSet))
1007	for sa := range saSet {
1008		saArray = append(saArray, sa)
1009	}
1010
1011	return saArray
1012}
1013
1014// AppendServiceHandler implements a service catalog operation
1015func (c *Controller) AppendServiceHandler(f func(*model.Service, model.Event)) error {
1016	c.serviceHandlers = append(c.serviceHandlers, f)
1017	return nil
1018}
1019
1020// AppendInstanceHandler implements a service catalog operation
1021func (c *Controller) AppendInstanceHandler(f func(*model.ServiceInstance, model.Event)) error {
1022	c.instanceHandlers = append(c.instanceHandlers, f)
1023	return nil
1024}
1025
1026// getPod fetches a pod by IP address.
1027// A pod may be missing (nil) for two reasons:
1028// * It is an endpoint without an associated Pod. In this case, expectPod will be false.
1029// * It is an endpoint with an associate Pod, but its not found. In this case, expectPod will be true.
1030//   this may happen due to eventually consistency issues, out of order events, etc. In this case, the caller
1031//   should not precede with the endpoint, or inaccurate information would be sent which may have impacts on
1032//   correctness and security.
1033func getPod(c *Controller, ip string, ep *metav1.ObjectMeta, targetRef *v1.ObjectReference, host host.Name) (rpod *v1.Pod, expectPod bool) {
1034	pod := c.pods.getPodByIP(ip)
1035	if pod != nil {
1036		return pod, false
1037	}
1038	// This means, the endpoint event has arrived before pod event.
1039	// This might happen because PodCache is eventually consistent.
1040	if targetRef != nil && targetRef.Kind == "Pod" {
1041		key := kube.KeyFunc(targetRef.Name, targetRef.Namespace)
1042		// There is a small chance getInformer may have the pod, but it hasn't
1043		// made its way to the PodCache yet as it a shared queue.
1044		podFromInformer, f, err := c.pods.informer.GetStore().GetByKey(key)
1045		if err != nil || !f {
1046			log.Debugf("Endpoint without pod %s %s.%s error: %v", ip, ep.Name, ep.Namespace, err)
1047			endpointsWithNoPods.Increment()
1048			if c.metrics != nil {
1049				c.metrics.AddMetric(model.EndpointNoPod, string(host), nil, ip)
1050			}
1051			// Tell pod cache we want to queue the endpoint event when this pod arrives.
1052			epkey := kube.KeyFunc(ep.Name, ep.Namespace)
1053			c.pods.recordNeedsUpdate(epkey, ip)
1054			return nil, true
1055		}
1056		pod = podFromInformer.(*v1.Pod)
1057	}
1058	return pod, false
1059}
1060
1061func (c *Controller) updateEDS(ep *v1.Endpoints, event model.Event, epc *endpointsController) {
1062	hostname := kube.ServiceHostname(ep.Name, ep.Namespace, c.domainSuffix)
1063
1064	c.RLock()
1065	svc := c.servicesMap[hostname]
1066	c.RUnlock()
1067	if svc == nil {
1068		log.Infof("Handle EDS endpoints: skip updating, service %s/%s has not been populated", ep.Name, ep.Namespace)
1069		return
1070	}
1071	endpoints := make([]*model.IstioEndpoint, 0)
1072	if event != model.EventDelete {
1073		for _, ss := range ep.Subsets {
1074			for _, ea := range ss.Addresses {
1075				pod, expectedUpdate := getPod(c, ea.IP, &metav1.ObjectMeta{Name: ep.Name, Namespace: ep.Namespace}, ea.TargetRef, hostname)
1076				if pod == nil && expectedUpdate {
1077					continue
1078				}
1079
1080				builder := NewEndpointBuilder(c, pod)
1081
1082				// EDS and ServiceEntry use name for service port - ADS will need to
1083				// map to numbers.
1084				for _, port := range ss.Ports {
1085					istioEndpoint := builder.buildIstioEndpoint(ea.IP, port.Port, port.Name)
1086					endpoints = append(endpoints, istioEndpoint)
1087				}
1088			}
1089		}
1090	} else {
1091		epc.forgetEndpoint(ep)
1092	}
1093
1094	log.Debugf("Handle EDS: %d endpoints for %s in namespace %s", len(endpoints), ep.Name, ep.Namespace)
1095
1096	_ = c.xdsUpdater.EDSUpdate(c.clusterID, string(hostname), ep.Namespace, endpoints)
1097	for _, handler := range c.instanceHandlers {
1098		for _, ep := range endpoints {
1099			si := &model.ServiceInstance{
1100				Service:     svc,
1101				ServicePort: nil,
1102				Endpoint:    ep,
1103			}
1104			handler(si, event)
1105		}
1106	}
1107}
1108
1109// namedRangerEntry for holding network's CIDR and name
1110type namedRangerEntry struct {
1111	name    string
1112	network net.IPNet
1113}
1114
1115// returns the IPNet for the network
1116func (n namedRangerEntry) Network() net.IPNet {
1117	return n.network
1118}
1119
1120// initNetworkLookup will read the mesh networks configuration from the environment
1121// and initialize CIDR rangers for an efficient network lookup when needed
1122func (c *Controller) initNetworkLookup() {
1123	meshNetworks := c.networksWatcher.Networks()
1124	if meshNetworks == nil || len(meshNetworks.Networks) == 0 {
1125		return
1126	}
1127
1128	c.ranger = cidranger.NewPCTrieRanger()
1129
1130	for n, v := range meshNetworks.Networks {
1131		for _, ep := range v.Endpoints {
1132			if ep.GetFromCidr() != "" {
1133				_, network, err := net.ParseCIDR(ep.GetFromCidr())
1134				if err != nil {
1135					log.Warnf("unable to parse CIDR %q for network %s", ep.GetFromCidr(), n)
1136					continue
1137				}
1138				rangerEntry := namedRangerEntry{
1139					name:    n,
1140					network: *network,
1141				}
1142				_ = c.ranger.Insert(rangerEntry)
1143			}
1144			if ep.GetFromRegistry() != "" && ep.GetFromRegistry() == c.clusterID {
1145				c.networkForRegistry = n
1146			}
1147		}
1148	}
1149}
1150
1151// return the mesh network for the endpoint IP. Empty string if not found.
1152func (c *Controller) endpointNetwork(endpointIP string) string {
1153	// If networkForRegistry is set then all endpoints discovered by this registry
1154	// belong to the configured network so simply return it
1155	if len(c.networkForRegistry) != 0 {
1156		return c.networkForRegistry
1157	}
1158
1159	// Try to determine the network by checking whether the endpoint IP belongs
1160	// to any of the configure networks' CIDR ranges
1161	if c.ranger == nil {
1162		return ""
1163	}
1164	entries, err := c.ranger.ContainingNetworks(net.ParseIP(endpointIP))
1165	if err != nil {
1166		log.Errora(err)
1167		return ""
1168	}
1169	if len(entries) == 0 {
1170		return ""
1171	}
1172	if len(entries) > 1 {
1173		log.Warnf("Found multiple networks CIDRs matching the endpoint IP: %s. Using the first match.", endpointIP)
1174	}
1175
1176	return (entries[0].(namedRangerEntry)).name
1177}
1178
1179// Forked from Kubernetes k8s.io/kubernetes/pkg/api/v1/pod
1180// FindPort locates the container port for the given pod and portName.  If the
1181// targetPort is a number, use that.  If the targetPort is a string, look that
1182// string up in all named ports in all containers in the target pod.  If no
1183// match is found, fail.
1184func FindPort(pod *v1.Pod, svcPort *v1.ServicePort) (int, error) {
1185	portName := svcPort.TargetPort
1186	switch portName.Type {
1187	case intstr.String:
1188		name := portName.StrVal
1189		for _, container := range pod.Spec.Containers {
1190			for _, port := range container.Ports {
1191				if port.Name == name && port.Protocol == svcPort.Protocol {
1192					return int(port.ContainerPort), nil
1193				}
1194			}
1195		}
1196	case intstr.Int:
1197		return portName.IntValue(), nil
1198	}
1199
1200	return 0, fmt.Errorf("no suitable port for manifest: %s", pod.UID)
1201}
1202
1203func createUID(podName, namespace string) string {
1204	return "kubernetes://" + podName + "." + namespace
1205}
1206