1/* 2Copyright 2015 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package kubelet 18 19import ( 20 "context" 21 "crypto/tls" 22 "fmt" 23 "math" 24 "net" 25 "net/http" 26 "os" 27 "path" 28 sysruntime "runtime" 29 "sort" 30 "strings" 31 "sync" 32 "sync/atomic" 33 "time" 34 35 "k8s.io/client-go/informers" 36 37 cadvisorapi "github.com/google/cadvisor/info/v1" 38 libcontaineruserns "github.com/opencontainers/runc/libcontainer/userns" 39 "k8s.io/mount-utils" 40 "k8s.io/utils/integer" 41 42 v1 "k8s.io/api/core/v1" 43 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 44 "k8s.io/apimachinery/pkg/fields" 45 "k8s.io/apimachinery/pkg/labels" 46 "k8s.io/apimachinery/pkg/types" 47 "k8s.io/apimachinery/pkg/util/clock" 48 utilruntime "k8s.io/apimachinery/pkg/util/runtime" 49 "k8s.io/apimachinery/pkg/util/sets" 50 "k8s.io/apimachinery/pkg/util/wait" 51 utilfeature "k8s.io/apiserver/pkg/util/feature" 52 clientset "k8s.io/client-go/kubernetes" 53 v1core "k8s.io/client-go/kubernetes/typed/core/v1" 54 corelisters "k8s.io/client-go/listers/core/v1" 55 "k8s.io/client-go/tools/cache" 56 "k8s.io/client-go/tools/record" 57 "k8s.io/client-go/util/certificate" 58 "k8s.io/client-go/util/flowcontrol" 59 cloudprovider "k8s.io/cloud-provider" 60 "k8s.io/component-helpers/apimachinery/lease" 61 internalapi "k8s.io/cri-api/pkg/apis" 62 "k8s.io/klog/v2" 63 pluginwatcherapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1" 64 statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1" 65 api "k8s.io/kubernetes/pkg/apis/core" 66 "k8s.io/kubernetes/pkg/features" 67 kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config" 68 "k8s.io/kubernetes/pkg/kubelet/apis/podresources" 69 "k8s.io/kubernetes/pkg/kubelet/cadvisor" 70 kubeletcertificate "k8s.io/kubernetes/pkg/kubelet/certificate" 71 "k8s.io/kubernetes/pkg/kubelet/cloudresource" 72 "k8s.io/kubernetes/pkg/kubelet/cm" 73 "k8s.io/kubernetes/pkg/kubelet/config" 74 "k8s.io/kubernetes/pkg/kubelet/configmap" 75 kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" 76 "k8s.io/kubernetes/pkg/kubelet/cri/remote" 77 "k8s.io/kubernetes/pkg/kubelet/cri/streaming" 78 "k8s.io/kubernetes/pkg/kubelet/events" 79 "k8s.io/kubernetes/pkg/kubelet/eviction" 80 "k8s.io/kubernetes/pkg/kubelet/images" 81 "k8s.io/kubernetes/pkg/kubelet/kubeletconfig" 82 "k8s.io/kubernetes/pkg/kubelet/kuberuntime" 83 "k8s.io/kubernetes/pkg/kubelet/legacy" 84 "k8s.io/kubernetes/pkg/kubelet/lifecycle" 85 "k8s.io/kubernetes/pkg/kubelet/logs" 86 "k8s.io/kubernetes/pkg/kubelet/metrics" 87 "k8s.io/kubernetes/pkg/kubelet/metrics/collectors" 88 "k8s.io/kubernetes/pkg/kubelet/network/dns" 89 "k8s.io/kubernetes/pkg/kubelet/nodeshutdown" 90 oomwatcher "k8s.io/kubernetes/pkg/kubelet/oom" 91 "k8s.io/kubernetes/pkg/kubelet/pleg" 92 "k8s.io/kubernetes/pkg/kubelet/pluginmanager" 93 plugincache "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache" 94 kubepod "k8s.io/kubernetes/pkg/kubelet/pod" 95 "k8s.io/kubernetes/pkg/kubelet/preemption" 96 "k8s.io/kubernetes/pkg/kubelet/prober" 97 proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" 98 "k8s.io/kubernetes/pkg/kubelet/runtimeclass" 99 "k8s.io/kubernetes/pkg/kubelet/secret" 100 "k8s.io/kubernetes/pkg/kubelet/server" 101 servermetrics "k8s.io/kubernetes/pkg/kubelet/server/metrics" 102 serverstats "k8s.io/kubernetes/pkg/kubelet/server/stats" 103 "k8s.io/kubernetes/pkg/kubelet/stats" 104 "k8s.io/kubernetes/pkg/kubelet/status" 105 "k8s.io/kubernetes/pkg/kubelet/sysctl" 106 "k8s.io/kubernetes/pkg/kubelet/token" 107 kubetypes "k8s.io/kubernetes/pkg/kubelet/types" 108 "k8s.io/kubernetes/pkg/kubelet/util" 109 "k8s.io/kubernetes/pkg/kubelet/util/format" 110 "k8s.io/kubernetes/pkg/kubelet/util/manager" 111 "k8s.io/kubernetes/pkg/kubelet/util/queue" 112 "k8s.io/kubernetes/pkg/kubelet/util/sliceutils" 113 "k8s.io/kubernetes/pkg/kubelet/volumemanager" 114 "k8s.io/kubernetes/pkg/security/apparmor" 115 sysctlwhitelist "k8s.io/kubernetes/pkg/security/podsecuritypolicy/sysctl" 116 "k8s.io/kubernetes/pkg/util/oom" 117 "k8s.io/kubernetes/pkg/util/selinux" 118 "k8s.io/kubernetes/pkg/volume" 119 "k8s.io/kubernetes/pkg/volume/csi" 120 "k8s.io/kubernetes/pkg/volume/util/hostutil" 121 "k8s.io/kubernetes/pkg/volume/util/subpath" 122 "k8s.io/kubernetes/pkg/volume/util/volumepathhandler" 123) 124 125const ( 126 // Max amount of time to wait for the container runtime to come up. 127 maxWaitForContainerRuntime = 30 * time.Second 128 129 // nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed. 130 nodeStatusUpdateRetry = 5 131 132 // ContainerLogsDir is the location of container logs. 133 ContainerLogsDir = "/var/log/containers" 134 135 // MaxContainerBackOff is the max backoff period, exported for the e2e test 136 MaxContainerBackOff = 300 * time.Second 137 138 // Period for performing global cleanup tasks. 139 housekeepingPeriod = time.Second * 2 140 141 // Duration at which housekeeping failed to satisfy the invariant that 142 // housekeeping should be fast to avoid blocking pod config (while 143 // housekeeping is running no new pods are started or deleted). 144 housekeepingWarningDuration = time.Second * 15 145 146 // Period for performing eviction monitoring. 147 // ensure this is kept in sync with internal cadvisor housekeeping. 148 evictionMonitoringPeriod = time.Second * 10 149 150 // The path in containers' filesystems where the hosts file is mounted. 151 linuxEtcHostsPath = "/etc/hosts" 152 windowsEtcHostsPath = "C:\\Windows\\System32\\drivers\\etc\\hosts" 153 154 // Capacity of the channel for receiving pod lifecycle events. This number 155 // is a bit arbitrary and may be adjusted in the future. 156 plegChannelCapacity = 1000 157 158 // Generic PLEG relies on relisting for discovering container events. 159 // A longer period means that kubelet will take longer to detect container 160 // changes and to update pod status. On the other hand, a shorter period 161 // will cause more frequent relisting (e.g., container runtime operations), 162 // leading to higher cpu usage. 163 // Note that even though we set the period to 1s, the relisting itself can 164 // take more than 1s to finish if the container runtime responds slowly 165 // and/or when there are many container changes in one cycle. 166 plegRelistPeriod = time.Second * 1 167 168 // backOffPeriod is the period to back off when pod syncing results in an 169 // error. It is also used as the base period for the exponential backoff 170 // container restarts and image pulls. 171 backOffPeriod = time.Second * 10 172 173 // ContainerGCPeriod is the period for performing container garbage collection. 174 ContainerGCPeriod = time.Minute 175 // ImageGCPeriod is the period for performing image garbage collection. 176 ImageGCPeriod = 5 * time.Minute 177 178 // Minimum number of dead containers to keep in a pod 179 minDeadContainerInPod = 1 180 181 // nodeLeaseRenewIntervalFraction is the fraction of lease duration to renew the lease 182 nodeLeaseRenewIntervalFraction = 0.25 183) 184 185var etcHostsPath = getContainerEtcHostsPath() 186 187func getContainerEtcHostsPath() string { 188 if sysruntime.GOOS == "windows" { 189 return windowsEtcHostsPath 190 } 191 return linuxEtcHostsPath 192} 193 194// SyncHandler is an interface implemented by Kubelet, for testability 195type SyncHandler interface { 196 HandlePodAdditions(pods []*v1.Pod) 197 HandlePodUpdates(pods []*v1.Pod) 198 HandlePodRemoves(pods []*v1.Pod) 199 HandlePodReconcile(pods []*v1.Pod) 200 HandlePodSyncs(pods []*v1.Pod) 201 HandlePodCleanups() error 202} 203 204// Option is a functional option type for Kubelet 205type Option func(*Kubelet) 206 207// Bootstrap is a bootstrapping interface for kubelet, targets the initialization protocol 208type Bootstrap interface { 209 GetConfiguration() kubeletconfiginternal.KubeletConfiguration 210 BirthCry() 211 StartGarbageCollection() 212 ListenAndServe(kubeCfg *kubeletconfiginternal.KubeletConfiguration, tlsOptions *server.TLSOptions, auth server.AuthInterface) 213 ListenAndServeReadOnly(address net.IP, port uint) 214 ListenAndServePodResources() 215 Run(<-chan kubetypes.PodUpdate) 216 RunOnce(<-chan kubetypes.PodUpdate) ([]RunPodResult, error) 217} 218 219// Dependencies is a bin for things we might consider "injected dependencies" -- objects constructed 220// at runtime that are necessary for running the Kubelet. This is a temporary solution for grouping 221// these objects while we figure out a more comprehensive dependency injection story for the Kubelet. 222type Dependencies struct { 223 Options []Option 224 225 // Injected Dependencies 226 Auth server.AuthInterface 227 CAdvisorInterface cadvisor.Interface 228 Cloud cloudprovider.Interface 229 ContainerManager cm.ContainerManager 230 DockerOptions *DockerOptions 231 EventClient v1core.EventsGetter 232 HeartbeatClient clientset.Interface 233 OnHeartbeatFailure func() 234 KubeClient clientset.Interface 235 Mounter mount.Interface 236 HostUtil hostutil.HostUtils 237 OOMAdjuster *oom.OOMAdjuster 238 OSInterface kubecontainer.OSInterface 239 PodConfig *config.PodConfig 240 Recorder record.EventRecorder 241 Subpather subpath.Interface 242 VolumePlugins []volume.VolumePlugin 243 DynamicPluginProber volume.DynamicPluginProber 244 TLSOptions *server.TLSOptions 245 KubeletConfigController *kubeletconfig.Controller 246 RemoteRuntimeService internalapi.RuntimeService 247 RemoteImageService internalapi.ImageManagerService 248 dockerLegacyService legacy.DockerLegacyService 249 // remove it after cadvisor.UsingLegacyCadvisorStats dropped. 250 useLegacyCadvisorStats bool 251} 252 253// DockerOptions contains docker specific configuration. Importantly, since it 254// lives outside of `dockershim`, it should not depend on the `docker/docker` 255// client library. 256type DockerOptions struct { 257 DockerEndpoint string 258 RuntimeRequestTimeout time.Duration 259 ImagePullProgressDeadline time.Duration 260} 261 262// makePodSourceConfig creates a config.PodConfig from the given 263// KubeletConfiguration or returns an error. 264func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, nodeHasSynced func() bool) (*config.PodConfig, error) { 265 manifestURLHeader := make(http.Header) 266 if len(kubeCfg.StaticPodURLHeader) > 0 { 267 for k, v := range kubeCfg.StaticPodURLHeader { 268 for i := range v { 269 manifestURLHeader.Add(k, v[i]) 270 } 271 } 272 } 273 274 // source of all configuration 275 cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder) 276 277 // define file config source 278 if kubeCfg.StaticPodPath != "" { 279 klog.InfoS("Adding static pod path", "path", kubeCfg.StaticPodPath) 280 config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource)) 281 } 282 283 // define url config source 284 if kubeCfg.StaticPodURL != "" { 285 klog.InfoS("Adding pod URL with HTTP header", "URL", kubeCfg.StaticPodURL, "header", manifestURLHeader) 286 config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource)) 287 } 288 289 if kubeDeps.KubeClient != nil { 290 klog.InfoS("Adding apiserver pod source") 291 config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, nodeHasSynced, cfg.Channel(kubetypes.ApiserverSource)) 292 } 293 return cfg, nil 294} 295 296// PreInitRuntimeService will init runtime service before RunKubelet. 297func PreInitRuntimeService(kubeCfg *kubeletconfiginternal.KubeletConfiguration, 298 kubeDeps *Dependencies, 299 crOptions *config.ContainerRuntimeOptions, 300 containerRuntime string, 301 runtimeCgroups string, 302 remoteRuntimeEndpoint string, 303 remoteImageEndpoint string, 304 nonMasqueradeCIDR string) error { 305 if remoteRuntimeEndpoint != "" { 306 // remoteImageEndpoint is same as remoteRuntimeEndpoint if not explicitly specified 307 if remoteImageEndpoint == "" { 308 remoteImageEndpoint = remoteRuntimeEndpoint 309 } 310 } 311 312 switch containerRuntime { 313 case kubetypes.DockerContainerRuntime: 314 klog.InfoS("Using dockershim is deprecated, please consider using a full-fledged CRI implementation") 315 if err := runDockershim( 316 kubeCfg, 317 kubeDeps, 318 crOptions, 319 runtimeCgroups, 320 remoteRuntimeEndpoint, 321 remoteImageEndpoint, 322 nonMasqueradeCIDR, 323 ); err != nil { 324 return err 325 } 326 case kubetypes.RemoteContainerRuntime: 327 // No-op. 328 break 329 default: 330 return fmt.Errorf("unsupported CRI runtime: %q", containerRuntime) 331 } 332 333 var err error 334 if kubeDeps.RemoteRuntimeService, err = remote.NewRemoteRuntimeService(remoteRuntimeEndpoint, kubeCfg.RuntimeRequestTimeout.Duration); err != nil { 335 return err 336 } 337 if kubeDeps.RemoteImageService, err = remote.NewRemoteImageService(remoteImageEndpoint, kubeCfg.RuntimeRequestTimeout.Duration); err != nil { 338 return err 339 } 340 341 kubeDeps.useLegacyCadvisorStats = cadvisor.UsingLegacyCadvisorStats(containerRuntime, remoteRuntimeEndpoint) 342 343 return nil 344} 345 346// NewMainKubelet instantiates a new Kubelet object along with all the required internal modules. 347// No initialization of Kubelet and its modules should happen here. 348func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, 349 kubeDeps *Dependencies, 350 crOptions *config.ContainerRuntimeOptions, 351 containerRuntime string, 352 hostname string, 353 hostnameOverridden bool, 354 nodeName types.NodeName, 355 nodeIPs []net.IP, 356 providerID string, 357 cloudProvider string, 358 certDirectory string, 359 rootDirectory string, 360 imageCredentialProviderConfigFile string, 361 imageCredentialProviderBinDir string, 362 registerNode bool, 363 registerWithTaints []api.Taint, 364 allowedUnsafeSysctls []string, 365 experimentalMounterPath string, 366 kernelMemcgNotification bool, 367 experimentalCheckNodeCapabilitiesBeforeMount bool, 368 experimentalNodeAllocatableIgnoreEvictionThreshold bool, 369 minimumGCAge metav1.Duration, 370 maxPerPodContainerCount int32, 371 maxContainerCount int32, 372 masterServiceNamespace string, 373 registerSchedulable bool, 374 keepTerminatedPodVolumes bool, 375 nodeLabels map[string]string, 376 seccompProfileRoot string, 377 nodeStatusMaxImages int32, 378 seccompDefault bool, 379) (*Kubelet, error) { 380 if rootDirectory == "" { 381 return nil, fmt.Errorf("invalid root directory %q", rootDirectory) 382 } 383 if kubeCfg.SyncFrequency.Duration <= 0 { 384 return nil, fmt.Errorf("invalid sync frequency %d", kubeCfg.SyncFrequency.Duration) 385 } 386 387 if kubeCfg.MakeIPTablesUtilChains { 388 if kubeCfg.IPTablesMasqueradeBit > 31 || kubeCfg.IPTablesMasqueradeBit < 0 { 389 return nil, fmt.Errorf("iptables-masquerade-bit is not valid. Must be within [0, 31]") 390 } 391 if kubeCfg.IPTablesDropBit > 31 || kubeCfg.IPTablesDropBit < 0 { 392 return nil, fmt.Errorf("iptables-drop-bit is not valid. Must be within [0, 31]") 393 } 394 if kubeCfg.IPTablesDropBit == kubeCfg.IPTablesMasqueradeBit { 395 return nil, fmt.Errorf("iptables-masquerade-bit and iptables-drop-bit must be different") 396 } 397 } 398 399 if utilfeature.DefaultFeatureGate.Enabled(features.DisableCloudProviders) && cloudprovider.IsDeprecatedInternal(cloudProvider) { 400 cloudprovider.DisableWarningForProvider(cloudProvider) 401 return nil, fmt.Errorf("cloud provider %q was specified, but built-in cloud providers are disabled. Please set --cloud-provider=external and migrate to an external cloud provider", cloudProvider) 402 } 403 404 var nodeHasSynced cache.InformerSynced 405 var nodeLister corelisters.NodeLister 406 407 // If kubeClient == nil, we are running in standalone mode (i.e. no API servers) 408 // If not nil, we are running as part of a cluster and should sync w/API 409 if kubeDeps.KubeClient != nil { 410 kubeInformers := informers.NewSharedInformerFactoryWithOptions(kubeDeps.KubeClient, 0, informers.WithTweakListOptions(func(options *metav1.ListOptions) { 411 options.FieldSelector = fields.Set{metav1.ObjectNameField: string(nodeName)}.String() 412 })) 413 nodeLister = kubeInformers.Core().V1().Nodes().Lister() 414 nodeHasSynced = func() bool { 415 return kubeInformers.Core().V1().Nodes().Informer().HasSynced() 416 } 417 kubeInformers.Start(wait.NeverStop) 418 klog.InfoS("Attempting to sync node with API server") 419 } else { 420 // we don't have a client to sync! 421 nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) 422 nodeLister = corelisters.NewNodeLister(nodeIndexer) 423 nodeHasSynced = func() bool { return true } 424 klog.InfoS("Kubelet is running in standalone mode, will skip API server sync") 425 } 426 427 if kubeDeps.PodConfig == nil { 428 var err error 429 kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, nodeHasSynced) 430 if err != nil { 431 return nil, err 432 } 433 } 434 435 containerGCPolicy := kubecontainer.GCPolicy{ 436 MinAge: minimumGCAge.Duration, 437 MaxPerPodContainer: int(maxPerPodContainerCount), 438 MaxContainers: int(maxContainerCount), 439 } 440 441 daemonEndpoints := &v1.NodeDaemonEndpoints{ 442 KubeletEndpoint: v1.DaemonEndpoint{Port: kubeCfg.Port}, 443 } 444 445 imageGCPolicy := images.ImageGCPolicy{ 446 MinAge: kubeCfg.ImageMinimumGCAge.Duration, 447 HighThresholdPercent: int(kubeCfg.ImageGCHighThresholdPercent), 448 LowThresholdPercent: int(kubeCfg.ImageGCLowThresholdPercent), 449 } 450 451 enforceNodeAllocatable := kubeCfg.EnforceNodeAllocatable 452 if experimentalNodeAllocatableIgnoreEvictionThreshold { 453 // Do not provide kubeCfg.EnforceNodeAllocatable to eviction threshold parsing if we are not enforcing Evictions 454 enforceNodeAllocatable = []string{} 455 } 456 thresholds, err := eviction.ParseThresholdConfig(enforceNodeAllocatable, kubeCfg.EvictionHard, kubeCfg.EvictionSoft, kubeCfg.EvictionSoftGracePeriod, kubeCfg.EvictionMinimumReclaim) 457 if err != nil { 458 return nil, err 459 } 460 evictionConfig := eviction.Config{ 461 PressureTransitionPeriod: kubeCfg.EvictionPressureTransitionPeriod.Duration, 462 MaxPodGracePeriodSeconds: int64(kubeCfg.EvictionMaxPodGracePeriod), 463 Thresholds: thresholds, 464 KernelMemcgNotification: kernelMemcgNotification, 465 PodCgroupRoot: kubeDeps.ContainerManager.GetPodCgroupRoot(), 466 } 467 468 var serviceLister corelisters.ServiceLister 469 var serviceHasSynced cache.InformerSynced 470 if kubeDeps.KubeClient != nil { 471 kubeInformers := informers.NewSharedInformerFactory(kubeDeps.KubeClient, 0) 472 serviceLister = kubeInformers.Core().V1().Services().Lister() 473 serviceHasSynced = kubeInformers.Core().V1().Services().Informer().HasSynced 474 kubeInformers.Start(wait.NeverStop) 475 } else { 476 serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) 477 serviceLister = corelisters.NewServiceLister(serviceIndexer) 478 serviceHasSynced = func() bool { return true } 479 } 480 481 // construct a node reference used for events 482 nodeRef := &v1.ObjectReference{ 483 Kind: "Node", 484 Name: string(nodeName), 485 UID: types.UID(nodeName), 486 Namespace: "", 487 } 488 489 oomWatcher, err := oomwatcher.NewWatcher(kubeDeps.Recorder) 490 if err != nil { 491 if libcontaineruserns.RunningInUserNS() { 492 if utilfeature.DefaultFeatureGate.Enabled(features.KubeletInUserNamespace) { 493 // oomwatcher.NewWatcher returns "open /dev/kmsg: operation not permitted" error, 494 // when running in a user namespace with sysctl value `kernel.dmesg_restrict=1`. 495 klog.V(2).InfoS("Failed to create an oomWatcher (running in UserNS, ignoring)", "err", err) 496 oomWatcher = nil 497 } else { 498 klog.ErrorS(err, "Failed to create an oomWatcher (running in UserNS, Hint: enable KubeletInUserNamespace feature flag to ignore the error)") 499 return nil, err 500 } 501 } else { 502 return nil, err 503 } 504 } 505 506 clusterDNS := make([]net.IP, 0, len(kubeCfg.ClusterDNS)) 507 for _, ipEntry := range kubeCfg.ClusterDNS { 508 ip := net.ParseIP(ipEntry) 509 if ip == nil { 510 klog.InfoS("Invalid clusterDNS IP", "IP", ipEntry) 511 } else { 512 clusterDNS = append(clusterDNS, ip) 513 } 514 } 515 httpClient := &http.Client{} 516 517 klet := &Kubelet{ 518 hostname: hostname, 519 hostnameOverridden: hostnameOverridden, 520 nodeName: nodeName, 521 kubeClient: kubeDeps.KubeClient, 522 heartbeatClient: kubeDeps.HeartbeatClient, 523 onRepeatedHeartbeatFailure: kubeDeps.OnHeartbeatFailure, 524 rootDirectory: rootDirectory, 525 resyncInterval: kubeCfg.SyncFrequency.Duration, 526 sourcesReady: config.NewSourcesReady(kubeDeps.PodConfig.SeenAllSources), 527 registerNode: registerNode, 528 registerWithTaints: registerWithTaints, 529 registerSchedulable: registerSchedulable, 530 dnsConfigurer: dns.NewConfigurer(kubeDeps.Recorder, nodeRef, nodeIPs, clusterDNS, kubeCfg.ClusterDomain, kubeCfg.ResolverConfig), 531 serviceLister: serviceLister, 532 serviceHasSynced: serviceHasSynced, 533 nodeLister: nodeLister, 534 nodeHasSynced: nodeHasSynced, 535 masterServiceNamespace: masterServiceNamespace, 536 streamingConnectionIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration, 537 recorder: kubeDeps.Recorder, 538 cadvisor: kubeDeps.CAdvisorInterface, 539 cloud: kubeDeps.Cloud, 540 externalCloudProvider: cloudprovider.IsExternal(cloudProvider), 541 providerID: providerID, 542 nodeRef: nodeRef, 543 nodeLabels: nodeLabels, 544 nodeStatusUpdateFrequency: kubeCfg.NodeStatusUpdateFrequency.Duration, 545 nodeStatusReportFrequency: kubeCfg.NodeStatusReportFrequency.Duration, 546 os: kubeDeps.OSInterface, 547 oomWatcher: oomWatcher, 548 cgroupsPerQOS: kubeCfg.CgroupsPerQOS, 549 cgroupRoot: kubeCfg.CgroupRoot, 550 mounter: kubeDeps.Mounter, 551 hostutil: kubeDeps.HostUtil, 552 subpather: kubeDeps.Subpather, 553 maxPods: int(kubeCfg.MaxPods), 554 podsPerCore: int(kubeCfg.PodsPerCore), 555 syncLoopMonitor: atomic.Value{}, 556 daemonEndpoints: daemonEndpoints, 557 containerManager: kubeDeps.ContainerManager, 558 containerRuntimeName: containerRuntime, 559 nodeIPs: nodeIPs, 560 nodeIPValidator: validateNodeIP, 561 clock: clock.RealClock{}, 562 enableControllerAttachDetach: kubeCfg.EnableControllerAttachDetach, 563 makeIPTablesUtilChains: kubeCfg.MakeIPTablesUtilChains, 564 iptablesMasqueradeBit: int(kubeCfg.IPTablesMasqueradeBit), 565 iptablesDropBit: int(kubeCfg.IPTablesDropBit), 566 experimentalHostUserNamespaceDefaulting: utilfeature.DefaultFeatureGate.Enabled(features.ExperimentalHostUserNamespaceDefaultingGate), 567 keepTerminatedPodVolumes: keepTerminatedPodVolumes, 568 nodeStatusMaxImages: nodeStatusMaxImages, 569 lastContainerStartedTime: newTimeCache(), 570 } 571 572 if klet.cloud != nil { 573 klet.cloudResourceSyncManager = cloudresource.NewSyncManager(klet.cloud, nodeName, klet.nodeStatusUpdateFrequency) 574 } 575 576 var secretManager secret.Manager 577 var configMapManager configmap.Manager 578 switch kubeCfg.ConfigMapAndSecretChangeDetectionStrategy { 579 case kubeletconfiginternal.WatchChangeDetectionStrategy: 580 secretManager = secret.NewWatchingSecretManager(kubeDeps.KubeClient, klet.resyncInterval) 581 configMapManager = configmap.NewWatchingConfigMapManager(kubeDeps.KubeClient, klet.resyncInterval) 582 case kubeletconfiginternal.TTLCacheChangeDetectionStrategy: 583 secretManager = secret.NewCachingSecretManager( 584 kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode)) 585 configMapManager = configmap.NewCachingConfigMapManager( 586 kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode)) 587 case kubeletconfiginternal.GetChangeDetectionStrategy: 588 secretManager = secret.NewSimpleSecretManager(kubeDeps.KubeClient) 589 configMapManager = configmap.NewSimpleConfigMapManager(kubeDeps.KubeClient) 590 default: 591 return nil, fmt.Errorf("unknown configmap and secret manager mode: %v", kubeCfg.ConfigMapAndSecretChangeDetectionStrategy) 592 } 593 594 klet.secretManager = secretManager 595 klet.configMapManager = configMapManager 596 597 if klet.experimentalHostUserNamespaceDefaulting { 598 klog.InfoS("Experimental host user namespace defaulting is enabled") 599 } 600 601 machineInfo, err := klet.cadvisor.MachineInfo() 602 if err != nil { 603 return nil, err 604 } 605 // Avoid collector collects it as a timestamped metric 606 // See PR #95210 and #97006 for more details. 607 machineInfo.Timestamp = time.Time{} 608 klet.setCachedMachineInfo(machineInfo) 609 610 imageBackOff := flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff) 611 612 klet.livenessManager = proberesults.NewManager() 613 klet.readinessManager = proberesults.NewManager() 614 klet.startupManager = proberesults.NewManager() 615 klet.podCache = kubecontainer.NewCache() 616 617 // podManager is also responsible for keeping secretManager and configMapManager contents up-to-date. 618 mirrorPodClient := kubepod.NewBasicMirrorClient(klet.kubeClient, string(nodeName), nodeLister) 619 klet.podManager = kubepod.NewBasicPodManager(mirrorPodClient, secretManager, configMapManager) 620 621 klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet) 622 623 klet.resourceAnalyzer = serverstats.NewResourceAnalyzer(klet, kubeCfg.VolumeStatsAggPeriod.Duration, kubeDeps.Recorder) 624 625 klet.dockerLegacyService = kubeDeps.dockerLegacyService 626 klet.runtimeService = kubeDeps.RemoteRuntimeService 627 628 if kubeDeps.KubeClient != nil { 629 klet.runtimeClassManager = runtimeclass.NewManager(kubeDeps.KubeClient) 630 } 631 632 if containerRuntime == kubetypes.RemoteContainerRuntime { 633 // setup containerLogManager for CRI container runtime 634 containerLogManager, err := logs.NewContainerLogManager( 635 klet.runtimeService, 636 kubeDeps.OSInterface, 637 kubeCfg.ContainerLogMaxSize, 638 int(kubeCfg.ContainerLogMaxFiles), 639 ) 640 if err != nil { 641 return nil, fmt.Errorf("failed to initialize container log manager: %v", err) 642 } 643 klet.containerLogManager = containerLogManager 644 } else { 645 klet.containerLogManager = logs.NewStubContainerLogManager() 646 } 647 648 klet.reasonCache = NewReasonCache() 649 klet.workQueue = queue.NewBasicWorkQueue(klet.clock) 650 klet.podWorkers = newPodWorkers( 651 klet.syncPod, 652 klet.syncTerminatingPod, 653 klet.syncTerminatedPod, 654 655 kubeDeps.Recorder, 656 klet.workQueue, 657 klet.resyncInterval, 658 backOffPeriod, 659 klet.podCache, 660 ) 661 662 runtime, err := kuberuntime.NewKubeGenericRuntimeManager( 663 kubecontainer.FilterEventRecorder(kubeDeps.Recorder), 664 klet.livenessManager, 665 klet.readinessManager, 666 klet.startupManager, 667 seccompProfileRoot, 668 machineInfo, 669 klet.podWorkers, 670 kubeDeps.OSInterface, 671 klet, 672 httpClient, 673 imageBackOff, 674 kubeCfg.SerializeImagePulls, 675 float32(kubeCfg.RegistryPullQPS), 676 int(kubeCfg.RegistryBurst), 677 imageCredentialProviderConfigFile, 678 imageCredentialProviderBinDir, 679 kubeCfg.CPUCFSQuota, 680 kubeCfg.CPUCFSQuotaPeriod, 681 kubeDeps.RemoteRuntimeService, 682 kubeDeps.RemoteImageService, 683 kubeDeps.ContainerManager.InternalContainerLifecycle(), 684 kubeDeps.dockerLegacyService, 685 klet.containerLogManager, 686 klet.runtimeClassManager, 687 seccompDefault, 688 kubeCfg.MemorySwap.SwapBehavior, 689 kubeDeps.ContainerManager.GetNodeAllocatableAbsolute, 690 *kubeCfg.MemoryThrottlingFactor, 691 ) 692 if err != nil { 693 return nil, err 694 } 695 klet.containerRuntime = runtime 696 klet.streamingRuntime = runtime 697 klet.runner = runtime 698 699 runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime) 700 if err != nil { 701 return nil, err 702 } 703 klet.runtimeCache = runtimeCache 704 705 // common provider to get host file system usage associated with a pod managed by kubelet 706 hostStatsProvider := stats.NewHostStatsProvider(kubecontainer.RealOS{}, func(podUID types.UID) (string, bool) { 707 return getEtcHostsPath(klet.getPodDir(podUID)), klet.containerRuntime.SupportsSingleFileMapping() 708 }) 709 if kubeDeps.useLegacyCadvisorStats { 710 klet.StatsProvider = stats.NewCadvisorStatsProvider( 711 klet.cadvisor, 712 klet.resourceAnalyzer, 713 klet.podManager, 714 klet.runtimeCache, 715 klet.containerRuntime, 716 klet.statusManager, 717 hostStatsProvider) 718 } else { 719 klet.StatsProvider = stats.NewCRIStatsProvider( 720 klet.cadvisor, 721 klet.resourceAnalyzer, 722 klet.podManager, 723 klet.runtimeCache, 724 kubeDeps.RemoteRuntimeService, 725 kubeDeps.RemoteImageService, 726 hostStatsProvider, 727 utilfeature.DefaultFeatureGate.Enabled(features.DisableAcceleratorUsageMetrics)) 728 } 729 730 klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{}) 731 klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime) 732 klet.runtimeState.addHealthCheck("PLEG", klet.pleg.Healthy) 733 if _, err := klet.updatePodCIDR(kubeCfg.PodCIDR); err != nil { 734 klog.ErrorS(err, "Pod CIDR update failed") 735 } 736 737 // setup containerGC 738 containerGC, err := kubecontainer.NewContainerGC(klet.containerRuntime, containerGCPolicy, klet.sourcesReady) 739 if err != nil { 740 return nil, err 741 } 742 klet.containerGC = containerGC 743 klet.containerDeletor = newPodContainerDeletor(klet.containerRuntime, integer.IntMax(containerGCPolicy.MaxPerPodContainer, minDeadContainerInPod)) 744 745 // setup imageManager 746 imageManager, err := images.NewImageGCManager(klet.containerRuntime, klet.StatsProvider, kubeDeps.Recorder, nodeRef, imageGCPolicy, crOptions.PodSandboxImage) 747 if err != nil { 748 return nil, fmt.Errorf("failed to initialize image manager: %v", err) 749 } 750 klet.imageManager = imageManager 751 752 if kubeCfg.ServerTLSBootstrap && kubeDeps.TLSOptions != nil && utilfeature.DefaultFeatureGate.Enabled(features.RotateKubeletServerCertificate) { 753 klet.serverCertificateManager, err = kubeletcertificate.NewKubeletServerCertificateManager(klet.kubeClient, kubeCfg, klet.nodeName, klet.getLastObservedNodeAddresses, certDirectory) 754 if err != nil { 755 return nil, fmt.Errorf("failed to initialize certificate manager: %v", err) 756 } 757 kubeDeps.TLSOptions.Config.GetCertificate = func(*tls.ClientHelloInfo) (*tls.Certificate, error) { 758 cert := klet.serverCertificateManager.Current() 759 if cert == nil { 760 return nil, fmt.Errorf("no serving certificate available for the kubelet") 761 } 762 return cert, nil 763 } 764 } 765 766 klet.probeManager = prober.NewManager( 767 klet.statusManager, 768 klet.livenessManager, 769 klet.readinessManager, 770 klet.startupManager, 771 klet.runner, 772 kubeDeps.Recorder) 773 774 tokenManager := token.NewManager(kubeDeps.KubeClient) 775 776 // NewInitializedVolumePluginMgr initializes some storageErrors on the Kubelet runtimeState (in csi_plugin.go init) 777 // which affects node ready status. This function must be called before Kubelet is initialized so that the Node 778 // ReadyState is accurate with the storage state. 779 klet.volumePluginMgr, err = 780 NewInitializedVolumePluginMgr(klet, secretManager, configMapManager, tokenManager, kubeDeps.VolumePlugins, kubeDeps.DynamicPluginProber) 781 if err != nil { 782 return nil, err 783 } 784 klet.pluginManager = pluginmanager.NewPluginManager( 785 klet.getPluginsRegistrationDir(), /* sockDir */ 786 kubeDeps.Recorder, 787 ) 788 789 // If the experimentalMounterPathFlag is set, we do not want to 790 // check node capabilities since the mount path is not the default 791 if len(experimentalMounterPath) != 0 { 792 experimentalCheckNodeCapabilitiesBeforeMount = false 793 // Replace the nameserver in containerized-mounter's rootfs/etc/resolve.conf with kubelet.ClusterDNS 794 // so that service name could be resolved 795 klet.dnsConfigurer.SetupDNSinContainerizedMounter(experimentalMounterPath) 796 } 797 798 // setup volumeManager 799 klet.volumeManager = volumemanager.NewVolumeManager( 800 kubeCfg.EnableControllerAttachDetach, 801 nodeName, 802 klet.podManager, 803 klet.podWorkers, 804 klet.kubeClient, 805 klet.volumePluginMgr, 806 klet.containerRuntime, 807 kubeDeps.Mounter, 808 kubeDeps.HostUtil, 809 klet.getPodsDir(), 810 kubeDeps.Recorder, 811 experimentalCheckNodeCapabilitiesBeforeMount, 812 keepTerminatedPodVolumes, 813 volumepathhandler.NewBlockVolumePathHandler()) 814 815 klet.backOff = flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff) 816 817 // setup eviction manager 818 evictionManager, evictionAdmitHandler := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, killPodNow(klet.podWorkers, kubeDeps.Recorder), klet.podManager.GetMirrorPodByPod, klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock) 819 820 klet.evictionManager = evictionManager 821 klet.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler) 822 823 // Safe, whitelisted sysctls can always be used as unsafe sysctls in the spec. 824 // Hence, we concatenate those two lists. 825 safeAndUnsafeSysctls := append(sysctlwhitelist.SafeSysctlWhitelist(), allowedUnsafeSysctls...) 826 sysctlsWhitelist, err := sysctl.NewWhitelist(safeAndUnsafeSysctls) 827 if err != nil { 828 return nil, err 829 } 830 klet.admitHandlers.AddPodAdmitHandler(sysctlsWhitelist) 831 832 // enable active deadline handler 833 activeDeadlineHandler, err := newActiveDeadlineHandler(klet.statusManager, kubeDeps.Recorder, klet.clock) 834 if err != nil { 835 return nil, err 836 } 837 klet.AddPodSyncLoopHandler(activeDeadlineHandler) 838 klet.AddPodSyncHandler(activeDeadlineHandler) 839 840 klet.admitHandlers.AddPodAdmitHandler(klet.containerManager.GetAllocateResourcesPodAdmitHandler()) 841 842 criticalPodAdmissionHandler := preemption.NewCriticalPodAdmissionHandler(klet.GetActivePods, killPodNow(klet.podWorkers, kubeDeps.Recorder), kubeDeps.Recorder) 843 klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, criticalPodAdmissionHandler, klet.containerManager.UpdatePluginResources)) 844 // apply functional Option's 845 for _, opt := range kubeDeps.Options { 846 opt(klet) 847 } 848 849 if sysruntime.GOOS == "linux" { 850 // AppArmor is a Linux kernel security module and it does not support other operating systems. 851 klet.appArmorValidator = apparmor.NewValidator(containerRuntime) 852 klet.softAdmitHandlers.AddPodAdmitHandler(lifecycle.NewAppArmorAdmitHandler(klet.appArmorValidator)) 853 } 854 klet.softAdmitHandlers.AddPodAdmitHandler(lifecycle.NewNoNewPrivsAdmitHandler(klet.containerRuntime)) 855 klet.softAdmitHandlers.AddPodAdmitHandler(lifecycle.NewProcMountAdmitHandler(klet.containerRuntime)) 856 857 leaseDuration := time.Duration(kubeCfg.NodeLeaseDurationSeconds) * time.Second 858 renewInterval := time.Duration(float64(leaseDuration) * nodeLeaseRenewIntervalFraction) 859 klet.nodeLeaseController = lease.NewController( 860 klet.clock, 861 klet.heartbeatClient, 862 string(klet.nodeName), 863 kubeCfg.NodeLeaseDurationSeconds, 864 klet.onRepeatedHeartbeatFailure, 865 renewInterval, 866 v1.NamespaceNodeLease, 867 util.SetNodeOwnerFunc(klet.heartbeatClient, string(klet.nodeName))) 868 869 // setup node shutdown manager 870 shutdownManager, shutdownAdmitHandler := nodeshutdown.NewManager(klet.GetActivePods, killPodNow(klet.podWorkers, kubeDeps.Recorder), klet.syncNodeStatus, kubeCfg.ShutdownGracePeriod.Duration, kubeCfg.ShutdownGracePeriodCriticalPods.Duration) 871 872 klet.shutdownManager = shutdownManager 873 klet.admitHandlers.AddPodAdmitHandler(shutdownAdmitHandler) 874 875 // Finally, put the most recent version of the config on the Kubelet, so 876 // people can see how it was configured. 877 klet.kubeletConfiguration = *kubeCfg 878 879 // Generating the status funcs should be the last thing we do, 880 // since this relies on the rest of the Kubelet having been constructed. 881 klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs() 882 883 return klet, nil 884} 885 886type serviceLister interface { 887 List(labels.Selector) ([]*v1.Service, error) 888} 889 890// Kubelet is the main kubelet implementation. 891type Kubelet struct { 892 kubeletConfiguration kubeletconfiginternal.KubeletConfiguration 893 894 // hostname is the hostname the kubelet detected or was given via flag/config 895 hostname string 896 // hostnameOverridden indicates the hostname was overridden via flag/config 897 hostnameOverridden bool 898 899 nodeName types.NodeName 900 runtimeCache kubecontainer.RuntimeCache 901 kubeClient clientset.Interface 902 heartbeatClient clientset.Interface 903 rootDirectory string 904 905 lastObservedNodeAddressesMux sync.RWMutex 906 lastObservedNodeAddresses []v1.NodeAddress 907 908 // onRepeatedHeartbeatFailure is called when a heartbeat operation fails more than once. optional. 909 onRepeatedHeartbeatFailure func() 910 911 // podWorkers handle syncing Pods in response to events. 912 podWorkers PodWorkers 913 914 // resyncInterval is the interval between periodic full reconciliations of 915 // pods on this node. 916 resyncInterval time.Duration 917 918 // sourcesReady records the sources seen by the kubelet, it is thread-safe. 919 sourcesReady config.SourcesReady 920 921 // podManager is a facade that abstracts away the various sources of pods 922 // this Kubelet services. 923 podManager kubepod.Manager 924 925 // Needed to observe and respond to situations that could impact node stability 926 evictionManager eviction.Manager 927 928 // Optional, defaults to /logs/ from /var/log 929 logServer http.Handler 930 // Optional, defaults to simple Docker implementation 931 runner kubecontainer.CommandRunner 932 933 // cAdvisor used for container information. 934 cadvisor cadvisor.Interface 935 936 // Set to true to have the node register itself with the apiserver. 937 registerNode bool 938 // List of taints to add to a node object when the kubelet registers itself. 939 registerWithTaints []api.Taint 940 // Set to true to have the node register itself as schedulable. 941 registerSchedulable bool 942 // for internal book keeping; access only from within registerWithApiserver 943 registrationCompleted bool 944 945 // dnsConfigurer is used for setting up DNS resolver configuration when launching pods. 946 dnsConfigurer *dns.Configurer 947 948 // masterServiceNamespace is the namespace that the master service is exposed in. 949 masterServiceNamespace string 950 // serviceLister knows how to list services 951 serviceLister serviceLister 952 // serviceHasSynced indicates whether services have been sync'd at least once. 953 // Check this before trusting a response from the lister. 954 serviceHasSynced cache.InformerSynced 955 // nodeLister knows how to list nodes 956 nodeLister corelisters.NodeLister 957 // nodeHasSynced indicates whether nodes have been sync'd at least once. 958 // Check this before trusting a response from the node lister. 959 nodeHasSynced cache.InformerSynced 960 // a list of node labels to register 961 nodeLabels map[string]string 962 963 // Last timestamp when runtime responded on ping. 964 // Mutex is used to protect this value. 965 runtimeState *runtimeState 966 967 // Volume plugins. 968 volumePluginMgr *volume.VolumePluginMgr 969 970 // Handles container probing. 971 probeManager prober.Manager 972 // Manages container health check results. 973 livenessManager proberesults.Manager 974 readinessManager proberesults.Manager 975 startupManager proberesults.Manager 976 977 // How long to keep idle streaming command execution/port forwarding 978 // connections open before terminating them 979 streamingConnectionIdleTimeout time.Duration 980 981 // The EventRecorder to use 982 recorder record.EventRecorder 983 984 // Policy for handling garbage collection of dead containers. 985 containerGC kubecontainer.GC 986 987 // Manager for image garbage collection. 988 imageManager images.ImageGCManager 989 990 // Manager for container logs. 991 containerLogManager logs.ContainerLogManager 992 993 // Secret manager. 994 secretManager secret.Manager 995 996 // ConfigMap manager. 997 configMapManager configmap.Manager 998 999 // Cached MachineInfo returned by cadvisor. 1000 machineInfoLock sync.RWMutex 1001 machineInfo *cadvisorapi.MachineInfo 1002 1003 // Handles certificate rotations. 1004 serverCertificateManager certificate.Manager 1005 1006 // Syncs pods statuses with apiserver; also used as a cache of statuses. 1007 statusManager status.Manager 1008 1009 // VolumeManager runs a set of asynchronous loops that figure out which 1010 // volumes need to be attached/mounted/unmounted/detached based on the pods 1011 // scheduled on this node and makes it so. 1012 volumeManager volumemanager.VolumeManager 1013 1014 // Cloud provider interface. 1015 cloud cloudprovider.Interface 1016 // Handles requests to cloud provider with timeout 1017 cloudResourceSyncManager cloudresource.SyncManager 1018 1019 // Indicates that the node initialization happens in an external cloud controller 1020 externalCloudProvider bool 1021 // Reference to this node. 1022 nodeRef *v1.ObjectReference 1023 1024 // The name of the container runtime 1025 containerRuntimeName string 1026 1027 // Container runtime. 1028 containerRuntime kubecontainer.Runtime 1029 1030 // Streaming runtime handles container streaming. 1031 streamingRuntime kubecontainer.StreamingRuntime 1032 1033 // Container runtime service (needed by container runtime Start()). 1034 runtimeService internalapi.RuntimeService 1035 1036 // reasonCache caches the failure reason of the last creation of all containers, which is 1037 // used for generating ContainerStatus. 1038 reasonCache *ReasonCache 1039 1040 // nodeStatusUpdateFrequency specifies how often kubelet computes node status. If node lease 1041 // feature is not enabled, it is also the frequency that kubelet posts node status to master. 1042 // In that case, be cautious when changing the constant, it must work with nodeMonitorGracePeriod 1043 // in nodecontroller. There are several constraints: 1044 // 1. nodeMonitorGracePeriod must be N times more than nodeStatusUpdateFrequency, where 1045 // N means number of retries allowed for kubelet to post node status. It is pointless 1046 // to make nodeMonitorGracePeriod be less than nodeStatusUpdateFrequency, since there 1047 // will only be fresh values from Kubelet at an interval of nodeStatusUpdateFrequency. 1048 // The constant must be less than podEvictionTimeout. 1049 // 2. nodeStatusUpdateFrequency needs to be large enough for kubelet to generate node 1050 // status. Kubelet may fail to update node status reliably if the value is too small, 1051 // as it takes time to gather all necessary node information. 1052 nodeStatusUpdateFrequency time.Duration 1053 1054 // nodeStatusReportFrequency is the frequency that kubelet posts node 1055 // status to master. It is only used when node lease feature is enabled. 1056 nodeStatusReportFrequency time.Duration 1057 1058 // lastStatusReportTime is the time when node status was last reported. 1059 lastStatusReportTime time.Time 1060 1061 // lastContainerStartedTime is the time of the last ContainerStarted event observed per pod 1062 lastContainerStartedTime *timeCache 1063 1064 // syncNodeStatusMux is a lock on updating the node status, because this path is not thread-safe. 1065 // This lock is used by Kubelet.syncNodeStatus function and shouldn't be used anywhere else. 1066 syncNodeStatusMux sync.Mutex 1067 1068 // updatePodCIDRMux is a lock on updating pod CIDR, because this path is not thread-safe. 1069 // This lock is used by Kubelet.syncNodeStatus function and shouldn't be used anywhere else. 1070 updatePodCIDRMux sync.Mutex 1071 1072 // updateRuntimeMux is a lock on updating runtime, because this path is not thread-safe. 1073 // This lock is used by Kubelet.updateRuntimeUp function and shouldn't be used anywhere else. 1074 updateRuntimeMux sync.Mutex 1075 1076 // nodeLeaseController claims and renews the node lease for this Kubelet 1077 nodeLeaseController lease.Controller 1078 1079 // Generates pod events. 1080 pleg pleg.PodLifecycleEventGenerator 1081 1082 // Store kubecontainer.PodStatus for all pods. 1083 podCache kubecontainer.Cache 1084 1085 // os is a facade for various syscalls that need to be mocked during testing. 1086 os kubecontainer.OSInterface 1087 1088 // Watcher of out of memory events. 1089 oomWatcher oomwatcher.Watcher 1090 1091 // Monitor resource usage 1092 resourceAnalyzer serverstats.ResourceAnalyzer 1093 1094 // Whether or not we should have the QOS cgroup hierarchy for resource management 1095 cgroupsPerQOS bool 1096 1097 // If non-empty, pass this to the container runtime as the root cgroup. 1098 cgroupRoot string 1099 1100 // Mounter to use for volumes. 1101 mounter mount.Interface 1102 1103 // hostutil to interact with filesystems 1104 hostutil hostutil.HostUtils 1105 1106 // subpather to execute subpath actions 1107 subpather subpath.Interface 1108 1109 // Manager of non-Runtime containers. 1110 containerManager cm.ContainerManager 1111 1112 // Maximum Number of Pods which can be run by this Kubelet 1113 maxPods int 1114 1115 // Monitor Kubelet's sync loop 1116 syncLoopMonitor atomic.Value 1117 1118 // Container restart Backoff 1119 backOff *flowcontrol.Backoff 1120 1121 // Information about the ports which are opened by daemons on Node running this Kubelet server. 1122 daemonEndpoints *v1.NodeDaemonEndpoints 1123 1124 // A queue used to trigger pod workers. 1125 workQueue queue.WorkQueue 1126 1127 // oneTimeInitializer is used to initialize modules that are dependent on the runtime to be up. 1128 oneTimeInitializer sync.Once 1129 1130 // If set, use this IP address or addresses for the node 1131 nodeIPs []net.IP 1132 1133 // use this function to validate the kubelet nodeIP 1134 nodeIPValidator func(net.IP) error 1135 1136 // If non-nil, this is a unique identifier for the node in an external database, eg. cloudprovider 1137 providerID string 1138 1139 // clock is an interface that provides time related functionality in a way that makes it 1140 // easy to test the code. 1141 clock clock.Clock 1142 1143 // handlers called during the tryUpdateNodeStatus cycle 1144 setNodeStatusFuncs []func(*v1.Node) error 1145 1146 lastNodeUnschedulableLock sync.Mutex 1147 // maintains Node.Spec.Unschedulable value from previous run of tryUpdateNodeStatus() 1148 lastNodeUnschedulable bool 1149 1150 // the list of handlers to call during pod admission. 1151 admitHandlers lifecycle.PodAdmitHandlers 1152 1153 // softAdmithandlers are applied to the pod after it is admitted by the Kubelet, but before it is 1154 // run. A pod rejected by a softAdmitHandler will be left in a Pending state indefinitely. If a 1155 // rejected pod should not be recreated, or the scheduler is not aware of the rejection rule, the 1156 // admission rule should be applied by a softAdmitHandler. 1157 softAdmitHandlers lifecycle.PodAdmitHandlers 1158 1159 // the list of handlers to call during pod sync loop. 1160 lifecycle.PodSyncLoopHandlers 1161 1162 // the list of handlers to call during pod sync. 1163 lifecycle.PodSyncHandlers 1164 1165 // the number of allowed pods per core 1166 podsPerCore int 1167 1168 // enableControllerAttachDetach indicates the Attach/Detach controller 1169 // should manage attachment/detachment of volumes scheduled to this node, 1170 // and disable kubelet from executing any attach/detach operations 1171 enableControllerAttachDetach bool 1172 1173 // trigger deleting containers in a pod 1174 containerDeletor *podContainerDeletor 1175 1176 // config iptables util rules 1177 makeIPTablesUtilChains bool 1178 1179 // The bit of the fwmark space to mark packets for SNAT. 1180 iptablesMasqueradeBit int 1181 1182 // The bit of the fwmark space to mark packets for dropping. 1183 iptablesDropBit int 1184 1185 // The AppArmor validator for checking whether AppArmor is supported. 1186 appArmorValidator apparmor.Validator 1187 1188 // experimentalHostUserNamespaceDefaulting sets userns=true when users request host namespaces (pid, ipc, net), 1189 // are using non-namespaced capabilities (mknod, sys_time, sys_module), the pod contains a privileged container, 1190 // or using host path volumes. 1191 // This should only be enabled when the container runtime is performing user remapping AND if the 1192 // experimental behavior is desired. 1193 experimentalHostUserNamespaceDefaulting bool 1194 1195 // dockerLegacyService contains some legacy methods for backward compatibility. 1196 // It should be set only when docker is using non json-file logging driver. 1197 dockerLegacyService legacy.DockerLegacyService 1198 1199 // StatsProvider provides the node and the container stats. 1200 StatsProvider *stats.Provider 1201 1202 // This flag, if set, instructs the kubelet to keep volumes from terminated pods mounted to the node. 1203 // This can be useful for debugging volume related issues. 1204 keepTerminatedPodVolumes bool // DEPRECATED 1205 1206 // pluginmanager runs a set of asynchronous loops that figure out which 1207 // plugins need to be registered/unregistered based on this node and makes it so. 1208 pluginManager pluginmanager.PluginManager 1209 1210 // This flag sets a maximum number of images to report in the node status. 1211 nodeStatusMaxImages int32 1212 1213 // Handles RuntimeClass objects for the Kubelet. 1214 runtimeClassManager *runtimeclass.Manager 1215 1216 // Handles node shutdown events for the Node. 1217 shutdownManager *nodeshutdown.Manager 1218} 1219 1220// ListPodStats is delegated to StatsProvider, which implements stats.Provider interface 1221func (kl *Kubelet) ListPodStats() ([]statsapi.PodStats, error) { 1222 return kl.StatsProvider.ListPodStats() 1223} 1224 1225// ListPodCPUAndMemoryStats is delegated to StatsProvider, which implements stats.Provider interface 1226func (kl *Kubelet) ListPodCPUAndMemoryStats() ([]statsapi.PodStats, error) { 1227 return kl.StatsProvider.ListPodCPUAndMemoryStats() 1228} 1229 1230// ListPodStatsAndUpdateCPUNanoCoreUsage is delegated to StatsProvider, which implements stats.Provider interface 1231func (kl *Kubelet) ListPodStatsAndUpdateCPUNanoCoreUsage() ([]statsapi.PodStats, error) { 1232 return kl.StatsProvider.ListPodStatsAndUpdateCPUNanoCoreUsage() 1233} 1234 1235// ImageFsStats is delegated to StatsProvider, which implements stats.Provider interface 1236func (kl *Kubelet) ImageFsStats() (*statsapi.FsStats, error) { 1237 return kl.StatsProvider.ImageFsStats() 1238} 1239 1240// GetCgroupStats is delegated to StatsProvider, which implements stats.Provider interface 1241func (kl *Kubelet) GetCgroupStats(cgroupName string, updateStats bool) (*statsapi.ContainerStats, *statsapi.NetworkStats, error) { 1242 return kl.StatsProvider.GetCgroupStats(cgroupName, updateStats) 1243} 1244 1245// GetCgroupCPUAndMemoryStats is delegated to StatsProvider, which implements stats.Provider interface 1246func (kl *Kubelet) GetCgroupCPUAndMemoryStats(cgroupName string, updateStats bool) (*statsapi.ContainerStats, error) { 1247 return kl.StatsProvider.GetCgroupCPUAndMemoryStats(cgroupName, updateStats) 1248} 1249 1250// RootFsStats is delegated to StatsProvider, which implements stats.Provider interface 1251func (kl *Kubelet) RootFsStats() (*statsapi.FsStats, error) { 1252 return kl.StatsProvider.RootFsStats() 1253} 1254 1255// GetContainerInfo is delegated to StatsProvider, which implements stats.Provider interface 1256func (kl *Kubelet) GetContainerInfo(podFullName string, uid types.UID, containerName string, req *cadvisorapi.ContainerInfoRequest) (*cadvisorapi.ContainerInfo, error) { 1257 return kl.StatsProvider.GetContainerInfo(podFullName, uid, containerName, req) 1258} 1259 1260// GetRawContainerInfo is delegated to StatsProvider, which implements stats.Provider interface 1261func (kl *Kubelet) GetRawContainerInfo(containerName string, req *cadvisorapi.ContainerInfoRequest, subcontainers bool) (map[string]*cadvisorapi.ContainerInfo, error) { 1262 return kl.StatsProvider.GetRawContainerInfo(containerName, req, subcontainers) 1263} 1264 1265// RlimitStats is delegated to StatsProvider, which implements stats.Provider interface 1266func (kl *Kubelet) RlimitStats() (*statsapi.RlimitStats, error) { 1267 return kl.StatsProvider.RlimitStats() 1268} 1269 1270// setupDataDirs creates: 1271// 1. the root directory 1272// 2. the pods directory 1273// 3. the plugins directory 1274// 4. the pod-resources directory 1275func (kl *Kubelet) setupDataDirs() error { 1276 kl.rootDirectory = path.Clean(kl.rootDirectory) 1277 pluginRegistrationDir := kl.getPluginsRegistrationDir() 1278 pluginsDir := kl.getPluginsDir() 1279 if err := os.MkdirAll(kl.getRootDir(), 0750); err != nil { 1280 return fmt.Errorf("error creating root directory: %v", err) 1281 } 1282 if err := kl.hostutil.MakeRShared(kl.getRootDir()); err != nil { 1283 return fmt.Errorf("error configuring root directory: %v", err) 1284 } 1285 if err := os.MkdirAll(kl.getPodsDir(), 0750); err != nil { 1286 return fmt.Errorf("error creating pods directory: %v", err) 1287 } 1288 if err := os.MkdirAll(kl.getPluginsDir(), 0750); err != nil { 1289 return fmt.Errorf("error creating plugins directory: %v", err) 1290 } 1291 if err := os.MkdirAll(kl.getPluginsRegistrationDir(), 0750); err != nil { 1292 return fmt.Errorf("error creating plugins registry directory: %v", err) 1293 } 1294 if err := os.MkdirAll(kl.getPodResourcesDir(), 0750); err != nil { 1295 return fmt.Errorf("error creating podresources directory: %v", err) 1296 } 1297 if selinux.SELinuxEnabled() { 1298 err := selinux.SetFileLabel(pluginRegistrationDir, config.KubeletPluginsDirSELinuxLabel) 1299 if err != nil { 1300 klog.InfoS("Unprivileged containerized plugins might not work, could not set selinux context on plugin registration dir", "path", pluginRegistrationDir, "err", err) 1301 } 1302 err = selinux.SetFileLabel(pluginsDir, config.KubeletPluginsDirSELinuxLabel) 1303 if err != nil { 1304 klog.InfoS("Unprivileged containerized plugins might not work, could not set selinux context on plugins dir", "path", pluginsDir, "err", err) 1305 } 1306 } 1307 return nil 1308} 1309 1310// StartGarbageCollection starts garbage collection threads. 1311func (kl *Kubelet) StartGarbageCollection() { 1312 loggedContainerGCFailure := false 1313 go wait.Until(func() { 1314 if err := kl.containerGC.GarbageCollect(); err != nil { 1315 klog.ErrorS(err, "Container garbage collection failed") 1316 kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.ContainerGCFailed, err.Error()) 1317 loggedContainerGCFailure = true 1318 } else { 1319 var vLevel klog.Level = 4 1320 if loggedContainerGCFailure { 1321 vLevel = 1 1322 loggedContainerGCFailure = false 1323 } 1324 1325 klog.V(vLevel).InfoS("Container garbage collection succeeded") 1326 } 1327 }, ContainerGCPeriod, wait.NeverStop) 1328 1329 // when the high threshold is set to 100, stub the image GC manager 1330 if kl.kubeletConfiguration.ImageGCHighThresholdPercent == 100 { 1331 klog.V(2).InfoS("ImageGCHighThresholdPercent is set 100, Disable image GC") 1332 return 1333 } 1334 1335 prevImageGCFailed := false 1336 go wait.Until(func() { 1337 if err := kl.imageManager.GarbageCollect(); err != nil { 1338 if prevImageGCFailed { 1339 klog.ErrorS(err, "Image garbage collection failed multiple times in a row") 1340 // Only create an event for repeated failures 1341 kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.ImageGCFailed, err.Error()) 1342 } else { 1343 klog.ErrorS(err, "Image garbage collection failed once. Stats initialization may not have completed yet") 1344 } 1345 prevImageGCFailed = true 1346 } else { 1347 var vLevel klog.Level = 4 1348 if prevImageGCFailed { 1349 vLevel = 1 1350 prevImageGCFailed = false 1351 } 1352 1353 klog.V(vLevel).InfoS("Image garbage collection succeeded") 1354 } 1355 }, ImageGCPeriod, wait.NeverStop) 1356} 1357 1358// initializeModules will initialize internal modules that do not require the container runtime to be up. 1359// Note that the modules here must not depend on modules that are not initialized here. 1360func (kl *Kubelet) initializeModules() error { 1361 // Prometheus metrics. 1362 metrics.Register( 1363 collectors.NewVolumeStatsCollector(kl), 1364 collectors.NewLogMetricsCollector(kl.StatsProvider.ListPodStats), 1365 ) 1366 metrics.SetNodeName(kl.nodeName) 1367 servermetrics.Register() 1368 1369 // Setup filesystem directories. 1370 if err := kl.setupDataDirs(); err != nil { 1371 return err 1372 } 1373 1374 // If the container logs directory does not exist, create it. 1375 if _, err := os.Stat(ContainerLogsDir); err != nil { 1376 if err := kl.os.MkdirAll(ContainerLogsDir, 0755); err != nil { 1377 return fmt.Errorf("failed to create directory %q: %v", ContainerLogsDir, err) 1378 } 1379 } 1380 1381 // Start the image manager. 1382 kl.imageManager.Start() 1383 1384 // Start the certificate manager if it was enabled. 1385 if kl.serverCertificateManager != nil { 1386 kl.serverCertificateManager.Start() 1387 } 1388 1389 // Start out of memory watcher. 1390 if kl.oomWatcher != nil { 1391 if err := kl.oomWatcher.Start(kl.nodeRef); err != nil { 1392 return fmt.Errorf("failed to start OOM watcher: %w", err) 1393 } 1394 } 1395 1396 // Start resource analyzer 1397 kl.resourceAnalyzer.Start() 1398 1399 return nil 1400} 1401 1402// initializeRuntimeDependentModules will initialize internal modules that require the container runtime to be up. 1403func (kl *Kubelet) initializeRuntimeDependentModules() { 1404 if err := kl.cadvisor.Start(); err != nil { 1405 // Fail kubelet and rely on the babysitter to retry starting kubelet. 1406 klog.ErrorS(err, "Failed to start cAdvisor") 1407 os.Exit(1) 1408 } 1409 1410 // trigger on-demand stats collection once so that we have capacity information for ephemeral storage. 1411 // ignore any errors, since if stats collection is not successful, the container manager will fail to start below. 1412 kl.StatsProvider.GetCgroupStats("/", true) 1413 // Start container manager. 1414 node, err := kl.getNodeAnyWay() 1415 if err != nil { 1416 // Fail kubelet and rely on the babysitter to retry starting kubelet. 1417 klog.ErrorS(err, "Kubelet failed to get node info") 1418 os.Exit(1) 1419 } 1420 // containerManager must start after cAdvisor because it needs filesystem capacity information 1421 if err := kl.containerManager.Start(node, kl.GetActivePods, kl.sourcesReady, kl.statusManager, kl.runtimeService); err != nil { 1422 // Fail kubelet and rely on the babysitter to retry starting kubelet. 1423 klog.ErrorS(err, "Failed to start ContainerManager") 1424 os.Exit(1) 1425 } 1426 // eviction manager must start after cadvisor because it needs to know if the container runtime has a dedicated imagefs 1427 kl.evictionManager.Start(kl.StatsProvider, kl.GetActivePods, kl.podResourcesAreReclaimed, evictionMonitoringPeriod) 1428 1429 // container log manager must start after container runtime is up to retrieve information from container runtime 1430 // and inform container to reopen log file after log rotation. 1431 kl.containerLogManager.Start() 1432 // Adding Registration Callback function for CSI Driver 1433 kl.pluginManager.AddHandler(pluginwatcherapi.CSIPlugin, plugincache.PluginHandler(csi.PluginHandler)) 1434 // Adding Registration Callback function for Device Manager 1435 kl.pluginManager.AddHandler(pluginwatcherapi.DevicePlugin, kl.containerManager.GetPluginRegistrationHandler()) 1436 // Start the plugin manager 1437 klog.V(4).InfoS("Starting plugin manager") 1438 go kl.pluginManager.Run(kl.sourcesReady, wait.NeverStop) 1439 1440 err = kl.shutdownManager.Start() 1441 if err != nil { 1442 // The shutdown manager is not critical for kubelet, so log failure, but don't block Kubelet startup if there was a failure starting it. 1443 klog.ErrorS(err, "Failed to start node shutdown manager") 1444 } 1445} 1446 1447// Run starts the kubelet reacting to config updates 1448func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { 1449 if kl.logServer == nil { 1450 kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/"))) 1451 } 1452 if kl.kubeClient == nil { 1453 klog.InfoS("No API server defined - no node status update will be sent") 1454 } 1455 1456 // Start the cloud provider sync manager 1457 if kl.cloudResourceSyncManager != nil { 1458 go kl.cloudResourceSyncManager.Run(wait.NeverStop) 1459 } 1460 1461 if err := kl.initializeModules(); err != nil { 1462 kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error()) 1463 klog.ErrorS(err, "Failed to initialize internal modules") 1464 os.Exit(1) 1465 } 1466 1467 // Start volume manager 1468 go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop) 1469 1470 if kl.kubeClient != nil { 1471 // Start syncing node status immediately, this may set up things the runtime needs to run. 1472 go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop) 1473 go kl.fastStatusUpdateOnce() 1474 1475 // start syncing lease 1476 go kl.nodeLeaseController.Run(wait.NeverStop) 1477 } 1478 go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop) 1479 1480 // Set up iptables util rules 1481 if kl.makeIPTablesUtilChains { 1482 kl.initNetworkUtil() 1483 } 1484 1485 // Start component sync loops. 1486 kl.statusManager.Start() 1487 1488 // Start syncing RuntimeClasses if enabled. 1489 if kl.runtimeClassManager != nil { 1490 kl.runtimeClassManager.Start(wait.NeverStop) 1491 } 1492 1493 // Start the pod lifecycle event generator. 1494 kl.pleg.Start() 1495 kl.syncLoop(updates, kl) 1496} 1497 1498// syncPod is the transaction script for the sync of a single pod (setting up) 1499// a pod. The reverse (teardown) is handled in syncTerminatingPod and 1500// syncTerminatedPod. If syncPod exits without error, then the pod runtime 1501// state is in sync with the desired configuration state (pod is running). 1502// If syncPod exits with a transient error, the next invocation of syncPod 1503// is expected to make progress towards reaching the runtime state. 1504// 1505// Arguments: 1506// 1507// o - the SyncPodOptions for this invocation 1508// 1509// The workflow is: 1510// * If the pod is being created, record pod worker start latency 1511// * Call generateAPIPodStatus to prepare an v1.PodStatus for the pod 1512// * If the pod is being seen as running for the first time, record pod 1513// start latency 1514// * Update the status of the pod in the status manager 1515// * Kill the pod if it should not be running due to soft admission 1516// * Create a mirror pod if the pod is a static pod, and does not 1517// already have a mirror pod 1518// * Create the data directories for the pod if they do not exist 1519// * Wait for volumes to attach/mount 1520// * Fetch the pull secrets for the pod 1521// * Call the container runtime's SyncPod callback 1522// * Update the traffic shaping for the pod's ingress and egress limits 1523// 1524// If any step of this workflow errors, the error is returned, and is repeated 1525// on the next syncPod call. 1526// 1527// This operation writes all events that are dispatched in order to provide 1528// the most accurate information possible about an error situation to aid debugging. 1529// Callers should not throw an event if this operation returns an error. 1530func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error { 1531 klog.V(4).InfoS("syncPod enter", "pod", klog.KObj(pod), "podUID", pod.UID) 1532 defer klog.V(4).InfoS("syncPod exit", "pod", klog.KObj(pod), "podUID", pod.UID) 1533 1534 // Latency measurements for the main workflow are relative to the 1535 // first time the pod was seen by the API server. 1536 var firstSeenTime time.Time 1537 if firstSeenTimeStr, ok := pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]; ok { 1538 firstSeenTime = kubetypes.ConvertToTimestamp(firstSeenTimeStr).Get() 1539 } 1540 1541 // Record pod worker start latency if being created 1542 // TODO: make pod workers record their own latencies 1543 if updateType == kubetypes.SyncPodCreate { 1544 if !firstSeenTime.IsZero() { 1545 // This is the first time we are syncing the pod. Record the latency 1546 // since kubelet first saw the pod if firstSeenTime is set. 1547 metrics.PodWorkerStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime)) 1548 } else { 1549 klog.V(3).InfoS("First seen time not recorded for pod", 1550 "podUID", pod.UID, 1551 "pod", klog.KObj(pod)) 1552 } 1553 } 1554 1555 // Generate final API pod status with pod and status manager status 1556 apiPodStatus := kl.generateAPIPodStatus(pod, podStatus) 1557 // The pod IP may be changed in generateAPIPodStatus if the pod is using host network. (See #24576) 1558 // TODO(random-liu): After writing pod spec into container labels, check whether pod is using host network, and 1559 // set pod IP to hostIP directly in runtime.GetPodStatus 1560 podStatus.IPs = make([]string, 0, len(apiPodStatus.PodIPs)) 1561 for _, ipInfo := range apiPodStatus.PodIPs { 1562 podStatus.IPs = append(podStatus.IPs, ipInfo.IP) 1563 } 1564 1565 if len(podStatus.IPs) == 0 && len(apiPodStatus.PodIP) > 0 { 1566 podStatus.IPs = []string{apiPodStatus.PodIP} 1567 } 1568 1569 // If the pod should not be running, we request the pod's containers be stopped. This is not the same 1570 // as termination (we want to stop the pod, but potentially restart it later if soft admission allows 1571 // it later). Set the status and phase appropriately 1572 runnable := kl.canRunPod(pod) 1573 if !runnable.Admit { 1574 // Pod is not runnable; and update the Pod and Container statuses to why. 1575 if apiPodStatus.Phase != v1.PodFailed && apiPodStatus.Phase != v1.PodSucceeded { 1576 apiPodStatus.Phase = v1.PodPending 1577 } 1578 apiPodStatus.Reason = runnable.Reason 1579 apiPodStatus.Message = runnable.Message 1580 // Waiting containers are not creating. 1581 const waitingReason = "Blocked" 1582 for _, cs := range apiPodStatus.InitContainerStatuses { 1583 if cs.State.Waiting != nil { 1584 cs.State.Waiting.Reason = waitingReason 1585 } 1586 } 1587 for _, cs := range apiPodStatus.ContainerStatuses { 1588 if cs.State.Waiting != nil { 1589 cs.State.Waiting.Reason = waitingReason 1590 } 1591 } 1592 } 1593 1594 // Record the time it takes for the pod to become running. 1595 existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID) 1596 if !ok || existingStatus.Phase == v1.PodPending && apiPodStatus.Phase == v1.PodRunning && 1597 !firstSeenTime.IsZero() { 1598 metrics.PodStartDuration.Observe(metrics.SinceInSeconds(firstSeenTime)) 1599 } 1600 1601 kl.statusManager.SetPodStatus(pod, apiPodStatus) 1602 1603 // Pods that are not runnable must be stopped - return a typed error to the pod worker 1604 if !runnable.Admit { 1605 klog.V(2).InfoS("Pod is not runnable and must have running containers stopped", "pod", klog.KObj(pod), "podUID", pod.UID, "message", runnable.Message) 1606 var syncErr error 1607 p := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus) 1608 if err := kl.killPod(pod, p, nil); err != nil { 1609 kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err) 1610 syncErr = fmt.Errorf("error killing pod: %v", err) 1611 utilruntime.HandleError(syncErr) 1612 } else { 1613 // There was no error killing the pod, but the pod cannot be run. 1614 // Return an error to signal that the sync loop should back off. 1615 syncErr = fmt.Errorf("pod cannot be run: %s", runnable.Message) 1616 } 1617 return syncErr 1618 } 1619 1620 // If the network plugin is not ready, only start the pod if it uses the host network 1621 if err := kl.runtimeState.networkErrors(); err != nil && !kubecontainer.IsHostNetworkPod(pod) { 1622 kl.recorder.Eventf(pod, v1.EventTypeWarning, events.NetworkNotReady, "%s: %v", NetworkNotReadyErrorMsg, err) 1623 return fmt.Errorf("%s: %v", NetworkNotReadyErrorMsg, err) 1624 } 1625 1626 // Create Cgroups for the pod and apply resource parameters 1627 // to them if cgroups-per-qos flag is enabled. 1628 pcm := kl.containerManager.NewPodContainerManager() 1629 // If pod has already been terminated then we need not create 1630 // or update the pod's cgroup 1631 // TODO: once context cancellation is added this check can be removed 1632 if !kl.podWorkers.IsPodTerminationRequested(pod.UID) { 1633 // When the kubelet is restarted with the cgroups-per-qos 1634 // flag enabled, all the pod's running containers 1635 // should be killed intermittently and brought back up 1636 // under the qos cgroup hierarchy. 1637 // Check if this is the pod's first sync 1638 firstSync := true 1639 for _, containerStatus := range apiPodStatus.ContainerStatuses { 1640 if containerStatus.State.Running != nil { 1641 firstSync = false 1642 break 1643 } 1644 } 1645 // Don't kill containers in pod if pod's cgroups already 1646 // exists or the pod is running for the first time 1647 podKilled := false 1648 if !pcm.Exists(pod) && !firstSync { 1649 p := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus) 1650 if err := kl.killPod(pod, p, nil); err == nil { 1651 podKilled = true 1652 } else { 1653 klog.ErrorS(err, "KillPod failed", "pod", klog.KObj(pod), "podStatus", podStatus) 1654 } 1655 } 1656 // Create and Update pod's Cgroups 1657 // Don't create cgroups for run once pod if it was killed above 1658 // The current policy is not to restart the run once pods when 1659 // the kubelet is restarted with the new flag as run once pods are 1660 // expected to run only once and if the kubelet is restarted then 1661 // they are not expected to run again. 1662 // We don't create and apply updates to cgroup if its a run once pod and was killed above 1663 if !(podKilled && pod.Spec.RestartPolicy == v1.RestartPolicyNever) { 1664 if !pcm.Exists(pod) { 1665 if err := kl.containerManager.UpdateQOSCgroups(); err != nil { 1666 klog.V(2).InfoS("Failed to update QoS cgroups while syncing pod", "pod", klog.KObj(pod), "err", err) 1667 } 1668 if err := pcm.EnsureExists(pod); err != nil { 1669 kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToCreatePodContainer, "unable to ensure pod container exists: %v", err) 1670 return fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err) 1671 } 1672 } 1673 } 1674 } 1675 1676 // Create Mirror Pod for Static Pod if it doesn't already exist 1677 if kubetypes.IsStaticPod(pod) { 1678 deleted := false 1679 if mirrorPod != nil { 1680 if mirrorPod.DeletionTimestamp != nil || !kl.podManager.IsMirrorPodOf(mirrorPod, pod) { 1681 // The mirror pod is semantically different from the static pod. Remove 1682 // it. The mirror pod will get recreated later. 1683 klog.InfoS("Trying to delete pod", "pod", klog.KObj(pod), "podUID", mirrorPod.ObjectMeta.UID) 1684 podFullName := kubecontainer.GetPodFullName(pod) 1685 var err error 1686 deleted, err = kl.podManager.DeleteMirrorPod(podFullName, &mirrorPod.ObjectMeta.UID) 1687 if deleted { 1688 klog.InfoS("Deleted mirror pod because it is outdated", "pod", klog.KObj(mirrorPod)) 1689 } else if err != nil { 1690 klog.ErrorS(err, "Failed deleting mirror pod", "pod", klog.KObj(mirrorPod)) 1691 } 1692 } 1693 } 1694 if mirrorPod == nil || deleted { 1695 node, err := kl.GetNode() 1696 if err != nil || node.DeletionTimestamp != nil { 1697 klog.V(4).InfoS("No need to create a mirror pod, since node has been removed from the cluster", "node", klog.KRef("", string(kl.nodeName))) 1698 } else { 1699 klog.V(4).InfoS("Creating a mirror pod for static pod", "pod", klog.KObj(pod)) 1700 if err := kl.podManager.CreateMirrorPod(pod); err != nil { 1701 klog.ErrorS(err, "Failed creating a mirror pod for", "pod", klog.KObj(pod)) 1702 } 1703 } 1704 } 1705 } 1706 1707 // Make data directories for the pod 1708 if err := kl.makePodDataDirs(pod); err != nil { 1709 kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToMakePodDataDirectories, "error making pod data directories: %v", err) 1710 klog.ErrorS(err, "Unable to make pod data directories for pod", "pod", klog.KObj(pod)) 1711 return err 1712 } 1713 1714 // Volume manager will not mount volumes for terminating pods 1715 // TODO: once context cancellation is added this check can be removed 1716 if !kl.podWorkers.IsPodTerminationRequested(pod.UID) { 1717 // Wait for volumes to attach/mount 1718 if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil { 1719 kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedMountVolume, "Unable to attach or mount volumes: %v", err) 1720 klog.ErrorS(err, "Unable to attach or mount volumes for pod; skipping pod", "pod", klog.KObj(pod)) 1721 return err 1722 } 1723 } 1724 1725 // Fetch the pull secrets for the pod 1726 pullSecrets := kl.getPullSecretsForPod(pod) 1727 1728 // Call the container runtime's SyncPod callback 1729 result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff) 1730 kl.reasonCache.Update(pod.UID, result) 1731 if err := result.Error(); err != nil { 1732 // Do not return error if the only failures were pods in backoff 1733 for _, r := range result.SyncResults { 1734 if r.Error != kubecontainer.ErrCrashLoopBackOff && r.Error != images.ErrImagePullBackOff { 1735 // Do not record an event here, as we keep all event logging for sync pod failures 1736 // local to container runtime so we get better errors 1737 return err 1738 } 1739 } 1740 1741 return nil 1742 } 1743 1744 return nil 1745} 1746 1747// syncTerminatingPod is expected to terminate all running containers in a pod. Once this method 1748// returns without error, the pod's local state can be safely cleaned up. If runningPod is passed, 1749// we perform no status updates. 1750func (kl *Kubelet) syncTerminatingPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, runningPod *kubecontainer.Pod, gracePeriod *int64, podStatusFn func(*v1.PodStatus)) error { 1751 klog.V(4).InfoS("syncTerminatingPod enter", "pod", klog.KObj(pod), "podUID", pod.UID) 1752 defer klog.V(4).InfoS("syncTerminatingPod exit", "pod", klog.KObj(pod), "podUID", pod.UID) 1753 1754 // when we receive a runtime only pod (runningPod != nil) we don't need to update the status 1755 // manager or refresh the status of the cache, because a successful killPod will ensure we do 1756 // not get invoked again 1757 if runningPod != nil { 1758 // we kill the pod with the specified grace period since this is a termination 1759 if gracePeriod != nil { 1760 klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", *gracePeriod) 1761 } else { 1762 klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", nil) 1763 } 1764 if err := kl.killPod(pod, *runningPod, gracePeriod); err != nil { 1765 kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err) 1766 // there was an error killing the pod, so we return that error directly 1767 utilruntime.HandleError(err) 1768 return err 1769 } 1770 klog.V(4).InfoS("Pod termination stopped all running orphan containers", "pod", klog.KObj(pod), "podUID", pod.UID) 1771 return nil 1772 } 1773 1774 apiPodStatus := kl.generateAPIPodStatus(pod, podStatus) 1775 if podStatusFn != nil { 1776 podStatusFn(&apiPodStatus) 1777 } 1778 kl.statusManager.SetPodStatus(pod, apiPodStatus) 1779 1780 if gracePeriod != nil { 1781 klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", *gracePeriod) 1782 } else { 1783 klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", nil) 1784 } 1785 p := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus) 1786 if err := kl.killPod(pod, p, gracePeriod); err != nil { 1787 kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err) 1788 // there was an error killing the pod, so we return that error directly 1789 utilruntime.HandleError(err) 1790 return err 1791 } 1792 1793 // Guard against consistency issues in KillPod implementations by checking that there are no 1794 // running containers. This method is invoked infrequently so this is effectively free and can 1795 // catch race conditions introduced by callers updating pod status out of order. 1796 // TODO: have KillPod return the terminal status of stopped containers and write that into the 1797 // cache immediately 1798 podStatus, err := kl.containerRuntime.GetPodStatus(pod.UID, pod.Name, pod.Namespace) 1799 if err != nil { 1800 klog.ErrorS(err, "Unable to read pod status prior to final pod termination", "pod", klog.KObj(pod), "podUID", pod.UID) 1801 return err 1802 } 1803 var runningContainers []string 1804 var containers []string 1805 for _, s := range podStatus.ContainerStatuses { 1806 if s.State == kubecontainer.ContainerStateRunning { 1807 runningContainers = append(runningContainers, s.ID.String()) 1808 } 1809 containers = append(containers, fmt.Sprintf("(%s state=%s exitCode=%d finishedAt=%s)", s.Name, s.State, s.ExitCode, s.FinishedAt.UTC().Format(time.RFC3339Nano))) 1810 } 1811 if klog.V(4).Enabled() { 1812 sort.Strings(containers) 1813 klog.InfoS("Post-termination container state", "pod", klog.KObj(pod), "podUID", pod.UID, "containers", strings.Join(containers, " ")) 1814 } 1815 if len(runningContainers) > 0 { 1816 return fmt.Errorf("detected running containers after a successful KillPod, CRI violation: %v", runningContainers) 1817 } 1818 1819 // we have successfully stopped all containers, the pod is terminating, our status is "done" 1820 klog.V(4).InfoS("Pod termination stopped all running containers", "pod", klog.KObj(pod), "podUID", pod.UID) 1821 1822 return nil 1823} 1824 1825// syncTerminatedPod cleans up a pod that has terminated (has no running containers). 1826// The invocations in this call are expected to tear down what PodResourcesAreReclaimed checks (which 1827// gates pod deletion). When this method exits the pod is expected to be ready for cleanup. 1828// TODO: make this method take a context and exit early 1829func (kl *Kubelet) syncTerminatedPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) error { 1830 klog.V(4).InfoS("syncTerminatedPod enter", "pod", klog.KObj(pod), "podUID", pod.UID) 1831 defer klog.V(4).InfoS("syncTerminatedPod exit", "pod", klog.KObj(pod), "podUID", pod.UID) 1832 1833 // generate the final status of the pod 1834 // TODO: should we simply fold this into TerminatePod? that would give a single pod update 1835 apiPodStatus := kl.generateAPIPodStatus(pod, podStatus) 1836 kl.statusManager.SetPodStatus(pod, apiPodStatus) 1837 1838 // volumes are unmounted after the pod worker reports ShouldPodRuntimeBeRemoved (which is satisfied 1839 // before syncTerminatedPod is invoked) 1840 if err := kl.volumeManager.WaitForUnmount(pod); err != nil { 1841 return err 1842 } 1843 klog.V(4).InfoS("Pod termination unmounted volumes", "pod", klog.KObj(pod), "podUID", pod.UID) 1844 1845 // Note: we leave pod containers to be reclaimed in the background since dockershim requires the 1846 // container for retrieving logs and we want to make sure logs are available until the pod is 1847 // physically deleted. 1848 1849 // remove any cgroups in the hierarchy for pods that are no longer running. 1850 if kl.cgroupsPerQOS { 1851 pcm := kl.containerManager.NewPodContainerManager() 1852 name, _ := pcm.GetPodContainerName(pod) 1853 if err := pcm.Destroy(name); err != nil { 1854 return err 1855 } 1856 klog.V(4).InfoS("Pod termination removed cgroups", "pod", klog.KObj(pod), "podUID", pod.UID) 1857 } 1858 1859 // mark the final pod status 1860 kl.statusManager.TerminatePod(pod) 1861 klog.V(4).InfoS("Pod is terminated and will need no more status updates", "pod", klog.KObj(pod), "podUID", pod.UID) 1862 1863 return nil 1864} 1865 1866// Get pods which should be resynchronized. Currently, the following pod should be resynchronized: 1867// * pod whose work is ready. 1868// * internal modules that request sync of a pod. 1869func (kl *Kubelet) getPodsToSync() []*v1.Pod { 1870 allPods := kl.podManager.GetPods() 1871 podUIDs := kl.workQueue.GetWork() 1872 podUIDSet := sets.NewString() 1873 for _, podUID := range podUIDs { 1874 podUIDSet.Insert(string(podUID)) 1875 } 1876 var podsToSync []*v1.Pod 1877 for _, pod := range allPods { 1878 if podUIDSet.Has(string(pod.UID)) { 1879 // The work of the pod is ready 1880 podsToSync = append(podsToSync, pod) 1881 continue 1882 } 1883 for _, podSyncLoopHandler := range kl.PodSyncLoopHandlers { 1884 if podSyncLoopHandler.ShouldSync(pod) { 1885 podsToSync = append(podsToSync, pod) 1886 break 1887 } 1888 } 1889 } 1890 return podsToSync 1891} 1892 1893// deletePod deletes the pod from the internal state of the kubelet by: 1894// 1. stopping the associated pod worker asynchronously 1895// 2. signaling to kill the pod by sending on the podKillingCh channel 1896// 1897// deletePod returns an error if not all sources are ready or the pod is not 1898// found in the runtime cache. 1899func (kl *Kubelet) deletePod(pod *v1.Pod) error { 1900 if pod == nil { 1901 return fmt.Errorf("deletePod does not allow nil pod") 1902 } 1903 if !kl.sourcesReady.AllReady() { 1904 // If the sources aren't ready, skip deletion, as we may accidentally delete pods 1905 // for sources that haven't reported yet. 1906 return fmt.Errorf("skipping delete because sources aren't ready yet") 1907 } 1908 klog.V(3).InfoS("Pod has been deleted and must be killed", "pod", klog.KObj(pod), "podUID", pod.UID) 1909 kl.podWorkers.UpdatePod(UpdatePodOptions{ 1910 Pod: pod, 1911 UpdateType: kubetypes.SyncPodKill, 1912 }) 1913 // We leave the volume/directory cleanup to the periodic cleanup routine. 1914 return nil 1915} 1916 1917// rejectPod records an event about the pod with the given reason and message, 1918// and updates the pod to the failed phase in the status manage. 1919func (kl *Kubelet) rejectPod(pod *v1.Pod, reason, message string) { 1920 kl.recorder.Eventf(pod, v1.EventTypeWarning, reason, message) 1921 kl.statusManager.SetPodStatus(pod, v1.PodStatus{ 1922 Phase: v1.PodFailed, 1923 Reason: reason, 1924 Message: "Pod " + message}) 1925} 1926 1927// canAdmitPod determines if a pod can be admitted, and gives a reason if it 1928// cannot. "pod" is new pod, while "pods" are all admitted pods 1929// The function returns a boolean value indicating whether the pod 1930// can be admitted, a brief single-word reason and a message explaining why 1931// the pod cannot be admitted. 1932func (kl *Kubelet) canAdmitPod(pods []*v1.Pod, pod *v1.Pod) (bool, string, string) { 1933 // the kubelet will invoke each pod admit handler in sequence 1934 // if any handler rejects, the pod is rejected. 1935 // TODO: move out of disk check into a pod admitter 1936 // TODO: out of resource eviction should have a pod admitter call-out 1937 attrs := &lifecycle.PodAdmitAttributes{Pod: pod, OtherPods: pods} 1938 for _, podAdmitHandler := range kl.admitHandlers { 1939 if result := podAdmitHandler.Admit(attrs); !result.Admit { 1940 return false, result.Reason, result.Message 1941 } 1942 } 1943 1944 return true, "", "" 1945} 1946 1947func (kl *Kubelet) canRunPod(pod *v1.Pod) lifecycle.PodAdmitResult { 1948 attrs := &lifecycle.PodAdmitAttributes{Pod: pod} 1949 // Get "OtherPods". Rejected pods are failed, so only include admitted pods that are alive. 1950 attrs.OtherPods = kl.GetActivePods() 1951 1952 for _, handler := range kl.softAdmitHandlers { 1953 if result := handler.Admit(attrs); !result.Admit { 1954 return result 1955 } 1956 } 1957 1958 return lifecycle.PodAdmitResult{Admit: true} 1959} 1960 1961// syncLoop is the main loop for processing changes. It watches for changes from 1962// three channels (file, apiserver, and http) and creates a union of them. For 1963// any new change seen, will run a sync against desired state and running state. If 1964// no changes are seen to the configuration, will synchronize the last known desired 1965// state every sync-frequency seconds. Never returns. 1966func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) { 1967 klog.InfoS("Starting kubelet main sync loop") 1968 // The syncTicker wakes up kubelet to checks if there are any pod workers 1969 // that need to be sync'd. A one-second period is sufficient because the 1970 // sync interval is defaulted to 10s. 1971 syncTicker := time.NewTicker(time.Second) 1972 defer syncTicker.Stop() 1973 housekeepingTicker := time.NewTicker(housekeepingPeriod) 1974 defer housekeepingTicker.Stop() 1975 plegCh := kl.pleg.Watch() 1976 const ( 1977 base = 100 * time.Millisecond 1978 max = 5 * time.Second 1979 factor = 2 1980 ) 1981 duration := base 1982 // Responsible for checking limits in resolv.conf 1983 // The limits do not have anything to do with individual pods 1984 // Since this is called in syncLoop, we don't need to call it anywhere else 1985 if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" { 1986 kl.dnsConfigurer.CheckLimitsForResolvConf() 1987 } 1988 1989 for { 1990 if err := kl.runtimeState.runtimeErrors(); err != nil { 1991 klog.ErrorS(err, "Skipping pod synchronization") 1992 // exponential backoff 1993 time.Sleep(duration) 1994 duration = time.Duration(math.Min(float64(max), factor*float64(duration))) 1995 continue 1996 } 1997 // reset backoff if we have a success 1998 duration = base 1999 2000 kl.syncLoopMonitor.Store(kl.clock.Now()) 2001 if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) { 2002 break 2003 } 2004 kl.syncLoopMonitor.Store(kl.clock.Now()) 2005 } 2006} 2007 2008// syncLoopIteration reads from various channels and dispatches pods to the 2009// given handler. 2010// 2011// Arguments: 2012// 1. configCh: a channel to read config events from 2013// 2. handler: the SyncHandler to dispatch pods to 2014// 3. syncCh: a channel to read periodic sync events from 2015// 4. housekeepingCh: a channel to read housekeeping events from 2016// 5. plegCh: a channel to read PLEG updates from 2017// 2018// Events are also read from the kubelet liveness manager's update channel. 2019// 2020// The workflow is to read from one of the channels, handle that event, and 2021// update the timestamp in the sync loop monitor. 2022// 2023// Here is an appropriate place to note that despite the syntactical 2024// similarity to the switch statement, the case statements in a select are 2025// evaluated in a pseudorandom order if there are multiple channels ready to 2026// read from when the select is evaluated. In other words, case statements 2027// are evaluated in random order, and you can not assume that the case 2028// statements evaluate in order if multiple channels have events. 2029// 2030// With that in mind, in truly no particular order, the different channels 2031// are handled as follows: 2032// 2033// * configCh: dispatch the pods for the config change to the appropriate 2034// handler callback for the event type 2035// * plegCh: update the runtime cache; sync pod 2036// * syncCh: sync all pods waiting for sync 2037// * housekeepingCh: trigger cleanup of pods 2038// * health manager: sync pods that have failed or in which one or more 2039// containers have failed health checks 2040func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler, 2041 syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool { 2042 select { 2043 case u, open := <-configCh: 2044 // Update from a config source; dispatch it to the right handler 2045 // callback. 2046 if !open { 2047 klog.ErrorS(nil, "Update channel is closed, exiting the sync loop") 2048 return false 2049 } 2050 2051 switch u.Op { 2052 case kubetypes.ADD: 2053 klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", format.Pods(u.Pods)) 2054 // After restarting, kubelet will get all existing pods through 2055 // ADD as if they are new pods. These pods will then go through the 2056 // admission process and *may* be rejected. This can be resolved 2057 // once we have checkpointing. 2058 handler.HandlePodAdditions(u.Pods) 2059 case kubetypes.UPDATE: 2060 klog.V(2).InfoS("SyncLoop UPDATE", "source", u.Source, "pods", format.Pods(u.Pods)) 2061 handler.HandlePodUpdates(u.Pods) 2062 case kubetypes.REMOVE: 2063 klog.V(2).InfoS("SyncLoop REMOVE", "source", u.Source, "pods", format.Pods(u.Pods)) 2064 handler.HandlePodRemoves(u.Pods) 2065 case kubetypes.RECONCILE: 2066 klog.V(4).InfoS("SyncLoop RECONCILE", "source", u.Source, "pods", format.Pods(u.Pods)) 2067 handler.HandlePodReconcile(u.Pods) 2068 case kubetypes.DELETE: 2069 klog.V(2).InfoS("SyncLoop DELETE", "source", u.Source, "pods", format.Pods(u.Pods)) 2070 // DELETE is treated as a UPDATE because of graceful deletion. 2071 handler.HandlePodUpdates(u.Pods) 2072 case kubetypes.SET: 2073 // TODO: Do we want to support this? 2074 klog.ErrorS(nil, "Kubelet does not support snapshot update") 2075 default: 2076 klog.ErrorS(nil, "Invalid operation type received", "operation", u.Op) 2077 } 2078 2079 kl.sourcesReady.AddSource(u.Source) 2080 2081 case e := <-plegCh: 2082 if e.Type == pleg.ContainerStarted { 2083 // record the most recent time we observed a container start for this pod. 2084 // this lets us selectively invalidate the runtimeCache when processing a delete for this pod 2085 // to make sure we don't miss handling graceful termination for containers we reported as having started. 2086 kl.lastContainerStartedTime.Add(e.ID, time.Now()) 2087 } 2088 if isSyncPodWorthy(e) { 2089 // PLEG event for a pod; sync it. 2090 if pod, ok := kl.podManager.GetPodByUID(e.ID); ok { 2091 klog.V(2).InfoS("SyncLoop (PLEG): event for pod", "pod", klog.KObj(pod), "event", e) 2092 handler.HandlePodSyncs([]*v1.Pod{pod}) 2093 } else { 2094 // If the pod no longer exists, ignore the event. 2095 klog.V(4).InfoS("SyncLoop (PLEG): pod does not exist, ignore irrelevant event", "event", e) 2096 } 2097 } 2098 2099 if e.Type == pleg.ContainerDied { 2100 if containerID, ok := e.Data.(string); ok { 2101 kl.cleanUpContainersInPod(e.ID, containerID) 2102 } 2103 } 2104 case <-syncCh: 2105 // Sync pods waiting for sync 2106 podsToSync := kl.getPodsToSync() 2107 if len(podsToSync) == 0 { 2108 break 2109 } 2110 klog.V(4).InfoS("SyncLoop (SYNC) pods", "total", len(podsToSync), "pods", format.Pods(podsToSync)) 2111 handler.HandlePodSyncs(podsToSync) 2112 case update := <-kl.livenessManager.Updates(): 2113 if update.Result == proberesults.Failure { 2114 handleProbeSync(kl, update, handler, "liveness", "unhealthy") 2115 } 2116 case update := <-kl.readinessManager.Updates(): 2117 ready := update.Result == proberesults.Success 2118 kl.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready) 2119 2120 status := "" 2121 if ready { 2122 status = "ready" 2123 } 2124 handleProbeSync(kl, update, handler, "readiness", status) 2125 case update := <-kl.startupManager.Updates(): 2126 started := update.Result == proberesults.Success 2127 kl.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started) 2128 2129 status := "unhealthy" 2130 if started { 2131 status = "started" 2132 } 2133 handleProbeSync(kl, update, handler, "startup", status) 2134 case <-housekeepingCh: 2135 if !kl.sourcesReady.AllReady() { 2136 // If the sources aren't ready or volume manager has not yet synced the states, 2137 // skip housekeeping, as we may accidentally delete pods from unready sources. 2138 klog.V(4).InfoS("SyncLoop (housekeeping, skipped): sources aren't ready yet") 2139 } else { 2140 start := time.Now() 2141 klog.V(4).InfoS("SyncLoop (housekeeping)") 2142 if err := handler.HandlePodCleanups(); err != nil { 2143 klog.ErrorS(err, "Failed cleaning pods") 2144 } 2145 duration := time.Since(start) 2146 if duration > housekeepingWarningDuration { 2147 klog.ErrorS(fmt.Errorf("housekeeping took too long"), "Housekeeping took longer than 15s", "seconds", duration.Seconds()) 2148 } 2149 klog.V(4).InfoS("SyncLoop (housekeeping) end") 2150 } 2151 } 2152 return true 2153} 2154 2155func handleProbeSync(kl *Kubelet, update proberesults.Update, handler SyncHandler, probe, status string) { 2156 // We should not use the pod from manager, because it is never updated after initialization. 2157 pod, ok := kl.podManager.GetPodByUID(update.PodUID) 2158 if !ok { 2159 // If the pod no longer exists, ignore the update. 2160 klog.V(4).InfoS("SyncLoop (probe): ignore irrelevant update", "probe", probe, "status", status, "update", update) 2161 return 2162 } 2163 klog.V(1).InfoS("SyncLoop (probe)", "probe", probe, "status", status, "pod", klog.KObj(pod)) 2164 handler.HandlePodSyncs([]*v1.Pod{pod}) 2165} 2166 2167// dispatchWork starts the asynchronous sync of the pod in a pod worker. 2168// If the pod has completed termination, dispatchWork will perform no action. 2169func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) { 2170 // Run the sync in an async worker. 2171 kl.podWorkers.UpdatePod(UpdatePodOptions{ 2172 Pod: pod, 2173 MirrorPod: mirrorPod, 2174 UpdateType: syncType, 2175 StartTime: start, 2176 }) 2177 // Note the number of containers for new pods. 2178 if syncType == kubetypes.SyncPodCreate { 2179 metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers))) 2180 } 2181} 2182 2183// TODO: handle mirror pods in a separate component (issue #17251) 2184func (kl *Kubelet) handleMirrorPod(mirrorPod *v1.Pod, start time.Time) { 2185 // Mirror pod ADD/UPDATE/DELETE operations are considered an UPDATE to the 2186 // corresponding static pod. Send update to the pod worker if the static 2187 // pod exists. 2188 if pod, ok := kl.podManager.GetPodByMirrorPod(mirrorPod); ok { 2189 kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start) 2190 } 2191} 2192 2193// HandlePodAdditions is the callback in SyncHandler for pods being added from 2194// a config source. 2195func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) { 2196 start := kl.clock.Now() 2197 sort.Sort(sliceutils.PodsByCreationTime(pods)) 2198 for _, pod := range pods { 2199 existingPods := kl.podManager.GetPods() 2200 // Always add the pod to the pod manager. Kubelet relies on the pod 2201 // manager as the source of truth for the desired state. If a pod does 2202 // not exist in the pod manager, it means that it has been deleted in 2203 // the apiserver and no action (other than cleanup) is required. 2204 kl.podManager.AddPod(pod) 2205 2206 if kubetypes.IsMirrorPod(pod) { 2207 kl.handleMirrorPod(pod, start) 2208 continue 2209 } 2210 2211 // Only go through the admission process if the pod is not requested 2212 // for termination by another part of the kubelet. If the pod is already 2213 // using resources (previously admitted), the pod worker is going to be 2214 // shutting it down. If the pod hasn't started yet, we know that when 2215 // the pod worker is invoked it will also avoid setting up the pod, so 2216 // we simply avoid doing any work. 2217 if !kl.podWorkers.IsPodTerminationRequested(pod.UID) { 2218 // We failed pods that we rejected, so activePods include all admitted 2219 // pods that are alive. 2220 activePods := kl.filterOutTerminatedPods(existingPods) 2221 2222 // Check if we can admit the pod; if not, reject it. 2223 if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok { 2224 kl.rejectPod(pod, reason, message) 2225 continue 2226 } 2227 } 2228 mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) 2229 kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start) 2230 kl.probeManager.AddPod(pod) 2231 } 2232} 2233 2234// HandlePodUpdates is the callback in the SyncHandler interface for pods 2235// being updated from a config source. 2236func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) { 2237 start := kl.clock.Now() 2238 for _, pod := range pods { 2239 kl.podManager.UpdatePod(pod) 2240 if kubetypes.IsMirrorPod(pod) { 2241 kl.handleMirrorPod(pod, start) 2242 continue 2243 } 2244 mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) 2245 kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start) 2246 } 2247} 2248 2249// HandlePodRemoves is the callback in the SyncHandler interface for pods 2250// being removed from a config source. 2251func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) { 2252 start := kl.clock.Now() 2253 for _, pod := range pods { 2254 kl.podManager.DeletePod(pod) 2255 if kubetypes.IsMirrorPod(pod) { 2256 kl.handleMirrorPod(pod, start) 2257 continue 2258 } 2259 // Deletion is allowed to fail because the periodic cleanup routine 2260 // will trigger deletion again. 2261 if err := kl.deletePod(pod); err != nil { 2262 klog.V(2).InfoS("Failed to delete pod", "pod", klog.KObj(pod), "err", err) 2263 } 2264 kl.probeManager.RemovePod(pod) 2265 } 2266} 2267 2268// HandlePodReconcile is the callback in the SyncHandler interface for pods 2269// that should be reconciled. 2270func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) { 2271 start := kl.clock.Now() 2272 for _, pod := range pods { 2273 // Update the pod in pod manager, status manager will do periodically reconcile according 2274 // to the pod manager. 2275 kl.podManager.UpdatePod(pod) 2276 2277 // Reconcile Pod "Ready" condition if necessary. Trigger sync pod for reconciliation. 2278 if status.NeedToReconcilePodReadiness(pod) { 2279 mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) 2280 kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start) 2281 } 2282 2283 // After an evicted pod is synced, all dead containers in the pod can be removed. 2284 if eviction.PodIsEvicted(pod.Status) { 2285 if podStatus, err := kl.podCache.Get(pod.UID); err == nil { 2286 kl.containerDeletor.deleteContainersInPod("", podStatus, true) 2287 } 2288 } 2289 } 2290} 2291 2292// HandlePodSyncs is the callback in the syncHandler interface for pods 2293// that should be dispatched to pod workers for sync. 2294func (kl *Kubelet) HandlePodSyncs(pods []*v1.Pod) { 2295 start := kl.clock.Now() 2296 for _, pod := range pods { 2297 mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) 2298 kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start) 2299 } 2300} 2301 2302// LatestLoopEntryTime returns the last time in the sync loop monitor. 2303func (kl *Kubelet) LatestLoopEntryTime() time.Time { 2304 val := kl.syncLoopMonitor.Load() 2305 if val == nil { 2306 return time.Time{} 2307 } 2308 return val.(time.Time) 2309} 2310 2311// updateRuntimeUp calls the container runtime status callback, initializing 2312// the runtime dependent modules when the container runtime first comes up, 2313// and returns an error if the status check fails. If the status check is OK, 2314// update the container runtime uptime in the kubelet runtimeState. 2315func (kl *Kubelet) updateRuntimeUp() { 2316 kl.updateRuntimeMux.Lock() 2317 defer kl.updateRuntimeMux.Unlock() 2318 2319 s, err := kl.containerRuntime.Status() 2320 if err != nil { 2321 klog.ErrorS(err, "Container runtime sanity check failed") 2322 return 2323 } 2324 if s == nil { 2325 klog.ErrorS(nil, "Container runtime status is nil") 2326 return 2327 } 2328 // Periodically log the whole runtime status for debugging. 2329 klog.V(4).InfoS("Container runtime status", "status", s) 2330 networkReady := s.GetRuntimeCondition(kubecontainer.NetworkReady) 2331 if networkReady == nil || !networkReady.Status { 2332 klog.ErrorS(nil, "Container runtime network not ready", "networkReady", networkReady) 2333 kl.runtimeState.setNetworkState(fmt.Errorf("container runtime network not ready: %v", networkReady)) 2334 } else { 2335 // Set nil if the container runtime network is ready. 2336 kl.runtimeState.setNetworkState(nil) 2337 } 2338 // information in RuntimeReady condition will be propagated to NodeReady condition. 2339 runtimeReady := s.GetRuntimeCondition(kubecontainer.RuntimeReady) 2340 // If RuntimeReady is not set or is false, report an error. 2341 if runtimeReady == nil || !runtimeReady.Status { 2342 klog.ErrorS(nil, "Container runtime not ready", "runtimeReady", runtimeReady) 2343 kl.runtimeState.setRuntimeState(fmt.Errorf("container runtime not ready: %v", runtimeReady)) 2344 return 2345 } 2346 kl.runtimeState.setRuntimeState(nil) 2347 kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules) 2348 kl.runtimeState.setRuntimeSync(kl.clock.Now()) 2349} 2350 2351// GetConfiguration returns the KubeletConfiguration used to configure the kubelet. 2352func (kl *Kubelet) GetConfiguration() kubeletconfiginternal.KubeletConfiguration { 2353 return kl.kubeletConfiguration 2354} 2355 2356// BirthCry sends an event that the kubelet has started up. 2357func (kl *Kubelet) BirthCry() { 2358 // Make an event that kubelet restarted. 2359 kl.recorder.Eventf(kl.nodeRef, v1.EventTypeNormal, events.StartingKubelet, "Starting kubelet.") 2360} 2361 2362// ResyncInterval returns the interval used for periodic syncs. 2363func (kl *Kubelet) ResyncInterval() time.Duration { 2364 return kl.resyncInterval 2365} 2366 2367// ListenAndServe runs the kubelet HTTP server. 2368func (kl *Kubelet) ListenAndServe(kubeCfg *kubeletconfiginternal.KubeletConfiguration, tlsOptions *server.TLSOptions, 2369 auth server.AuthInterface) { 2370 server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, kubeCfg, tlsOptions, auth) 2371} 2372 2373// ListenAndServeReadOnly runs the kubelet HTTP server in read-only mode. 2374func (kl *Kubelet) ListenAndServeReadOnly(address net.IP, port uint) { 2375 server.ListenAndServeKubeletReadOnlyServer(kl, kl.resourceAnalyzer, address, port) 2376} 2377 2378// ListenAndServePodResources runs the kubelet podresources grpc service 2379func (kl *Kubelet) ListenAndServePodResources() { 2380 socket, err := util.LocalEndpoint(kl.getPodResourcesDir(), podresources.Socket) 2381 if err != nil { 2382 klog.V(2).InfoS("Failed to get local endpoint for PodResources endpoint", "err", err) 2383 return 2384 } 2385 server.ListenAndServePodResources(socket, kl.podManager, kl.containerManager, kl.containerManager, kl.containerManager) 2386} 2387 2388// Delete the eligible dead container instances in a pod. Depending on the configuration, the latest dead containers may be kept around. 2389func (kl *Kubelet) cleanUpContainersInPod(podID types.UID, exitedContainerID string) { 2390 if podStatus, err := kl.podCache.Get(podID); err == nil { 2391 // When an evicted or deleted pod has already synced, all containers can be removed. 2392 removeAll := kl.podWorkers.ShouldPodContentBeRemoved(podID) 2393 kl.containerDeletor.deleteContainersInPod(exitedContainerID, podStatus, removeAll) 2394 } 2395} 2396 2397// fastStatusUpdateOnce starts a loop that checks the internal node indexer cache for when a CIDR 2398// is applied and tries to update pod CIDR immediately. After pod CIDR is updated it fires off 2399// a runtime update and a node status update. Function returns after one successful node status update. 2400// Function is executed only during Kubelet start which improves latency to ready node by updating 2401// pod CIDR, runtime status and node statuses ASAP. 2402func (kl *Kubelet) fastStatusUpdateOnce() { 2403 for { 2404 time.Sleep(100 * time.Millisecond) 2405 node, err := kl.GetNode() 2406 if err != nil { 2407 klog.ErrorS(err, "Error getting node") 2408 continue 2409 } 2410 if len(node.Spec.PodCIDRs) != 0 { 2411 podCIDRs := strings.Join(node.Spec.PodCIDRs, ",") 2412 if _, err := kl.updatePodCIDR(podCIDRs); err != nil { 2413 klog.ErrorS(err, "Pod CIDR update failed", "CIDR", podCIDRs) 2414 continue 2415 } 2416 kl.updateRuntimeUp() 2417 kl.syncNodeStatus() 2418 return 2419 } 2420 } 2421} 2422 2423// isSyncPodWorthy filters out events that are not worthy of pod syncing 2424func isSyncPodWorthy(event *pleg.PodLifecycleEvent) bool { 2425 // ContainerRemoved doesn't affect pod state 2426 return event.Type != pleg.ContainerRemoved 2427} 2428 2429// Gets the streaming server configuration to use with in-process CRI shims. 2430func getStreamingConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, crOptions *config.ContainerRuntimeOptions) *streaming.Config { 2431 config := &streaming.Config{ 2432 StreamIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration, 2433 StreamCreationTimeout: streaming.DefaultConfig.StreamCreationTimeout, 2434 SupportedRemoteCommandProtocols: streaming.DefaultConfig.SupportedRemoteCommandProtocols, 2435 SupportedPortForwardProtocols: streaming.DefaultConfig.SupportedPortForwardProtocols, 2436 } 2437 config.Addr = net.JoinHostPort("localhost", "0") 2438 return config 2439} 2440