1/*
2Copyright 2019 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package endpointslice
18
19import (
20	"fmt"
21	"time"
22
23	corev1 "k8s.io/api/core/v1"
24	v1 "k8s.io/api/core/v1"
25	discovery "k8s.io/api/discovery/v1"
26	apiequality "k8s.io/apimachinery/pkg/api/equality"
27	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28	"k8s.io/apimachinery/pkg/runtime/schema"
29	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
30	"k8s.io/apimachinery/pkg/util/sets"
31	utilfeature "k8s.io/apiserver/pkg/util/feature"
32	"k8s.io/client-go/tools/cache"
33	"k8s.io/klog/v2"
34	podutil "k8s.io/kubernetes/pkg/api/v1/pod"
35	api "k8s.io/kubernetes/pkg/apis/core"
36	helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
37	"k8s.io/kubernetes/pkg/apis/discovery/validation"
38	endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
39	"k8s.io/kubernetes/pkg/features"
40	utilnet "k8s.io/utils/net"
41)
42
43// podToEndpoint returns an Endpoint object generated from a Pod, a Node, and a Service for a particular addressType.
44func podToEndpoint(pod *corev1.Pod, node *corev1.Node, service *corev1.Service, addressType discovery.AddressType) discovery.Endpoint {
45	serving := podutil.IsPodReady(pod)
46	terminating := pod.DeletionTimestamp != nil
47	// For compatibility reasons, "ready" should never be "true" if a pod is terminatng, unless
48	// publishNotReadyAddresses was set.
49	ready := service.Spec.PublishNotReadyAddresses || (serving && !terminating)
50	ep := discovery.Endpoint{
51		Addresses: getEndpointAddresses(pod.Status, service, addressType),
52		Conditions: discovery.EndpointConditions{
53			Ready: &ready,
54		},
55		TargetRef: &corev1.ObjectReference{
56			Kind:            "Pod",
57			Namespace:       pod.ObjectMeta.Namespace,
58			Name:            pod.ObjectMeta.Name,
59			UID:             pod.ObjectMeta.UID,
60			ResourceVersion: pod.ObjectMeta.ResourceVersion,
61		},
62	}
63
64	if utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceTerminatingCondition) {
65		ep.Conditions.Serving = &serving
66		ep.Conditions.Terminating = &terminating
67	}
68
69	if pod.Spec.NodeName != "" {
70		ep.NodeName = &pod.Spec.NodeName
71	}
72
73	if node != nil && node.Labels[corev1.LabelTopologyZone] != "" {
74		zone := node.Labels[corev1.LabelTopologyZone]
75		ep.Zone = &zone
76	}
77
78	if endpointutil.ShouldSetHostname(pod, service) {
79		ep.Hostname = &pod.Spec.Hostname
80	}
81
82	return ep
83}
84
85// getEndpointPorts returns a list of EndpointPorts generated from a Service
86// and Pod.
87func getEndpointPorts(service *corev1.Service, pod *corev1.Pod) []discovery.EndpointPort {
88	endpointPorts := []discovery.EndpointPort{}
89
90	// Allow headless service not to have ports.
91	if len(service.Spec.Ports) == 0 && service.Spec.ClusterIP == api.ClusterIPNone {
92		return endpointPorts
93	}
94
95	for i := range service.Spec.Ports {
96		servicePort := &service.Spec.Ports[i]
97
98		portName := servicePort.Name
99		portProto := servicePort.Protocol
100		portNum, err := podutil.FindPort(pod, servicePort)
101		if err != nil {
102			klog.V(4).Infof("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err)
103			continue
104		}
105
106		i32PortNum := int32(portNum)
107		endpointPorts = append(endpointPorts, discovery.EndpointPort{
108			Name:        &portName,
109			Port:        &i32PortNum,
110			Protocol:    &portProto,
111			AppProtocol: servicePort.AppProtocol,
112		})
113	}
114
115	return endpointPorts
116}
117
118// getEndpointAddresses returns a list of addresses generated from a pod status.
119func getEndpointAddresses(podStatus corev1.PodStatus, service *corev1.Service, addressType discovery.AddressType) []string {
120	addresses := []string{}
121
122	for _, podIP := range podStatus.PodIPs {
123		isIPv6PodIP := utilnet.IsIPv6String(podIP.IP)
124		if isIPv6PodIP && addressType == discovery.AddressTypeIPv6 {
125			addresses = append(addresses, podIP.IP)
126		}
127
128		if !isIPv6PodIP && addressType == discovery.AddressTypeIPv4 {
129			addresses = append(addresses, podIP.IP)
130		}
131	}
132
133	return addresses
134}
135
136// newEndpointSlice returns an EndpointSlice generated from a service and
137// endpointMeta.
138func newEndpointSlice(service *corev1.Service, endpointMeta *endpointMeta) *discovery.EndpointSlice {
139	gvk := schema.GroupVersionKind{Version: "v1", Kind: "Service"}
140	ownerRef := metav1.NewControllerRef(service, gvk)
141	epSlice := &discovery.EndpointSlice{
142		ObjectMeta: metav1.ObjectMeta{
143			Labels:          map[string]string{},
144			GenerateName:    getEndpointSlicePrefix(service.Name),
145			OwnerReferences: []metav1.OwnerReference{*ownerRef},
146			Namespace:       service.Namespace,
147		},
148		Ports:       endpointMeta.Ports,
149		AddressType: endpointMeta.AddressType,
150		Endpoints:   []discovery.Endpoint{},
151	}
152	// add parent service labels
153	epSlice.Labels, _ = setEndpointSliceLabels(epSlice, service)
154
155	return epSlice
156}
157
158// getEndpointSlicePrefix returns a suitable prefix for an EndpointSlice name.
159func getEndpointSlicePrefix(serviceName string) string {
160	// use the dash (if the name isn't too long) to make the pod name a bit prettier
161	prefix := fmt.Sprintf("%s-", serviceName)
162	if len(validation.ValidateEndpointSliceName(prefix, true)) != 0 {
163		prefix = serviceName
164	}
165	return prefix
166}
167
168// ownedBy returns true if the provided EndpointSlice is owned by the provided
169// Service.
170func ownedBy(endpointSlice *discovery.EndpointSlice, svc *corev1.Service) bool {
171	for _, o := range endpointSlice.OwnerReferences {
172		if o.UID == svc.UID && o.Kind == "Service" && o.APIVersion == "v1" {
173			return true
174		}
175	}
176	return false
177}
178
179// getSliceToFill will return the EndpointSlice that will be closest to full
180// when numEndpoints are added. If no EndpointSlice can be found, a nil pointer
181// will be returned.
182func getSliceToFill(endpointSlices []*discovery.EndpointSlice, numEndpoints, maxEndpoints int) (slice *discovery.EndpointSlice) {
183	closestDiff := maxEndpoints
184	var closestSlice *discovery.EndpointSlice
185	for _, endpointSlice := range endpointSlices {
186		currentDiff := maxEndpoints - (numEndpoints + len(endpointSlice.Endpoints))
187		if currentDiff >= 0 && currentDiff < closestDiff {
188			closestDiff = currentDiff
189			closestSlice = endpointSlice
190			if closestDiff == 0 {
191				return closestSlice
192			}
193		}
194	}
195	return closestSlice
196}
197
198// getEndpointSliceFromDeleteAction parses an EndpointSlice from a delete action.
199func getEndpointSliceFromDeleteAction(obj interface{}) *discovery.EndpointSlice {
200	if endpointSlice, ok := obj.(*discovery.EndpointSlice); ok {
201		// Enqueue all the services that the pod used to be a member of.
202		// This is the same thing we do when we add a pod.
203		return endpointSlice
204	}
205	// If we reached here it means the pod was deleted but its final state is unrecorded.
206	tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
207	if !ok {
208		utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
209		return nil
210	}
211	endpointSlice, ok := tombstone.Obj.(*discovery.EndpointSlice)
212	if !ok {
213		utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a EndpointSlice: %#v", obj))
214		return nil
215	}
216	return endpointSlice
217}
218
219// addTriggerTimeAnnotation adds a triggerTime annotation to an EndpointSlice
220func addTriggerTimeAnnotation(endpointSlice *discovery.EndpointSlice, triggerTime time.Time) {
221	if endpointSlice.Annotations == nil {
222		endpointSlice.Annotations = make(map[string]string)
223	}
224
225	if !triggerTime.IsZero() {
226		endpointSlice.Annotations[corev1.EndpointsLastChangeTriggerTime] = triggerTime.UTC().Format(time.RFC3339Nano)
227	} else { // No new trigger time, clear the annotation.
228		delete(endpointSlice.Annotations, corev1.EndpointsLastChangeTriggerTime)
229	}
230}
231
232// serviceControllerKey returns a controller key for a Service but derived from
233// an EndpointSlice.
234func serviceControllerKey(endpointSlice *discovery.EndpointSlice) (string, error) {
235	if endpointSlice == nil {
236		return "", fmt.Errorf("nil EndpointSlice passed to serviceControllerKey()")
237	}
238	serviceName, ok := endpointSlice.Labels[discovery.LabelServiceName]
239	if !ok || serviceName == "" {
240		return "", fmt.Errorf("EndpointSlice missing %s label", discovery.LabelServiceName)
241	}
242	return fmt.Sprintf("%s/%s", endpointSlice.Namespace, serviceName), nil
243}
244
245// setEndpointSliceLabels returns a map with the new endpoint slices labels and true if there was an update.
246// Slices labels must be equivalent to the Service labels except for the reserved IsHeadlessService, LabelServiceName and LabelManagedBy labels
247// Changes to IsHeadlessService, LabelServiceName and LabelManagedBy labels on the Service do not result in updates to EndpointSlice labels.
248func setEndpointSliceLabels(epSlice *discovery.EndpointSlice, service *corev1.Service) (map[string]string, bool) {
249	updated := false
250	epLabels := make(map[string]string)
251	svcLabels := make(map[string]string)
252
253	// check if the endpoint slice and the service have the same labels
254	// clone current slice labels except the reserved labels
255	for key, value := range epSlice.Labels {
256		if IsReservedLabelKey(key) {
257			continue
258		}
259		// copy endpoint slice labels
260		epLabels[key] = value
261	}
262
263	for key, value := range service.Labels {
264		if IsReservedLabelKey(key) {
265			klog.Warningf("Service %s/%s using reserved endpoint slices label, skipping label %s: %s", service.Namespace, service.Name, key, value)
266			continue
267		}
268		// copy service labels
269		svcLabels[key] = value
270	}
271
272	// if the labels are not identical update the slice with the corresponding service labels
273	if !apiequality.Semantic.DeepEqual(epLabels, svcLabels) {
274		updated = true
275	}
276
277	// add or remove headless label depending on the service Type
278	if !helper.IsServiceIPSet(service) {
279		svcLabels[v1.IsHeadlessService] = ""
280	} else {
281		delete(svcLabels, v1.IsHeadlessService)
282	}
283
284	// override endpoint slices reserved labels
285	svcLabels[discovery.LabelServiceName] = service.Name
286	svcLabels[discovery.LabelManagedBy] = controllerName
287
288	return svcLabels, updated
289}
290
291// IsReservedLabelKey return true if the label is one of the reserved label for slices
292func IsReservedLabelKey(label string) bool {
293	if label == discovery.LabelServiceName ||
294		label == discovery.LabelManagedBy ||
295		label == v1.IsHeadlessService {
296		return true
297	}
298	return false
299}
300
301// endpointSliceEndpointLen helps sort endpoint slices by the number of
302// endpoints they contain.
303type endpointSliceEndpointLen []*discovery.EndpointSlice
304
305func (sl endpointSliceEndpointLen) Len() int      { return len(sl) }
306func (sl endpointSliceEndpointLen) Swap(i, j int) { sl[i], sl[j] = sl[j], sl[i] }
307func (sl endpointSliceEndpointLen) Less(i, j int) bool {
308	return len(sl[i].Endpoints) > len(sl[j].Endpoints)
309}
310
311// returns a map of address types used by a service
312func getAddressTypesForService(service *corev1.Service) map[discovery.AddressType]struct{} {
313	serviceSupportedAddresses := make(map[discovery.AddressType]struct{})
314	// TODO: (khenidak) when address types are removed in favor of
315	// v1.IPFamily this will need to be removed, and work directly with
316	// v1.IPFamily types
317
318	// IMPORTANT: we assume that IP of (discovery.AddressType enum) is never in use
319	// as it gets deprecated
320	for _, family := range service.Spec.IPFamilies {
321		if family == corev1.IPv4Protocol {
322			serviceSupportedAddresses[discovery.AddressTypeIPv4] = struct{}{}
323		}
324
325		if family == corev1.IPv6Protocol {
326			serviceSupportedAddresses[discovery.AddressTypeIPv6] = struct{}{}
327		}
328	}
329
330	if len(serviceSupportedAddresses) > 0 {
331		return serviceSupportedAddresses // we have found families for this service
332	}
333
334	// TODO (khenidak) remove when (1) dual stack becomes
335	// enabled by default (2) v1.19 falls off supported versions
336
337	// Why do we need this:
338	// a cluster being upgraded to the new apis
339	// will have service.spec.IPFamilies: nil
340	// if the controller manager connected to old api
341	// server. This will have the nasty side effect of
342	// removing all slices already created for this service.
343	// this will disable all routing to service vip (ClusterIP)
344	// this ensures that this does not happen. Same for headless services
345	// we assume it is dual stack, until they get defaulted by *new* api-server
346	// this ensures that traffic is not disrupted  until then. But *may*
347	// include undesired families for headless services until then.
348
349	if len(service.Spec.ClusterIP) > 0 && service.Spec.ClusterIP != corev1.ClusterIPNone { // headfull
350		addrType := discovery.AddressTypeIPv4
351		if utilnet.IsIPv6String(service.Spec.ClusterIP) {
352			addrType = discovery.AddressTypeIPv6
353		}
354		serviceSupportedAddresses[addrType] = struct{}{}
355		klog.V(2).Infof("couldn't find ipfamilies for service: %v/%v. This could happen if controller manager is connected to an old apiserver that does not support ip families yet. EndpointSlices for this Service will use %s as the IP Family based on familyOf(ClusterIP:%v).", service.Namespace, service.Name, addrType, service.Spec.ClusterIP)
356		return serviceSupportedAddresses
357	}
358
359	// headless
360	// for now we assume two families. This should have minimal side effect
361	// if the service is headless with no selector, then this will remain the case
362	// if the service is headless with selector then chances are pods are still using single family
363	// since kubelet will need to restart in order to start patching pod status with multiple ips
364	serviceSupportedAddresses[discovery.AddressTypeIPv4] = struct{}{}
365	serviceSupportedAddresses[discovery.AddressTypeIPv6] = struct{}{}
366	klog.V(2).Infof("couldn't find ipfamilies for headless service: %v/%v likely because controller manager is likely connected to an old apiserver that does not support ip families yet. The service endpoint slice will use dual stack families until api-server default it correctly", service.Namespace, service.Name)
367	return serviceSupportedAddresses
368}
369
370func unchangedSlices(existingSlices, slicesToUpdate, slicesToDelete []*discovery.EndpointSlice) []*discovery.EndpointSlice {
371	changedSliceNames := sets.String{}
372	for _, slice := range slicesToUpdate {
373		changedSliceNames.Insert(slice.Name)
374	}
375	for _, slice := range slicesToDelete {
376		changedSliceNames.Insert(slice.Name)
377	}
378	unchangedSlices := []*discovery.EndpointSlice{}
379	for _, slice := range existingSlices {
380		if !changedSliceNames.Has(slice.Name) {
381			unchangedSlices = append(unchangedSlices, slice)
382		}
383	}
384
385	return unchangedSlices
386}
387
388// hintsEnabled returns true if the provided annotations include a
389// corev1.AnnotationTopologyAwareHints key with a value set to "Auto" or "auto".
390func hintsEnabled(annotations map[string]string) bool {
391	val, ok := annotations[corev1.AnnotationTopologyAwareHints]
392	if !ok {
393		return false
394	}
395	return val == "Auto" || val == "auto"
396}
397
398// managedByChanged returns true if one of the provided EndpointSlices is
399// managed by the EndpointSlice controller while the other is not.
400func managedByChanged(endpointSlice1, endpointSlice2 *discovery.EndpointSlice) bool {
401	return managedByController(endpointSlice1) != managedByController(endpointSlice2)
402}
403
404// managedByController returns true if the controller of the provided
405// EndpointSlices is the EndpointSlice controller.
406func managedByController(endpointSlice *discovery.EndpointSlice) bool {
407	managedBy, _ := endpointSlice.Labels[discovery.LabelManagedBy]
408	return managedBy == controllerName
409}
410