1/*
2Copyright 2015 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package kubelet
18
19import (
20	"context"
21	"crypto/tls"
22	"fmt"
23	"math"
24	"net"
25	"net/http"
26	"os"
27	"path"
28	sysruntime "runtime"
29	"sort"
30	"strings"
31	"sync"
32	"sync/atomic"
33	"time"
34
35	"k8s.io/client-go/informers"
36
37	cadvisorapi "github.com/google/cadvisor/info/v1"
38	libcontaineruserns "github.com/opencontainers/runc/libcontainer/userns"
39	"k8s.io/mount-utils"
40	"k8s.io/utils/integer"
41
42	v1 "k8s.io/api/core/v1"
43	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
44	"k8s.io/apimachinery/pkg/fields"
45	"k8s.io/apimachinery/pkg/labels"
46	"k8s.io/apimachinery/pkg/types"
47	"k8s.io/apimachinery/pkg/util/clock"
48	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
49	"k8s.io/apimachinery/pkg/util/sets"
50	"k8s.io/apimachinery/pkg/util/wait"
51	utilfeature "k8s.io/apiserver/pkg/util/feature"
52	clientset "k8s.io/client-go/kubernetes"
53	v1core "k8s.io/client-go/kubernetes/typed/core/v1"
54	corelisters "k8s.io/client-go/listers/core/v1"
55	"k8s.io/client-go/tools/cache"
56	"k8s.io/client-go/tools/record"
57	"k8s.io/client-go/util/certificate"
58	"k8s.io/client-go/util/flowcontrol"
59	cloudprovider "k8s.io/cloud-provider"
60	"k8s.io/component-helpers/apimachinery/lease"
61	internalapi "k8s.io/cri-api/pkg/apis"
62	"k8s.io/klog/v2"
63	pluginwatcherapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
64	statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
65	api "k8s.io/kubernetes/pkg/apis/core"
66	"k8s.io/kubernetes/pkg/features"
67	kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
68	"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
69	"k8s.io/kubernetes/pkg/kubelet/cadvisor"
70	kubeletcertificate "k8s.io/kubernetes/pkg/kubelet/certificate"
71	"k8s.io/kubernetes/pkg/kubelet/cloudresource"
72	"k8s.io/kubernetes/pkg/kubelet/cm"
73	"k8s.io/kubernetes/pkg/kubelet/config"
74	"k8s.io/kubernetes/pkg/kubelet/configmap"
75	kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
76	"k8s.io/kubernetes/pkg/kubelet/cri/remote"
77	"k8s.io/kubernetes/pkg/kubelet/cri/streaming"
78	"k8s.io/kubernetes/pkg/kubelet/events"
79	"k8s.io/kubernetes/pkg/kubelet/eviction"
80	"k8s.io/kubernetes/pkg/kubelet/images"
81	"k8s.io/kubernetes/pkg/kubelet/kubeletconfig"
82	"k8s.io/kubernetes/pkg/kubelet/kuberuntime"
83	"k8s.io/kubernetes/pkg/kubelet/legacy"
84	"k8s.io/kubernetes/pkg/kubelet/lifecycle"
85	"k8s.io/kubernetes/pkg/kubelet/logs"
86	"k8s.io/kubernetes/pkg/kubelet/metrics"
87	"k8s.io/kubernetes/pkg/kubelet/metrics/collectors"
88	"k8s.io/kubernetes/pkg/kubelet/network/dns"
89	"k8s.io/kubernetes/pkg/kubelet/nodeshutdown"
90	oomwatcher "k8s.io/kubernetes/pkg/kubelet/oom"
91	"k8s.io/kubernetes/pkg/kubelet/pleg"
92	"k8s.io/kubernetes/pkg/kubelet/pluginmanager"
93	plugincache "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
94	kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
95	"k8s.io/kubernetes/pkg/kubelet/preemption"
96	"k8s.io/kubernetes/pkg/kubelet/prober"
97	proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
98	"k8s.io/kubernetes/pkg/kubelet/runtimeclass"
99	"k8s.io/kubernetes/pkg/kubelet/secret"
100	"k8s.io/kubernetes/pkg/kubelet/server"
101	servermetrics "k8s.io/kubernetes/pkg/kubelet/server/metrics"
102	serverstats "k8s.io/kubernetes/pkg/kubelet/server/stats"
103	"k8s.io/kubernetes/pkg/kubelet/stats"
104	"k8s.io/kubernetes/pkg/kubelet/status"
105	"k8s.io/kubernetes/pkg/kubelet/sysctl"
106	"k8s.io/kubernetes/pkg/kubelet/token"
107	kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
108	"k8s.io/kubernetes/pkg/kubelet/util"
109	"k8s.io/kubernetes/pkg/kubelet/util/format"
110	"k8s.io/kubernetes/pkg/kubelet/util/manager"
111	"k8s.io/kubernetes/pkg/kubelet/util/queue"
112	"k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
113	"k8s.io/kubernetes/pkg/kubelet/volumemanager"
114	"k8s.io/kubernetes/pkg/security/apparmor"
115	sysctlwhitelist "k8s.io/kubernetes/pkg/security/podsecuritypolicy/sysctl"
116	"k8s.io/kubernetes/pkg/util/oom"
117	"k8s.io/kubernetes/pkg/util/selinux"
118	"k8s.io/kubernetes/pkg/volume"
119	"k8s.io/kubernetes/pkg/volume/csi"
120	"k8s.io/kubernetes/pkg/volume/util/hostutil"
121	"k8s.io/kubernetes/pkg/volume/util/subpath"
122	"k8s.io/kubernetes/pkg/volume/util/volumepathhandler"
123)
124
125const (
126	// Max amount of time to wait for the container runtime to come up.
127	maxWaitForContainerRuntime = 30 * time.Second
128
129	// nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed.
130	nodeStatusUpdateRetry = 5
131
132	// ContainerLogsDir is the location of container logs.
133	ContainerLogsDir = "/var/log/containers"
134
135	// MaxContainerBackOff is the max backoff period, exported for the e2e test
136	MaxContainerBackOff = 300 * time.Second
137
138	// Period for performing global cleanup tasks.
139	housekeepingPeriod = time.Second * 2
140
141	// Duration at which housekeeping failed to satisfy the invariant that
142	// housekeeping should be fast to avoid blocking pod config (while
143	// housekeeping is running no new pods are started or deleted).
144	housekeepingWarningDuration = time.Second * 15
145
146	// Period for performing eviction monitoring.
147	// ensure this is kept in sync with internal cadvisor housekeeping.
148	evictionMonitoringPeriod = time.Second * 10
149
150	// The path in containers' filesystems where the hosts file is mounted.
151	linuxEtcHostsPath   = "/etc/hosts"
152	windowsEtcHostsPath = "C:\\Windows\\System32\\drivers\\etc\\hosts"
153
154	// Capacity of the channel for receiving pod lifecycle events. This number
155	// is a bit arbitrary and may be adjusted in the future.
156	plegChannelCapacity = 1000
157
158	// Generic PLEG relies on relisting for discovering container events.
159	// A longer period means that kubelet will take longer to detect container
160	// changes and to update pod status. On the other hand, a shorter period
161	// will cause more frequent relisting (e.g., container runtime operations),
162	// leading to higher cpu usage.
163	// Note that even though we set the period to 1s, the relisting itself can
164	// take more than 1s to finish if the container runtime responds slowly
165	// and/or when there are many container changes in one cycle.
166	plegRelistPeriod = time.Second * 1
167
168	// backOffPeriod is the period to back off when pod syncing results in an
169	// error. It is also used as the base period for the exponential backoff
170	// container restarts and image pulls.
171	backOffPeriod = time.Second * 10
172
173	// ContainerGCPeriod is the period for performing container garbage collection.
174	ContainerGCPeriod = time.Minute
175	// ImageGCPeriod is the period for performing image garbage collection.
176	ImageGCPeriod = 5 * time.Minute
177
178	// Minimum number of dead containers to keep in a pod
179	minDeadContainerInPod = 1
180
181	// nodeLeaseRenewIntervalFraction is the fraction of lease duration to renew the lease
182	nodeLeaseRenewIntervalFraction = 0.25
183)
184
185var etcHostsPath = getContainerEtcHostsPath()
186
187func getContainerEtcHostsPath() string {
188	if sysruntime.GOOS == "windows" {
189		return windowsEtcHostsPath
190	}
191	return linuxEtcHostsPath
192}
193
194// SyncHandler is an interface implemented by Kubelet, for testability
195type SyncHandler interface {
196	HandlePodAdditions(pods []*v1.Pod)
197	HandlePodUpdates(pods []*v1.Pod)
198	HandlePodRemoves(pods []*v1.Pod)
199	HandlePodReconcile(pods []*v1.Pod)
200	HandlePodSyncs(pods []*v1.Pod)
201	HandlePodCleanups() error
202}
203
204// Option is a functional option type for Kubelet
205type Option func(*Kubelet)
206
207// Bootstrap is a bootstrapping interface for kubelet, targets the initialization protocol
208type Bootstrap interface {
209	GetConfiguration() kubeletconfiginternal.KubeletConfiguration
210	BirthCry()
211	StartGarbageCollection()
212	ListenAndServe(kubeCfg *kubeletconfiginternal.KubeletConfiguration, tlsOptions *server.TLSOptions, auth server.AuthInterface)
213	ListenAndServeReadOnly(address net.IP, port uint)
214	ListenAndServePodResources()
215	Run(<-chan kubetypes.PodUpdate)
216	RunOnce(<-chan kubetypes.PodUpdate) ([]RunPodResult, error)
217}
218
219// Dependencies is a bin for things we might consider "injected dependencies" -- objects constructed
220// at runtime that are necessary for running the Kubelet. This is a temporary solution for grouping
221// these objects while we figure out a more comprehensive dependency injection story for the Kubelet.
222type Dependencies struct {
223	Options []Option
224
225	// Injected Dependencies
226	Auth                    server.AuthInterface
227	CAdvisorInterface       cadvisor.Interface
228	Cloud                   cloudprovider.Interface
229	ContainerManager        cm.ContainerManager
230	DockerOptions           *DockerOptions
231	EventClient             v1core.EventsGetter
232	HeartbeatClient         clientset.Interface
233	OnHeartbeatFailure      func()
234	KubeClient              clientset.Interface
235	Mounter                 mount.Interface
236	HostUtil                hostutil.HostUtils
237	OOMAdjuster             *oom.OOMAdjuster
238	OSInterface             kubecontainer.OSInterface
239	PodConfig               *config.PodConfig
240	Recorder                record.EventRecorder
241	Subpather               subpath.Interface
242	VolumePlugins           []volume.VolumePlugin
243	DynamicPluginProber     volume.DynamicPluginProber
244	TLSOptions              *server.TLSOptions
245	KubeletConfigController *kubeletconfig.Controller
246	RemoteRuntimeService    internalapi.RuntimeService
247	RemoteImageService      internalapi.ImageManagerService
248	dockerLegacyService     legacy.DockerLegacyService
249	// remove it after cadvisor.UsingLegacyCadvisorStats dropped.
250	useLegacyCadvisorStats bool
251}
252
253// DockerOptions contains docker specific configuration. Importantly, since it
254// lives outside of `dockershim`, it should not depend on the `docker/docker`
255// client library.
256type DockerOptions struct {
257	DockerEndpoint            string
258	RuntimeRequestTimeout     time.Duration
259	ImagePullProgressDeadline time.Duration
260}
261
262// makePodSourceConfig creates a config.PodConfig from the given
263// KubeletConfiguration or returns an error.
264func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, nodeHasSynced func() bool) (*config.PodConfig, error) {
265	manifestURLHeader := make(http.Header)
266	if len(kubeCfg.StaticPodURLHeader) > 0 {
267		for k, v := range kubeCfg.StaticPodURLHeader {
268			for i := range v {
269				manifestURLHeader.Add(k, v[i])
270			}
271		}
272	}
273
274	// source of all configuration
275	cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)
276
277	// define file config source
278	if kubeCfg.StaticPodPath != "" {
279		klog.InfoS("Adding static pod path", "path", kubeCfg.StaticPodPath)
280		config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
281	}
282
283	// define url config source
284	if kubeCfg.StaticPodURL != "" {
285		klog.InfoS("Adding pod URL with HTTP header", "URL", kubeCfg.StaticPodURL, "header", manifestURLHeader)
286		config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
287	}
288
289	if kubeDeps.KubeClient != nil {
290		klog.InfoS("Adding apiserver pod source")
291		config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, nodeHasSynced, cfg.Channel(kubetypes.ApiserverSource))
292	}
293	return cfg, nil
294}
295
296// PreInitRuntimeService will init runtime service before RunKubelet.
297func PreInitRuntimeService(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
298	kubeDeps *Dependencies,
299	crOptions *config.ContainerRuntimeOptions,
300	containerRuntime string,
301	runtimeCgroups string,
302	remoteRuntimeEndpoint string,
303	remoteImageEndpoint string,
304	nonMasqueradeCIDR string) error {
305	if remoteRuntimeEndpoint != "" {
306		// remoteImageEndpoint is same as remoteRuntimeEndpoint if not explicitly specified
307		if remoteImageEndpoint == "" {
308			remoteImageEndpoint = remoteRuntimeEndpoint
309		}
310	}
311
312	switch containerRuntime {
313	case kubetypes.DockerContainerRuntime:
314		klog.InfoS("Using dockershim is deprecated, please consider using a full-fledged CRI implementation")
315		if err := runDockershim(
316			kubeCfg,
317			kubeDeps,
318			crOptions,
319			runtimeCgroups,
320			remoteRuntimeEndpoint,
321			remoteImageEndpoint,
322			nonMasqueradeCIDR,
323		); err != nil {
324			return err
325		}
326	case kubetypes.RemoteContainerRuntime:
327		// No-op.
328		break
329	default:
330		return fmt.Errorf("unsupported CRI runtime: %q", containerRuntime)
331	}
332
333	var err error
334	if kubeDeps.RemoteRuntimeService, err = remote.NewRemoteRuntimeService(remoteRuntimeEndpoint, kubeCfg.RuntimeRequestTimeout.Duration); err != nil {
335		return err
336	}
337	if kubeDeps.RemoteImageService, err = remote.NewRemoteImageService(remoteImageEndpoint, kubeCfg.RuntimeRequestTimeout.Duration); err != nil {
338		return err
339	}
340
341	kubeDeps.useLegacyCadvisorStats = cadvisor.UsingLegacyCadvisorStats(containerRuntime, remoteRuntimeEndpoint)
342
343	return nil
344}
345
346// NewMainKubelet instantiates a new Kubelet object along with all the required internal modules.
347// No initialization of Kubelet and its modules should happen here.
348func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
349	kubeDeps *Dependencies,
350	crOptions *config.ContainerRuntimeOptions,
351	containerRuntime string,
352	hostname string,
353	hostnameOverridden bool,
354	nodeName types.NodeName,
355	nodeIPs []net.IP,
356	providerID string,
357	cloudProvider string,
358	certDirectory string,
359	rootDirectory string,
360	imageCredentialProviderConfigFile string,
361	imageCredentialProviderBinDir string,
362	registerNode bool,
363	registerWithTaints []api.Taint,
364	allowedUnsafeSysctls []string,
365	experimentalMounterPath string,
366	kernelMemcgNotification bool,
367	experimentalCheckNodeCapabilitiesBeforeMount bool,
368	experimentalNodeAllocatableIgnoreEvictionThreshold bool,
369	minimumGCAge metav1.Duration,
370	maxPerPodContainerCount int32,
371	maxContainerCount int32,
372	masterServiceNamespace string,
373	registerSchedulable bool,
374	keepTerminatedPodVolumes bool,
375	nodeLabels map[string]string,
376	seccompProfileRoot string,
377	nodeStatusMaxImages int32,
378	seccompDefault bool,
379) (*Kubelet, error) {
380	if rootDirectory == "" {
381		return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
382	}
383	if kubeCfg.SyncFrequency.Duration <= 0 {
384		return nil, fmt.Errorf("invalid sync frequency %d", kubeCfg.SyncFrequency.Duration)
385	}
386
387	if kubeCfg.MakeIPTablesUtilChains {
388		if kubeCfg.IPTablesMasqueradeBit > 31 || kubeCfg.IPTablesMasqueradeBit < 0 {
389			return nil, fmt.Errorf("iptables-masquerade-bit is not valid. Must be within [0, 31]")
390		}
391		if kubeCfg.IPTablesDropBit > 31 || kubeCfg.IPTablesDropBit < 0 {
392			return nil, fmt.Errorf("iptables-drop-bit is not valid. Must be within [0, 31]")
393		}
394		if kubeCfg.IPTablesDropBit == kubeCfg.IPTablesMasqueradeBit {
395			return nil, fmt.Errorf("iptables-masquerade-bit and iptables-drop-bit must be different")
396		}
397	}
398
399	if utilfeature.DefaultFeatureGate.Enabled(features.DisableCloudProviders) && cloudprovider.IsDeprecatedInternal(cloudProvider) {
400		cloudprovider.DisableWarningForProvider(cloudProvider)
401		return nil, fmt.Errorf("cloud provider %q was specified, but built-in cloud providers are disabled. Please set --cloud-provider=external and migrate to an external cloud provider", cloudProvider)
402	}
403
404	var nodeHasSynced cache.InformerSynced
405	var nodeLister corelisters.NodeLister
406
407	// If kubeClient == nil, we are running in standalone mode (i.e. no API servers)
408	// If not nil, we are running as part of a cluster and should sync w/API
409	if kubeDeps.KubeClient != nil {
410		kubeInformers := informers.NewSharedInformerFactoryWithOptions(kubeDeps.KubeClient, 0, informers.WithTweakListOptions(func(options *metav1.ListOptions) {
411			options.FieldSelector = fields.Set{metav1.ObjectNameField: string(nodeName)}.String()
412		}))
413		nodeLister = kubeInformers.Core().V1().Nodes().Lister()
414		nodeHasSynced = func() bool {
415			return kubeInformers.Core().V1().Nodes().Informer().HasSynced()
416		}
417		kubeInformers.Start(wait.NeverStop)
418		klog.InfoS("Attempting to sync node with API server")
419	} else {
420		// we don't have a client to sync!
421		nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
422		nodeLister = corelisters.NewNodeLister(nodeIndexer)
423		nodeHasSynced = func() bool { return true }
424		klog.InfoS("Kubelet is running in standalone mode, will skip API server sync")
425	}
426
427	if kubeDeps.PodConfig == nil {
428		var err error
429		kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, nodeHasSynced)
430		if err != nil {
431			return nil, err
432		}
433	}
434
435	containerGCPolicy := kubecontainer.GCPolicy{
436		MinAge:             minimumGCAge.Duration,
437		MaxPerPodContainer: int(maxPerPodContainerCount),
438		MaxContainers:      int(maxContainerCount),
439	}
440
441	daemonEndpoints := &v1.NodeDaemonEndpoints{
442		KubeletEndpoint: v1.DaemonEndpoint{Port: kubeCfg.Port},
443	}
444
445	imageGCPolicy := images.ImageGCPolicy{
446		MinAge:               kubeCfg.ImageMinimumGCAge.Duration,
447		HighThresholdPercent: int(kubeCfg.ImageGCHighThresholdPercent),
448		LowThresholdPercent:  int(kubeCfg.ImageGCLowThresholdPercent),
449	}
450
451	enforceNodeAllocatable := kubeCfg.EnforceNodeAllocatable
452	if experimentalNodeAllocatableIgnoreEvictionThreshold {
453		// Do not provide kubeCfg.EnforceNodeAllocatable to eviction threshold parsing if we are not enforcing Evictions
454		enforceNodeAllocatable = []string{}
455	}
456	thresholds, err := eviction.ParseThresholdConfig(enforceNodeAllocatable, kubeCfg.EvictionHard, kubeCfg.EvictionSoft, kubeCfg.EvictionSoftGracePeriod, kubeCfg.EvictionMinimumReclaim)
457	if err != nil {
458		return nil, err
459	}
460	evictionConfig := eviction.Config{
461		PressureTransitionPeriod: kubeCfg.EvictionPressureTransitionPeriod.Duration,
462		MaxPodGracePeriodSeconds: int64(kubeCfg.EvictionMaxPodGracePeriod),
463		Thresholds:               thresholds,
464		KernelMemcgNotification:  kernelMemcgNotification,
465		PodCgroupRoot:            kubeDeps.ContainerManager.GetPodCgroupRoot(),
466	}
467
468	var serviceLister corelisters.ServiceLister
469	var serviceHasSynced cache.InformerSynced
470	if kubeDeps.KubeClient != nil {
471		kubeInformers := informers.NewSharedInformerFactory(kubeDeps.KubeClient, 0)
472		serviceLister = kubeInformers.Core().V1().Services().Lister()
473		serviceHasSynced = kubeInformers.Core().V1().Services().Informer().HasSynced
474		kubeInformers.Start(wait.NeverStop)
475	} else {
476		serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
477		serviceLister = corelisters.NewServiceLister(serviceIndexer)
478		serviceHasSynced = func() bool { return true }
479	}
480
481	// construct a node reference used for events
482	nodeRef := &v1.ObjectReference{
483		Kind:      "Node",
484		Name:      string(nodeName),
485		UID:       types.UID(nodeName),
486		Namespace: "",
487	}
488
489	oomWatcher, err := oomwatcher.NewWatcher(kubeDeps.Recorder)
490	if err != nil {
491		if libcontaineruserns.RunningInUserNS() {
492			if utilfeature.DefaultFeatureGate.Enabled(features.KubeletInUserNamespace) {
493				// oomwatcher.NewWatcher returns "open /dev/kmsg: operation not permitted" error,
494				// when running in a user namespace with sysctl value `kernel.dmesg_restrict=1`.
495				klog.V(2).InfoS("Failed to create an oomWatcher (running in UserNS, ignoring)", "err", err)
496				oomWatcher = nil
497			} else {
498				klog.ErrorS(err, "Failed to create an oomWatcher (running in UserNS, Hint: enable KubeletInUserNamespace feature flag to ignore the error)")
499				return nil, err
500			}
501		} else {
502			return nil, err
503		}
504	}
505
506	clusterDNS := make([]net.IP, 0, len(kubeCfg.ClusterDNS))
507	for _, ipEntry := range kubeCfg.ClusterDNS {
508		ip := net.ParseIP(ipEntry)
509		if ip == nil {
510			klog.InfoS("Invalid clusterDNS IP", "IP", ipEntry)
511		} else {
512			clusterDNS = append(clusterDNS, ip)
513		}
514	}
515	httpClient := &http.Client{}
516
517	klet := &Kubelet{
518		hostname:                                hostname,
519		hostnameOverridden:                      hostnameOverridden,
520		nodeName:                                nodeName,
521		kubeClient:                              kubeDeps.KubeClient,
522		heartbeatClient:                         kubeDeps.HeartbeatClient,
523		onRepeatedHeartbeatFailure:              kubeDeps.OnHeartbeatFailure,
524		rootDirectory:                           rootDirectory,
525		resyncInterval:                          kubeCfg.SyncFrequency.Duration,
526		sourcesReady:                            config.NewSourcesReady(kubeDeps.PodConfig.SeenAllSources),
527		registerNode:                            registerNode,
528		registerWithTaints:                      registerWithTaints,
529		registerSchedulable:                     registerSchedulable,
530		dnsConfigurer:                           dns.NewConfigurer(kubeDeps.Recorder, nodeRef, nodeIPs, clusterDNS, kubeCfg.ClusterDomain, kubeCfg.ResolverConfig),
531		serviceLister:                           serviceLister,
532		serviceHasSynced:                        serviceHasSynced,
533		nodeLister:                              nodeLister,
534		nodeHasSynced:                           nodeHasSynced,
535		masterServiceNamespace:                  masterServiceNamespace,
536		streamingConnectionIdleTimeout:          kubeCfg.StreamingConnectionIdleTimeout.Duration,
537		recorder:                                kubeDeps.Recorder,
538		cadvisor:                                kubeDeps.CAdvisorInterface,
539		cloud:                                   kubeDeps.Cloud,
540		externalCloudProvider:                   cloudprovider.IsExternal(cloudProvider),
541		providerID:                              providerID,
542		nodeRef:                                 nodeRef,
543		nodeLabels:                              nodeLabels,
544		nodeStatusUpdateFrequency:               kubeCfg.NodeStatusUpdateFrequency.Duration,
545		nodeStatusReportFrequency:               kubeCfg.NodeStatusReportFrequency.Duration,
546		os:                                      kubeDeps.OSInterface,
547		oomWatcher:                              oomWatcher,
548		cgroupsPerQOS:                           kubeCfg.CgroupsPerQOS,
549		cgroupRoot:                              kubeCfg.CgroupRoot,
550		mounter:                                 kubeDeps.Mounter,
551		hostutil:                                kubeDeps.HostUtil,
552		subpather:                               kubeDeps.Subpather,
553		maxPods:                                 int(kubeCfg.MaxPods),
554		podsPerCore:                             int(kubeCfg.PodsPerCore),
555		syncLoopMonitor:                         atomic.Value{},
556		daemonEndpoints:                         daemonEndpoints,
557		containerManager:                        kubeDeps.ContainerManager,
558		containerRuntimeName:                    containerRuntime,
559		nodeIPs:                                 nodeIPs,
560		nodeIPValidator:                         validateNodeIP,
561		clock:                                   clock.RealClock{},
562		enableControllerAttachDetach:            kubeCfg.EnableControllerAttachDetach,
563		makeIPTablesUtilChains:                  kubeCfg.MakeIPTablesUtilChains,
564		iptablesMasqueradeBit:                   int(kubeCfg.IPTablesMasqueradeBit),
565		iptablesDropBit:                         int(kubeCfg.IPTablesDropBit),
566		experimentalHostUserNamespaceDefaulting: utilfeature.DefaultFeatureGate.Enabled(features.ExperimentalHostUserNamespaceDefaultingGate),
567		keepTerminatedPodVolumes:                keepTerminatedPodVolumes,
568		nodeStatusMaxImages:                     nodeStatusMaxImages,
569		lastContainerStartedTime:                newTimeCache(),
570	}
571
572	if klet.cloud != nil {
573		klet.cloudResourceSyncManager = cloudresource.NewSyncManager(klet.cloud, nodeName, klet.nodeStatusUpdateFrequency)
574	}
575
576	var secretManager secret.Manager
577	var configMapManager configmap.Manager
578	switch kubeCfg.ConfigMapAndSecretChangeDetectionStrategy {
579	case kubeletconfiginternal.WatchChangeDetectionStrategy:
580		secretManager = secret.NewWatchingSecretManager(kubeDeps.KubeClient, klet.resyncInterval)
581		configMapManager = configmap.NewWatchingConfigMapManager(kubeDeps.KubeClient, klet.resyncInterval)
582	case kubeletconfiginternal.TTLCacheChangeDetectionStrategy:
583		secretManager = secret.NewCachingSecretManager(
584			kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
585		configMapManager = configmap.NewCachingConfigMapManager(
586			kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
587	case kubeletconfiginternal.GetChangeDetectionStrategy:
588		secretManager = secret.NewSimpleSecretManager(kubeDeps.KubeClient)
589		configMapManager = configmap.NewSimpleConfigMapManager(kubeDeps.KubeClient)
590	default:
591		return nil, fmt.Errorf("unknown configmap and secret manager mode: %v", kubeCfg.ConfigMapAndSecretChangeDetectionStrategy)
592	}
593
594	klet.secretManager = secretManager
595	klet.configMapManager = configMapManager
596
597	if klet.experimentalHostUserNamespaceDefaulting {
598		klog.InfoS("Experimental host user namespace defaulting is enabled")
599	}
600
601	machineInfo, err := klet.cadvisor.MachineInfo()
602	if err != nil {
603		return nil, err
604	}
605	// Avoid collector collects it as a timestamped metric
606	// See PR #95210 and #97006 for more details.
607	machineInfo.Timestamp = time.Time{}
608	klet.setCachedMachineInfo(machineInfo)
609
610	imageBackOff := flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
611
612	klet.livenessManager = proberesults.NewManager()
613	klet.readinessManager = proberesults.NewManager()
614	klet.startupManager = proberesults.NewManager()
615	klet.podCache = kubecontainer.NewCache()
616
617	// podManager is also responsible for keeping secretManager and configMapManager contents up-to-date.
618	mirrorPodClient := kubepod.NewBasicMirrorClient(klet.kubeClient, string(nodeName), nodeLister)
619	klet.podManager = kubepod.NewBasicPodManager(mirrorPodClient, secretManager, configMapManager)
620
621	klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet)
622
623	klet.resourceAnalyzer = serverstats.NewResourceAnalyzer(klet, kubeCfg.VolumeStatsAggPeriod.Duration, kubeDeps.Recorder)
624
625	klet.dockerLegacyService = kubeDeps.dockerLegacyService
626	klet.runtimeService = kubeDeps.RemoteRuntimeService
627
628	if kubeDeps.KubeClient != nil {
629		klet.runtimeClassManager = runtimeclass.NewManager(kubeDeps.KubeClient)
630	}
631
632	if containerRuntime == kubetypes.RemoteContainerRuntime {
633		// setup containerLogManager for CRI container runtime
634		containerLogManager, err := logs.NewContainerLogManager(
635			klet.runtimeService,
636			kubeDeps.OSInterface,
637			kubeCfg.ContainerLogMaxSize,
638			int(kubeCfg.ContainerLogMaxFiles),
639		)
640		if err != nil {
641			return nil, fmt.Errorf("failed to initialize container log manager: %v", err)
642		}
643		klet.containerLogManager = containerLogManager
644	} else {
645		klet.containerLogManager = logs.NewStubContainerLogManager()
646	}
647
648	klet.reasonCache = NewReasonCache()
649	klet.workQueue = queue.NewBasicWorkQueue(klet.clock)
650	klet.podWorkers = newPodWorkers(
651		klet.syncPod,
652		klet.syncTerminatingPod,
653		klet.syncTerminatedPod,
654
655		kubeDeps.Recorder,
656		klet.workQueue,
657		klet.resyncInterval,
658		backOffPeriod,
659		klet.podCache,
660	)
661
662	runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
663		kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
664		klet.livenessManager,
665		klet.readinessManager,
666		klet.startupManager,
667		seccompProfileRoot,
668		machineInfo,
669		klet.podWorkers,
670		kubeDeps.OSInterface,
671		klet,
672		httpClient,
673		imageBackOff,
674		kubeCfg.SerializeImagePulls,
675		float32(kubeCfg.RegistryPullQPS),
676		int(kubeCfg.RegistryBurst),
677		imageCredentialProviderConfigFile,
678		imageCredentialProviderBinDir,
679		kubeCfg.CPUCFSQuota,
680		kubeCfg.CPUCFSQuotaPeriod,
681		kubeDeps.RemoteRuntimeService,
682		kubeDeps.RemoteImageService,
683		kubeDeps.ContainerManager.InternalContainerLifecycle(),
684		kubeDeps.dockerLegacyService,
685		klet.containerLogManager,
686		klet.runtimeClassManager,
687		seccompDefault,
688		kubeCfg.MemorySwap.SwapBehavior,
689		kubeDeps.ContainerManager.GetNodeAllocatableAbsolute,
690		*kubeCfg.MemoryThrottlingFactor,
691	)
692	if err != nil {
693		return nil, err
694	}
695	klet.containerRuntime = runtime
696	klet.streamingRuntime = runtime
697	klet.runner = runtime
698
699	runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)
700	if err != nil {
701		return nil, err
702	}
703	klet.runtimeCache = runtimeCache
704
705	// common provider to get host file system usage associated with a pod managed by kubelet
706	hostStatsProvider := stats.NewHostStatsProvider(kubecontainer.RealOS{}, func(podUID types.UID) (string, bool) {
707		return getEtcHostsPath(klet.getPodDir(podUID)), klet.containerRuntime.SupportsSingleFileMapping()
708	})
709	if kubeDeps.useLegacyCadvisorStats {
710		klet.StatsProvider = stats.NewCadvisorStatsProvider(
711			klet.cadvisor,
712			klet.resourceAnalyzer,
713			klet.podManager,
714			klet.runtimeCache,
715			klet.containerRuntime,
716			klet.statusManager,
717			hostStatsProvider)
718	} else {
719		klet.StatsProvider = stats.NewCRIStatsProvider(
720			klet.cadvisor,
721			klet.resourceAnalyzer,
722			klet.podManager,
723			klet.runtimeCache,
724			kubeDeps.RemoteRuntimeService,
725			kubeDeps.RemoteImageService,
726			hostStatsProvider,
727			utilfeature.DefaultFeatureGate.Enabled(features.DisableAcceleratorUsageMetrics))
728	}
729
730	klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{})
731	klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime)
732	klet.runtimeState.addHealthCheck("PLEG", klet.pleg.Healthy)
733	if _, err := klet.updatePodCIDR(kubeCfg.PodCIDR); err != nil {
734		klog.ErrorS(err, "Pod CIDR update failed")
735	}
736
737	// setup containerGC
738	containerGC, err := kubecontainer.NewContainerGC(klet.containerRuntime, containerGCPolicy, klet.sourcesReady)
739	if err != nil {
740		return nil, err
741	}
742	klet.containerGC = containerGC
743	klet.containerDeletor = newPodContainerDeletor(klet.containerRuntime, integer.IntMax(containerGCPolicy.MaxPerPodContainer, minDeadContainerInPod))
744
745	// setup imageManager
746	imageManager, err := images.NewImageGCManager(klet.containerRuntime, klet.StatsProvider, kubeDeps.Recorder, nodeRef, imageGCPolicy, crOptions.PodSandboxImage)
747	if err != nil {
748		return nil, fmt.Errorf("failed to initialize image manager: %v", err)
749	}
750	klet.imageManager = imageManager
751
752	if kubeCfg.ServerTLSBootstrap && kubeDeps.TLSOptions != nil && utilfeature.DefaultFeatureGate.Enabled(features.RotateKubeletServerCertificate) {
753		klet.serverCertificateManager, err = kubeletcertificate.NewKubeletServerCertificateManager(klet.kubeClient, kubeCfg, klet.nodeName, klet.getLastObservedNodeAddresses, certDirectory)
754		if err != nil {
755			return nil, fmt.Errorf("failed to initialize certificate manager: %v", err)
756		}
757		kubeDeps.TLSOptions.Config.GetCertificate = func(*tls.ClientHelloInfo) (*tls.Certificate, error) {
758			cert := klet.serverCertificateManager.Current()
759			if cert == nil {
760				return nil, fmt.Errorf("no serving certificate available for the kubelet")
761			}
762			return cert, nil
763		}
764	}
765
766	klet.probeManager = prober.NewManager(
767		klet.statusManager,
768		klet.livenessManager,
769		klet.readinessManager,
770		klet.startupManager,
771		klet.runner,
772		kubeDeps.Recorder)
773
774	tokenManager := token.NewManager(kubeDeps.KubeClient)
775
776	// NewInitializedVolumePluginMgr initializes some storageErrors on the Kubelet runtimeState (in csi_plugin.go init)
777	// which affects node ready status. This function must be called before Kubelet is initialized so that the Node
778	// ReadyState is accurate with the storage state.
779	klet.volumePluginMgr, err =
780		NewInitializedVolumePluginMgr(klet, secretManager, configMapManager, tokenManager, kubeDeps.VolumePlugins, kubeDeps.DynamicPluginProber)
781	if err != nil {
782		return nil, err
783	}
784	klet.pluginManager = pluginmanager.NewPluginManager(
785		klet.getPluginsRegistrationDir(), /* sockDir */
786		kubeDeps.Recorder,
787	)
788
789	// If the experimentalMounterPathFlag is set, we do not want to
790	// check node capabilities since the mount path is not the default
791	if len(experimentalMounterPath) != 0 {
792		experimentalCheckNodeCapabilitiesBeforeMount = false
793		// Replace the nameserver in containerized-mounter's rootfs/etc/resolve.conf with kubelet.ClusterDNS
794		// so that service name could be resolved
795		klet.dnsConfigurer.SetupDNSinContainerizedMounter(experimentalMounterPath)
796	}
797
798	// setup volumeManager
799	klet.volumeManager = volumemanager.NewVolumeManager(
800		kubeCfg.EnableControllerAttachDetach,
801		nodeName,
802		klet.podManager,
803		klet.podWorkers,
804		klet.kubeClient,
805		klet.volumePluginMgr,
806		klet.containerRuntime,
807		kubeDeps.Mounter,
808		kubeDeps.HostUtil,
809		klet.getPodsDir(),
810		kubeDeps.Recorder,
811		experimentalCheckNodeCapabilitiesBeforeMount,
812		keepTerminatedPodVolumes,
813		volumepathhandler.NewBlockVolumePathHandler())
814
815	klet.backOff = flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
816
817	// setup eviction manager
818	evictionManager, evictionAdmitHandler := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, killPodNow(klet.podWorkers, kubeDeps.Recorder), klet.podManager.GetMirrorPodByPod, klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock)
819
820	klet.evictionManager = evictionManager
821	klet.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler)
822
823	// Safe, whitelisted sysctls can always be used as unsafe sysctls in the spec.
824	// Hence, we concatenate those two lists.
825	safeAndUnsafeSysctls := append(sysctlwhitelist.SafeSysctlWhitelist(), allowedUnsafeSysctls...)
826	sysctlsWhitelist, err := sysctl.NewWhitelist(safeAndUnsafeSysctls)
827	if err != nil {
828		return nil, err
829	}
830	klet.admitHandlers.AddPodAdmitHandler(sysctlsWhitelist)
831
832	// enable active deadline handler
833	activeDeadlineHandler, err := newActiveDeadlineHandler(klet.statusManager, kubeDeps.Recorder, klet.clock)
834	if err != nil {
835		return nil, err
836	}
837	klet.AddPodSyncLoopHandler(activeDeadlineHandler)
838	klet.AddPodSyncHandler(activeDeadlineHandler)
839
840	klet.admitHandlers.AddPodAdmitHandler(klet.containerManager.GetAllocateResourcesPodAdmitHandler())
841
842	criticalPodAdmissionHandler := preemption.NewCriticalPodAdmissionHandler(klet.GetActivePods, killPodNow(klet.podWorkers, kubeDeps.Recorder), kubeDeps.Recorder)
843	klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, criticalPodAdmissionHandler, klet.containerManager.UpdatePluginResources))
844	// apply functional Option's
845	for _, opt := range kubeDeps.Options {
846		opt(klet)
847	}
848
849	if sysruntime.GOOS == "linux" {
850		// AppArmor is a Linux kernel security module and it does not support other operating systems.
851		klet.appArmorValidator = apparmor.NewValidator(containerRuntime)
852		klet.softAdmitHandlers.AddPodAdmitHandler(lifecycle.NewAppArmorAdmitHandler(klet.appArmorValidator))
853	}
854	klet.softAdmitHandlers.AddPodAdmitHandler(lifecycle.NewNoNewPrivsAdmitHandler(klet.containerRuntime))
855	klet.softAdmitHandlers.AddPodAdmitHandler(lifecycle.NewProcMountAdmitHandler(klet.containerRuntime))
856
857	leaseDuration := time.Duration(kubeCfg.NodeLeaseDurationSeconds) * time.Second
858	renewInterval := time.Duration(float64(leaseDuration) * nodeLeaseRenewIntervalFraction)
859	klet.nodeLeaseController = lease.NewController(
860		klet.clock,
861		klet.heartbeatClient,
862		string(klet.nodeName),
863		kubeCfg.NodeLeaseDurationSeconds,
864		klet.onRepeatedHeartbeatFailure,
865		renewInterval,
866		v1.NamespaceNodeLease,
867		util.SetNodeOwnerFunc(klet.heartbeatClient, string(klet.nodeName)))
868
869	// setup node shutdown manager
870	shutdownManager, shutdownAdmitHandler := nodeshutdown.NewManager(klet.GetActivePods, killPodNow(klet.podWorkers, kubeDeps.Recorder), klet.syncNodeStatus, kubeCfg.ShutdownGracePeriod.Duration, kubeCfg.ShutdownGracePeriodCriticalPods.Duration)
871
872	klet.shutdownManager = shutdownManager
873	klet.admitHandlers.AddPodAdmitHandler(shutdownAdmitHandler)
874
875	// Finally, put the most recent version of the config on the Kubelet, so
876	// people can see how it was configured.
877	klet.kubeletConfiguration = *kubeCfg
878
879	// Generating the status funcs should be the last thing we do,
880	// since this relies on the rest of the Kubelet having been constructed.
881	klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs()
882
883	return klet, nil
884}
885
886type serviceLister interface {
887	List(labels.Selector) ([]*v1.Service, error)
888}
889
890// Kubelet is the main kubelet implementation.
891type Kubelet struct {
892	kubeletConfiguration kubeletconfiginternal.KubeletConfiguration
893
894	// hostname is the hostname the kubelet detected or was given via flag/config
895	hostname string
896	// hostnameOverridden indicates the hostname was overridden via flag/config
897	hostnameOverridden bool
898
899	nodeName        types.NodeName
900	runtimeCache    kubecontainer.RuntimeCache
901	kubeClient      clientset.Interface
902	heartbeatClient clientset.Interface
903	rootDirectory   string
904
905	lastObservedNodeAddressesMux sync.RWMutex
906	lastObservedNodeAddresses    []v1.NodeAddress
907
908	// onRepeatedHeartbeatFailure is called when a heartbeat operation fails more than once. optional.
909	onRepeatedHeartbeatFailure func()
910
911	// podWorkers handle syncing Pods in response to events.
912	podWorkers PodWorkers
913
914	// resyncInterval is the interval between periodic full reconciliations of
915	// pods on this node.
916	resyncInterval time.Duration
917
918	// sourcesReady records the sources seen by the kubelet, it is thread-safe.
919	sourcesReady config.SourcesReady
920
921	// podManager is a facade that abstracts away the various sources of pods
922	// this Kubelet services.
923	podManager kubepod.Manager
924
925	// Needed to observe and respond to situations that could impact node stability
926	evictionManager eviction.Manager
927
928	// Optional, defaults to /logs/ from /var/log
929	logServer http.Handler
930	// Optional, defaults to simple Docker implementation
931	runner kubecontainer.CommandRunner
932
933	// cAdvisor used for container information.
934	cadvisor cadvisor.Interface
935
936	// Set to true to have the node register itself with the apiserver.
937	registerNode bool
938	// List of taints to add to a node object when the kubelet registers itself.
939	registerWithTaints []api.Taint
940	// Set to true to have the node register itself as schedulable.
941	registerSchedulable bool
942	// for internal book keeping; access only from within registerWithApiserver
943	registrationCompleted bool
944
945	// dnsConfigurer is used for setting up DNS resolver configuration when launching pods.
946	dnsConfigurer *dns.Configurer
947
948	// masterServiceNamespace is the namespace that the master service is exposed in.
949	masterServiceNamespace string
950	// serviceLister knows how to list services
951	serviceLister serviceLister
952	// serviceHasSynced indicates whether services have been sync'd at least once.
953	// Check this before trusting a response from the lister.
954	serviceHasSynced cache.InformerSynced
955	// nodeLister knows how to list nodes
956	nodeLister corelisters.NodeLister
957	// nodeHasSynced indicates whether nodes have been sync'd at least once.
958	// Check this before trusting a response from the node lister.
959	nodeHasSynced cache.InformerSynced
960	// a list of node labels to register
961	nodeLabels map[string]string
962
963	// Last timestamp when runtime responded on ping.
964	// Mutex is used to protect this value.
965	runtimeState *runtimeState
966
967	// Volume plugins.
968	volumePluginMgr *volume.VolumePluginMgr
969
970	// Handles container probing.
971	probeManager prober.Manager
972	// Manages container health check results.
973	livenessManager  proberesults.Manager
974	readinessManager proberesults.Manager
975	startupManager   proberesults.Manager
976
977	// How long to keep idle streaming command execution/port forwarding
978	// connections open before terminating them
979	streamingConnectionIdleTimeout time.Duration
980
981	// The EventRecorder to use
982	recorder record.EventRecorder
983
984	// Policy for handling garbage collection of dead containers.
985	containerGC kubecontainer.GC
986
987	// Manager for image garbage collection.
988	imageManager images.ImageGCManager
989
990	// Manager for container logs.
991	containerLogManager logs.ContainerLogManager
992
993	// Secret manager.
994	secretManager secret.Manager
995
996	// ConfigMap manager.
997	configMapManager configmap.Manager
998
999	// Cached MachineInfo returned by cadvisor.
1000	machineInfoLock sync.RWMutex
1001	machineInfo     *cadvisorapi.MachineInfo
1002
1003	// Handles certificate rotations.
1004	serverCertificateManager certificate.Manager
1005
1006	// Syncs pods statuses with apiserver; also used as a cache of statuses.
1007	statusManager status.Manager
1008
1009	// VolumeManager runs a set of asynchronous loops that figure out which
1010	// volumes need to be attached/mounted/unmounted/detached based on the pods
1011	// scheduled on this node and makes it so.
1012	volumeManager volumemanager.VolumeManager
1013
1014	// Cloud provider interface.
1015	cloud cloudprovider.Interface
1016	// Handles requests to cloud provider with timeout
1017	cloudResourceSyncManager cloudresource.SyncManager
1018
1019	// Indicates that the node initialization happens in an external cloud controller
1020	externalCloudProvider bool
1021	// Reference to this node.
1022	nodeRef *v1.ObjectReference
1023
1024	// The name of the container runtime
1025	containerRuntimeName string
1026
1027	// Container runtime.
1028	containerRuntime kubecontainer.Runtime
1029
1030	// Streaming runtime handles container streaming.
1031	streamingRuntime kubecontainer.StreamingRuntime
1032
1033	// Container runtime service (needed by container runtime Start()).
1034	runtimeService internalapi.RuntimeService
1035
1036	// reasonCache caches the failure reason of the last creation of all containers, which is
1037	// used for generating ContainerStatus.
1038	reasonCache *ReasonCache
1039
1040	// nodeStatusUpdateFrequency specifies how often kubelet computes node status. If node lease
1041	// feature is not enabled, it is also the frequency that kubelet posts node status to master.
1042	// In that case, be cautious when changing the constant, it must work with nodeMonitorGracePeriod
1043	// in nodecontroller. There are several constraints:
1044	// 1. nodeMonitorGracePeriod must be N times more than nodeStatusUpdateFrequency, where
1045	//    N means number of retries allowed for kubelet to post node status. It is pointless
1046	//    to make nodeMonitorGracePeriod be less than nodeStatusUpdateFrequency, since there
1047	//    will only be fresh values from Kubelet at an interval of nodeStatusUpdateFrequency.
1048	//    The constant must be less than podEvictionTimeout.
1049	// 2. nodeStatusUpdateFrequency needs to be large enough for kubelet to generate node
1050	//    status. Kubelet may fail to update node status reliably if the value is too small,
1051	//    as it takes time to gather all necessary node information.
1052	nodeStatusUpdateFrequency time.Duration
1053
1054	// nodeStatusReportFrequency is the frequency that kubelet posts node
1055	// status to master. It is only used when node lease feature is enabled.
1056	nodeStatusReportFrequency time.Duration
1057
1058	// lastStatusReportTime is the time when node status was last reported.
1059	lastStatusReportTime time.Time
1060
1061	// lastContainerStartedTime is the time of the last ContainerStarted event observed per pod
1062	lastContainerStartedTime *timeCache
1063
1064	// syncNodeStatusMux is a lock on updating the node status, because this path is not thread-safe.
1065	// This lock is used by Kubelet.syncNodeStatus function and shouldn't be used anywhere else.
1066	syncNodeStatusMux sync.Mutex
1067
1068	// updatePodCIDRMux is a lock on updating pod CIDR, because this path is not thread-safe.
1069	// This lock is used by Kubelet.syncNodeStatus function and shouldn't be used anywhere else.
1070	updatePodCIDRMux sync.Mutex
1071
1072	// updateRuntimeMux is a lock on updating runtime, because this path is not thread-safe.
1073	// This lock is used by Kubelet.updateRuntimeUp function and shouldn't be used anywhere else.
1074	updateRuntimeMux sync.Mutex
1075
1076	// nodeLeaseController claims and renews the node lease for this Kubelet
1077	nodeLeaseController lease.Controller
1078
1079	// Generates pod events.
1080	pleg pleg.PodLifecycleEventGenerator
1081
1082	// Store kubecontainer.PodStatus for all pods.
1083	podCache kubecontainer.Cache
1084
1085	// os is a facade for various syscalls that need to be mocked during testing.
1086	os kubecontainer.OSInterface
1087
1088	// Watcher of out of memory events.
1089	oomWatcher oomwatcher.Watcher
1090
1091	// Monitor resource usage
1092	resourceAnalyzer serverstats.ResourceAnalyzer
1093
1094	// Whether or not we should have the QOS cgroup hierarchy for resource management
1095	cgroupsPerQOS bool
1096
1097	// If non-empty, pass this to the container runtime as the root cgroup.
1098	cgroupRoot string
1099
1100	// Mounter to use for volumes.
1101	mounter mount.Interface
1102
1103	// hostutil to interact with filesystems
1104	hostutil hostutil.HostUtils
1105
1106	// subpather to execute subpath actions
1107	subpather subpath.Interface
1108
1109	// Manager of non-Runtime containers.
1110	containerManager cm.ContainerManager
1111
1112	// Maximum Number of Pods which can be run by this Kubelet
1113	maxPods int
1114
1115	// Monitor Kubelet's sync loop
1116	syncLoopMonitor atomic.Value
1117
1118	// Container restart Backoff
1119	backOff *flowcontrol.Backoff
1120
1121	// Information about the ports which are opened by daemons on Node running this Kubelet server.
1122	daemonEndpoints *v1.NodeDaemonEndpoints
1123
1124	// A queue used to trigger pod workers.
1125	workQueue queue.WorkQueue
1126
1127	// oneTimeInitializer is used to initialize modules that are dependent on the runtime to be up.
1128	oneTimeInitializer sync.Once
1129
1130	// If set, use this IP address or addresses for the node
1131	nodeIPs []net.IP
1132
1133	// use this function to validate the kubelet nodeIP
1134	nodeIPValidator func(net.IP) error
1135
1136	// If non-nil, this is a unique identifier for the node in an external database, eg. cloudprovider
1137	providerID string
1138
1139	// clock is an interface that provides time related functionality in a way that makes it
1140	// easy to test the code.
1141	clock clock.Clock
1142
1143	// handlers called during the tryUpdateNodeStatus cycle
1144	setNodeStatusFuncs []func(*v1.Node) error
1145
1146	lastNodeUnschedulableLock sync.Mutex
1147	// maintains Node.Spec.Unschedulable value from previous run of tryUpdateNodeStatus()
1148	lastNodeUnschedulable bool
1149
1150	// the list of handlers to call during pod admission.
1151	admitHandlers lifecycle.PodAdmitHandlers
1152
1153	// softAdmithandlers are applied to the pod after it is admitted by the Kubelet, but before it is
1154	// run. A pod rejected by a softAdmitHandler will be left in a Pending state indefinitely. If a
1155	// rejected pod should not be recreated, or the scheduler is not aware of the rejection rule, the
1156	// admission rule should be applied by a softAdmitHandler.
1157	softAdmitHandlers lifecycle.PodAdmitHandlers
1158
1159	// the list of handlers to call during pod sync loop.
1160	lifecycle.PodSyncLoopHandlers
1161
1162	// the list of handlers to call during pod sync.
1163	lifecycle.PodSyncHandlers
1164
1165	// the number of allowed pods per core
1166	podsPerCore int
1167
1168	// enableControllerAttachDetach indicates the Attach/Detach controller
1169	// should manage attachment/detachment of volumes scheduled to this node,
1170	// and disable kubelet from executing any attach/detach operations
1171	enableControllerAttachDetach bool
1172
1173	// trigger deleting containers in a pod
1174	containerDeletor *podContainerDeletor
1175
1176	// config iptables util rules
1177	makeIPTablesUtilChains bool
1178
1179	// The bit of the fwmark space to mark packets for SNAT.
1180	iptablesMasqueradeBit int
1181
1182	// The bit of the fwmark space to mark packets for dropping.
1183	iptablesDropBit int
1184
1185	// The AppArmor validator for checking whether AppArmor is supported.
1186	appArmorValidator apparmor.Validator
1187
1188	// experimentalHostUserNamespaceDefaulting sets userns=true when users request host namespaces (pid, ipc, net),
1189	// are using non-namespaced capabilities (mknod, sys_time, sys_module), the pod contains a privileged container,
1190	// or using host path volumes.
1191	// This should only be enabled when the container runtime is performing user remapping AND if the
1192	// experimental behavior is desired.
1193	experimentalHostUserNamespaceDefaulting bool
1194
1195	// dockerLegacyService contains some legacy methods for backward compatibility.
1196	// It should be set only when docker is using non json-file logging driver.
1197	dockerLegacyService legacy.DockerLegacyService
1198
1199	// StatsProvider provides the node and the container stats.
1200	StatsProvider *stats.Provider
1201
1202	// This flag, if set, instructs the kubelet to keep volumes from terminated pods mounted to the node.
1203	// This can be useful for debugging volume related issues.
1204	keepTerminatedPodVolumes bool // DEPRECATED
1205
1206	// pluginmanager runs a set of asynchronous loops that figure out which
1207	// plugins need to be registered/unregistered based on this node and makes it so.
1208	pluginManager pluginmanager.PluginManager
1209
1210	// This flag sets a maximum number of images to report in the node status.
1211	nodeStatusMaxImages int32
1212
1213	// Handles RuntimeClass objects for the Kubelet.
1214	runtimeClassManager *runtimeclass.Manager
1215
1216	// Handles node shutdown events for the Node.
1217	shutdownManager *nodeshutdown.Manager
1218}
1219
1220// ListPodStats is delegated to StatsProvider, which implements stats.Provider interface
1221func (kl *Kubelet) ListPodStats() ([]statsapi.PodStats, error) {
1222	return kl.StatsProvider.ListPodStats()
1223}
1224
1225// ListPodCPUAndMemoryStats is delegated to StatsProvider, which implements stats.Provider interface
1226func (kl *Kubelet) ListPodCPUAndMemoryStats() ([]statsapi.PodStats, error) {
1227	return kl.StatsProvider.ListPodCPUAndMemoryStats()
1228}
1229
1230// ListPodStatsAndUpdateCPUNanoCoreUsage is delegated to StatsProvider, which implements stats.Provider interface
1231func (kl *Kubelet) ListPodStatsAndUpdateCPUNanoCoreUsage() ([]statsapi.PodStats, error) {
1232	return kl.StatsProvider.ListPodStatsAndUpdateCPUNanoCoreUsage()
1233}
1234
1235// ImageFsStats is delegated to StatsProvider, which implements stats.Provider interface
1236func (kl *Kubelet) ImageFsStats() (*statsapi.FsStats, error) {
1237	return kl.StatsProvider.ImageFsStats()
1238}
1239
1240// GetCgroupStats is delegated to StatsProvider, which implements stats.Provider interface
1241func (kl *Kubelet) GetCgroupStats(cgroupName string, updateStats bool) (*statsapi.ContainerStats, *statsapi.NetworkStats, error) {
1242	return kl.StatsProvider.GetCgroupStats(cgroupName, updateStats)
1243}
1244
1245// GetCgroupCPUAndMemoryStats is delegated to StatsProvider, which implements stats.Provider interface
1246func (kl *Kubelet) GetCgroupCPUAndMemoryStats(cgroupName string, updateStats bool) (*statsapi.ContainerStats, error) {
1247	return kl.StatsProvider.GetCgroupCPUAndMemoryStats(cgroupName, updateStats)
1248}
1249
1250// RootFsStats is delegated to StatsProvider, which implements stats.Provider interface
1251func (kl *Kubelet) RootFsStats() (*statsapi.FsStats, error) {
1252	return kl.StatsProvider.RootFsStats()
1253}
1254
1255// GetContainerInfo is delegated to StatsProvider, which implements stats.Provider interface
1256func (kl *Kubelet) GetContainerInfo(podFullName string, uid types.UID, containerName string, req *cadvisorapi.ContainerInfoRequest) (*cadvisorapi.ContainerInfo, error) {
1257	return kl.StatsProvider.GetContainerInfo(podFullName, uid, containerName, req)
1258}
1259
1260// GetRawContainerInfo is delegated to StatsProvider, which implements stats.Provider interface
1261func (kl *Kubelet) GetRawContainerInfo(containerName string, req *cadvisorapi.ContainerInfoRequest, subcontainers bool) (map[string]*cadvisorapi.ContainerInfo, error) {
1262	return kl.StatsProvider.GetRawContainerInfo(containerName, req, subcontainers)
1263}
1264
1265// RlimitStats is delegated to StatsProvider, which implements stats.Provider interface
1266func (kl *Kubelet) RlimitStats() (*statsapi.RlimitStats, error) {
1267	return kl.StatsProvider.RlimitStats()
1268}
1269
1270// setupDataDirs creates:
1271// 1.  the root directory
1272// 2.  the pods directory
1273// 3.  the plugins directory
1274// 4.  the pod-resources directory
1275func (kl *Kubelet) setupDataDirs() error {
1276	kl.rootDirectory = path.Clean(kl.rootDirectory)
1277	pluginRegistrationDir := kl.getPluginsRegistrationDir()
1278	pluginsDir := kl.getPluginsDir()
1279	if err := os.MkdirAll(kl.getRootDir(), 0750); err != nil {
1280		return fmt.Errorf("error creating root directory: %v", err)
1281	}
1282	if err := kl.hostutil.MakeRShared(kl.getRootDir()); err != nil {
1283		return fmt.Errorf("error configuring root directory: %v", err)
1284	}
1285	if err := os.MkdirAll(kl.getPodsDir(), 0750); err != nil {
1286		return fmt.Errorf("error creating pods directory: %v", err)
1287	}
1288	if err := os.MkdirAll(kl.getPluginsDir(), 0750); err != nil {
1289		return fmt.Errorf("error creating plugins directory: %v", err)
1290	}
1291	if err := os.MkdirAll(kl.getPluginsRegistrationDir(), 0750); err != nil {
1292		return fmt.Errorf("error creating plugins registry directory: %v", err)
1293	}
1294	if err := os.MkdirAll(kl.getPodResourcesDir(), 0750); err != nil {
1295		return fmt.Errorf("error creating podresources directory: %v", err)
1296	}
1297	if selinux.SELinuxEnabled() {
1298		err := selinux.SetFileLabel(pluginRegistrationDir, config.KubeletPluginsDirSELinuxLabel)
1299		if err != nil {
1300			klog.InfoS("Unprivileged containerized plugins might not work, could not set selinux context on plugin registration dir", "path", pluginRegistrationDir, "err", err)
1301		}
1302		err = selinux.SetFileLabel(pluginsDir, config.KubeletPluginsDirSELinuxLabel)
1303		if err != nil {
1304			klog.InfoS("Unprivileged containerized plugins might not work, could not set selinux context on plugins dir", "path", pluginsDir, "err", err)
1305		}
1306	}
1307	return nil
1308}
1309
1310// StartGarbageCollection starts garbage collection threads.
1311func (kl *Kubelet) StartGarbageCollection() {
1312	loggedContainerGCFailure := false
1313	go wait.Until(func() {
1314		if err := kl.containerGC.GarbageCollect(); err != nil {
1315			klog.ErrorS(err, "Container garbage collection failed")
1316			kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.ContainerGCFailed, err.Error())
1317			loggedContainerGCFailure = true
1318		} else {
1319			var vLevel klog.Level = 4
1320			if loggedContainerGCFailure {
1321				vLevel = 1
1322				loggedContainerGCFailure = false
1323			}
1324
1325			klog.V(vLevel).InfoS("Container garbage collection succeeded")
1326		}
1327	}, ContainerGCPeriod, wait.NeverStop)
1328
1329	// when the high threshold is set to 100, stub the image GC manager
1330	if kl.kubeletConfiguration.ImageGCHighThresholdPercent == 100 {
1331		klog.V(2).InfoS("ImageGCHighThresholdPercent is set 100, Disable image GC")
1332		return
1333	}
1334
1335	prevImageGCFailed := false
1336	go wait.Until(func() {
1337		if err := kl.imageManager.GarbageCollect(); err != nil {
1338			if prevImageGCFailed {
1339				klog.ErrorS(err, "Image garbage collection failed multiple times in a row")
1340				// Only create an event for repeated failures
1341				kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.ImageGCFailed, err.Error())
1342			} else {
1343				klog.ErrorS(err, "Image garbage collection failed once. Stats initialization may not have completed yet")
1344			}
1345			prevImageGCFailed = true
1346		} else {
1347			var vLevel klog.Level = 4
1348			if prevImageGCFailed {
1349				vLevel = 1
1350				prevImageGCFailed = false
1351			}
1352
1353			klog.V(vLevel).InfoS("Image garbage collection succeeded")
1354		}
1355	}, ImageGCPeriod, wait.NeverStop)
1356}
1357
1358// initializeModules will initialize internal modules that do not require the container runtime to be up.
1359// Note that the modules here must not depend on modules that are not initialized here.
1360func (kl *Kubelet) initializeModules() error {
1361	// Prometheus metrics.
1362	metrics.Register(
1363		collectors.NewVolumeStatsCollector(kl),
1364		collectors.NewLogMetricsCollector(kl.StatsProvider.ListPodStats),
1365	)
1366	metrics.SetNodeName(kl.nodeName)
1367	servermetrics.Register()
1368
1369	// Setup filesystem directories.
1370	if err := kl.setupDataDirs(); err != nil {
1371		return err
1372	}
1373
1374	// If the container logs directory does not exist, create it.
1375	if _, err := os.Stat(ContainerLogsDir); err != nil {
1376		if err := kl.os.MkdirAll(ContainerLogsDir, 0755); err != nil {
1377			return fmt.Errorf("failed to create directory %q: %v", ContainerLogsDir, err)
1378		}
1379	}
1380
1381	// Start the image manager.
1382	kl.imageManager.Start()
1383
1384	// Start the certificate manager if it was enabled.
1385	if kl.serverCertificateManager != nil {
1386		kl.serverCertificateManager.Start()
1387	}
1388
1389	// Start out of memory watcher.
1390	if kl.oomWatcher != nil {
1391		if err := kl.oomWatcher.Start(kl.nodeRef); err != nil {
1392			return fmt.Errorf("failed to start OOM watcher: %w", err)
1393		}
1394	}
1395
1396	// Start resource analyzer
1397	kl.resourceAnalyzer.Start()
1398
1399	return nil
1400}
1401
1402// initializeRuntimeDependentModules will initialize internal modules that require the container runtime to be up.
1403func (kl *Kubelet) initializeRuntimeDependentModules() {
1404	if err := kl.cadvisor.Start(); err != nil {
1405		// Fail kubelet and rely on the babysitter to retry starting kubelet.
1406		klog.ErrorS(err, "Failed to start cAdvisor")
1407		os.Exit(1)
1408	}
1409
1410	// trigger on-demand stats collection once so that we have capacity information for ephemeral storage.
1411	// ignore any errors, since if stats collection is not successful, the container manager will fail to start below.
1412	kl.StatsProvider.GetCgroupStats("/", true)
1413	// Start container manager.
1414	node, err := kl.getNodeAnyWay()
1415	if err != nil {
1416		// Fail kubelet and rely on the babysitter to retry starting kubelet.
1417		klog.ErrorS(err, "Kubelet failed to get node info")
1418		os.Exit(1)
1419	}
1420	// containerManager must start after cAdvisor because it needs filesystem capacity information
1421	if err := kl.containerManager.Start(node, kl.GetActivePods, kl.sourcesReady, kl.statusManager, kl.runtimeService); err != nil {
1422		// Fail kubelet and rely on the babysitter to retry starting kubelet.
1423		klog.ErrorS(err, "Failed to start ContainerManager")
1424		os.Exit(1)
1425	}
1426	// eviction manager must start after cadvisor because it needs to know if the container runtime has a dedicated imagefs
1427	kl.evictionManager.Start(kl.StatsProvider, kl.GetActivePods, kl.podResourcesAreReclaimed, evictionMonitoringPeriod)
1428
1429	// container log manager must start after container runtime is up to retrieve information from container runtime
1430	// and inform container to reopen log file after log rotation.
1431	kl.containerLogManager.Start()
1432	// Adding Registration Callback function for CSI Driver
1433	kl.pluginManager.AddHandler(pluginwatcherapi.CSIPlugin, plugincache.PluginHandler(csi.PluginHandler))
1434	// Adding Registration Callback function for Device Manager
1435	kl.pluginManager.AddHandler(pluginwatcherapi.DevicePlugin, kl.containerManager.GetPluginRegistrationHandler())
1436	// Start the plugin manager
1437	klog.V(4).InfoS("Starting plugin manager")
1438	go kl.pluginManager.Run(kl.sourcesReady, wait.NeverStop)
1439
1440	err = kl.shutdownManager.Start()
1441	if err != nil {
1442		// The shutdown manager is not critical for kubelet, so log failure, but don't block Kubelet startup if there was a failure starting it.
1443		klog.ErrorS(err, "Failed to start node shutdown manager")
1444	}
1445}
1446
1447// Run starts the kubelet reacting to config updates
1448func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
1449	if kl.logServer == nil {
1450		kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
1451	}
1452	if kl.kubeClient == nil {
1453		klog.InfoS("No API server defined - no node status update will be sent")
1454	}
1455
1456	// Start the cloud provider sync manager
1457	if kl.cloudResourceSyncManager != nil {
1458		go kl.cloudResourceSyncManager.Run(wait.NeverStop)
1459	}
1460
1461	if err := kl.initializeModules(); err != nil {
1462		kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
1463		klog.ErrorS(err, "Failed to initialize internal modules")
1464		os.Exit(1)
1465	}
1466
1467	// Start volume manager
1468	go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
1469
1470	if kl.kubeClient != nil {
1471		// Start syncing node status immediately, this may set up things the runtime needs to run.
1472		go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
1473		go kl.fastStatusUpdateOnce()
1474
1475		// start syncing lease
1476		go kl.nodeLeaseController.Run(wait.NeverStop)
1477	}
1478	go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
1479
1480	// Set up iptables util rules
1481	if kl.makeIPTablesUtilChains {
1482		kl.initNetworkUtil()
1483	}
1484
1485	// Start component sync loops.
1486	kl.statusManager.Start()
1487
1488	// Start syncing RuntimeClasses if enabled.
1489	if kl.runtimeClassManager != nil {
1490		kl.runtimeClassManager.Start(wait.NeverStop)
1491	}
1492
1493	// Start the pod lifecycle event generator.
1494	kl.pleg.Start()
1495	kl.syncLoop(updates, kl)
1496}
1497
1498// syncPod is the transaction script for the sync of a single pod (setting up)
1499// a pod. The reverse (teardown) is handled in syncTerminatingPod and
1500// syncTerminatedPod. If syncPod exits without error, then the pod runtime
1501// state is in sync with the desired configuration state (pod is running).
1502// If syncPod exits with a transient error, the next invocation of syncPod
1503// is expected to make progress towards reaching the runtime state.
1504//
1505// Arguments:
1506//
1507// o - the SyncPodOptions for this invocation
1508//
1509// The workflow is:
1510// * If the pod is being created, record pod worker start latency
1511// * Call generateAPIPodStatus to prepare an v1.PodStatus for the pod
1512// * If the pod is being seen as running for the first time, record pod
1513//   start latency
1514// * Update the status of the pod in the status manager
1515// * Kill the pod if it should not be running due to soft admission
1516// * Create a mirror pod if the pod is a static pod, and does not
1517//   already have a mirror pod
1518// * Create the data directories for the pod if they do not exist
1519// * Wait for volumes to attach/mount
1520// * Fetch the pull secrets for the pod
1521// * Call the container runtime's SyncPod callback
1522// * Update the traffic shaping for the pod's ingress and egress limits
1523//
1524// If any step of this workflow errors, the error is returned, and is repeated
1525// on the next syncPod call.
1526//
1527// This operation writes all events that are dispatched in order to provide
1528// the most accurate information possible about an error situation to aid debugging.
1529// Callers should not throw an event if this operation returns an error.
1530func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
1531	klog.V(4).InfoS("syncPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
1532	defer klog.V(4).InfoS("syncPod exit", "pod", klog.KObj(pod), "podUID", pod.UID)
1533
1534	// Latency measurements for the main workflow are relative to the
1535	// first time the pod was seen by the API server.
1536	var firstSeenTime time.Time
1537	if firstSeenTimeStr, ok := pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]; ok {
1538		firstSeenTime = kubetypes.ConvertToTimestamp(firstSeenTimeStr).Get()
1539	}
1540
1541	// Record pod worker start latency if being created
1542	// TODO: make pod workers record their own latencies
1543	if updateType == kubetypes.SyncPodCreate {
1544		if !firstSeenTime.IsZero() {
1545			// This is the first time we are syncing the pod. Record the latency
1546			// since kubelet first saw the pod if firstSeenTime is set.
1547			metrics.PodWorkerStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime))
1548		} else {
1549			klog.V(3).InfoS("First seen time not recorded for pod",
1550				"podUID", pod.UID,
1551				"pod", klog.KObj(pod))
1552		}
1553	}
1554
1555	// Generate final API pod status with pod and status manager status
1556	apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
1557	// The pod IP may be changed in generateAPIPodStatus if the pod is using host network. (See #24576)
1558	// TODO(random-liu): After writing pod spec into container labels, check whether pod is using host network, and
1559	// set pod IP to hostIP directly in runtime.GetPodStatus
1560	podStatus.IPs = make([]string, 0, len(apiPodStatus.PodIPs))
1561	for _, ipInfo := range apiPodStatus.PodIPs {
1562		podStatus.IPs = append(podStatus.IPs, ipInfo.IP)
1563	}
1564
1565	if len(podStatus.IPs) == 0 && len(apiPodStatus.PodIP) > 0 {
1566		podStatus.IPs = []string{apiPodStatus.PodIP}
1567	}
1568
1569	// If the pod should not be running, we request the pod's containers be stopped. This is not the same
1570	// as termination (we want to stop the pod, but potentially restart it later if soft admission allows
1571	// it later). Set the status and phase appropriately
1572	runnable := kl.canRunPod(pod)
1573	if !runnable.Admit {
1574		// Pod is not runnable; and update the Pod and Container statuses to why.
1575		if apiPodStatus.Phase != v1.PodFailed && apiPodStatus.Phase != v1.PodSucceeded {
1576			apiPodStatus.Phase = v1.PodPending
1577		}
1578		apiPodStatus.Reason = runnable.Reason
1579		apiPodStatus.Message = runnable.Message
1580		// Waiting containers are not creating.
1581		const waitingReason = "Blocked"
1582		for _, cs := range apiPodStatus.InitContainerStatuses {
1583			if cs.State.Waiting != nil {
1584				cs.State.Waiting.Reason = waitingReason
1585			}
1586		}
1587		for _, cs := range apiPodStatus.ContainerStatuses {
1588			if cs.State.Waiting != nil {
1589				cs.State.Waiting.Reason = waitingReason
1590			}
1591		}
1592	}
1593
1594	// Record the time it takes for the pod to become running.
1595	existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
1596	if !ok || existingStatus.Phase == v1.PodPending && apiPodStatus.Phase == v1.PodRunning &&
1597		!firstSeenTime.IsZero() {
1598		metrics.PodStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime))
1599	}
1600
1601	kl.statusManager.SetPodStatus(pod, apiPodStatus)
1602
1603	// Pods that are not runnable must be stopped - return a typed error to the pod worker
1604	if !runnable.Admit {
1605		klog.V(2).InfoS("Pod is not runnable and must have running containers stopped", "pod", klog.KObj(pod), "podUID", pod.UID, "message", runnable.Message)
1606		var syncErr error
1607		p := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus)
1608		if err := kl.killPod(pod, p, nil); err != nil {
1609			kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
1610			syncErr = fmt.Errorf("error killing pod: %v", err)
1611			utilruntime.HandleError(syncErr)
1612		} else {
1613			// There was no error killing the pod, but the pod cannot be run.
1614			// Return an error to signal that the sync loop should back off.
1615			syncErr = fmt.Errorf("pod cannot be run: %s", runnable.Message)
1616		}
1617		return syncErr
1618	}
1619
1620	// If the network plugin is not ready, only start the pod if it uses the host network
1621	if err := kl.runtimeState.networkErrors(); err != nil && !kubecontainer.IsHostNetworkPod(pod) {
1622		kl.recorder.Eventf(pod, v1.EventTypeWarning, events.NetworkNotReady, "%s: %v", NetworkNotReadyErrorMsg, err)
1623		return fmt.Errorf("%s: %v", NetworkNotReadyErrorMsg, err)
1624	}
1625
1626	// Create Cgroups for the pod and apply resource parameters
1627	// to them if cgroups-per-qos flag is enabled.
1628	pcm := kl.containerManager.NewPodContainerManager()
1629	// If pod has already been terminated then we need not create
1630	// or update the pod's cgroup
1631	// TODO: once context cancellation is added this check can be removed
1632	if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
1633		// When the kubelet is restarted with the cgroups-per-qos
1634		// flag enabled, all the pod's running containers
1635		// should be killed intermittently and brought back up
1636		// under the qos cgroup hierarchy.
1637		// Check if this is the pod's first sync
1638		firstSync := true
1639		for _, containerStatus := range apiPodStatus.ContainerStatuses {
1640			if containerStatus.State.Running != nil {
1641				firstSync = false
1642				break
1643			}
1644		}
1645		// Don't kill containers in pod if pod's cgroups already
1646		// exists or the pod is running for the first time
1647		podKilled := false
1648		if !pcm.Exists(pod) && !firstSync {
1649			p := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus)
1650			if err := kl.killPod(pod, p, nil); err == nil {
1651				podKilled = true
1652			} else {
1653				klog.ErrorS(err, "KillPod failed", "pod", klog.KObj(pod), "podStatus", podStatus)
1654			}
1655		}
1656		// Create and Update pod's Cgroups
1657		// Don't create cgroups for run once pod if it was killed above
1658		// The current policy is not to restart the run once pods when
1659		// the kubelet is restarted with the new flag as run once pods are
1660		// expected to run only once and if the kubelet is restarted then
1661		// they are not expected to run again.
1662		// We don't create and apply updates to cgroup if its a run once pod and was killed above
1663		if !(podKilled && pod.Spec.RestartPolicy == v1.RestartPolicyNever) {
1664			if !pcm.Exists(pod) {
1665				if err := kl.containerManager.UpdateQOSCgroups(); err != nil {
1666					klog.V(2).InfoS("Failed to update QoS cgroups while syncing pod", "pod", klog.KObj(pod), "err", err)
1667				}
1668				if err := pcm.EnsureExists(pod); err != nil {
1669					kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToCreatePodContainer, "unable to ensure pod container exists: %v", err)
1670					return fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err)
1671				}
1672			}
1673		}
1674	}
1675
1676	// Create Mirror Pod for Static Pod if it doesn't already exist
1677	if kubetypes.IsStaticPod(pod) {
1678		deleted := false
1679		if mirrorPod != nil {
1680			if mirrorPod.DeletionTimestamp != nil || !kl.podManager.IsMirrorPodOf(mirrorPod, pod) {
1681				// The mirror pod is semantically different from the static pod. Remove
1682				// it. The mirror pod will get recreated later.
1683				klog.InfoS("Trying to delete pod", "pod", klog.KObj(pod), "podUID", mirrorPod.ObjectMeta.UID)
1684				podFullName := kubecontainer.GetPodFullName(pod)
1685				var err error
1686				deleted, err = kl.podManager.DeleteMirrorPod(podFullName, &mirrorPod.ObjectMeta.UID)
1687				if deleted {
1688					klog.InfoS("Deleted mirror pod because it is outdated", "pod", klog.KObj(mirrorPod))
1689				} else if err != nil {
1690					klog.ErrorS(err, "Failed deleting mirror pod", "pod", klog.KObj(mirrorPod))
1691				}
1692			}
1693		}
1694		if mirrorPod == nil || deleted {
1695			node, err := kl.GetNode()
1696			if err != nil || node.DeletionTimestamp != nil {
1697				klog.V(4).InfoS("No need to create a mirror pod, since node has been removed from the cluster", "node", klog.KRef("", string(kl.nodeName)))
1698			} else {
1699				klog.V(4).InfoS("Creating a mirror pod for static pod", "pod", klog.KObj(pod))
1700				if err := kl.podManager.CreateMirrorPod(pod); err != nil {
1701					klog.ErrorS(err, "Failed creating a mirror pod for", "pod", klog.KObj(pod))
1702				}
1703			}
1704		}
1705	}
1706
1707	// Make data directories for the pod
1708	if err := kl.makePodDataDirs(pod); err != nil {
1709		kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToMakePodDataDirectories, "error making pod data directories: %v", err)
1710		klog.ErrorS(err, "Unable to make pod data directories for pod", "pod", klog.KObj(pod))
1711		return err
1712	}
1713
1714	// Volume manager will not mount volumes for terminating pods
1715	// TODO: once context cancellation is added this check can be removed
1716	if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
1717		// Wait for volumes to attach/mount
1718		if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
1719			kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to attach or mount volumes: %v", err)
1720			klog.ErrorS(err, "Unable to attach or mount volumes for pod; skipping pod", "pod", klog.KObj(pod))
1721			return err
1722		}
1723	}
1724
1725	// Fetch the pull secrets for the pod
1726	pullSecrets := kl.getPullSecretsForPod(pod)
1727
1728	// Call the container runtime's SyncPod callback
1729	result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff)
1730	kl.reasonCache.Update(pod.UID, result)
1731	if err := result.Error(); err != nil {
1732		// Do not return error if the only failures were pods in backoff
1733		for _, r := range result.SyncResults {
1734			if r.Error != kubecontainer.ErrCrashLoopBackOff && r.Error != images.ErrImagePullBackOff {
1735				// Do not record an event here, as we keep all event logging for sync pod failures
1736				// local to container runtime so we get better errors
1737				return err
1738			}
1739		}
1740
1741		return nil
1742	}
1743
1744	return nil
1745}
1746
1747// syncTerminatingPod is expected to terminate all running containers in a pod. Once this method
1748// returns without error, the pod's local state can be safely cleaned up. If runningPod is passed,
1749// we perform no status updates.
1750func (kl *Kubelet) syncTerminatingPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, runningPod *kubecontainer.Pod, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error {
1751	klog.V(4).InfoS("syncTerminatingPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
1752	defer klog.V(4).InfoS("syncTerminatingPod exit", "pod", klog.KObj(pod), "podUID", pod.UID)
1753
1754	// when we receive a runtime only pod (runningPod != nil) we don't need to update the status
1755	// manager or refresh the status of the cache, because a successful killPod will ensure we do
1756	// not get invoked again
1757	if runningPod != nil {
1758		// we kill the pod with the specified grace period since this is a termination
1759		if gracePeriod != nil {
1760			klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", *gracePeriod)
1761		} else {
1762			klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", nil)
1763		}
1764		if err := kl.killPod(pod, *runningPod, gracePeriod); err != nil {
1765			kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
1766			// there was an error killing the pod, so we return that error directly
1767			utilruntime.HandleError(err)
1768			return err
1769		}
1770		klog.V(4).InfoS("Pod termination stopped all running orphan containers", "pod", klog.KObj(pod), "podUID", pod.UID)
1771		return nil
1772	}
1773
1774	apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
1775	if podStatusFn != nil {
1776		podStatusFn(&apiPodStatus)
1777	}
1778	kl.statusManager.SetPodStatus(pod, apiPodStatus)
1779
1780	if gracePeriod != nil {
1781		klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", *gracePeriod)
1782	} else {
1783		klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", nil)
1784	}
1785	p := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus)
1786	if err := kl.killPod(pod, p, gracePeriod); err != nil {
1787		kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
1788		// there was an error killing the pod, so we return that error directly
1789		utilruntime.HandleError(err)
1790		return err
1791	}
1792
1793	// Guard against consistency issues in KillPod implementations by checking that there are no
1794	// running containers. This method is invoked infrequently so this is effectively free and can
1795	// catch race conditions introduced by callers updating pod status out of order.
1796	// TODO: have KillPod return the terminal status of stopped containers and write that into the
1797	//  cache immediately
1798	podStatus, err := kl.containerRuntime.GetPodStatus(pod.UID, pod.Name, pod.Namespace)
1799	if err != nil {
1800		klog.ErrorS(err, "Unable to read pod status prior to final pod termination", "pod", klog.KObj(pod), "podUID", pod.UID)
1801		return err
1802	}
1803	var runningContainers []string
1804	var containers []string
1805	for _, s := range podStatus.ContainerStatuses {
1806		if s.State == kubecontainer.ContainerStateRunning {
1807			runningContainers = append(runningContainers, s.ID.String())
1808		}
1809		containers = append(containers, fmt.Sprintf("(%s state=%s exitCode=%d finishedAt=%s)", s.Name, s.State, s.ExitCode, s.FinishedAt.UTC().Format(time.RFC3339Nano)))
1810	}
1811	if klog.V(4).Enabled() {
1812		sort.Strings(containers)
1813		klog.InfoS("Post-termination container state", "pod", klog.KObj(pod), "podUID", pod.UID, "containers", strings.Join(containers, " "))
1814	}
1815	if len(runningContainers) > 0 {
1816		return fmt.Errorf("detected running containers after a successful KillPod, CRI violation: %v", runningContainers)
1817	}
1818
1819	// we have successfully stopped all containers, the pod is terminating, our status is "done"
1820	klog.V(4).InfoS("Pod termination stopped all running containers", "pod", klog.KObj(pod), "podUID", pod.UID)
1821
1822	return nil
1823}
1824
1825// syncTerminatedPod cleans up a pod that has terminated (has no running containers).
1826// The invocations in this call are expected to tear down what PodResourcesAreReclaimed checks (which
1827// gates pod deletion). When this method exits the pod is expected to be ready for cleanup.
1828// TODO: make this method take a context and exit early
1829func (kl *Kubelet) syncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
1830	klog.V(4).InfoS("syncTerminatedPod enter", "pod", klog.KObj(pod), "podUID", pod.UID)
1831	defer klog.V(4).InfoS("syncTerminatedPod exit", "pod", klog.KObj(pod), "podUID", pod.UID)
1832
1833	// generate the final status of the pod
1834	// TODO: should we simply fold this into TerminatePod? that would give a single pod update
1835	apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
1836	kl.statusManager.SetPodStatus(pod, apiPodStatus)
1837
1838	// volumes are unmounted after the pod worker reports ShouldPodRuntimeBeRemoved (which is satisfied
1839	// before syncTerminatedPod is invoked)
1840	if err := kl.volumeManager.WaitForUnmount(pod); err != nil {
1841		return err
1842	}
1843	klog.V(4).InfoS("Pod termination unmounted volumes", "pod", klog.KObj(pod), "podUID", pod.UID)
1844
1845	// Note: we leave pod containers to be reclaimed in the background since dockershim requires the
1846	// container for retrieving logs and we want to make sure logs are available until the pod is
1847	// physically deleted.
1848
1849	// remove any cgroups in the hierarchy for pods that are no longer running.
1850	if kl.cgroupsPerQOS {
1851		pcm := kl.containerManager.NewPodContainerManager()
1852		name, _ := pcm.GetPodContainerName(pod)
1853		if err := pcm.Destroy(name); err != nil {
1854			return err
1855		}
1856		klog.V(4).InfoS("Pod termination removed cgroups", "pod", klog.KObj(pod), "podUID", pod.UID)
1857	}
1858
1859	// mark the final pod status
1860	kl.statusManager.TerminatePod(pod)
1861	klog.V(4).InfoS("Pod is terminated and will need no more status updates", "pod", klog.KObj(pod), "podUID", pod.UID)
1862
1863	return nil
1864}
1865
1866// Get pods which should be resynchronized. Currently, the following pod should be resynchronized:
1867//   * pod whose work is ready.
1868//   * internal modules that request sync of a pod.
1869func (kl *Kubelet) getPodsToSync() []*v1.Pod {
1870	allPods := kl.podManager.GetPods()
1871	podUIDs := kl.workQueue.GetWork()
1872	podUIDSet := sets.NewString()
1873	for _, podUID := range podUIDs {
1874		podUIDSet.Insert(string(podUID))
1875	}
1876	var podsToSync []*v1.Pod
1877	for _, pod := range allPods {
1878		if podUIDSet.Has(string(pod.UID)) {
1879			// The work of the pod is ready
1880			podsToSync = append(podsToSync, pod)
1881			continue
1882		}
1883		for _, podSyncLoopHandler := range kl.PodSyncLoopHandlers {
1884			if podSyncLoopHandler.ShouldSync(pod) {
1885				podsToSync = append(podsToSync, pod)
1886				break
1887			}
1888		}
1889	}
1890	return podsToSync
1891}
1892
1893// deletePod deletes the pod from the internal state of the kubelet by:
1894// 1.  stopping the associated pod worker asynchronously
1895// 2.  signaling to kill the pod by sending on the podKillingCh channel
1896//
1897// deletePod returns an error if not all sources are ready or the pod is not
1898// found in the runtime cache.
1899func (kl *Kubelet) deletePod(pod *v1.Pod) error {
1900	if pod == nil {
1901		return fmt.Errorf("deletePod does not allow nil pod")
1902	}
1903	if !kl.sourcesReady.AllReady() {
1904		// If the sources aren't ready, skip deletion, as we may accidentally delete pods
1905		// for sources that haven't reported yet.
1906		return fmt.Errorf("skipping delete because sources aren't ready yet")
1907	}
1908	klog.V(3).InfoS("Pod has been deleted and must be killed", "pod", klog.KObj(pod), "podUID", pod.UID)
1909	kl.podWorkers.UpdatePod(UpdatePodOptions{
1910		Pod:        pod,
1911		UpdateType: kubetypes.SyncPodKill,
1912	})
1913	// We leave the volume/directory cleanup to the periodic cleanup routine.
1914	return nil
1915}
1916
1917// rejectPod records an event about the pod with the given reason and message,
1918// and updates the pod to the failed phase in the status manage.
1919func (kl *Kubelet) rejectPod(pod *v1.Pod, reason, message string) {
1920	kl.recorder.Eventf(pod, v1.EventTypeWarning, reason, message)
1921	kl.statusManager.SetPodStatus(pod, v1.PodStatus{
1922		Phase:   v1.PodFailed,
1923		Reason:  reason,
1924		Message: "Pod " + message})
1925}
1926
1927// canAdmitPod determines if a pod can be admitted, and gives a reason if it
1928// cannot. "pod" is new pod, while "pods" are all admitted pods
1929// The function returns a boolean value indicating whether the pod
1930// can be admitted, a brief single-word reason and a message explaining why
1931// the pod cannot be admitted.
1932func (kl *Kubelet) canAdmitPod(pods []*v1.Pod, pod *v1.Pod) (bool, string, string) {
1933	// the kubelet will invoke each pod admit handler in sequence
1934	// if any handler rejects, the pod is rejected.
1935	// TODO: move out of disk check into a pod admitter
1936	// TODO: out of resource eviction should have a pod admitter call-out
1937	attrs := &lifecycle.PodAdmitAttributes{Pod: pod, OtherPods: pods}
1938	for _, podAdmitHandler := range kl.admitHandlers {
1939		if result := podAdmitHandler.Admit(attrs); !result.Admit {
1940			return false, result.Reason, result.Message
1941		}
1942	}
1943
1944	return true, "", ""
1945}
1946
1947func (kl *Kubelet) canRunPod(pod *v1.Pod) lifecycle.PodAdmitResult {
1948	attrs := &lifecycle.PodAdmitAttributes{Pod: pod}
1949	// Get "OtherPods". Rejected pods are failed, so only include admitted pods that are alive.
1950	attrs.OtherPods = kl.GetActivePods()
1951
1952	for _, handler := range kl.softAdmitHandlers {
1953		if result := handler.Admit(attrs); !result.Admit {
1954			return result
1955		}
1956	}
1957
1958	return lifecycle.PodAdmitResult{Admit: true}
1959}
1960
1961// syncLoop is the main loop for processing changes. It watches for changes from
1962// three channels (file, apiserver, and http) and creates a union of them. For
1963// any new change seen, will run a sync against desired state and running state. If
1964// no changes are seen to the configuration, will synchronize the last known desired
1965// state every sync-frequency seconds. Never returns.
1966func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
1967	klog.InfoS("Starting kubelet main sync loop")
1968	// The syncTicker wakes up kubelet to checks if there are any pod workers
1969	// that need to be sync'd. A one-second period is sufficient because the
1970	// sync interval is defaulted to 10s.
1971	syncTicker := time.NewTicker(time.Second)
1972	defer syncTicker.Stop()
1973	housekeepingTicker := time.NewTicker(housekeepingPeriod)
1974	defer housekeepingTicker.Stop()
1975	plegCh := kl.pleg.Watch()
1976	const (
1977		base   = 100 * time.Millisecond
1978		max    = 5 * time.Second
1979		factor = 2
1980	)
1981	duration := base
1982	// Responsible for checking limits in resolv.conf
1983	// The limits do not have anything to do with individual pods
1984	// Since this is called in syncLoop, we don't need to call it anywhere else
1985	if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {
1986		kl.dnsConfigurer.CheckLimitsForResolvConf()
1987	}
1988
1989	for {
1990		if err := kl.runtimeState.runtimeErrors(); err != nil {
1991			klog.ErrorS(err, "Skipping pod synchronization")
1992			// exponential backoff
1993			time.Sleep(duration)
1994			duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
1995			continue
1996		}
1997		// reset backoff if we have a success
1998		duration = base
1999
2000		kl.syncLoopMonitor.Store(kl.clock.Now())
2001		if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
2002			break
2003		}
2004		kl.syncLoopMonitor.Store(kl.clock.Now())
2005	}
2006}
2007
2008// syncLoopIteration reads from various channels and dispatches pods to the
2009// given handler.
2010//
2011// Arguments:
2012// 1.  configCh:       a channel to read config events from
2013// 2.  handler:        the SyncHandler to dispatch pods to
2014// 3.  syncCh:         a channel to read periodic sync events from
2015// 4.  housekeepingCh: a channel to read housekeeping events from
2016// 5.  plegCh:         a channel to read PLEG updates from
2017//
2018// Events are also read from the kubelet liveness manager's update channel.
2019//
2020// The workflow is to read from one of the channels, handle that event, and
2021// update the timestamp in the sync loop monitor.
2022//
2023// Here is an appropriate place to note that despite the syntactical
2024// similarity to the switch statement, the case statements in a select are
2025// evaluated in a pseudorandom order if there are multiple channels ready to
2026// read from when the select is evaluated.  In other words, case statements
2027// are evaluated in random order, and you can not assume that the case
2028// statements evaluate in order if multiple channels have events.
2029//
2030// With that in mind, in truly no particular order, the different channels
2031// are handled as follows:
2032//
2033// * configCh: dispatch the pods for the config change to the appropriate
2034//             handler callback for the event type
2035// * plegCh: update the runtime cache; sync pod
2036// * syncCh: sync all pods waiting for sync
2037// * housekeepingCh: trigger cleanup of pods
2038// * health manager: sync pods that have failed or in which one or more
2039//                     containers have failed health checks
2040func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
2041	syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
2042	select {
2043	case u, open := <-configCh:
2044		// Update from a config source; dispatch it to the right handler
2045		// callback.
2046		if !open {
2047			klog.ErrorS(nil, "Update channel is closed, exiting the sync loop")
2048			return false
2049		}
2050
2051		switch u.Op {
2052		case kubetypes.ADD:
2053			klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", format.Pods(u.Pods))
2054			// After restarting, kubelet will get all existing pods through
2055			// ADD as if they are new pods. These pods will then go through the
2056			// admission process and *may* be rejected. This can be resolved
2057			// once we have checkpointing.
2058			handler.HandlePodAdditions(u.Pods)
2059		case kubetypes.UPDATE:
2060			klog.V(2).InfoS("SyncLoop UPDATE", "source", u.Source, "pods", format.Pods(u.Pods))
2061			handler.HandlePodUpdates(u.Pods)
2062		case kubetypes.REMOVE:
2063			klog.V(2).InfoS("SyncLoop REMOVE", "source", u.Source, "pods", format.Pods(u.Pods))
2064			handler.HandlePodRemoves(u.Pods)
2065		case kubetypes.RECONCILE:
2066			klog.V(4).InfoS("SyncLoop RECONCILE", "source", u.Source, "pods", format.Pods(u.Pods))
2067			handler.HandlePodReconcile(u.Pods)
2068		case kubetypes.DELETE:
2069			klog.V(2).InfoS("SyncLoop DELETE", "source", u.Source, "pods", format.Pods(u.Pods))
2070			// DELETE is treated as a UPDATE because of graceful deletion.
2071			handler.HandlePodUpdates(u.Pods)
2072		case kubetypes.SET:
2073			// TODO: Do we want to support this?
2074			klog.ErrorS(nil, "Kubelet does not support snapshot update")
2075		default:
2076			klog.ErrorS(nil, "Invalid operation type received", "operation", u.Op)
2077		}
2078
2079		kl.sourcesReady.AddSource(u.Source)
2080
2081	case e := <-plegCh:
2082		if e.Type == pleg.ContainerStarted {
2083			// record the most recent time we observed a container start for this pod.
2084			// this lets us selectively invalidate the runtimeCache when processing a delete for this pod
2085			// to make sure we don't miss handling graceful termination for containers we reported as having started.
2086			kl.lastContainerStartedTime.Add(e.ID, time.Now())
2087		}
2088		if isSyncPodWorthy(e) {
2089			// PLEG event for a pod; sync it.
2090			if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
2091				klog.V(2).InfoS("SyncLoop (PLEG): event for pod", "pod", klog.KObj(pod), "event", e)
2092				handler.HandlePodSyncs([]*v1.Pod{pod})
2093			} else {
2094				// If the pod no longer exists, ignore the event.
2095				klog.V(4).InfoS("SyncLoop (PLEG): pod does not exist, ignore irrelevant event", "event", e)
2096			}
2097		}
2098
2099		if e.Type == pleg.ContainerDied {
2100			if containerID, ok := e.Data.(string); ok {
2101				kl.cleanUpContainersInPod(e.ID, containerID)
2102			}
2103		}
2104	case <-syncCh:
2105		// Sync pods waiting for sync
2106		podsToSync := kl.getPodsToSync()
2107		if len(podsToSync) == 0 {
2108			break
2109		}
2110		klog.V(4).InfoS("SyncLoop (SYNC) pods", "total", len(podsToSync), "pods", format.Pods(podsToSync))
2111		handler.HandlePodSyncs(podsToSync)
2112	case update := <-kl.livenessManager.Updates():
2113		if update.Result == proberesults.Failure {
2114			handleProbeSync(kl, update, handler, "liveness", "unhealthy")
2115		}
2116	case update := <-kl.readinessManager.Updates():
2117		ready := update.Result == proberesults.Success
2118		kl.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
2119
2120		status := ""
2121		if ready {
2122			status = "ready"
2123		}
2124		handleProbeSync(kl, update, handler, "readiness", status)
2125	case update := <-kl.startupManager.Updates():
2126		started := update.Result == proberesults.Success
2127		kl.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)
2128
2129		status := "unhealthy"
2130		if started {
2131			status = "started"
2132		}
2133		handleProbeSync(kl, update, handler, "startup", status)
2134	case <-housekeepingCh:
2135		if !kl.sourcesReady.AllReady() {
2136			// If the sources aren't ready or volume manager has not yet synced the states,
2137			// skip housekeeping, as we may accidentally delete pods from unready sources.
2138			klog.V(4).InfoS("SyncLoop (housekeeping, skipped): sources aren't ready yet")
2139		} else {
2140			start := time.Now()
2141			klog.V(4).InfoS("SyncLoop (housekeeping)")
2142			if err := handler.HandlePodCleanups(); err != nil {
2143				klog.ErrorS(err, "Failed cleaning pods")
2144			}
2145			duration := time.Since(start)
2146			if duration > housekeepingWarningDuration {
2147				klog.ErrorS(fmt.Errorf("housekeeping took too long"), "Housekeeping took longer than 15s", "seconds", duration.Seconds())
2148			}
2149			klog.V(4).InfoS("SyncLoop (housekeeping) end")
2150		}
2151	}
2152	return true
2153}
2154
2155func handleProbeSync(kl *Kubelet, update proberesults.Update, handler SyncHandler, probe, status string) {
2156	// We should not use the pod from manager, because it is never updated after initialization.
2157	pod, ok := kl.podManager.GetPodByUID(update.PodUID)
2158	if !ok {
2159		// If the pod no longer exists, ignore the update.
2160		klog.V(4).InfoS("SyncLoop (probe): ignore irrelevant update", "probe", probe, "status", status, "update", update)
2161		return
2162	}
2163	klog.V(1).InfoS("SyncLoop (probe)", "probe", probe, "status", status, "pod", klog.KObj(pod))
2164	handler.HandlePodSyncs([]*v1.Pod{pod})
2165}
2166
2167// dispatchWork starts the asynchronous sync of the pod in a pod worker.
2168// If the pod has completed termination, dispatchWork will perform no action.
2169func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
2170	// Run the sync in an async worker.
2171	kl.podWorkers.UpdatePod(UpdatePodOptions{
2172		Pod:        pod,
2173		MirrorPod:  mirrorPod,
2174		UpdateType: syncType,
2175		StartTime:  start,
2176	})
2177	// Note the number of containers for new pods.
2178	if syncType == kubetypes.SyncPodCreate {
2179		metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
2180	}
2181}
2182
2183// TODO: handle mirror pods in a separate component (issue #17251)
2184func (kl *Kubelet) handleMirrorPod(mirrorPod *v1.Pod, start time.Time) {
2185	// Mirror pod ADD/UPDATE/DELETE operations are considered an UPDATE to the
2186	// corresponding static pod. Send update to the pod worker if the static
2187	// pod exists.
2188	if pod, ok := kl.podManager.GetPodByMirrorPod(mirrorPod); ok {
2189		kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start)
2190	}
2191}
2192
2193// HandlePodAdditions is the callback in SyncHandler for pods being added from
2194// a config source.
2195func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
2196	start := kl.clock.Now()
2197	sort.Sort(sliceutils.PodsByCreationTime(pods))
2198	for _, pod := range pods {
2199		existingPods := kl.podManager.GetPods()
2200		// Always add the pod to the pod manager. Kubelet relies on the pod
2201		// manager as the source of truth for the desired state. If a pod does
2202		// not exist in the pod manager, it means that it has been deleted in
2203		// the apiserver and no action (other than cleanup) is required.
2204		kl.podManager.AddPod(pod)
2205
2206		if kubetypes.IsMirrorPod(pod) {
2207			kl.handleMirrorPod(pod, start)
2208			continue
2209		}
2210
2211		// Only go through the admission process if the pod is not requested
2212		// for termination by another part of the kubelet. If the pod is already
2213		// using resources (previously admitted), the pod worker is going to be
2214		// shutting it down. If the pod hasn't started yet, we know that when
2215		// the pod worker is invoked it will also avoid setting up the pod, so
2216		// we simply avoid doing any work.
2217		if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
2218			// We failed pods that we rejected, so activePods include all admitted
2219			// pods that are alive.
2220			activePods := kl.filterOutTerminatedPods(existingPods)
2221
2222			// Check if we can admit the pod; if not, reject it.
2223			if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
2224				kl.rejectPod(pod, reason, message)
2225				continue
2226			}
2227		}
2228		mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
2229		kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
2230		kl.probeManager.AddPod(pod)
2231	}
2232}
2233
2234// HandlePodUpdates is the callback in the SyncHandler interface for pods
2235// being updated from a config source.
2236func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) {
2237	start := kl.clock.Now()
2238	for _, pod := range pods {
2239		kl.podManager.UpdatePod(pod)
2240		if kubetypes.IsMirrorPod(pod) {
2241			kl.handleMirrorPod(pod, start)
2242			continue
2243		}
2244		mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
2245		kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start)
2246	}
2247}
2248
2249// HandlePodRemoves is the callback in the SyncHandler interface for pods
2250// being removed from a config source.
2251func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) {
2252	start := kl.clock.Now()
2253	for _, pod := range pods {
2254		kl.podManager.DeletePod(pod)
2255		if kubetypes.IsMirrorPod(pod) {
2256			kl.handleMirrorPod(pod, start)
2257			continue
2258		}
2259		// Deletion is allowed to fail because the periodic cleanup routine
2260		// will trigger deletion again.
2261		if err := kl.deletePod(pod); err != nil {
2262			klog.V(2).InfoS("Failed to delete pod", "pod", klog.KObj(pod), "err", err)
2263		}
2264		kl.probeManager.RemovePod(pod)
2265	}
2266}
2267
2268// HandlePodReconcile is the callback in the SyncHandler interface for pods
2269// that should be reconciled.
2270func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) {
2271	start := kl.clock.Now()
2272	for _, pod := range pods {
2273		// Update the pod in pod manager, status manager will do periodically reconcile according
2274		// to the pod manager.
2275		kl.podManager.UpdatePod(pod)
2276
2277		// Reconcile Pod "Ready" condition if necessary. Trigger sync pod for reconciliation.
2278		if status.NeedToReconcilePodReadiness(pod) {
2279			mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
2280			kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)
2281		}
2282
2283		// After an evicted pod is synced, all dead containers in the pod can be removed.
2284		if eviction.PodIsEvicted(pod.Status) {
2285			if podStatus, err := kl.podCache.Get(pod.UID); err == nil {
2286				kl.containerDeletor.deleteContainersInPod("", podStatus, true)
2287			}
2288		}
2289	}
2290}
2291
2292// HandlePodSyncs is the callback in the syncHandler interface for pods
2293// that should be dispatched to pod workers for sync.
2294func (kl *Kubelet) HandlePodSyncs(pods []*v1.Pod) {
2295	start := kl.clock.Now()
2296	for _, pod := range pods {
2297		mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
2298		kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)
2299	}
2300}
2301
2302// LatestLoopEntryTime returns the last time in the sync loop monitor.
2303func (kl *Kubelet) LatestLoopEntryTime() time.Time {
2304	val := kl.syncLoopMonitor.Load()
2305	if val == nil {
2306		return time.Time{}
2307	}
2308	return val.(time.Time)
2309}
2310
2311// updateRuntimeUp calls the container runtime status callback, initializing
2312// the runtime dependent modules when the container runtime first comes up,
2313// and returns an error if the status check fails.  If the status check is OK,
2314// update the container runtime uptime in the kubelet runtimeState.
2315func (kl *Kubelet) updateRuntimeUp() {
2316	kl.updateRuntimeMux.Lock()
2317	defer kl.updateRuntimeMux.Unlock()
2318
2319	s, err := kl.containerRuntime.Status()
2320	if err != nil {
2321		klog.ErrorS(err, "Container runtime sanity check failed")
2322		return
2323	}
2324	if s == nil {
2325		klog.ErrorS(nil, "Container runtime status is nil")
2326		return
2327	}
2328	// Periodically log the whole runtime status for debugging.
2329	klog.V(4).InfoS("Container runtime status", "status", s)
2330	networkReady := s.GetRuntimeCondition(kubecontainer.NetworkReady)
2331	if networkReady == nil || !networkReady.Status {
2332		klog.ErrorS(nil, "Container runtime network not ready", "networkReady", networkReady)
2333		kl.runtimeState.setNetworkState(fmt.Errorf("container runtime network not ready: %v", networkReady))
2334	} else {
2335		// Set nil if the container runtime network is ready.
2336		kl.runtimeState.setNetworkState(nil)
2337	}
2338	// information in RuntimeReady condition will be propagated to NodeReady condition.
2339	runtimeReady := s.GetRuntimeCondition(kubecontainer.RuntimeReady)
2340	// If RuntimeReady is not set or is false, report an error.
2341	if runtimeReady == nil || !runtimeReady.Status {
2342		klog.ErrorS(nil, "Container runtime not ready", "runtimeReady", runtimeReady)
2343		kl.runtimeState.setRuntimeState(fmt.Errorf("container runtime not ready: %v", runtimeReady))
2344		return
2345	}
2346	kl.runtimeState.setRuntimeState(nil)
2347	kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules)
2348	kl.runtimeState.setRuntimeSync(kl.clock.Now())
2349}
2350
2351// GetConfiguration returns the KubeletConfiguration used to configure the kubelet.
2352func (kl *Kubelet) GetConfiguration() kubeletconfiginternal.KubeletConfiguration {
2353	return kl.kubeletConfiguration
2354}
2355
2356// BirthCry sends an event that the kubelet has started up.
2357func (kl *Kubelet) BirthCry() {
2358	// Make an event that kubelet restarted.
2359	kl.recorder.Eventf(kl.nodeRef, v1.EventTypeNormal, events.StartingKubelet, "Starting kubelet.")
2360}
2361
2362// ResyncInterval returns the interval used for periodic syncs.
2363func (kl *Kubelet) ResyncInterval() time.Duration {
2364	return kl.resyncInterval
2365}
2366
2367// ListenAndServe runs the kubelet HTTP server.
2368func (kl *Kubelet) ListenAndServe(kubeCfg *kubeletconfiginternal.KubeletConfiguration, tlsOptions *server.TLSOptions,
2369	auth server.AuthInterface) {
2370	server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, kubeCfg, tlsOptions, auth)
2371}
2372
2373// ListenAndServeReadOnly runs the kubelet HTTP server in read-only mode.
2374func (kl *Kubelet) ListenAndServeReadOnly(address net.IP, port uint) {
2375	server.ListenAndServeKubeletReadOnlyServer(kl, kl.resourceAnalyzer, address, port)
2376}
2377
2378// ListenAndServePodResources runs the kubelet podresources grpc service
2379func (kl *Kubelet) ListenAndServePodResources() {
2380	socket, err := util.LocalEndpoint(kl.getPodResourcesDir(), podresources.Socket)
2381	if err != nil {
2382		klog.V(2).InfoS("Failed to get local endpoint for PodResources endpoint", "err", err)
2383		return
2384	}
2385	server.ListenAndServePodResources(socket, kl.podManager, kl.containerManager, kl.containerManager, kl.containerManager)
2386}
2387
2388// Delete the eligible dead container instances in a pod. Depending on the configuration, the latest dead containers may be kept around.
2389func (kl *Kubelet) cleanUpContainersInPod(podID types.UID, exitedContainerID string) {
2390	if podStatus, err := kl.podCache.Get(podID); err == nil {
2391		// When an evicted or deleted pod has already synced, all containers can be removed.
2392		removeAll := kl.podWorkers.ShouldPodContentBeRemoved(podID)
2393		kl.containerDeletor.deleteContainersInPod(exitedContainerID, podStatus, removeAll)
2394	}
2395}
2396
2397// fastStatusUpdateOnce starts a loop that checks the internal node indexer cache for when a CIDR
2398// is applied  and tries to update pod CIDR immediately. After pod CIDR is updated it fires off
2399// a runtime update and a node status update. Function returns after one successful node status update.
2400// Function is executed only during Kubelet start which improves latency to ready node by updating
2401// pod CIDR, runtime status and node statuses ASAP.
2402func (kl *Kubelet) fastStatusUpdateOnce() {
2403	for {
2404		time.Sleep(100 * time.Millisecond)
2405		node, err := kl.GetNode()
2406		if err != nil {
2407			klog.ErrorS(err, "Error getting node")
2408			continue
2409		}
2410		if len(node.Spec.PodCIDRs) != 0 {
2411			podCIDRs := strings.Join(node.Spec.PodCIDRs, ",")
2412			if _, err := kl.updatePodCIDR(podCIDRs); err != nil {
2413				klog.ErrorS(err, "Pod CIDR update failed", "CIDR", podCIDRs)
2414				continue
2415			}
2416			kl.updateRuntimeUp()
2417			kl.syncNodeStatus()
2418			return
2419		}
2420	}
2421}
2422
2423// isSyncPodWorthy filters out events that are not worthy of pod syncing
2424func isSyncPodWorthy(event *pleg.PodLifecycleEvent) bool {
2425	// ContainerRemoved doesn't affect pod state
2426	return event.Type != pleg.ContainerRemoved
2427}
2428
2429// Gets the streaming server configuration to use with in-process CRI shims.
2430func getStreamingConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, crOptions *config.ContainerRuntimeOptions) *streaming.Config {
2431	config := &streaming.Config{
2432		StreamIdleTimeout:               kubeCfg.StreamingConnectionIdleTimeout.Duration,
2433		StreamCreationTimeout:           streaming.DefaultConfig.StreamCreationTimeout,
2434		SupportedRemoteCommandProtocols: streaming.DefaultConfig.SupportedRemoteCommandProtocols,
2435		SupportedPortForwardProtocols:   streaming.DefaultConfig.SupportedPortForwardProtocols,
2436	}
2437	config.Addr = net.JoinHostPort("localhost", "0")
2438	return config
2439}
2440