1// Copyright 2019 Istio Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package controller
16
17import (
18	"sync"
19
20	v1 "k8s.io/api/core/v1"
21	discoveryv1alpha1 "k8s.io/api/discovery/v1alpha1"
22	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
23	klabels "k8s.io/apimachinery/pkg/labels"
24	"k8s.io/client-go/informers"
25	discoverylister "k8s.io/client-go/listers/discovery/v1alpha1"
26	"k8s.io/client-go/tools/cache"
27
28	"istio.io/pkg/log"
29
30	"istio.io/istio/pilot/pkg/model"
31	"istio.io/istio/pilot/pkg/serviceregistry/kube"
32	"istio.io/istio/pkg/config/host"
33	"istio.io/istio/pkg/config/labels"
34)
35
36type endpointSliceController struct {
37	kubeEndpoints
38	endpointCache *endpointSliceCache
39}
40
41var _ kubeEndpointsController = &endpointSliceController{}
42
43func newEndpointSliceController(c *Controller, sharedInformers informers.SharedInformerFactory) *endpointSliceController {
44	informer := sharedInformers.Discovery().V1alpha1().EndpointSlices().Informer()
45	// TODO Endpoints has a special cache, to filter out irrelevant updates to kube-system
46	// Investigate if we need this, or if EndpointSlice is makes this not relevant
47	out := &endpointSliceController{
48		kubeEndpoints: kubeEndpoints{
49			c:        c,
50			informer: informer,
51		},
52		endpointCache: newEndpointSliceCache(),
53	}
54	registerHandlers(informer, c.queue, "EndpointSlice", out.onEvent)
55	return out
56}
57
58func (esc *endpointSliceController) updateEDS(es interface{}, event model.Event) {
59	slice := es.(*discoveryv1alpha1.EndpointSlice)
60	svcName := slice.Labels[discoveryv1alpha1.LabelServiceName]
61	hostname := kube.ServiceHostname(svcName, slice.Namespace, esc.c.domainSuffix)
62
63	esc.c.RLock()
64	svc := esc.c.servicesMap[hostname]
65	esc.c.RUnlock()
66
67	if svc == nil {
68		log.Infof("Handle EDS endpoint: skip updating, service %s/%s has mot been populated", svcName, slice.Namespace)
69		return
70	}
71
72	endpoints := make([]*model.IstioEndpoint, 0)
73	if event != model.EventDelete {
74		for _, e := range slice.Endpoints {
75			if e.Conditions.Ready != nil && !*e.Conditions.Ready {
76				// Ignore not ready endpoints
77				continue
78			}
79			for _, a := range e.Addresses {
80				pod, expectedUpdate := getPod(esc.c, a, &metav1.ObjectMeta{Name: slice.Name, Namespace: slice.Namespace}, e.TargetRef, hostname)
81				if pod == nil && expectedUpdate {
82					continue
83				}
84
85				builder := esc.newEndpointBuilder(pod, e)
86				// EDS and ServiceEntry use name for service port - ADS will need to
87				// map to numbers.
88				for _, port := range slice.Ports {
89					var portNum int32
90					if port.Port != nil {
91						portNum = *port.Port
92					}
93					var portName string
94					if port.Name != nil {
95						portName = *port.Name
96					}
97
98					istioEndpoint := builder.buildIstioEndpoint(a, portNum, portName)
99					endpoints = append(endpoints, istioEndpoint)
100				}
101			}
102		}
103	} else {
104		esc.forgetEndpoint(es)
105	}
106
107	esc.endpointCache.Update(hostname, slice.Name, endpoints)
108
109	log.Debugf("Handle EDS endpoint %s in namespace %s", svcName, slice.Namespace)
110
111	_ = esc.c.xdsUpdater.EDSUpdate(esc.c.clusterID, string(hostname), slice.Namespace, esc.endpointCache.Get(hostname))
112	for _, handler := range esc.c.instanceHandlers {
113		for _, ep := range endpoints {
114			si := &model.ServiceInstance{
115				Service:     svc,
116				ServicePort: nil,
117				Endpoint:    ep,
118			}
119			handler(si, event)
120		}
121	}
122}
123
124func (esc *endpointSliceController) onEvent(curr interface{}, event model.Event) error {
125	if err := esc.c.checkReadyForEvents(); err != nil {
126		return err
127	}
128
129	ep, ok := curr.(*discoveryv1alpha1.EndpointSlice)
130	if !ok {
131		tombstone, ok := curr.(cache.DeletedFinalStateUnknown)
132		if !ok {
133			log.Errorf("1 Couldn't get object from tombstone %#v", curr)
134			return nil
135		}
136		ep, ok = tombstone.Obj.(*discoveryv1alpha1.EndpointSlice)
137		if !ok {
138			log.Errorf("Tombstone contained an object that is not an endpoints slice %#v", curr)
139			return nil
140		}
141	}
142
143	return esc.handleEvent(ep.Labels[discoveryv1alpha1.LabelServiceName], ep.Namespace, event, curr, func(obj interface{}, event model.Event) {
144		esc.updateEDS(obj, event)
145	})
146}
147
148func (esc *endpointSliceController) GetProxyServiceInstances(c *Controller, proxy *model.Proxy) []*model.ServiceInstance {
149	eps, err := discoverylister.NewEndpointSliceLister(esc.informer.GetIndexer()).EndpointSlices(proxy.Metadata.Namespace).List(klabels.Everything())
150	if err != nil {
151		log.Errorf("Get endpointslice by index failed: %v", err)
152		return nil
153	}
154	out := make([]*model.ServiceInstance, 0)
155	for _, ep := range eps {
156		instances := esc.proxyServiceInstances(c, ep, proxy)
157		out = append(out, instances...)
158	}
159
160	return out
161}
162
163func (esc *endpointSliceController) proxyServiceInstances(c *Controller, ep *discoveryv1alpha1.EndpointSlice, proxy *model.Proxy) []*model.ServiceInstance {
164	out := make([]*model.ServiceInstance, 0)
165
166	hostname := kube.ServiceHostname(ep.Labels[discoveryv1alpha1.LabelServiceName], ep.Namespace, c.domainSuffix)
167	c.RLock()
168	svc := c.servicesMap[hostname]
169	c.RUnlock()
170
171	if svc == nil {
172		return out
173	}
174
175	podIP := proxy.IPAddresses[0]
176	pod := c.pods.getPodByIP(podIP)
177	builder := NewEndpointBuilder(c, pod)
178
179	for _, port := range ep.Ports {
180		if port.Name == nil || port.Port == nil {
181			continue
182		}
183		svcPort, exists := svc.Ports.Get(*port.Name)
184		if !exists {
185			continue
186		}
187
188		// consider multiple IP scenarios
189		for _, ip := range proxy.IPAddresses {
190			for _, ep := range ep.Endpoints {
191				for _, a := range ep.Addresses {
192					if a == ip {
193						istioEndpoint := builder.buildIstioEndpoint(ip, *port.Port, svcPort.Name)
194						out = append(out, &model.ServiceInstance{
195							Endpoint:    istioEndpoint,
196							ServicePort: svcPort,
197							Service:     svc,
198						})
199						// If the endpoint isn't ready, report this
200						if ep.Conditions.Ready != nil && !*ep.Conditions.Ready && c.metrics != nil {
201							c.metrics.AddMetric(model.ProxyStatusEndpointNotReady, proxy.ID, proxy, "")
202						}
203					}
204				}
205			}
206		}
207	}
208
209	return out
210}
211
212func (esc *endpointSliceController) InstancesByPort(c *Controller, svc *model.Service, reqSvcPort int,
213	labelsList labels.Collection) ([]*model.ServiceInstance, error) {
214	esLabelSelector := klabels.Set(map[string]string{discoveryv1alpha1.LabelServiceName: svc.Attributes.Name}).AsSelectorPreValidated()
215	slices, err := discoverylister.NewEndpointSliceLister(esc.informer.GetIndexer()).EndpointSlices(svc.Attributes.Namespace).List(esLabelSelector)
216	if err != nil {
217		log.Infof("get endpoints(%s, %s) => error %v", svc.Attributes.Name, svc.Attributes.Namespace, err)
218		return nil, nil
219	}
220	if len(slices) == 0 {
221		return nil, nil
222	}
223
224	// Locate all ports in the actual service
225	svcPort, exists := svc.Ports.GetByPort(reqSvcPort)
226	if !exists {
227		return nil, nil
228	}
229
230	var out []*model.ServiceInstance
231	for _, slice := range slices {
232		for _, e := range slice.Endpoints {
233			for _, a := range e.Addresses {
234				var podLabels labels.Instance
235				pod := c.pods.getPodByIP(a)
236				if pod != nil {
237					podLabels = pod.Labels
238				}
239
240				// check that one of the input labels is a subset of the labels
241				if !labelsList.HasSubsetOf(podLabels) {
242					continue
243				}
244
245				builder := esc.newEndpointBuilder(pod, e)
246				// identify the port by name. K8S EndpointPort uses the service port name
247				for _, port := range slice.Ports {
248					var portNum int32
249					if port.Port != nil {
250						portNum = *port.Port
251					}
252
253					if port.Name == nil ||
254						svcPort.Name == *port.Name {
255						istioEndpoint := builder.buildIstioEndpoint(a, portNum, svcPort.Name)
256						out = append(out, &model.ServiceInstance{
257							Endpoint:    istioEndpoint,
258							ServicePort: svcPort,
259							Service:     svc,
260						})
261					}
262				}
263			}
264		}
265	}
266	return out, nil
267}
268
269func (esc *endpointSliceController) newEndpointBuilder(pod *v1.Pod, endpoint discoveryv1alpha1.Endpoint) *EndpointBuilder {
270	if pod != nil {
271		// Respect pod "istio-locality" label
272		if pod.Labels[model.LocalityLabel] == "" {
273			pod = pod.DeepCopy()
274			// mutate the labels, only need `istio-locality`
275			pod.Labels[model.LocalityLabel] = getLocalityFromTopology(endpoint.Topology)
276		}
277	}
278
279	return NewEndpointBuilder(esc.c, pod)
280}
281
282func getLocalityFromTopology(topology map[string]string) string {
283	locality := topology[NodeRegionLabelGA]
284	if _, f := topology[NodeZoneLabelGA]; f {
285		locality += "/" + topology[NodeZoneLabelGA]
286	}
287	if _, f := topology[IstioSubzoneLabel]; f {
288		locality += "/" + topology[IstioSubzoneLabel]
289	}
290	return locality
291}
292
293type endpointSliceCache struct {
294	mu                         sync.RWMutex
295	endpointsByServiceAndSlice map[host.Name]map[string][]*model.IstioEndpoint
296}
297
298func newEndpointSliceCache() *endpointSliceCache {
299	out := &endpointSliceCache{
300		endpointsByServiceAndSlice: make(map[host.Name]map[string][]*model.IstioEndpoint),
301	}
302	return out
303}
304
305func (e *endpointSliceCache) Update(hostname host.Name, slice string, endpoints []*model.IstioEndpoint) {
306	e.mu.Lock()
307	defer e.mu.Unlock()
308	if len(endpoints) == 0 {
309		delete(e.endpointsByServiceAndSlice[hostname], slice)
310	}
311	if _, f := e.endpointsByServiceAndSlice[hostname]; !f {
312		e.endpointsByServiceAndSlice[hostname] = make(map[string][]*model.IstioEndpoint)
313	}
314	e.endpointsByServiceAndSlice[hostname][slice] = endpoints
315}
316
317func (e *endpointSliceCache) Get(hostname host.Name) []*model.IstioEndpoint {
318	e.mu.RLock()
319	defer e.mu.RUnlock()
320	var endpoints []*model.IstioEndpoint
321	for _, eps := range e.endpointsByServiceAndSlice[hostname] {
322		endpoints = append(endpoints, eps...)
323	}
324	return endpoints
325}
326
327func (esc *endpointSliceController) getInformer() cache.SharedIndexInformer {
328	return esc.informer
329}
330
331func (esc *endpointSliceController) forgetEndpoint(endpoint interface{}) {
332	slice := endpoint.(*discoveryv1alpha1.EndpointSlice)
333	key := kube.KeyFunc(slice.Name, slice.Namespace)
334	for _, e := range slice.Endpoints {
335		for _, a := range e.Addresses {
336			esc.c.pods.dropNeedsUpdate(key, a)
337		}
338	}
339}
340