1/* 2Copyright 2016 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package kubelet 18 19import ( 20 "context" 21 "fmt" 22 "net" 23 goruntime "runtime" 24 "sort" 25 "strings" 26 "time" 27 28 v1 "k8s.io/api/core/v1" 29 apiequality "k8s.io/apimachinery/pkg/api/equality" 30 apierrors "k8s.io/apimachinery/pkg/api/errors" 31 "k8s.io/apimachinery/pkg/api/resource" 32 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 33 "k8s.io/apimachinery/pkg/types" 34 "k8s.io/apimachinery/pkg/util/sets" 35 cloudprovider "k8s.io/cloud-provider" 36 cloudproviderapi "k8s.io/cloud-provider/api" 37 "k8s.io/klog/v2" 38 kubeletapis "k8s.io/kubelet/pkg/apis" 39 k8s_api_v1 "k8s.io/kubernetes/pkg/apis/core/v1" 40 v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" 41 "k8s.io/kubernetes/pkg/kubelet/events" 42 "k8s.io/kubernetes/pkg/kubelet/nodestatus" 43 "k8s.io/kubernetes/pkg/kubelet/util" 44 nodeutil "k8s.io/kubernetes/pkg/util/node" 45 taintutil "k8s.io/kubernetes/pkg/util/taints" 46 volutil "k8s.io/kubernetes/pkg/volume/util" 47) 48 49// registerWithAPIServer registers the node with the cluster master. It is safe 50// to call multiple times, but not concurrently (kl.registrationCompleted is 51// not locked). 52func (kl *Kubelet) registerWithAPIServer() { 53 if kl.registrationCompleted { 54 return 55 } 56 step := 100 * time.Millisecond 57 58 for { 59 time.Sleep(step) 60 step = step * 2 61 if step >= 7*time.Second { 62 step = 7 * time.Second 63 } 64 65 node, err := kl.initialNode(context.TODO()) 66 if err != nil { 67 klog.ErrorS(err, "Unable to construct v1.Node object for kubelet") 68 continue 69 } 70 71 klog.InfoS("Attempting to register node", "node", klog.KObj(node)) 72 registered := kl.tryRegisterWithAPIServer(node) 73 if registered { 74 klog.InfoS("Successfully registered node", "node", klog.KObj(node)) 75 kl.registrationCompleted = true 76 return 77 } 78 } 79} 80 81// tryRegisterWithAPIServer makes an attempt to register the given node with 82// the API server, returning a boolean indicating whether the attempt was 83// successful. If a node with the same name already exists, it reconciles the 84// value of the annotation for controller-managed attach-detach of attachable 85// persistent volumes for the node. 86func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool { 87 _, err := kl.kubeClient.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{}) 88 if err == nil { 89 return true 90 } 91 92 if !apierrors.IsAlreadyExists(err) { 93 klog.ErrorS(err, "Unable to register node with API server", "node", klog.KObj(node)) 94 return false 95 } 96 97 existingNode, err := kl.kubeClient.CoreV1().Nodes().Get(context.TODO(), string(kl.nodeName), metav1.GetOptions{}) 98 if err != nil { 99 klog.ErrorS(err, "Unable to register node with API server, error getting existing node", "node", klog.KObj(node)) 100 return false 101 } 102 if existingNode == nil { 103 klog.InfoS("Unable to register node with API server, no node instance returned", "node", klog.KObj(node)) 104 return false 105 } 106 107 originalNode := existingNode.DeepCopy() 108 109 klog.InfoS("Node was previously registered", "node", klog.KObj(node)) 110 111 // Edge case: the node was previously registered; reconcile 112 // the value of the controller-managed attach-detach 113 // annotation. 114 requiresUpdate := kl.reconcileCMADAnnotationWithExistingNode(node, existingNode) 115 requiresUpdate = kl.updateDefaultLabels(node, existingNode) || requiresUpdate 116 requiresUpdate = kl.reconcileExtendedResource(node, existingNode) || requiresUpdate 117 requiresUpdate = kl.reconcileHugePageResource(node, existingNode) || requiresUpdate 118 if requiresUpdate { 119 if _, _, err := nodeutil.PatchNodeStatus(kl.kubeClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, existingNode); err != nil { 120 klog.ErrorS(err, "Unable to reconcile node with API server,error updating node", "node", klog.KObj(node)) 121 return false 122 } 123 } 124 125 return true 126} 127 128// reconcileHugePageResource will update huge page capacity for each page size and remove huge page sizes no longer supported 129func (kl *Kubelet) reconcileHugePageResource(initialNode, existingNode *v1.Node) bool { 130 requiresUpdate := updateDefaultResources(initialNode, existingNode) 131 supportedHugePageResources := sets.String{} 132 133 for resourceName := range initialNode.Status.Capacity { 134 if !v1helper.IsHugePageResourceName(resourceName) { 135 continue 136 } 137 supportedHugePageResources.Insert(string(resourceName)) 138 139 initialCapacity := initialNode.Status.Capacity[resourceName] 140 initialAllocatable := initialNode.Status.Allocatable[resourceName] 141 142 capacity, resourceIsSupported := existingNode.Status.Capacity[resourceName] 143 allocatable := existingNode.Status.Allocatable[resourceName] 144 145 // Add or update capacity if it the size was previously unsupported or has changed 146 if !resourceIsSupported || capacity.Cmp(initialCapacity) != 0 { 147 existingNode.Status.Capacity[resourceName] = initialCapacity.DeepCopy() 148 requiresUpdate = true 149 } 150 151 // Add or update allocatable if it the size was previously unsupported or has changed 152 if !resourceIsSupported || allocatable.Cmp(initialAllocatable) != 0 { 153 existingNode.Status.Allocatable[resourceName] = initialAllocatable.DeepCopy() 154 requiresUpdate = true 155 } 156 157 } 158 159 for resourceName := range existingNode.Status.Capacity { 160 if !v1helper.IsHugePageResourceName(resourceName) { 161 continue 162 } 163 164 // If huge page size no longer is supported, we remove it from the node 165 if !supportedHugePageResources.Has(string(resourceName)) { 166 delete(existingNode.Status.Capacity, resourceName) 167 delete(existingNode.Status.Allocatable, resourceName) 168 klog.InfoS("Removing huge page resource which is no longer supported", "resourceName", resourceName) 169 requiresUpdate = true 170 } 171 } 172 return requiresUpdate 173} 174 175// Zeros out extended resource capacity during reconciliation. 176func (kl *Kubelet) reconcileExtendedResource(initialNode, node *v1.Node) bool { 177 requiresUpdate := updateDefaultResources(initialNode, node) 178 // Check with the device manager to see if node has been recreated, in which case extended resources should be zeroed until they are available 179 if kl.containerManager.ShouldResetExtendedResourceCapacity() { 180 for k := range node.Status.Capacity { 181 if v1helper.IsExtendedResourceName(k) { 182 klog.InfoS("Zero out resource capacity in existing node", "resourceName", k, "node", klog.KObj(node)) 183 node.Status.Capacity[k] = *resource.NewQuantity(int64(0), resource.DecimalSI) 184 node.Status.Allocatable[k] = *resource.NewQuantity(int64(0), resource.DecimalSI) 185 requiresUpdate = true 186 } 187 } 188 } 189 return requiresUpdate 190} 191 192// updateDefaultResources will set the default resources on the existing node according to the initial node 193func updateDefaultResources(initialNode, existingNode *v1.Node) bool { 194 requiresUpdate := false 195 if existingNode.Status.Capacity == nil { 196 if initialNode.Status.Capacity != nil { 197 existingNode.Status.Capacity = initialNode.Status.Capacity.DeepCopy() 198 requiresUpdate = true 199 } else { 200 existingNode.Status.Capacity = make(map[v1.ResourceName]resource.Quantity) 201 } 202 } 203 204 if existingNode.Status.Allocatable == nil { 205 if initialNode.Status.Allocatable != nil { 206 existingNode.Status.Allocatable = initialNode.Status.Allocatable.DeepCopy() 207 requiresUpdate = true 208 } else { 209 existingNode.Status.Allocatable = make(map[v1.ResourceName]resource.Quantity) 210 } 211 } 212 return requiresUpdate 213} 214 215// updateDefaultLabels will set the default labels on the node 216func (kl *Kubelet) updateDefaultLabels(initialNode, existingNode *v1.Node) bool { 217 defaultLabels := []string{ 218 v1.LabelHostname, 219 v1.LabelTopologyZone, 220 v1.LabelTopologyRegion, 221 v1.LabelFailureDomainBetaZone, 222 v1.LabelFailureDomainBetaRegion, 223 v1.LabelInstanceTypeStable, 224 v1.LabelInstanceType, 225 v1.LabelOSStable, 226 v1.LabelArchStable, 227 v1.LabelWindowsBuild, 228 kubeletapis.LabelOS, 229 kubeletapis.LabelArch, 230 } 231 232 needsUpdate := false 233 if existingNode.Labels == nil { 234 existingNode.Labels = make(map[string]string) 235 } 236 //Set default labels but make sure to not set labels with empty values 237 for _, label := range defaultLabels { 238 if _, hasInitialValue := initialNode.Labels[label]; !hasInitialValue { 239 continue 240 } 241 242 if existingNode.Labels[label] != initialNode.Labels[label] { 243 existingNode.Labels[label] = initialNode.Labels[label] 244 needsUpdate = true 245 } 246 247 if existingNode.Labels[label] == "" { 248 delete(existingNode.Labels, label) 249 } 250 } 251 252 return needsUpdate 253} 254 255// reconcileCMADAnnotationWithExistingNode reconciles the controller-managed 256// attach-detach annotation on a new node and the existing node, returning 257// whether the existing node must be updated. 258func (kl *Kubelet) reconcileCMADAnnotationWithExistingNode(node, existingNode *v1.Node) bool { 259 var ( 260 existingCMAAnnotation = existingNode.Annotations[volutil.ControllerManagedAttachAnnotation] 261 newCMAAnnotation, newSet = node.Annotations[volutil.ControllerManagedAttachAnnotation] 262 ) 263 264 if newCMAAnnotation == existingCMAAnnotation { 265 return false 266 } 267 268 // If the just-constructed node and the existing node do 269 // not have the same value, update the existing node with 270 // the correct value of the annotation. 271 if !newSet { 272 klog.InfoS("Controller attach-detach setting changed to false; updating existing Node") 273 delete(existingNode.Annotations, volutil.ControllerManagedAttachAnnotation) 274 } else { 275 klog.InfoS("Controller attach-detach setting changed to true; updating existing Node") 276 if existingNode.Annotations == nil { 277 existingNode.Annotations = make(map[string]string) 278 } 279 existingNode.Annotations[volutil.ControllerManagedAttachAnnotation] = newCMAAnnotation 280 } 281 282 return true 283} 284 285// initialNode constructs the initial v1.Node for this Kubelet, incorporating node 286// labels, information from the cloud provider, and Kubelet configuration. 287func (kl *Kubelet) initialNode(ctx context.Context) (*v1.Node, error) { 288 node := &v1.Node{ 289 ObjectMeta: metav1.ObjectMeta{ 290 Name: string(kl.nodeName), 291 Labels: map[string]string{ 292 v1.LabelHostname: kl.hostname, 293 v1.LabelOSStable: goruntime.GOOS, 294 v1.LabelArchStable: goruntime.GOARCH, 295 kubeletapis.LabelOS: goruntime.GOOS, 296 kubeletapis.LabelArch: goruntime.GOARCH, 297 }, 298 }, 299 Spec: v1.NodeSpec{ 300 Unschedulable: !kl.registerSchedulable, 301 }, 302 } 303 osLabels, err := getOSSpecificLabels() 304 if err != nil { 305 return nil, err 306 } 307 for label, value := range osLabels { 308 node.Labels[label] = value 309 } 310 311 nodeTaints := make([]v1.Taint, 0) 312 if len(kl.registerWithTaints) > 0 { 313 taints := make([]v1.Taint, len(kl.registerWithTaints)) 314 for i := range kl.registerWithTaints { 315 if err := k8s_api_v1.Convert_core_Taint_To_v1_Taint(&kl.registerWithTaints[i], &taints[i], nil); err != nil { 316 return nil, err 317 } 318 } 319 nodeTaints = append(nodeTaints, taints...) 320 } 321 322 unschedulableTaint := v1.Taint{ 323 Key: v1.TaintNodeUnschedulable, 324 Effect: v1.TaintEffectNoSchedule, 325 } 326 327 // Taint node with TaintNodeUnschedulable when initializing 328 // node to avoid race condition; refer to #63897 for more detail. 329 if node.Spec.Unschedulable && 330 !taintutil.TaintExists(nodeTaints, &unschedulableTaint) { 331 nodeTaints = append(nodeTaints, unschedulableTaint) 332 } 333 334 if kl.externalCloudProvider { 335 taint := v1.Taint{ 336 Key: cloudproviderapi.TaintExternalCloudProvider, 337 Value: "true", 338 Effect: v1.TaintEffectNoSchedule, 339 } 340 341 nodeTaints = append(nodeTaints, taint) 342 } 343 if len(nodeTaints) > 0 { 344 node.Spec.Taints = nodeTaints 345 } 346 // Initially, set NodeNetworkUnavailable to true. 347 if kl.providerRequiresNetworkingConfiguration() { 348 node.Status.Conditions = append(node.Status.Conditions, v1.NodeCondition{ 349 Type: v1.NodeNetworkUnavailable, 350 Status: v1.ConditionTrue, 351 Reason: "NoRouteCreated", 352 Message: "Node created without a route", 353 LastTransitionTime: metav1.NewTime(kl.clock.Now()), 354 }) 355 } 356 357 if kl.enableControllerAttachDetach { 358 if node.Annotations == nil { 359 node.Annotations = make(map[string]string) 360 } 361 362 klog.V(2).InfoS("Setting node annotation to enable volume controller attach/detach") 363 node.Annotations[volutil.ControllerManagedAttachAnnotation] = "true" 364 } else { 365 klog.V(2).InfoS("Controller attach/detach is disabled for this node; Kubelet will attach and detach volumes") 366 } 367 368 if kl.keepTerminatedPodVolumes { 369 if node.Annotations == nil { 370 node.Annotations = make(map[string]string) 371 } 372 klog.V(2).InfoS("Setting node annotation to keep pod volumes of terminated pods attached to the node") 373 node.Annotations[volutil.KeepTerminatedPodVolumesAnnotation] = "true" 374 } 375 376 // @question: should this be place after the call to the cloud provider? which also applies labels 377 for k, v := range kl.nodeLabels { 378 if cv, found := node.ObjectMeta.Labels[k]; found { 379 klog.InfoS("the node label will overwrite default setting", "labelKey", k, "labelValue", v, "default", cv) 380 } 381 node.ObjectMeta.Labels[k] = v 382 } 383 384 if kl.providerID != "" { 385 node.Spec.ProviderID = kl.providerID 386 } 387 388 if kl.cloud != nil { 389 instances, ok := kl.cloud.Instances() 390 if !ok { 391 return nil, fmt.Errorf("failed to get instances from cloud provider") 392 } 393 394 // TODO: We can't assume that the node has credentials to talk to the 395 // cloudprovider from arbitrary nodes. At most, we should talk to a 396 // local metadata server here. 397 var err error 398 if node.Spec.ProviderID == "" { 399 node.Spec.ProviderID, err = cloudprovider.GetInstanceProviderID(ctx, kl.cloud, kl.nodeName) 400 if err != nil { 401 return nil, err 402 } 403 } 404 405 instanceType, err := instances.InstanceType(ctx, kl.nodeName) 406 if err != nil { 407 return nil, err 408 } 409 if instanceType != "" { 410 klog.InfoS("Adding label from cloud provider", "labelKey", v1.LabelInstanceType, "labelValue", instanceType) 411 node.ObjectMeta.Labels[v1.LabelInstanceType] = instanceType 412 klog.InfoS("Adding node label from cloud provider", "labelKey", v1.LabelInstanceTypeStable, "labelValue", instanceType) 413 node.ObjectMeta.Labels[v1.LabelInstanceTypeStable] = instanceType 414 } 415 // If the cloud has zone information, label the node with the zone information 416 zones, ok := kl.cloud.Zones() 417 if ok { 418 zone, err := zones.GetZone(ctx) 419 if err != nil { 420 return nil, fmt.Errorf("failed to get zone from cloud provider: %v", err) 421 } 422 if zone.FailureDomain != "" { 423 klog.InfoS("Adding node label from cloud provider", "labelKey", v1.LabelFailureDomainBetaZone, "labelValue", zone.FailureDomain) 424 node.ObjectMeta.Labels[v1.LabelFailureDomainBetaZone] = zone.FailureDomain 425 klog.InfoS("Adding node label from cloud provider", "labelKey", v1.LabelTopologyZone, "labelValue", zone.FailureDomain) 426 node.ObjectMeta.Labels[v1.LabelTopologyZone] = zone.FailureDomain 427 } 428 if zone.Region != "" { 429 klog.InfoS("Adding node label from cloud provider", "labelKey", v1.LabelFailureDomainBetaRegion, "labelValue", zone.Region) 430 node.ObjectMeta.Labels[v1.LabelFailureDomainBetaRegion] = zone.Region 431 klog.InfoS("Adding node label from cloud provider", "labelKey", v1.LabelTopologyRegion, "labelValue", zone.Region) 432 node.ObjectMeta.Labels[v1.LabelTopologyRegion] = zone.Region 433 } 434 } 435 } 436 437 kl.setNodeStatus(node) 438 439 return node, nil 440} 441 442// syncNodeStatus should be called periodically from a goroutine. 443// It synchronizes node status to master if there is any change or enough time 444// passed from the last sync, registering the kubelet first if necessary. 445func (kl *Kubelet) syncNodeStatus() { 446 kl.syncNodeStatusMux.Lock() 447 defer kl.syncNodeStatusMux.Unlock() 448 449 if kl.kubeClient == nil || kl.heartbeatClient == nil { 450 return 451 } 452 if kl.registerNode { 453 // This will exit immediately if it doesn't need to do anything. 454 kl.registerWithAPIServer() 455 } 456 if err := kl.updateNodeStatus(); err != nil { 457 klog.ErrorS(err, "Unable to update node status") 458 } 459} 460 461// updateNodeStatus updates node status to master with retries if there is any 462// change or enough time passed from the last sync. 463func (kl *Kubelet) updateNodeStatus() error { 464 klog.V(5).InfoS("Updating node status") 465 for i := 0; i < nodeStatusUpdateRetry; i++ { 466 if err := kl.tryUpdateNodeStatus(i); err != nil { 467 if i > 0 && kl.onRepeatedHeartbeatFailure != nil { 468 kl.onRepeatedHeartbeatFailure() 469 } 470 klog.ErrorS(err, "Error updating node status, will retry") 471 } else { 472 return nil 473 } 474 } 475 return fmt.Errorf("update node status exceeds retry count") 476} 477 478// tryUpdateNodeStatus tries to update node status to master if there is any 479// change or enough time passed from the last sync. 480func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error { 481 // In large clusters, GET and PUT operations on Node objects coming 482 // from here are the majority of load on apiserver and etcd. 483 // To reduce the load on etcd, we are serving GET operations from 484 // apiserver cache (the data might be slightly delayed but it doesn't 485 // seem to cause more conflict - the delays are pretty small). 486 // If it result in a conflict, all retries are served directly from etcd. 487 opts := metav1.GetOptions{} 488 if tryNumber == 0 { 489 util.FromApiserverCache(&opts) 490 } 491 node, err := kl.heartbeatClient.CoreV1().Nodes().Get(context.TODO(), string(kl.nodeName), opts) 492 if err != nil { 493 return fmt.Errorf("error getting node %q: %v", kl.nodeName, err) 494 } 495 496 originalNode := node.DeepCopy() 497 if originalNode == nil { 498 return fmt.Errorf("nil %q node object", kl.nodeName) 499 } 500 501 podCIDRChanged := false 502 if len(node.Spec.PodCIDRs) != 0 { 503 // Pod CIDR could have been updated before, so we cannot rely on 504 // node.Spec.PodCIDR being non-empty. We also need to know if pod CIDR is 505 // actually changed. 506 podCIDRs := strings.Join(node.Spec.PodCIDRs, ",") 507 if podCIDRChanged, err = kl.updatePodCIDR(podCIDRs); err != nil { 508 klog.ErrorS(err, "Error updating pod CIDR") 509 } 510 } 511 512 kl.setNodeStatus(node) 513 514 now := kl.clock.Now() 515 if now.Before(kl.lastStatusReportTime.Add(kl.nodeStatusReportFrequency)) { 516 if !podCIDRChanged && !nodeStatusHasChanged(&originalNode.Status, &node.Status) { 517 // We must mark the volumes as ReportedInUse in volume manager's dsw even 518 // if no changes were made to the node status (no volumes were added or removed 519 // from the VolumesInUse list). 520 // 521 // The reason is that on a kubelet restart, the volume manager's dsw is 522 // repopulated and the volume ReportedInUse is initialized to false, while the 523 // VolumesInUse list from the Node object still contains the state from the 524 // previous kubelet instantiation. 525 // 526 // Once the volumes are added to the dsw, the ReportedInUse field needs to be 527 // synced from the VolumesInUse list in the Node.Status. 528 // 529 // The MarkVolumesAsReportedInUse() call cannot be performed in dsw directly 530 // because it does not have access to the Node object. 531 // This also cannot be populated on node status manager init because the volume 532 // may not have been added to dsw at that time. 533 kl.volumeManager.MarkVolumesAsReportedInUse(node.Status.VolumesInUse) 534 return nil 535 } 536 } 537 538 // Patch the current status on the API server 539 updatedNode, _, err := nodeutil.PatchNodeStatus(kl.heartbeatClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, node) 540 if err != nil { 541 return err 542 } 543 kl.lastStatusReportTime = now 544 kl.setLastObservedNodeAddresses(updatedNode.Status.Addresses) 545 // If update finishes successfully, mark the volumeInUse as reportedInUse to indicate 546 // those volumes are already updated in the node's status 547 kl.volumeManager.MarkVolumesAsReportedInUse(updatedNode.Status.VolumesInUse) 548 return nil 549} 550 551// recordNodeStatusEvent records an event of the given type with the given 552// message for the node. 553func (kl *Kubelet) recordNodeStatusEvent(eventType, event string) { 554 klog.V(2).InfoS("Recording event message for node", "node", klog.KRef("", string(kl.nodeName)), "event", event) 555 kl.recorder.Eventf(kl.nodeRef, eventType, event, "Node %s status is now: %s", kl.nodeName, event) 556} 557 558// recordEvent records an event for this node, the Kubelet's nodeRef is passed to the recorder 559func (kl *Kubelet) recordEvent(eventType, event, message string) { 560 kl.recorder.Eventf(kl.nodeRef, eventType, event, message) 561} 562 563// record if node schedulable change. 564func (kl *Kubelet) recordNodeSchedulableEvent(node *v1.Node) error { 565 kl.lastNodeUnschedulableLock.Lock() 566 defer kl.lastNodeUnschedulableLock.Unlock() 567 if kl.lastNodeUnschedulable != node.Spec.Unschedulable { 568 if node.Spec.Unschedulable { 569 kl.recordNodeStatusEvent(v1.EventTypeNormal, events.NodeNotSchedulable) 570 } else { 571 kl.recordNodeStatusEvent(v1.EventTypeNormal, events.NodeSchedulable) 572 } 573 kl.lastNodeUnschedulable = node.Spec.Unschedulable 574 } 575 return nil 576} 577 578// setNodeStatus fills in the Status fields of the given Node, overwriting 579// any fields that are currently set. 580// TODO(madhusudancs): Simplify the logic for setting node conditions and 581// refactor the node status condition code out to a different file. 582func (kl *Kubelet) setNodeStatus(node *v1.Node) { 583 for i, f := range kl.setNodeStatusFuncs { 584 klog.V(5).InfoS("Setting node status condition code", "position", i, "node", klog.KObj(node)) 585 if err := f(node); err != nil { 586 klog.ErrorS(err, "Failed to set some node status fields", "node", klog.KObj(node)) 587 } 588 } 589} 590 591func (kl *Kubelet) setLastObservedNodeAddresses(addresses []v1.NodeAddress) { 592 kl.lastObservedNodeAddressesMux.Lock() 593 defer kl.lastObservedNodeAddressesMux.Unlock() 594 kl.lastObservedNodeAddresses = addresses 595} 596func (kl *Kubelet) getLastObservedNodeAddresses() []v1.NodeAddress { 597 kl.lastObservedNodeAddressesMux.RLock() 598 defer kl.lastObservedNodeAddressesMux.RUnlock() 599 return kl.lastObservedNodeAddresses 600} 601 602// defaultNodeStatusFuncs is a factory that generates the default set of 603// setNodeStatus funcs 604func (kl *Kubelet) defaultNodeStatusFuncs() []func(*v1.Node) error { 605 // if cloud is not nil, we expect the cloud resource sync manager to exist 606 var nodeAddressesFunc func() ([]v1.NodeAddress, error) 607 if kl.cloud != nil { 608 nodeAddressesFunc = kl.cloudResourceSyncManager.NodeAddresses 609 } 610 var validateHostFunc func() error 611 if kl.appArmorValidator != nil { 612 validateHostFunc = kl.appArmorValidator.ValidateHost 613 } 614 var setters []func(n *v1.Node) error 615 setters = append(setters, 616 nodestatus.NodeAddress(kl.nodeIPs, kl.nodeIPValidator, kl.hostname, kl.hostnameOverridden, kl.externalCloudProvider, kl.cloud, nodeAddressesFunc), 617 nodestatus.MachineInfo(string(kl.nodeName), kl.maxPods, kl.podsPerCore, kl.GetCachedMachineInfo, kl.containerManager.GetCapacity, 618 kl.containerManager.GetDevicePluginResourceCapacity, kl.containerManager.GetNodeAllocatableReservation, kl.recordEvent), 619 nodestatus.VersionInfo(kl.cadvisor.VersionInfo, kl.containerRuntime.Type, kl.containerRuntime.Version), 620 nodestatus.DaemonEndpoints(kl.daemonEndpoints), 621 nodestatus.Images(kl.nodeStatusMaxImages, kl.imageManager.GetImageList), 622 nodestatus.GoRuntime(), 623 ) 624 // Volume limits 625 setters = append(setters, nodestatus.VolumeLimits(kl.volumePluginMgr.ListVolumePluginWithLimits)) 626 627 setters = append(setters, 628 nodestatus.MemoryPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderMemoryPressure, kl.recordNodeStatusEvent), 629 nodestatus.DiskPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderDiskPressure, kl.recordNodeStatusEvent), 630 nodestatus.PIDPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderPIDPressure, kl.recordNodeStatusEvent), 631 nodestatus.ReadyCondition(kl.clock.Now, kl.runtimeState.runtimeErrors, kl.runtimeState.networkErrors, kl.runtimeState.storageErrors, validateHostFunc, kl.containerManager.Status, kl.shutdownManager.ShutdownStatus, kl.recordNodeStatusEvent), 632 nodestatus.VolumesInUse(kl.volumeManager.ReconcilerStatesHasBeenSynced, kl.volumeManager.GetVolumesInUse), 633 // TODO(mtaufen): I decided not to move this setter for now, since all it does is send an event 634 // and record state back to the Kubelet runtime object. In the future, I'd like to isolate 635 // these side-effects by decoupling the decisions to send events and partial status recording 636 // from the Node setters. 637 kl.recordNodeSchedulableEvent, 638 ) 639 return setters 640} 641 642// Validate given node IP belongs to the current host 643func validateNodeIP(nodeIP net.IP) error { 644 // Honor IP limitations set in setNodeStatus() 645 if nodeIP.To4() == nil && nodeIP.To16() == nil { 646 return fmt.Errorf("nodeIP must be a valid IP address") 647 } 648 if nodeIP.IsLoopback() { 649 return fmt.Errorf("nodeIP can't be loopback address") 650 } 651 if nodeIP.IsMulticast() { 652 return fmt.Errorf("nodeIP can't be a multicast address") 653 } 654 if nodeIP.IsLinkLocalUnicast() { 655 return fmt.Errorf("nodeIP can't be a link-local unicast address") 656 } 657 if nodeIP.IsUnspecified() { 658 return fmt.Errorf("nodeIP can't be an all zeros address") 659 } 660 661 addrs, err := net.InterfaceAddrs() 662 if err != nil { 663 return err 664 } 665 for _, addr := range addrs { 666 var ip net.IP 667 switch v := addr.(type) { 668 case *net.IPNet: 669 ip = v.IP 670 case *net.IPAddr: 671 ip = v.IP 672 } 673 if ip != nil && ip.Equal(nodeIP) { 674 return nil 675 } 676 } 677 return fmt.Errorf("node IP: %q not found in the host's network interfaces", nodeIP.String()) 678} 679 680// nodeStatusHasChanged compares the original node and current node's status and 681// returns true if any change happens. The heartbeat timestamp is ignored. 682func nodeStatusHasChanged(originalStatus *v1.NodeStatus, status *v1.NodeStatus) bool { 683 if originalStatus == nil && status == nil { 684 return false 685 } 686 if originalStatus == nil || status == nil { 687 return true 688 } 689 690 // Compare node conditions here because we need to ignore the heartbeat timestamp. 691 if nodeConditionsHaveChanged(originalStatus.Conditions, status.Conditions) { 692 return true 693 } 694 695 // Compare other fields of NodeStatus. 696 originalStatusCopy := originalStatus.DeepCopy() 697 statusCopy := status.DeepCopy() 698 originalStatusCopy.Conditions = nil 699 statusCopy.Conditions = nil 700 return !apiequality.Semantic.DeepEqual(originalStatusCopy, statusCopy) 701} 702 703// nodeConditionsHaveChanged compares the original node and current node's 704// conditions and returns true if any change happens. The heartbeat timestamp is 705// ignored. 706func nodeConditionsHaveChanged(originalConditions []v1.NodeCondition, conditions []v1.NodeCondition) bool { 707 if len(originalConditions) != len(conditions) { 708 return true 709 } 710 711 originalConditionsCopy := make([]v1.NodeCondition, 0, len(originalConditions)) 712 originalConditionsCopy = append(originalConditionsCopy, originalConditions...) 713 conditionsCopy := make([]v1.NodeCondition, 0, len(conditions)) 714 conditionsCopy = append(conditionsCopy, conditions...) 715 716 sort.SliceStable(originalConditionsCopy, func(i, j int) bool { return originalConditionsCopy[i].Type < originalConditionsCopy[j].Type }) 717 sort.SliceStable(conditionsCopy, func(i, j int) bool { return conditionsCopy[i].Type < conditionsCopy[j].Type }) 718 719 replacedheartbeatTime := metav1.Time{} 720 for i := range conditionsCopy { 721 originalConditionsCopy[i].LastHeartbeatTime = replacedheartbeatTime 722 conditionsCopy[i].LastHeartbeatTime = replacedheartbeatTime 723 if !apiequality.Semantic.DeepEqual(&originalConditionsCopy[i], &conditionsCopy[i]) { 724 return true 725 } 726 } 727 return false 728} 729