1/* 2Copyright 2019 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package endpointslice 18 19import ( 20 "fmt" 21 "time" 22 23 corev1 "k8s.io/api/core/v1" 24 v1 "k8s.io/api/core/v1" 25 discovery "k8s.io/api/discovery/v1" 26 apiequality "k8s.io/apimachinery/pkg/api/equality" 27 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 28 "k8s.io/apimachinery/pkg/runtime/schema" 29 utilruntime "k8s.io/apimachinery/pkg/util/runtime" 30 "k8s.io/apimachinery/pkg/util/sets" 31 utilfeature "k8s.io/apiserver/pkg/util/feature" 32 "k8s.io/client-go/tools/cache" 33 "k8s.io/klog/v2" 34 podutil "k8s.io/kubernetes/pkg/api/v1/pod" 35 api "k8s.io/kubernetes/pkg/apis/core" 36 helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" 37 "k8s.io/kubernetes/pkg/apis/discovery/validation" 38 endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint" 39 "k8s.io/kubernetes/pkg/features" 40 utilnet "k8s.io/utils/net" 41) 42 43// podToEndpoint returns an Endpoint object generated from a Pod, a Node, and a Service for a particular addressType. 44func podToEndpoint(pod *corev1.Pod, node *corev1.Node, service *corev1.Service, addressType discovery.AddressType) discovery.Endpoint { 45 serving := podutil.IsPodReady(pod) 46 terminating := pod.DeletionTimestamp != nil 47 // For compatibility reasons, "ready" should never be "true" if a pod is terminatng, unless 48 // publishNotReadyAddresses was set. 49 ready := service.Spec.PublishNotReadyAddresses || (serving && !terminating) 50 ep := discovery.Endpoint{ 51 Addresses: getEndpointAddresses(pod.Status, service, addressType), 52 Conditions: discovery.EndpointConditions{ 53 Ready: &ready, 54 }, 55 TargetRef: &corev1.ObjectReference{ 56 Kind: "Pod", 57 Namespace: pod.ObjectMeta.Namespace, 58 Name: pod.ObjectMeta.Name, 59 UID: pod.ObjectMeta.UID, 60 ResourceVersion: pod.ObjectMeta.ResourceVersion, 61 }, 62 } 63 64 if utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceTerminatingCondition) { 65 ep.Conditions.Serving = &serving 66 ep.Conditions.Terminating = &terminating 67 } 68 69 if pod.Spec.NodeName != "" { 70 ep.NodeName = &pod.Spec.NodeName 71 } 72 73 if node != nil && node.Labels[corev1.LabelTopologyZone] != "" { 74 zone := node.Labels[corev1.LabelTopologyZone] 75 ep.Zone = &zone 76 } 77 78 if endpointutil.ShouldSetHostname(pod, service) { 79 ep.Hostname = &pod.Spec.Hostname 80 } 81 82 return ep 83} 84 85// getEndpointPorts returns a list of EndpointPorts generated from a Service 86// and Pod. 87func getEndpointPorts(service *corev1.Service, pod *corev1.Pod) []discovery.EndpointPort { 88 endpointPorts := []discovery.EndpointPort{} 89 90 // Allow headless service not to have ports. 91 if len(service.Spec.Ports) == 0 && service.Spec.ClusterIP == api.ClusterIPNone { 92 return endpointPorts 93 } 94 95 for i := range service.Spec.Ports { 96 servicePort := &service.Spec.Ports[i] 97 98 portName := servicePort.Name 99 portProto := servicePort.Protocol 100 portNum, err := podutil.FindPort(pod, servicePort) 101 if err != nil { 102 klog.V(4).Infof("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err) 103 continue 104 } 105 106 i32PortNum := int32(portNum) 107 endpointPorts = append(endpointPorts, discovery.EndpointPort{ 108 Name: &portName, 109 Port: &i32PortNum, 110 Protocol: &portProto, 111 AppProtocol: servicePort.AppProtocol, 112 }) 113 } 114 115 return endpointPorts 116} 117 118// getEndpointAddresses returns a list of addresses generated from a pod status. 119func getEndpointAddresses(podStatus corev1.PodStatus, service *corev1.Service, addressType discovery.AddressType) []string { 120 addresses := []string{} 121 122 for _, podIP := range podStatus.PodIPs { 123 isIPv6PodIP := utilnet.IsIPv6String(podIP.IP) 124 if isIPv6PodIP && addressType == discovery.AddressTypeIPv6 { 125 addresses = append(addresses, podIP.IP) 126 } 127 128 if !isIPv6PodIP && addressType == discovery.AddressTypeIPv4 { 129 addresses = append(addresses, podIP.IP) 130 } 131 } 132 133 return addresses 134} 135 136// newEndpointSlice returns an EndpointSlice generated from a service and 137// endpointMeta. 138func newEndpointSlice(service *corev1.Service, endpointMeta *endpointMeta) *discovery.EndpointSlice { 139 gvk := schema.GroupVersionKind{Version: "v1", Kind: "Service"} 140 ownerRef := metav1.NewControllerRef(service, gvk) 141 epSlice := &discovery.EndpointSlice{ 142 ObjectMeta: metav1.ObjectMeta{ 143 Labels: map[string]string{}, 144 GenerateName: getEndpointSlicePrefix(service.Name), 145 OwnerReferences: []metav1.OwnerReference{*ownerRef}, 146 Namespace: service.Namespace, 147 }, 148 Ports: endpointMeta.Ports, 149 AddressType: endpointMeta.AddressType, 150 Endpoints: []discovery.Endpoint{}, 151 } 152 // add parent service labels 153 epSlice.Labels, _ = setEndpointSliceLabels(epSlice, service) 154 155 return epSlice 156} 157 158// getEndpointSlicePrefix returns a suitable prefix for an EndpointSlice name. 159func getEndpointSlicePrefix(serviceName string) string { 160 // use the dash (if the name isn't too long) to make the pod name a bit prettier 161 prefix := fmt.Sprintf("%s-", serviceName) 162 if len(validation.ValidateEndpointSliceName(prefix, true)) != 0 { 163 prefix = serviceName 164 } 165 return prefix 166} 167 168// ownedBy returns true if the provided EndpointSlice is owned by the provided 169// Service. 170func ownedBy(endpointSlice *discovery.EndpointSlice, svc *corev1.Service) bool { 171 for _, o := range endpointSlice.OwnerReferences { 172 if o.UID == svc.UID && o.Kind == "Service" && o.APIVersion == "v1" { 173 return true 174 } 175 } 176 return false 177} 178 179// getSliceToFill will return the EndpointSlice that will be closest to full 180// when numEndpoints are added. If no EndpointSlice can be found, a nil pointer 181// will be returned. 182func getSliceToFill(endpointSlices []*discovery.EndpointSlice, numEndpoints, maxEndpoints int) (slice *discovery.EndpointSlice) { 183 closestDiff := maxEndpoints 184 var closestSlice *discovery.EndpointSlice 185 for _, endpointSlice := range endpointSlices { 186 currentDiff := maxEndpoints - (numEndpoints + len(endpointSlice.Endpoints)) 187 if currentDiff >= 0 && currentDiff < closestDiff { 188 closestDiff = currentDiff 189 closestSlice = endpointSlice 190 if closestDiff == 0 { 191 return closestSlice 192 } 193 } 194 } 195 return closestSlice 196} 197 198// getEndpointSliceFromDeleteAction parses an EndpointSlice from a delete action. 199func getEndpointSliceFromDeleteAction(obj interface{}) *discovery.EndpointSlice { 200 if endpointSlice, ok := obj.(*discovery.EndpointSlice); ok { 201 // Enqueue all the services that the pod used to be a member of. 202 // This is the same thing we do when we add a pod. 203 return endpointSlice 204 } 205 // If we reached here it means the pod was deleted but its final state is unrecorded. 206 tombstone, ok := obj.(cache.DeletedFinalStateUnknown) 207 if !ok { 208 utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj)) 209 return nil 210 } 211 endpointSlice, ok := tombstone.Obj.(*discovery.EndpointSlice) 212 if !ok { 213 utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a EndpointSlice: %#v", obj)) 214 return nil 215 } 216 return endpointSlice 217} 218 219// addTriggerTimeAnnotation adds a triggerTime annotation to an EndpointSlice 220func addTriggerTimeAnnotation(endpointSlice *discovery.EndpointSlice, triggerTime time.Time) { 221 if endpointSlice.Annotations == nil { 222 endpointSlice.Annotations = make(map[string]string) 223 } 224 225 if !triggerTime.IsZero() { 226 endpointSlice.Annotations[corev1.EndpointsLastChangeTriggerTime] = triggerTime.UTC().Format(time.RFC3339Nano) 227 } else { // No new trigger time, clear the annotation. 228 delete(endpointSlice.Annotations, corev1.EndpointsLastChangeTriggerTime) 229 } 230} 231 232// serviceControllerKey returns a controller key for a Service but derived from 233// an EndpointSlice. 234func serviceControllerKey(endpointSlice *discovery.EndpointSlice) (string, error) { 235 if endpointSlice == nil { 236 return "", fmt.Errorf("nil EndpointSlice passed to serviceControllerKey()") 237 } 238 serviceName, ok := endpointSlice.Labels[discovery.LabelServiceName] 239 if !ok || serviceName == "" { 240 return "", fmt.Errorf("EndpointSlice missing %s label", discovery.LabelServiceName) 241 } 242 return fmt.Sprintf("%s/%s", endpointSlice.Namespace, serviceName), nil 243} 244 245// setEndpointSliceLabels returns a map with the new endpoint slices labels and true if there was an update. 246// Slices labels must be equivalent to the Service labels except for the reserved IsHeadlessService, LabelServiceName and LabelManagedBy labels 247// Changes to IsHeadlessService, LabelServiceName and LabelManagedBy labels on the Service do not result in updates to EndpointSlice labels. 248func setEndpointSliceLabels(epSlice *discovery.EndpointSlice, service *corev1.Service) (map[string]string, bool) { 249 updated := false 250 epLabels := make(map[string]string) 251 svcLabels := make(map[string]string) 252 253 // check if the endpoint slice and the service have the same labels 254 // clone current slice labels except the reserved labels 255 for key, value := range epSlice.Labels { 256 if IsReservedLabelKey(key) { 257 continue 258 } 259 // copy endpoint slice labels 260 epLabels[key] = value 261 } 262 263 for key, value := range service.Labels { 264 if IsReservedLabelKey(key) { 265 klog.Warningf("Service %s/%s using reserved endpoint slices label, skipping label %s: %s", service.Namespace, service.Name, key, value) 266 continue 267 } 268 // copy service labels 269 svcLabels[key] = value 270 } 271 272 // if the labels are not identical update the slice with the corresponding service labels 273 if !apiequality.Semantic.DeepEqual(epLabels, svcLabels) { 274 updated = true 275 } 276 277 // add or remove headless label depending on the service Type 278 if !helper.IsServiceIPSet(service) { 279 svcLabels[v1.IsHeadlessService] = "" 280 } else { 281 delete(svcLabels, v1.IsHeadlessService) 282 } 283 284 // override endpoint slices reserved labels 285 svcLabels[discovery.LabelServiceName] = service.Name 286 svcLabels[discovery.LabelManagedBy] = controllerName 287 288 return svcLabels, updated 289} 290 291// IsReservedLabelKey return true if the label is one of the reserved label for slices 292func IsReservedLabelKey(label string) bool { 293 if label == discovery.LabelServiceName || 294 label == discovery.LabelManagedBy || 295 label == v1.IsHeadlessService { 296 return true 297 } 298 return false 299} 300 301// endpointSliceEndpointLen helps sort endpoint slices by the number of 302// endpoints they contain. 303type endpointSliceEndpointLen []*discovery.EndpointSlice 304 305func (sl endpointSliceEndpointLen) Len() int { return len(sl) } 306func (sl endpointSliceEndpointLen) Swap(i, j int) { sl[i], sl[j] = sl[j], sl[i] } 307func (sl endpointSliceEndpointLen) Less(i, j int) bool { 308 return len(sl[i].Endpoints) > len(sl[j].Endpoints) 309} 310 311// returns a map of address types used by a service 312func getAddressTypesForService(service *corev1.Service) map[discovery.AddressType]struct{} { 313 serviceSupportedAddresses := make(map[discovery.AddressType]struct{}) 314 // TODO: (khenidak) when address types are removed in favor of 315 // v1.IPFamily this will need to be removed, and work directly with 316 // v1.IPFamily types 317 318 // IMPORTANT: we assume that IP of (discovery.AddressType enum) is never in use 319 // as it gets deprecated 320 for _, family := range service.Spec.IPFamilies { 321 if family == corev1.IPv4Protocol { 322 serviceSupportedAddresses[discovery.AddressTypeIPv4] = struct{}{} 323 } 324 325 if family == corev1.IPv6Protocol { 326 serviceSupportedAddresses[discovery.AddressTypeIPv6] = struct{}{} 327 } 328 } 329 330 if len(serviceSupportedAddresses) > 0 { 331 return serviceSupportedAddresses // we have found families for this service 332 } 333 334 // TODO (khenidak) remove when (1) dual stack becomes 335 // enabled by default (2) v1.19 falls off supported versions 336 337 // Why do we need this: 338 // a cluster being upgraded to the new apis 339 // will have service.spec.IPFamilies: nil 340 // if the controller manager connected to old api 341 // server. This will have the nasty side effect of 342 // removing all slices already created for this service. 343 // this will disable all routing to service vip (ClusterIP) 344 // this ensures that this does not happen. Same for headless services 345 // we assume it is dual stack, until they get defaulted by *new* api-server 346 // this ensures that traffic is not disrupted until then. But *may* 347 // include undesired families for headless services until then. 348 349 if len(service.Spec.ClusterIP) > 0 && service.Spec.ClusterIP != corev1.ClusterIPNone { // headfull 350 addrType := discovery.AddressTypeIPv4 351 if utilnet.IsIPv6String(service.Spec.ClusterIP) { 352 addrType = discovery.AddressTypeIPv6 353 } 354 serviceSupportedAddresses[addrType] = struct{}{} 355 klog.V(2).Infof("couldn't find ipfamilies for service: %v/%v. This could happen if controller manager is connected to an old apiserver that does not support ip families yet. EndpointSlices for this Service will use %s as the IP Family based on familyOf(ClusterIP:%v).", service.Namespace, service.Name, addrType, service.Spec.ClusterIP) 356 return serviceSupportedAddresses 357 } 358 359 // headless 360 // for now we assume two families. This should have minimal side effect 361 // if the service is headless with no selector, then this will remain the case 362 // if the service is headless with selector then chances are pods are still using single family 363 // since kubelet will need to restart in order to start patching pod status with multiple ips 364 serviceSupportedAddresses[discovery.AddressTypeIPv4] = struct{}{} 365 serviceSupportedAddresses[discovery.AddressTypeIPv6] = struct{}{} 366 klog.V(2).Infof("couldn't find ipfamilies for headless service: %v/%v likely because controller manager is likely connected to an old apiserver that does not support ip families yet. The service endpoint slice will use dual stack families until api-server default it correctly", service.Namespace, service.Name) 367 return serviceSupportedAddresses 368} 369 370func unchangedSlices(existingSlices, slicesToUpdate, slicesToDelete []*discovery.EndpointSlice) []*discovery.EndpointSlice { 371 changedSliceNames := sets.String{} 372 for _, slice := range slicesToUpdate { 373 changedSliceNames.Insert(slice.Name) 374 } 375 for _, slice := range slicesToDelete { 376 changedSliceNames.Insert(slice.Name) 377 } 378 unchangedSlices := []*discovery.EndpointSlice{} 379 for _, slice := range existingSlices { 380 if !changedSliceNames.Has(slice.Name) { 381 unchangedSlices = append(unchangedSlices, slice) 382 } 383 } 384 385 return unchangedSlices 386} 387 388// hintsEnabled returns true if the provided annotations include a 389// corev1.AnnotationTopologyAwareHints key with a value set to "Auto" or "auto". 390func hintsEnabled(annotations map[string]string) bool { 391 val, ok := annotations[corev1.AnnotationTopologyAwareHints] 392 if !ok { 393 return false 394 } 395 return val == "Auto" || val == "auto" 396} 397 398// managedByChanged returns true if one of the provided EndpointSlices is 399// managed by the EndpointSlice controller while the other is not. 400func managedByChanged(endpointSlice1, endpointSlice2 *discovery.EndpointSlice) bool { 401 return managedByController(endpointSlice1) != managedByController(endpointSlice2) 402} 403 404// managedByController returns true if the controller of the provided 405// EndpointSlices is the EndpointSlice controller. 406func managedByController(endpointSlice *discovery.EndpointSlice) bool { 407 managedBy, _ := endpointSlice.Labels[discovery.LabelManagedBy] 408 return managedBy == controllerName 409} 410