1/*
2Copyright 2016 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package kubelet
18
19import (
20	"context"
21	"fmt"
22	"net"
23	goruntime "runtime"
24	"sort"
25	"strings"
26	"time"
27
28	v1 "k8s.io/api/core/v1"
29	apiequality "k8s.io/apimachinery/pkg/api/equality"
30	apierrors "k8s.io/apimachinery/pkg/api/errors"
31	"k8s.io/apimachinery/pkg/api/resource"
32	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33	"k8s.io/apimachinery/pkg/types"
34	"k8s.io/apimachinery/pkg/util/sets"
35	cloudprovider "k8s.io/cloud-provider"
36	cloudproviderapi "k8s.io/cloud-provider/api"
37	"k8s.io/klog/v2"
38	kubeletapis "k8s.io/kubelet/pkg/apis"
39	k8s_api_v1 "k8s.io/kubernetes/pkg/apis/core/v1"
40	v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
41	"k8s.io/kubernetes/pkg/kubelet/events"
42	"k8s.io/kubernetes/pkg/kubelet/nodestatus"
43	"k8s.io/kubernetes/pkg/kubelet/util"
44	nodeutil "k8s.io/kubernetes/pkg/util/node"
45	taintutil "k8s.io/kubernetes/pkg/util/taints"
46	volutil "k8s.io/kubernetes/pkg/volume/util"
47)
48
49// registerWithAPIServer registers the node with the cluster master. It is safe
50// to call multiple times, but not concurrently (kl.registrationCompleted is
51// not locked).
52func (kl *Kubelet) registerWithAPIServer() {
53	if kl.registrationCompleted {
54		return
55	}
56	step := 100 * time.Millisecond
57
58	for {
59		time.Sleep(step)
60		step = step * 2
61		if step >= 7*time.Second {
62			step = 7 * time.Second
63		}
64
65		node, err := kl.initialNode(context.TODO())
66		if err != nil {
67			klog.ErrorS(err, "Unable to construct v1.Node object for kubelet")
68			continue
69		}
70
71		klog.InfoS("Attempting to register node", "node", klog.KObj(node))
72		registered := kl.tryRegisterWithAPIServer(node)
73		if registered {
74			klog.InfoS("Successfully registered node", "node", klog.KObj(node))
75			kl.registrationCompleted = true
76			return
77		}
78	}
79}
80
81// tryRegisterWithAPIServer makes an attempt to register the given node with
82// the API server, returning a boolean indicating whether the attempt was
83// successful.  If a node with the same name already exists, it reconciles the
84// value of the annotation for controller-managed attach-detach of attachable
85// persistent volumes for the node.
86func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool {
87	_, err := kl.kubeClient.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{})
88	if err == nil {
89		return true
90	}
91
92	if !apierrors.IsAlreadyExists(err) {
93		klog.ErrorS(err, "Unable to register node with API server", "node", klog.KObj(node))
94		return false
95	}
96
97	existingNode, err := kl.kubeClient.CoreV1().Nodes().Get(context.TODO(), string(kl.nodeName), metav1.GetOptions{})
98	if err != nil {
99		klog.ErrorS(err, "Unable to register node with API server, error getting existing node", "node", klog.KObj(node))
100		return false
101	}
102	if existingNode == nil {
103		klog.InfoS("Unable to register node with API server, no node instance returned", "node", klog.KObj(node))
104		return false
105	}
106
107	originalNode := existingNode.DeepCopy()
108
109	klog.InfoS("Node was previously registered", "node", klog.KObj(node))
110
111	// Edge case: the node was previously registered; reconcile
112	// the value of the controller-managed attach-detach
113	// annotation.
114	requiresUpdate := kl.reconcileCMADAnnotationWithExistingNode(node, existingNode)
115	requiresUpdate = kl.updateDefaultLabels(node, existingNode) || requiresUpdate
116	requiresUpdate = kl.reconcileExtendedResource(node, existingNode) || requiresUpdate
117	requiresUpdate = kl.reconcileHugePageResource(node, existingNode) || requiresUpdate
118	if requiresUpdate {
119		if _, _, err := nodeutil.PatchNodeStatus(kl.kubeClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, existingNode); err != nil {
120			klog.ErrorS(err, "Unable to reconcile node with API server,error updating node", "node", klog.KObj(node))
121			return false
122		}
123	}
124
125	return true
126}
127
128// reconcileHugePageResource will update huge page capacity for each page size and remove huge page sizes no longer supported
129func (kl *Kubelet) reconcileHugePageResource(initialNode, existingNode *v1.Node) bool {
130	requiresUpdate := updateDefaultResources(initialNode, existingNode)
131	supportedHugePageResources := sets.String{}
132
133	for resourceName := range initialNode.Status.Capacity {
134		if !v1helper.IsHugePageResourceName(resourceName) {
135			continue
136		}
137		supportedHugePageResources.Insert(string(resourceName))
138
139		initialCapacity := initialNode.Status.Capacity[resourceName]
140		initialAllocatable := initialNode.Status.Allocatable[resourceName]
141
142		capacity, resourceIsSupported := existingNode.Status.Capacity[resourceName]
143		allocatable := existingNode.Status.Allocatable[resourceName]
144
145		// Add or update capacity if it the size was previously unsupported or has changed
146		if !resourceIsSupported || capacity.Cmp(initialCapacity) != 0 {
147			existingNode.Status.Capacity[resourceName] = initialCapacity.DeepCopy()
148			requiresUpdate = true
149		}
150
151		// Add or update allocatable if it the size was previously unsupported or has changed
152		if !resourceIsSupported || allocatable.Cmp(initialAllocatable) != 0 {
153			existingNode.Status.Allocatable[resourceName] = initialAllocatable.DeepCopy()
154			requiresUpdate = true
155		}
156
157	}
158
159	for resourceName := range existingNode.Status.Capacity {
160		if !v1helper.IsHugePageResourceName(resourceName) {
161			continue
162		}
163
164		// If huge page size no longer is supported, we remove it from the node
165		if !supportedHugePageResources.Has(string(resourceName)) {
166			delete(existingNode.Status.Capacity, resourceName)
167			delete(existingNode.Status.Allocatable, resourceName)
168			klog.InfoS("Removing huge page resource which is no longer supported", "resourceName", resourceName)
169			requiresUpdate = true
170		}
171	}
172	return requiresUpdate
173}
174
175// Zeros out extended resource capacity during reconciliation.
176func (kl *Kubelet) reconcileExtendedResource(initialNode, node *v1.Node) bool {
177	requiresUpdate := updateDefaultResources(initialNode, node)
178	// Check with the device manager to see if node has been recreated, in which case extended resources should be zeroed until they are available
179	if kl.containerManager.ShouldResetExtendedResourceCapacity() {
180		for k := range node.Status.Capacity {
181			if v1helper.IsExtendedResourceName(k) {
182				klog.InfoS("Zero out resource capacity in existing node", "resourceName", k, "node", klog.KObj(node))
183				node.Status.Capacity[k] = *resource.NewQuantity(int64(0), resource.DecimalSI)
184				node.Status.Allocatable[k] = *resource.NewQuantity(int64(0), resource.DecimalSI)
185				requiresUpdate = true
186			}
187		}
188	}
189	return requiresUpdate
190}
191
192// updateDefaultResources will set the default resources on the existing node according to the initial node
193func updateDefaultResources(initialNode, existingNode *v1.Node) bool {
194	requiresUpdate := false
195	if existingNode.Status.Capacity == nil {
196		if initialNode.Status.Capacity != nil {
197			existingNode.Status.Capacity = initialNode.Status.Capacity.DeepCopy()
198			requiresUpdate = true
199		} else {
200			existingNode.Status.Capacity = make(map[v1.ResourceName]resource.Quantity)
201		}
202	}
203
204	if existingNode.Status.Allocatable == nil {
205		if initialNode.Status.Allocatable != nil {
206			existingNode.Status.Allocatable = initialNode.Status.Allocatable.DeepCopy()
207			requiresUpdate = true
208		} else {
209			existingNode.Status.Allocatable = make(map[v1.ResourceName]resource.Quantity)
210		}
211	}
212	return requiresUpdate
213}
214
215// updateDefaultLabels will set the default labels on the node
216func (kl *Kubelet) updateDefaultLabels(initialNode, existingNode *v1.Node) bool {
217	defaultLabels := []string{
218		v1.LabelHostname,
219		v1.LabelTopologyZone,
220		v1.LabelTopologyRegion,
221		v1.LabelFailureDomainBetaZone,
222		v1.LabelFailureDomainBetaRegion,
223		v1.LabelInstanceTypeStable,
224		v1.LabelInstanceType,
225		v1.LabelOSStable,
226		v1.LabelArchStable,
227		v1.LabelWindowsBuild,
228		kubeletapis.LabelOS,
229		kubeletapis.LabelArch,
230	}
231
232	needsUpdate := false
233	if existingNode.Labels == nil {
234		existingNode.Labels = make(map[string]string)
235	}
236	//Set default labels but make sure to not set labels with empty values
237	for _, label := range defaultLabels {
238		if _, hasInitialValue := initialNode.Labels[label]; !hasInitialValue {
239			continue
240		}
241
242		if existingNode.Labels[label] != initialNode.Labels[label] {
243			existingNode.Labels[label] = initialNode.Labels[label]
244			needsUpdate = true
245		}
246
247		if existingNode.Labels[label] == "" {
248			delete(existingNode.Labels, label)
249		}
250	}
251
252	return needsUpdate
253}
254
255// reconcileCMADAnnotationWithExistingNode reconciles the controller-managed
256// attach-detach annotation on a new node and the existing node, returning
257// whether the existing node must be updated.
258func (kl *Kubelet) reconcileCMADAnnotationWithExistingNode(node, existingNode *v1.Node) bool {
259	var (
260		existingCMAAnnotation    = existingNode.Annotations[volutil.ControllerManagedAttachAnnotation]
261		newCMAAnnotation, newSet = node.Annotations[volutil.ControllerManagedAttachAnnotation]
262	)
263
264	if newCMAAnnotation == existingCMAAnnotation {
265		return false
266	}
267
268	// If the just-constructed node and the existing node do
269	// not have the same value, update the existing node with
270	// the correct value of the annotation.
271	if !newSet {
272		klog.InfoS("Controller attach-detach setting changed to false; updating existing Node")
273		delete(existingNode.Annotations, volutil.ControllerManagedAttachAnnotation)
274	} else {
275		klog.InfoS("Controller attach-detach setting changed to true; updating existing Node")
276		if existingNode.Annotations == nil {
277			existingNode.Annotations = make(map[string]string)
278		}
279		existingNode.Annotations[volutil.ControllerManagedAttachAnnotation] = newCMAAnnotation
280	}
281
282	return true
283}
284
285// initialNode constructs the initial v1.Node for this Kubelet, incorporating node
286// labels, information from the cloud provider, and Kubelet configuration.
287func (kl *Kubelet) initialNode(ctx context.Context) (*v1.Node, error) {
288	node := &v1.Node{
289		ObjectMeta: metav1.ObjectMeta{
290			Name: string(kl.nodeName),
291			Labels: map[string]string{
292				v1.LabelHostname:      kl.hostname,
293				v1.LabelOSStable:      goruntime.GOOS,
294				v1.LabelArchStable:    goruntime.GOARCH,
295				kubeletapis.LabelOS:   goruntime.GOOS,
296				kubeletapis.LabelArch: goruntime.GOARCH,
297			},
298		},
299		Spec: v1.NodeSpec{
300			Unschedulable: !kl.registerSchedulable,
301		},
302	}
303	osLabels, err := getOSSpecificLabels()
304	if err != nil {
305		return nil, err
306	}
307	for label, value := range osLabels {
308		node.Labels[label] = value
309	}
310
311	nodeTaints := make([]v1.Taint, 0)
312	if len(kl.registerWithTaints) > 0 {
313		taints := make([]v1.Taint, len(kl.registerWithTaints))
314		for i := range kl.registerWithTaints {
315			if err := k8s_api_v1.Convert_core_Taint_To_v1_Taint(&kl.registerWithTaints[i], &taints[i], nil); err != nil {
316				return nil, err
317			}
318		}
319		nodeTaints = append(nodeTaints, taints...)
320	}
321
322	unschedulableTaint := v1.Taint{
323		Key:    v1.TaintNodeUnschedulable,
324		Effect: v1.TaintEffectNoSchedule,
325	}
326
327	// Taint node with TaintNodeUnschedulable when initializing
328	// node to avoid race condition; refer to #63897 for more detail.
329	if node.Spec.Unschedulable &&
330		!taintutil.TaintExists(nodeTaints, &unschedulableTaint) {
331		nodeTaints = append(nodeTaints, unschedulableTaint)
332	}
333
334	if kl.externalCloudProvider {
335		taint := v1.Taint{
336			Key:    cloudproviderapi.TaintExternalCloudProvider,
337			Value:  "true",
338			Effect: v1.TaintEffectNoSchedule,
339		}
340
341		nodeTaints = append(nodeTaints, taint)
342	}
343	if len(nodeTaints) > 0 {
344		node.Spec.Taints = nodeTaints
345	}
346	// Initially, set NodeNetworkUnavailable to true.
347	if kl.providerRequiresNetworkingConfiguration() {
348		node.Status.Conditions = append(node.Status.Conditions, v1.NodeCondition{
349			Type:               v1.NodeNetworkUnavailable,
350			Status:             v1.ConditionTrue,
351			Reason:             "NoRouteCreated",
352			Message:            "Node created without a route",
353			LastTransitionTime: metav1.NewTime(kl.clock.Now()),
354		})
355	}
356
357	if kl.enableControllerAttachDetach {
358		if node.Annotations == nil {
359			node.Annotations = make(map[string]string)
360		}
361
362		klog.V(2).InfoS("Setting node annotation to enable volume controller attach/detach")
363		node.Annotations[volutil.ControllerManagedAttachAnnotation] = "true"
364	} else {
365		klog.V(2).InfoS("Controller attach/detach is disabled for this node; Kubelet will attach and detach volumes")
366	}
367
368	if kl.keepTerminatedPodVolumes {
369		if node.Annotations == nil {
370			node.Annotations = make(map[string]string)
371		}
372		klog.V(2).InfoS("Setting node annotation to keep pod volumes of terminated pods attached to the node")
373		node.Annotations[volutil.KeepTerminatedPodVolumesAnnotation] = "true"
374	}
375
376	// @question: should this be place after the call to the cloud provider? which also applies labels
377	for k, v := range kl.nodeLabels {
378		if cv, found := node.ObjectMeta.Labels[k]; found {
379			klog.InfoS("the node label will overwrite default setting", "labelKey", k, "labelValue", v, "default", cv)
380		}
381		node.ObjectMeta.Labels[k] = v
382	}
383
384	if kl.providerID != "" {
385		node.Spec.ProviderID = kl.providerID
386	}
387
388	if kl.cloud != nil {
389		instances, ok := kl.cloud.Instances()
390		if !ok {
391			return nil, fmt.Errorf("failed to get instances from cloud provider")
392		}
393
394		// TODO: We can't assume that the node has credentials to talk to the
395		// cloudprovider from arbitrary nodes. At most, we should talk to a
396		// local metadata server here.
397		var err error
398		if node.Spec.ProviderID == "" {
399			node.Spec.ProviderID, err = cloudprovider.GetInstanceProviderID(ctx, kl.cloud, kl.nodeName)
400			if err != nil {
401				return nil, err
402			}
403		}
404
405		instanceType, err := instances.InstanceType(ctx, kl.nodeName)
406		if err != nil {
407			return nil, err
408		}
409		if instanceType != "" {
410			klog.InfoS("Adding label from cloud provider", "labelKey", v1.LabelInstanceType, "labelValue", instanceType)
411			node.ObjectMeta.Labels[v1.LabelInstanceType] = instanceType
412			klog.InfoS("Adding node label from cloud provider", "labelKey", v1.LabelInstanceTypeStable, "labelValue", instanceType)
413			node.ObjectMeta.Labels[v1.LabelInstanceTypeStable] = instanceType
414		}
415		// If the cloud has zone information, label the node with the zone information
416		zones, ok := kl.cloud.Zones()
417		if ok {
418			zone, err := zones.GetZone(ctx)
419			if err != nil {
420				return nil, fmt.Errorf("failed to get zone from cloud provider: %v", err)
421			}
422			if zone.FailureDomain != "" {
423				klog.InfoS("Adding node label from cloud provider", "labelKey", v1.LabelFailureDomainBetaZone, "labelValue", zone.FailureDomain)
424				node.ObjectMeta.Labels[v1.LabelFailureDomainBetaZone] = zone.FailureDomain
425				klog.InfoS("Adding node label from cloud provider", "labelKey", v1.LabelTopologyZone, "labelValue", zone.FailureDomain)
426				node.ObjectMeta.Labels[v1.LabelTopologyZone] = zone.FailureDomain
427			}
428			if zone.Region != "" {
429				klog.InfoS("Adding node label from cloud provider", "labelKey", v1.LabelFailureDomainBetaRegion, "labelValue", zone.Region)
430				node.ObjectMeta.Labels[v1.LabelFailureDomainBetaRegion] = zone.Region
431				klog.InfoS("Adding node label from cloud provider", "labelKey", v1.LabelTopologyRegion, "labelValue", zone.Region)
432				node.ObjectMeta.Labels[v1.LabelTopologyRegion] = zone.Region
433			}
434		}
435	}
436
437	kl.setNodeStatus(node)
438
439	return node, nil
440}
441
442// syncNodeStatus should be called periodically from a goroutine.
443// It synchronizes node status to master if there is any change or enough time
444// passed from the last sync, registering the kubelet first if necessary.
445func (kl *Kubelet) syncNodeStatus() {
446	kl.syncNodeStatusMux.Lock()
447	defer kl.syncNodeStatusMux.Unlock()
448
449	if kl.kubeClient == nil || kl.heartbeatClient == nil {
450		return
451	}
452	if kl.registerNode {
453		// This will exit immediately if it doesn't need to do anything.
454		kl.registerWithAPIServer()
455	}
456	if err := kl.updateNodeStatus(); err != nil {
457		klog.ErrorS(err, "Unable to update node status")
458	}
459}
460
461// updateNodeStatus updates node status to master with retries if there is any
462// change or enough time passed from the last sync.
463func (kl *Kubelet) updateNodeStatus() error {
464	klog.V(5).InfoS("Updating node status")
465	for i := 0; i < nodeStatusUpdateRetry; i++ {
466		if err := kl.tryUpdateNodeStatus(i); err != nil {
467			if i > 0 && kl.onRepeatedHeartbeatFailure != nil {
468				kl.onRepeatedHeartbeatFailure()
469			}
470			klog.ErrorS(err, "Error updating node status, will retry")
471		} else {
472			return nil
473		}
474	}
475	return fmt.Errorf("update node status exceeds retry count")
476}
477
478// tryUpdateNodeStatus tries to update node status to master if there is any
479// change or enough time passed from the last sync.
480func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error {
481	// In large clusters, GET and PUT operations on Node objects coming
482	// from here are the majority of load on apiserver and etcd.
483	// To reduce the load on etcd, we are serving GET operations from
484	// apiserver cache (the data might be slightly delayed but it doesn't
485	// seem to cause more conflict - the delays are pretty small).
486	// If it result in a conflict, all retries are served directly from etcd.
487	opts := metav1.GetOptions{}
488	if tryNumber == 0 {
489		util.FromApiserverCache(&opts)
490	}
491	node, err := kl.heartbeatClient.CoreV1().Nodes().Get(context.TODO(), string(kl.nodeName), opts)
492	if err != nil {
493		return fmt.Errorf("error getting node %q: %v", kl.nodeName, err)
494	}
495
496	originalNode := node.DeepCopy()
497	if originalNode == nil {
498		return fmt.Errorf("nil %q node object", kl.nodeName)
499	}
500
501	podCIDRChanged := false
502	if len(node.Spec.PodCIDRs) != 0 {
503		// Pod CIDR could have been updated before, so we cannot rely on
504		// node.Spec.PodCIDR being non-empty. We also need to know if pod CIDR is
505		// actually changed.
506		podCIDRs := strings.Join(node.Spec.PodCIDRs, ",")
507		if podCIDRChanged, err = kl.updatePodCIDR(podCIDRs); err != nil {
508			klog.ErrorS(err, "Error updating pod CIDR")
509		}
510	}
511
512	kl.setNodeStatus(node)
513
514	now := kl.clock.Now()
515	if now.Before(kl.lastStatusReportTime.Add(kl.nodeStatusReportFrequency)) {
516		if !podCIDRChanged && !nodeStatusHasChanged(&originalNode.Status, &node.Status) {
517			// We must mark the volumes as ReportedInUse in volume manager's dsw even
518			// if no changes were made to the node status (no volumes were added or removed
519			// from the VolumesInUse list).
520			//
521			// The reason is that on a kubelet restart, the volume manager's dsw is
522			// repopulated and the volume ReportedInUse is initialized to false, while the
523			// VolumesInUse list from the Node object still contains the state from the
524			// previous kubelet instantiation.
525			//
526			// Once the volumes are added to the dsw, the ReportedInUse field needs to be
527			// synced from the VolumesInUse list in the Node.Status.
528			//
529			// The MarkVolumesAsReportedInUse() call cannot be performed in dsw directly
530			// because it does not have access to the Node object.
531			// This also cannot be populated on node status manager init because the volume
532			// may not have been added to dsw at that time.
533			kl.volumeManager.MarkVolumesAsReportedInUse(node.Status.VolumesInUse)
534			return nil
535		}
536	}
537
538	// Patch the current status on the API server
539	updatedNode, _, err := nodeutil.PatchNodeStatus(kl.heartbeatClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, node)
540	if err != nil {
541		return err
542	}
543	kl.lastStatusReportTime = now
544	kl.setLastObservedNodeAddresses(updatedNode.Status.Addresses)
545	// If update finishes successfully, mark the volumeInUse as reportedInUse to indicate
546	// those volumes are already updated in the node's status
547	kl.volumeManager.MarkVolumesAsReportedInUse(updatedNode.Status.VolumesInUse)
548	return nil
549}
550
551// recordNodeStatusEvent records an event of the given type with the given
552// message for the node.
553func (kl *Kubelet) recordNodeStatusEvent(eventType, event string) {
554	klog.V(2).InfoS("Recording event message for node", "node", klog.KRef("", string(kl.nodeName)), "event", event)
555	kl.recorder.Eventf(kl.nodeRef, eventType, event, "Node %s status is now: %s", kl.nodeName, event)
556}
557
558// recordEvent records an event for this node, the Kubelet's nodeRef is passed to the recorder
559func (kl *Kubelet) recordEvent(eventType, event, message string) {
560	kl.recorder.Eventf(kl.nodeRef, eventType, event, message)
561}
562
563// record if node schedulable change.
564func (kl *Kubelet) recordNodeSchedulableEvent(node *v1.Node) error {
565	kl.lastNodeUnschedulableLock.Lock()
566	defer kl.lastNodeUnschedulableLock.Unlock()
567	if kl.lastNodeUnschedulable != node.Spec.Unschedulable {
568		if node.Spec.Unschedulable {
569			kl.recordNodeStatusEvent(v1.EventTypeNormal, events.NodeNotSchedulable)
570		} else {
571			kl.recordNodeStatusEvent(v1.EventTypeNormal, events.NodeSchedulable)
572		}
573		kl.lastNodeUnschedulable = node.Spec.Unschedulable
574	}
575	return nil
576}
577
578// setNodeStatus fills in the Status fields of the given Node, overwriting
579// any fields that are currently set.
580// TODO(madhusudancs): Simplify the logic for setting node conditions and
581// refactor the node status condition code out to a different file.
582func (kl *Kubelet) setNodeStatus(node *v1.Node) {
583	for i, f := range kl.setNodeStatusFuncs {
584		klog.V(5).InfoS("Setting node status condition code", "position", i, "node", klog.KObj(node))
585		if err := f(node); err != nil {
586			klog.ErrorS(err, "Failed to set some node status fields", "node", klog.KObj(node))
587		}
588	}
589}
590
591func (kl *Kubelet) setLastObservedNodeAddresses(addresses []v1.NodeAddress) {
592	kl.lastObservedNodeAddressesMux.Lock()
593	defer kl.lastObservedNodeAddressesMux.Unlock()
594	kl.lastObservedNodeAddresses = addresses
595}
596func (kl *Kubelet) getLastObservedNodeAddresses() []v1.NodeAddress {
597	kl.lastObservedNodeAddressesMux.RLock()
598	defer kl.lastObservedNodeAddressesMux.RUnlock()
599	return kl.lastObservedNodeAddresses
600}
601
602// defaultNodeStatusFuncs is a factory that generates the default set of
603// setNodeStatus funcs
604func (kl *Kubelet) defaultNodeStatusFuncs() []func(*v1.Node) error {
605	// if cloud is not nil, we expect the cloud resource sync manager to exist
606	var nodeAddressesFunc func() ([]v1.NodeAddress, error)
607	if kl.cloud != nil {
608		nodeAddressesFunc = kl.cloudResourceSyncManager.NodeAddresses
609	}
610	var validateHostFunc func() error
611	if kl.appArmorValidator != nil {
612		validateHostFunc = kl.appArmorValidator.ValidateHost
613	}
614	var setters []func(n *v1.Node) error
615	setters = append(setters,
616		nodestatus.NodeAddress(kl.nodeIPs, kl.nodeIPValidator, kl.hostname, kl.hostnameOverridden, kl.externalCloudProvider, kl.cloud, nodeAddressesFunc),
617		nodestatus.MachineInfo(string(kl.nodeName), kl.maxPods, kl.podsPerCore, kl.GetCachedMachineInfo, kl.containerManager.GetCapacity,
618			kl.containerManager.GetDevicePluginResourceCapacity, kl.containerManager.GetNodeAllocatableReservation, kl.recordEvent),
619		nodestatus.VersionInfo(kl.cadvisor.VersionInfo, kl.containerRuntime.Type, kl.containerRuntime.Version),
620		nodestatus.DaemonEndpoints(kl.daemonEndpoints),
621		nodestatus.Images(kl.nodeStatusMaxImages, kl.imageManager.GetImageList),
622		nodestatus.GoRuntime(),
623	)
624	// Volume limits
625	setters = append(setters, nodestatus.VolumeLimits(kl.volumePluginMgr.ListVolumePluginWithLimits))
626
627	setters = append(setters,
628		nodestatus.MemoryPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderMemoryPressure, kl.recordNodeStatusEvent),
629		nodestatus.DiskPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderDiskPressure, kl.recordNodeStatusEvent),
630		nodestatus.PIDPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderPIDPressure, kl.recordNodeStatusEvent),
631		nodestatus.ReadyCondition(kl.clock.Now, kl.runtimeState.runtimeErrors, kl.runtimeState.networkErrors, kl.runtimeState.storageErrors, validateHostFunc, kl.containerManager.Status, kl.shutdownManager.ShutdownStatus, kl.recordNodeStatusEvent),
632		nodestatus.VolumesInUse(kl.volumeManager.ReconcilerStatesHasBeenSynced, kl.volumeManager.GetVolumesInUse),
633		// TODO(mtaufen): I decided not to move this setter for now, since all it does is send an event
634		// and record state back to the Kubelet runtime object. In the future, I'd like to isolate
635		// these side-effects by decoupling the decisions to send events and partial status recording
636		// from the Node setters.
637		kl.recordNodeSchedulableEvent,
638	)
639	return setters
640}
641
642// Validate given node IP belongs to the current host
643func validateNodeIP(nodeIP net.IP) error {
644	// Honor IP limitations set in setNodeStatus()
645	if nodeIP.To4() == nil && nodeIP.To16() == nil {
646		return fmt.Errorf("nodeIP must be a valid IP address")
647	}
648	if nodeIP.IsLoopback() {
649		return fmt.Errorf("nodeIP can't be loopback address")
650	}
651	if nodeIP.IsMulticast() {
652		return fmt.Errorf("nodeIP can't be a multicast address")
653	}
654	if nodeIP.IsLinkLocalUnicast() {
655		return fmt.Errorf("nodeIP can't be a link-local unicast address")
656	}
657	if nodeIP.IsUnspecified() {
658		return fmt.Errorf("nodeIP can't be an all zeros address")
659	}
660
661	addrs, err := net.InterfaceAddrs()
662	if err != nil {
663		return err
664	}
665	for _, addr := range addrs {
666		var ip net.IP
667		switch v := addr.(type) {
668		case *net.IPNet:
669			ip = v.IP
670		case *net.IPAddr:
671			ip = v.IP
672		}
673		if ip != nil && ip.Equal(nodeIP) {
674			return nil
675		}
676	}
677	return fmt.Errorf("node IP: %q not found in the host's network interfaces", nodeIP.String())
678}
679
680// nodeStatusHasChanged compares the original node and current node's status and
681// returns true if any change happens. The heartbeat timestamp is ignored.
682func nodeStatusHasChanged(originalStatus *v1.NodeStatus, status *v1.NodeStatus) bool {
683	if originalStatus == nil && status == nil {
684		return false
685	}
686	if originalStatus == nil || status == nil {
687		return true
688	}
689
690	// Compare node conditions here because we need to ignore the heartbeat timestamp.
691	if nodeConditionsHaveChanged(originalStatus.Conditions, status.Conditions) {
692		return true
693	}
694
695	// Compare other fields of NodeStatus.
696	originalStatusCopy := originalStatus.DeepCopy()
697	statusCopy := status.DeepCopy()
698	originalStatusCopy.Conditions = nil
699	statusCopy.Conditions = nil
700	return !apiequality.Semantic.DeepEqual(originalStatusCopy, statusCopy)
701}
702
703// nodeConditionsHaveChanged compares the original node and current node's
704// conditions and returns true if any change happens. The heartbeat timestamp is
705// ignored.
706func nodeConditionsHaveChanged(originalConditions []v1.NodeCondition, conditions []v1.NodeCondition) bool {
707	if len(originalConditions) != len(conditions) {
708		return true
709	}
710
711	originalConditionsCopy := make([]v1.NodeCondition, 0, len(originalConditions))
712	originalConditionsCopy = append(originalConditionsCopy, originalConditions...)
713	conditionsCopy := make([]v1.NodeCondition, 0, len(conditions))
714	conditionsCopy = append(conditionsCopy, conditions...)
715
716	sort.SliceStable(originalConditionsCopy, func(i, j int) bool { return originalConditionsCopy[i].Type < originalConditionsCopy[j].Type })
717	sort.SliceStable(conditionsCopy, func(i, j int) bool { return conditionsCopy[i].Type < conditionsCopy[j].Type })
718
719	replacedheartbeatTime := metav1.Time{}
720	for i := range conditionsCopy {
721		originalConditionsCopy[i].LastHeartbeatTime = replacedheartbeatTime
722		conditionsCopy[i].LastHeartbeatTime = replacedheartbeatTime
723		if !apiequality.Semantic.DeepEqual(&originalConditionsCopy[i], &conditionsCopy[i]) {
724			return true
725		}
726	}
727	return false
728}
729