1// +build linux 2 3/* 4Copyright 2015 The Kubernetes Authors. 5 6Licensed under the Apache License, Version 2.0 (the "License"); 7you may not use this file except in compliance with the License. 8You may obtain a copy of the License at 9 10 http://www.apache.org/licenses/LICENSE-2.0 11 12Unless required by applicable law or agreed to in writing, software 13distributed under the License is distributed on an "AS IS" BASIS, 14WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15See the License for the specific language governing permissions and 16limitations under the License. 17*/ 18 19package cm 20 21import ( 22 "bytes" 23 "fmt" 24 "io/ioutil" 25 "os" 26 "path" 27 "strconv" 28 "strings" 29 "sync" 30 "time" 31 32 "github.com/opencontainers/runc/libcontainer/cgroups" 33 cgroupfs "github.com/opencontainers/runc/libcontainer/cgroups/fs" 34 cgroupfs2 "github.com/opencontainers/runc/libcontainer/cgroups/fs2" 35 "github.com/opencontainers/runc/libcontainer/configs" 36 "k8s.io/klog/v2" 37 "k8s.io/mount-utils" 38 utilio "k8s.io/utils/io" 39 utilpath "k8s.io/utils/path" 40 41 libcontaineruserns "github.com/opencontainers/runc/libcontainer/userns" 42 v1 "k8s.io/api/core/v1" 43 "k8s.io/apimachinery/pkg/api/resource" 44 utilerrors "k8s.io/apimachinery/pkg/util/errors" 45 "k8s.io/apimachinery/pkg/util/sets" 46 utilversion "k8s.io/apimachinery/pkg/util/version" 47 "k8s.io/apimachinery/pkg/util/wait" 48 utilfeature "k8s.io/apiserver/pkg/util/feature" 49 "k8s.io/client-go/tools/record" 50 internalapi "k8s.io/cri-api/pkg/apis" 51 podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1" 52 kubefeatures "k8s.io/kubernetes/pkg/features" 53 "k8s.io/kubernetes/pkg/kubelet/cadvisor" 54 "k8s.io/kubernetes/pkg/kubelet/cm/admission" 55 "k8s.io/kubernetes/pkg/kubelet/cm/containermap" 56 "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" 57 "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager" 58 "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager" 59 memorymanagerstate "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/state" 60 "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" 61 cmutil "k8s.io/kubernetes/pkg/kubelet/cm/util" 62 "k8s.io/kubernetes/pkg/kubelet/config" 63 kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" 64 "k8s.io/kubernetes/pkg/kubelet/lifecycle" 65 "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache" 66 "k8s.io/kubernetes/pkg/kubelet/qos" 67 "k8s.io/kubernetes/pkg/kubelet/stats/pidlimit" 68 "k8s.io/kubernetes/pkg/kubelet/status" 69 schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" 70 "k8s.io/kubernetes/pkg/util/oom" 71 "k8s.io/kubernetes/pkg/util/procfs" 72 utilsysctl "k8s.io/kubernetes/pkg/util/sysctl" 73) 74 75const ( 76 dockerProcessName = "dockerd" 77 // dockerd option --pidfile can specify path to use for daemon PID file, pid file path is default "/var/run/docker.pid" 78 dockerPidFile = "/var/run/docker.pid" 79 containerdProcessName = "containerd" 80 maxPidFileLength = 1 << 10 // 1KB 81) 82 83var ( 84 // The docker version in which containerd was introduced. 85 containerdAPIVersion = utilversion.MustParseGeneric("1.23") 86) 87 88// A non-user container tracked by the Kubelet. 89type systemContainer struct { 90 // Absolute name of the container. 91 name string 92 93 // CPU limit in millicores. 94 cpuMillicores int64 95 96 // Function that ensures the state of the container. 97 // m is the cgroup manager for the specified container. 98 ensureStateFunc func(m cgroups.Manager) error 99 100 // Manager for the cgroups of the external container. 101 manager cgroups.Manager 102} 103 104func newSystemCgroups(containerName string) (*systemContainer, error) { 105 manager, err := createManager(containerName) 106 if err != nil { 107 return nil, err 108 } 109 return &systemContainer{ 110 name: containerName, 111 manager: manager, 112 }, nil 113} 114 115type containerManagerImpl struct { 116 sync.RWMutex 117 cadvisorInterface cadvisor.Interface 118 mountUtil mount.Interface 119 NodeConfig 120 status Status 121 // External containers being managed. 122 systemContainers []*systemContainer 123 // Tasks that are run periodically 124 periodicTasks []func() 125 // Holds all the mounted cgroup subsystems 126 subsystems *CgroupSubsystems 127 nodeInfo *v1.Node 128 // Interface for cgroup management 129 cgroupManager CgroupManager 130 // Capacity of this node. 131 capacity v1.ResourceList 132 // Capacity of this node, including internal resources. 133 internalCapacity v1.ResourceList 134 // Absolute cgroupfs path to a cgroup that Kubelet needs to place all pods under. 135 // This path include a top level container for enforcing Node Allocatable. 136 cgroupRoot CgroupName 137 // Event recorder interface. 138 recorder record.EventRecorder 139 // Interface for QoS cgroup management 140 qosContainerManager QOSContainerManager 141 // Interface for exporting and allocating devices reported by device plugins. 142 deviceManager devicemanager.Manager 143 // Interface for CPU affinity management. 144 cpuManager cpumanager.Manager 145 // Interface for memory affinity management. 146 memoryManager memorymanager.Manager 147 // Interface for Topology resource co-ordination 148 topologyManager topologymanager.Manager 149} 150 151type features struct { 152 cpuHardcapping bool 153} 154 155var _ ContainerManager = &containerManagerImpl{} 156 157// checks if the required cgroups subsystems are mounted. 158// As of now, only 'cpu' and 'memory' are required. 159// cpu quota is a soft requirement. 160func validateSystemRequirements(mountUtil mount.Interface) (features, error) { 161 const ( 162 cgroupMountType = "cgroup" 163 localErr = "system validation failed" 164 ) 165 var ( 166 cpuMountPoint string 167 f features 168 ) 169 mountPoints, err := mountUtil.List() 170 if err != nil { 171 return f, fmt.Errorf("%s - %v", localErr, err) 172 } 173 174 if cgroups.IsCgroup2UnifiedMode() { 175 f.cpuHardcapping = true 176 return f, nil 177 } 178 179 expectedCgroups := sets.NewString("cpu", "cpuacct", "cpuset", "memory") 180 for _, mountPoint := range mountPoints { 181 if mountPoint.Type == cgroupMountType { 182 for _, opt := range mountPoint.Opts { 183 if expectedCgroups.Has(opt) { 184 expectedCgroups.Delete(opt) 185 } 186 if opt == "cpu" { 187 cpuMountPoint = mountPoint.Path 188 } 189 } 190 } 191 } 192 193 if expectedCgroups.Len() > 0 { 194 return f, fmt.Errorf("%s - Following Cgroup subsystem not mounted: %v", localErr, expectedCgroups.List()) 195 } 196 197 // Check if cpu quota is available. 198 // CPU cgroup is required and so it expected to be mounted at this point. 199 periodExists, err := utilpath.Exists(utilpath.CheckFollowSymlink, path.Join(cpuMountPoint, "cpu.cfs_period_us")) 200 if err != nil { 201 klog.ErrorS(err, "Failed to detect if CPU cgroup cpu.cfs_period_us is available") 202 } 203 quotaExists, err := utilpath.Exists(utilpath.CheckFollowSymlink, path.Join(cpuMountPoint, "cpu.cfs_quota_us")) 204 if err != nil { 205 klog.ErrorS(err, "Failed to detect if CPU cgroup cpu.cfs_quota_us is available") 206 } 207 if quotaExists && periodExists { 208 f.cpuHardcapping = true 209 } 210 return f, nil 211} 212 213// TODO(vmarmol): Add limits to the system containers. 214// Takes the absolute name of the specified containers. 215// Empty container name disables use of the specified container. 216func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, devicePluginEnabled bool, recorder record.EventRecorder) (ContainerManager, error) { 217 subsystems, err := GetCgroupSubsystems() 218 if err != nil { 219 return nil, fmt.Errorf("failed to get mounted cgroup subsystems: %v", err) 220 } 221 222 if failSwapOn { 223 // Check whether swap is enabled. The Kubelet does not support running with swap enabled. 224 swapFile := "/proc/swaps" 225 swapData, err := ioutil.ReadFile(swapFile) 226 if err != nil { 227 if os.IsNotExist(err) { 228 klog.InfoS("File does not exist, assuming that swap is disabled", "path", swapFile) 229 } else { 230 return nil, err 231 } 232 } else { 233 swapData = bytes.TrimSpace(swapData) // extra trailing \n 234 swapLines := strings.Split(string(swapData), "\n") 235 236 // If there is more than one line (table headers) in /proc/swaps, swap is enabled and we should 237 // error out unless --fail-swap-on is set to false. 238 if len(swapLines) > 1 { 239 return nil, fmt.Errorf("running with swap on is not supported, please disable swap! or set --fail-swap-on flag to false. /proc/swaps contained: %v", swapLines) 240 } 241 } 242 } 243 244 var internalCapacity = v1.ResourceList{} 245 // It is safe to invoke `MachineInfo` on cAdvisor before logically initializing cAdvisor here because 246 // machine info is computed and cached once as part of cAdvisor object creation. 247 // But `RootFsInfo` and `ImagesFsInfo` are not available at this moment so they will be called later during manager starts 248 machineInfo, err := cadvisorInterface.MachineInfo() 249 if err != nil { 250 return nil, err 251 } 252 capacity := cadvisor.CapacityFromMachineInfo(machineInfo) 253 for k, v := range capacity { 254 internalCapacity[k] = v 255 } 256 pidlimits, err := pidlimit.Stats() 257 if err == nil && pidlimits != nil && pidlimits.MaxPID != nil { 258 internalCapacity[pidlimit.PIDs] = *resource.NewQuantity( 259 int64(*pidlimits.MaxPID), 260 resource.DecimalSI) 261 } 262 263 // Turn CgroupRoot from a string (in cgroupfs path format) to internal CgroupName 264 cgroupRoot := ParseCgroupfsToCgroupName(nodeConfig.CgroupRoot) 265 cgroupManager := NewCgroupManager(subsystems, nodeConfig.CgroupDriver) 266 // Check if Cgroup-root actually exists on the node 267 if nodeConfig.CgroupsPerQOS { 268 // this does default to / when enabled, but this tests against regressions. 269 if nodeConfig.CgroupRoot == "" { 270 return nil, fmt.Errorf("invalid configuration: cgroups-per-qos was specified and cgroup-root was not specified. To enable the QoS cgroup hierarchy you need to specify a valid cgroup-root") 271 } 272 273 // we need to check that the cgroup root actually exists for each subsystem 274 // of note, we always use the cgroupfs driver when performing this check since 275 // the input is provided in that format. 276 // this is important because we do not want any name conversion to occur. 277 if !cgroupManager.Exists(cgroupRoot) { 278 return nil, fmt.Errorf("invalid configuration: cgroup-root %q doesn't exist", cgroupRoot) 279 } 280 klog.InfoS("Container manager verified user specified cgroup-root exists", "cgroupRoot", cgroupRoot) 281 // Include the top level cgroup for enforcing node allocatable into cgroup-root. 282 // This way, all sub modules can avoid having to understand the concept of node allocatable. 283 cgroupRoot = NewCgroupName(cgroupRoot, defaultNodeAllocatableCgroupName) 284 } 285 klog.InfoS("Creating Container Manager object based on Node Config", "nodeConfig", nodeConfig) 286 287 qosContainerManager, err := NewQOSContainerManager(subsystems, cgroupRoot, nodeConfig, cgroupManager) 288 if err != nil { 289 return nil, err 290 } 291 292 cm := &containerManagerImpl{ 293 cadvisorInterface: cadvisorInterface, 294 mountUtil: mountUtil, 295 NodeConfig: nodeConfig, 296 subsystems: subsystems, 297 cgroupManager: cgroupManager, 298 capacity: capacity, 299 internalCapacity: internalCapacity, 300 cgroupRoot: cgroupRoot, 301 recorder: recorder, 302 qosContainerManager: qosContainerManager, 303 } 304 305 if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.TopologyManager) { 306 cm.topologyManager, err = topologymanager.NewManager( 307 machineInfo.Topology, 308 nodeConfig.ExperimentalTopologyManagerPolicy, 309 nodeConfig.ExperimentalTopologyManagerScope, 310 ) 311 312 if err != nil { 313 return nil, err 314 } 315 316 } else { 317 cm.topologyManager = topologymanager.NewFakeManager() 318 } 319 320 klog.InfoS("Creating device plugin manager", "devicePluginEnabled", devicePluginEnabled) 321 if devicePluginEnabled { 322 cm.deviceManager, err = devicemanager.NewManagerImpl(machineInfo.Topology, cm.topologyManager) 323 cm.topologyManager.AddHintProvider(cm.deviceManager) 324 } else { 325 cm.deviceManager, err = devicemanager.NewManagerStub() 326 } 327 if err != nil { 328 return nil, err 329 } 330 331 // Initialize CPU manager 332 if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) { 333 cm.cpuManager, err = cpumanager.NewManager( 334 nodeConfig.ExperimentalCPUManagerPolicy, 335 nodeConfig.ExperimentalCPUManagerPolicyOptions, 336 nodeConfig.ExperimentalCPUManagerReconcilePeriod, 337 machineInfo, 338 nodeConfig.NodeAllocatableConfig.ReservedSystemCPUs, 339 cm.GetNodeAllocatableReservation(), 340 nodeConfig.KubeletRootDir, 341 cm.topologyManager, 342 ) 343 if err != nil { 344 klog.ErrorS(err, "Failed to initialize cpu manager") 345 return nil, err 346 } 347 cm.topologyManager.AddHintProvider(cm.cpuManager) 348 } 349 350 if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.MemoryManager) { 351 cm.memoryManager, err = memorymanager.NewManager( 352 nodeConfig.ExperimentalMemoryManagerPolicy, 353 machineInfo, 354 cm.GetNodeAllocatableReservation(), 355 nodeConfig.ExperimentalMemoryManagerReservedMemory, 356 nodeConfig.KubeletRootDir, 357 cm.topologyManager, 358 ) 359 if err != nil { 360 klog.ErrorS(err, "Failed to initialize memory manager") 361 return nil, err 362 } 363 cm.topologyManager.AddHintProvider(cm.memoryManager) 364 } 365 366 return cm, nil 367} 368 369// NewPodContainerManager is a factory method returns a PodContainerManager object 370// If qosCgroups are enabled then it returns the general pod container manager 371// otherwise it returns a no-op manager which essentially does nothing 372func (cm *containerManagerImpl) NewPodContainerManager() PodContainerManager { 373 if cm.NodeConfig.CgroupsPerQOS { 374 return &podContainerManagerImpl{ 375 qosContainersInfo: cm.GetQOSContainersInfo(), 376 subsystems: cm.subsystems, 377 cgroupManager: cm.cgroupManager, 378 podPidsLimit: cm.ExperimentalPodPidsLimit, 379 enforceCPULimits: cm.EnforceCPULimits, 380 cpuCFSQuotaPeriod: uint64(cm.CPUCFSQuotaPeriod / time.Microsecond), 381 } 382 } 383 return &podContainerManagerNoop{ 384 cgroupRoot: cm.cgroupRoot, 385 } 386} 387 388func (cm *containerManagerImpl) InternalContainerLifecycle() InternalContainerLifecycle { 389 return &internalContainerLifecycleImpl{cm.cpuManager, cm.memoryManager, cm.topologyManager} 390} 391 392// Create a cgroup container manager. 393func createManager(containerName string) (cgroups.Manager, error) { 394 cg := &configs.Cgroup{ 395 Parent: "/", 396 Name: containerName, 397 Resources: &configs.Resources{ 398 SkipDevices: true, 399 }, 400 } 401 402 if cgroups.IsCgroup2UnifiedMode() { 403 return cgroupfs2.NewManager(cg, "", false) 404 405 } 406 return cgroupfs.NewManager(cg, nil, false), nil 407} 408 409type KernelTunableBehavior string 410 411const ( 412 KernelTunableWarn KernelTunableBehavior = "warn" 413 KernelTunableError KernelTunableBehavior = "error" 414 KernelTunableModify KernelTunableBehavior = "modify" 415) 416 417// setupKernelTunables validates kernel tunable flags are set as expected 418// depending upon the specified option, it will either warn, error, or modify the kernel tunable flags 419func setupKernelTunables(option KernelTunableBehavior) error { 420 desiredState := map[string]int{ 421 utilsysctl.VMOvercommitMemory: utilsysctl.VMOvercommitMemoryAlways, 422 utilsysctl.VMPanicOnOOM: utilsysctl.VMPanicOnOOMInvokeOOMKiller, 423 utilsysctl.KernelPanic: utilsysctl.KernelPanicRebootTimeout, 424 utilsysctl.KernelPanicOnOops: utilsysctl.KernelPanicOnOopsAlways, 425 utilsysctl.RootMaxKeys: utilsysctl.RootMaxKeysSetting, 426 utilsysctl.RootMaxBytes: utilsysctl.RootMaxBytesSetting, 427 } 428 429 sysctl := utilsysctl.New() 430 431 errList := []error{} 432 for flag, expectedValue := range desiredState { 433 val, err := sysctl.GetSysctl(flag) 434 if err != nil { 435 errList = append(errList, err) 436 continue 437 } 438 if val == expectedValue { 439 continue 440 } 441 442 switch option { 443 case KernelTunableError: 444 errList = append(errList, fmt.Errorf("invalid kernel flag: %v, expected value: %v, actual value: %v", flag, expectedValue, val)) 445 case KernelTunableWarn: 446 klog.V(2).InfoS("Invalid kernel flag", "flag", flag, "expectedValue", expectedValue, "actualValue", val) 447 case KernelTunableModify: 448 klog.V(2).InfoS("Updating kernel flag", "flag", flag, "expectedValue", expectedValue, "actualValue", val) 449 err = sysctl.SetSysctl(flag, expectedValue) 450 if err != nil { 451 if libcontaineruserns.RunningInUserNS() { 452 if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.KubeletInUserNamespace) { 453 klog.V(2).InfoS("Updating kernel flag failed (running in UserNS, ignoring)", "flag", flag, "err", err) 454 continue 455 } 456 klog.ErrorS(err, "Updating kernel flag failed (Hint: enable KubeletInUserNamespace feature flag to ignore the error)", "flag", flag) 457 } 458 errList = append(errList, err) 459 } 460 } 461 } 462 return utilerrors.NewAggregate(errList) 463} 464 465func (cm *containerManagerImpl) setupNode(activePods ActivePodsFunc) error { 466 f, err := validateSystemRequirements(cm.mountUtil) 467 if err != nil { 468 return err 469 } 470 if !f.cpuHardcapping { 471 cm.status.SoftRequirements = fmt.Errorf("CPU hardcapping unsupported") 472 } 473 b := KernelTunableModify 474 if cm.GetNodeConfig().ProtectKernelDefaults { 475 b = KernelTunableError 476 } 477 if err := setupKernelTunables(b); err != nil { 478 return err 479 } 480 481 // Setup top level qos containers only if CgroupsPerQOS flag is specified as true 482 if cm.NodeConfig.CgroupsPerQOS { 483 if err := cm.createNodeAllocatableCgroups(); err != nil { 484 return err 485 } 486 err = cm.qosContainerManager.Start(cm.GetNodeAllocatableAbsolute, activePods) 487 if err != nil { 488 return fmt.Errorf("failed to initialize top level QOS containers: %v", err) 489 } 490 } 491 492 // Enforce Node Allocatable (if required) 493 if err := cm.enforceNodeAllocatableCgroups(); err != nil { 494 return err 495 } 496 497 systemContainers := []*systemContainer{} 498 if cm.ContainerRuntime == "docker" { 499 // With the docker-CRI integration, dockershim manages the cgroups 500 // and oom score for the docker processes. 501 // Check the cgroup for docker periodically, so kubelet can serve stats for the docker runtime. 502 // TODO(KEP#866): remove special processing for CRI "docker" enablement 503 cm.periodicTasks = append(cm.periodicTasks, func() { 504 klog.V(4).InfoS("Adding periodic tasks for docker CRI integration") 505 cont, err := getContainerNameForProcess(dockerProcessName, dockerPidFile) 506 if err != nil { 507 klog.ErrorS(err, "Failed to get container name for process") 508 return 509 } 510 klog.V(2).InfoS("Discovered runtime cgroup name", "cgroupName", cont) 511 cm.Lock() 512 defer cm.Unlock() 513 cm.RuntimeCgroupsName = cont 514 }) 515 } 516 517 if cm.SystemCgroupsName != "" { 518 if cm.SystemCgroupsName == "/" { 519 return fmt.Errorf("system container cannot be root (\"/\")") 520 } 521 cont, err := newSystemCgroups(cm.SystemCgroupsName) 522 if err != nil { 523 return err 524 } 525 cont.ensureStateFunc = func(manager cgroups.Manager) error { 526 return ensureSystemCgroups("/", manager) 527 } 528 systemContainers = append(systemContainers, cont) 529 } 530 531 if cm.KubeletCgroupsName != "" { 532 cont, err := newSystemCgroups(cm.KubeletCgroupsName) 533 if err != nil { 534 return err 535 } 536 537 cont.ensureStateFunc = func(_ cgroups.Manager) error { 538 return ensureProcessInContainerWithOOMScore(os.Getpid(), qos.KubeletOOMScoreAdj, cont.manager) 539 } 540 systemContainers = append(systemContainers, cont) 541 } else { 542 cm.periodicTasks = append(cm.periodicTasks, func() { 543 if err := ensureProcessInContainerWithOOMScore(os.Getpid(), qos.KubeletOOMScoreAdj, nil); err != nil { 544 klog.ErrorS(err, "Failed to ensure process in container with oom score") 545 return 546 } 547 cont, err := getContainer(os.Getpid()) 548 if err != nil { 549 klog.ErrorS(err, "Failed to find cgroups of kubelet") 550 return 551 } 552 cm.Lock() 553 defer cm.Unlock() 554 555 cm.KubeletCgroupsName = cont 556 }) 557 } 558 559 cm.systemContainers = systemContainers 560 return nil 561} 562 563func getContainerNameForProcess(name, pidFile string) (string, error) { 564 pids, err := getPidsForProcess(name, pidFile) 565 if err != nil { 566 return "", fmt.Errorf("failed to detect process id for %q - %v", name, err) 567 } 568 if len(pids) == 0 { 569 return "", nil 570 } 571 cont, err := getContainer(pids[0]) 572 if err != nil { 573 return "", err 574 } 575 return cont, nil 576} 577 578func (cm *containerManagerImpl) GetNodeConfig() NodeConfig { 579 cm.RLock() 580 defer cm.RUnlock() 581 return cm.NodeConfig 582} 583 584// GetPodCgroupRoot returns the literal cgroupfs value for the cgroup containing all pods. 585func (cm *containerManagerImpl) GetPodCgroupRoot() string { 586 return cm.cgroupManager.Name(cm.cgroupRoot) 587} 588 589func (cm *containerManagerImpl) GetMountedSubsystems() *CgroupSubsystems { 590 return cm.subsystems 591} 592 593func (cm *containerManagerImpl) GetQOSContainersInfo() QOSContainersInfo { 594 return cm.qosContainerManager.GetQOSContainersInfo() 595} 596 597func (cm *containerManagerImpl) UpdateQOSCgroups() error { 598 return cm.qosContainerManager.UpdateCgroups() 599} 600 601func (cm *containerManagerImpl) Status() Status { 602 cm.RLock() 603 defer cm.RUnlock() 604 return cm.status 605} 606 607func (cm *containerManagerImpl) Start(node *v1.Node, 608 activePods ActivePodsFunc, 609 sourcesReady config.SourcesReady, 610 podStatusProvider status.PodStatusProvider, 611 runtimeService internalapi.RuntimeService) error { 612 613 // Initialize CPU manager 614 if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) { 615 containerMap, err := buildContainerMapFromRuntime(runtimeService) 616 if err != nil { 617 return fmt.Errorf("failed to build map of initial containers from runtime: %v", err) 618 } 619 err = cm.cpuManager.Start(cpumanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap) 620 if err != nil { 621 return fmt.Errorf("start cpu manager error: %v", err) 622 } 623 } 624 625 // Initialize memory manager 626 if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.MemoryManager) { 627 containerMap, err := buildContainerMapFromRuntime(runtimeService) 628 if err != nil { 629 return fmt.Errorf("failed to build map of initial containers from runtime: %v", err) 630 } 631 err = cm.memoryManager.Start(memorymanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap) 632 if err != nil { 633 return fmt.Errorf("start memory manager error: %v", err) 634 } 635 } 636 637 // cache the node Info including resource capacity and 638 // allocatable of the node 639 cm.nodeInfo = node 640 641 if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.LocalStorageCapacityIsolation) { 642 rootfs, err := cm.cadvisorInterface.RootFsInfo() 643 if err != nil { 644 return fmt.Errorf("failed to get rootfs info: %v", err) 645 } 646 for rName, rCap := range cadvisor.EphemeralStorageCapacityFromFsInfo(rootfs) { 647 cm.capacity[rName] = rCap 648 } 649 } 650 651 // Ensure that node allocatable configuration is valid. 652 if err := cm.validateNodeAllocatable(); err != nil { 653 return err 654 } 655 656 // Setup the node 657 if err := cm.setupNode(activePods); err != nil { 658 return err 659 } 660 661 // Don't run a background thread if there are no ensureStateFuncs. 662 hasEnsureStateFuncs := false 663 for _, cont := range cm.systemContainers { 664 if cont.ensureStateFunc != nil { 665 hasEnsureStateFuncs = true 666 break 667 } 668 } 669 if hasEnsureStateFuncs { 670 // Run ensure state functions every minute. 671 go wait.Until(func() { 672 for _, cont := range cm.systemContainers { 673 if cont.ensureStateFunc != nil { 674 if err := cont.ensureStateFunc(cont.manager); err != nil { 675 klog.InfoS("Failed to ensure state", "containerName", cont.name, "err", err) 676 } 677 } 678 } 679 }, time.Minute, wait.NeverStop) 680 681 } 682 683 if len(cm.periodicTasks) > 0 { 684 go wait.Until(func() { 685 for _, task := range cm.periodicTasks { 686 if task != nil { 687 task() 688 } 689 } 690 }, 5*time.Minute, wait.NeverStop) 691 } 692 693 // Starts device manager. 694 if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady); err != nil { 695 return err 696 } 697 698 return nil 699} 700 701func (cm *containerManagerImpl) GetPluginRegistrationHandler() cache.PluginHandler { 702 return cm.deviceManager.GetWatcherHandler() 703} 704 705// TODO: move the GetResources logic to PodContainerManager. 706func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error) { 707 opts := &kubecontainer.RunContainerOptions{} 708 // Allocate should already be called during predicateAdmitHandler.Admit(), 709 // just try to fetch device runtime information from cached state here 710 devOpts, err := cm.deviceManager.GetDeviceRunContainerOptions(pod, container) 711 if err != nil { 712 return nil, err 713 } else if devOpts == nil { 714 return opts, nil 715 } 716 opts.Devices = append(opts.Devices, devOpts.Devices...) 717 opts.Mounts = append(opts.Mounts, devOpts.Mounts...) 718 opts.Envs = append(opts.Envs, devOpts.Envs...) 719 opts.Annotations = append(opts.Annotations, devOpts.Annotations...) 720 return opts, nil 721} 722 723func (cm *containerManagerImpl) UpdatePluginResources(node *schedulerframework.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error { 724 return cm.deviceManager.UpdatePluginResources(node, attrs) 725} 726 727func (cm *containerManagerImpl) GetAllocateResourcesPodAdmitHandler() lifecycle.PodAdmitHandler { 728 if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.TopologyManager) { 729 return cm.topologyManager 730 } 731 // TODO: we need to think about a better way to do this. This will work for 732 // now so long as we have only the cpuManager and deviceManager relying on 733 // allocations here. However, going forward it is not generalized enough to 734 // work as we add more and more hint providers that the TopologyManager 735 // needs to call Allocate() on (that may not be directly intstantiated 736 // inside this component). 737 return &resourceAllocator{cm.cpuManager, cm.memoryManager, cm.deviceManager} 738} 739 740type resourceAllocator struct { 741 cpuManager cpumanager.Manager 742 memoryManager memorymanager.Manager 743 deviceManager devicemanager.Manager 744} 745 746func (m *resourceAllocator) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult { 747 pod := attrs.Pod 748 749 for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) { 750 err := m.deviceManager.Allocate(pod, &container) 751 if err != nil { 752 return admission.GetPodAdmitResult(err) 753 } 754 755 if m.cpuManager != nil { 756 err = m.cpuManager.Allocate(pod, &container) 757 if err != nil { 758 return admission.GetPodAdmitResult(err) 759 } 760 } 761 762 if m.memoryManager != nil { 763 err = m.memoryManager.Allocate(pod, &container) 764 if err != nil { 765 return admission.GetPodAdmitResult(err) 766 } 767 } 768 } 769 770 return admission.GetPodAdmitResult(nil) 771} 772 773func (cm *containerManagerImpl) SystemCgroupsLimit() v1.ResourceList { 774 cpuLimit := int64(0) 775 776 // Sum up resources of all external containers. 777 for _, cont := range cm.systemContainers { 778 cpuLimit += cont.cpuMillicores 779 } 780 781 return v1.ResourceList{ 782 v1.ResourceCPU: *resource.NewMilliQuantity( 783 cpuLimit, 784 resource.DecimalSI), 785 } 786} 787 788func buildContainerMapFromRuntime(runtimeService internalapi.RuntimeService) (containermap.ContainerMap, error) { 789 podSandboxMap := make(map[string]string) 790 podSandboxList, _ := runtimeService.ListPodSandbox(nil) 791 for _, p := range podSandboxList { 792 podSandboxMap[p.Id] = p.Metadata.Uid 793 } 794 795 containerMap := containermap.NewContainerMap() 796 containerList, _ := runtimeService.ListContainers(nil) 797 for _, c := range containerList { 798 if _, exists := podSandboxMap[c.PodSandboxId]; !exists { 799 return nil, fmt.Errorf("no PodsandBox found with Id '%s'", c.PodSandboxId) 800 } 801 containerMap.Add(podSandboxMap[c.PodSandboxId], c.Metadata.Name, c.Id) 802 } 803 804 return containerMap, nil 805} 806 807func isProcessRunningInHost(pid int) (bool, error) { 808 // Get init pid namespace. 809 initPidNs, err := os.Readlink("/proc/1/ns/pid") 810 if err != nil { 811 return false, fmt.Errorf("failed to find pid namespace of init process") 812 } 813 klog.V(10).InfoS("Found init PID namespace", "namespace", initPidNs) 814 processPidNs, err := os.Readlink(fmt.Sprintf("/proc/%d/ns/pid", pid)) 815 if err != nil { 816 return false, fmt.Errorf("failed to find pid namespace of process %q", pid) 817 } 818 klog.V(10).InfoS("Process info", "pid", pid, "namespace", processPidNs) 819 return initPidNs == processPidNs, nil 820} 821 822func getPidFromPidFile(pidFile string) (int, error) { 823 file, err := os.Open(pidFile) 824 if err != nil { 825 return 0, fmt.Errorf("error opening pid file %s: %v", pidFile, err) 826 } 827 defer file.Close() 828 829 data, err := utilio.ReadAtMost(file, maxPidFileLength) 830 if err != nil { 831 return 0, fmt.Errorf("error reading pid file %s: %v", pidFile, err) 832 } 833 834 pid, err := strconv.Atoi(string(data)) 835 if err != nil { 836 return 0, fmt.Errorf("error parsing %s as a number: %v", string(data), err) 837 } 838 839 return pid, nil 840} 841 842func getPidsForProcess(name, pidFile string) ([]int, error) { 843 if len(pidFile) == 0 { 844 return procfs.PidOf(name) 845 } 846 847 pid, err := getPidFromPidFile(pidFile) 848 if err == nil { 849 return []int{pid}, nil 850 } 851 852 // Try to lookup pid by process name 853 pids, err2 := procfs.PidOf(name) 854 if err2 == nil { 855 return pids, nil 856 } 857 858 // Return error from getPidFromPidFile since that should have worked 859 // and is the real source of the problem. 860 klog.V(4).InfoS("Unable to get pid from file", "path", pidFile, "err", err) 861 return []int{}, err 862} 863 864// Ensures that the Docker daemon is in the desired container. 865// Temporarily export the function to be used by dockershim. 866// TODO(yujuhong): Move this function to dockershim once kubelet migrates to 867// dockershim as the default. 868func EnsureDockerInContainer(dockerAPIVersion *utilversion.Version, oomScoreAdj int, manager cgroups.Manager) error { 869 type process struct{ name, file string } 870 dockerProcs := []process{{dockerProcessName, dockerPidFile}} 871 if dockerAPIVersion.AtLeast(containerdAPIVersion) { 872 // By default containerd is started separately, so there is no pid file. 873 containerdPidFile := "" 874 dockerProcs = append(dockerProcs, process{containerdProcessName, containerdPidFile}) 875 } 876 var errs []error 877 for _, proc := range dockerProcs { 878 pids, err := getPidsForProcess(proc.name, proc.file) 879 if err != nil { 880 errs = append(errs, fmt.Errorf("failed to get pids for %q: %v", proc.name, err)) 881 continue 882 } 883 884 // Move if the pid is not already in the desired container. 885 for _, pid := range pids { 886 if err := ensureProcessInContainerWithOOMScore(pid, oomScoreAdj, manager); err != nil { 887 errs = append(errs, fmt.Errorf("errors moving %q pid: %v", proc.name, err)) 888 } 889 } 890 } 891 return utilerrors.NewAggregate(errs) 892} 893 894func ensureProcessInContainerWithOOMScore(pid int, oomScoreAdj int, manager cgroups.Manager) error { 895 if runningInHost, err := isProcessRunningInHost(pid); err != nil { 896 // Err on the side of caution. Avoid moving the docker daemon unless we are able to identify its context. 897 return err 898 } else if !runningInHost { 899 // Process is running inside a container. Don't touch that. 900 klog.V(2).InfoS("PID is not running in the host namespace", "pid", pid) 901 return nil 902 } 903 904 var errs []error 905 if manager != nil { 906 cont, err := getContainer(pid) 907 if err != nil { 908 errs = append(errs, fmt.Errorf("failed to find container of PID %d: %v", pid, err)) 909 } 910 911 name := "" 912 cgroups, err := manager.GetCgroups() 913 if err != nil { 914 errs = append(errs, fmt.Errorf("failed to get cgroups for %d: %v", pid, err)) 915 } else { 916 name = cgroups.Name 917 } 918 919 if cont != name { 920 err = manager.Apply(pid) 921 if err != nil { 922 errs = append(errs, fmt.Errorf("failed to move PID %d (in %q) to %q: %v", pid, cont, name, err)) 923 } 924 } 925 } 926 927 // Also apply oom-score-adj to processes 928 oomAdjuster := oom.NewOOMAdjuster() 929 klog.V(5).InfoS("Attempting to apply oom_score_adj to process", "oomScoreAdj", oomScoreAdj, "pid", pid) 930 if err := oomAdjuster.ApplyOOMScoreAdj(pid, oomScoreAdj); err != nil { 931 klog.V(3).InfoS("Failed to apply oom_score_adj to process", "oomScoreAdj", oomScoreAdj, "pid", pid, "err", err) 932 errs = append(errs, fmt.Errorf("failed to apply oom score %d to PID %d: %v", oomScoreAdj, pid, err)) 933 } 934 return utilerrors.NewAggregate(errs) 935} 936 937// getContainer returns the cgroup associated with the specified pid. 938// It enforces a unified hierarchy for memory and cpu cgroups. 939// On systemd environments, it uses the name=systemd cgroup for the specified pid. 940func getContainer(pid int) (string, error) { 941 cgs, err := cgroups.ParseCgroupFile(fmt.Sprintf("/proc/%d/cgroup", pid)) 942 if err != nil { 943 return "", err 944 } 945 946 if cgroups.IsCgroup2UnifiedMode() { 947 c, found := cgs[""] 948 if !found { 949 return "", cgroups.NewNotFoundError("unified") 950 } 951 return c, nil 952 } 953 954 cpu, found := cgs["cpu"] 955 if !found { 956 return "", cgroups.NewNotFoundError("cpu") 957 } 958 memory, found := cgs["memory"] 959 if !found { 960 return "", cgroups.NewNotFoundError("memory") 961 } 962 963 // since we use this container for accounting, we need to ensure its a unified hierarchy. 964 if cpu != memory { 965 return "", fmt.Errorf("cpu and memory cgroup hierarchy not unified. cpu: %s, memory: %s", cpu, memory) 966 } 967 968 // on systemd, every pid is in a unified cgroup hierarchy (name=systemd as seen in systemd-cgls) 969 // cpu and memory accounting is off by default, users may choose to enable it per unit or globally. 970 // users could enable CPU and memory accounting globally via /etc/systemd/system.conf (DefaultCPUAccounting=true DefaultMemoryAccounting=true). 971 // users could also enable CPU and memory accounting per unit via CPUAccounting=true and MemoryAccounting=true 972 // we only warn if accounting is not enabled for CPU or memory so as to not break local development flows where kubelet is launched in a terminal. 973 // for example, the cgroup for the user session will be something like /user.slice/user-X.slice/session-X.scope, but the cpu and memory 974 // cgroup will be the closest ancestor where accounting is performed (most likely /) on systems that launch docker containers. 975 // as a result, on those systems, you will not get cpu or memory accounting statistics for kubelet. 976 // in addition, you would not get memory or cpu accounting for the runtime unless accounting was enabled on its unit (or globally). 977 if systemd, found := cgs["name=systemd"]; found { 978 if systemd != cpu { 979 klog.InfoS("CPUAccounting not enabled for process", "pid", pid) 980 } 981 if systemd != memory { 982 klog.InfoS("MemoryAccounting not enabled for process", "pid", pid) 983 } 984 return systemd, nil 985 } 986 987 return cpu, nil 988} 989 990// Ensures the system container is created and all non-kernel threads and process 1 991// without a container are moved to it. 992// 993// The reason of leaving kernel threads at root cgroup is that we don't want to tie the 994// execution of these threads with to-be defined /system quota and create priority inversions. 995// 996func ensureSystemCgroups(rootCgroupPath string, manager cgroups.Manager) error { 997 // Move non-kernel PIDs to the system container. 998 // Only keep errors on latest attempt. 999 var finalErr error 1000 for i := 0; i <= 10; i++ { 1001 allPids, err := cmutil.GetPids(rootCgroupPath) 1002 if err != nil { 1003 finalErr = fmt.Errorf("failed to list PIDs for root: %v", err) 1004 continue 1005 } 1006 1007 // Remove kernel pids and other protected PIDs (pid 1, PIDs already in system & kubelet containers) 1008 pids := make([]int, 0, len(allPids)) 1009 for _, pid := range allPids { 1010 if pid == 1 || isKernelPid(pid) { 1011 continue 1012 } 1013 1014 pids = append(pids, pid) 1015 } 1016 1017 // Check if we have moved all the non-kernel PIDs. 1018 if len(pids) == 0 { 1019 return nil 1020 } 1021 1022 klog.V(3).InfoS("Moving non-kernel processes", "pids", pids) 1023 for _, pid := range pids { 1024 err := manager.Apply(pid) 1025 if err != nil { 1026 name := "" 1027 cgroups, err := manager.GetCgroups() 1028 if err == nil { 1029 name = cgroups.Name 1030 } 1031 1032 finalErr = fmt.Errorf("failed to move PID %d into the system container %q: %v", pid, name, err) 1033 } 1034 } 1035 1036 } 1037 1038 return finalErr 1039} 1040 1041// Determines whether the specified PID is a kernel PID. 1042func isKernelPid(pid int) bool { 1043 // Kernel threads have no associated executable. 1044 _, err := os.Readlink(fmt.Sprintf("/proc/%d/exe", pid)) 1045 return err != nil && os.IsNotExist(err) 1046} 1047 1048func (cm *containerManagerImpl) GetCapacity() v1.ResourceList { 1049 return cm.capacity 1050} 1051 1052func (cm *containerManagerImpl) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) { 1053 return cm.deviceManager.GetCapacity() 1054} 1055 1056func (cm *containerManagerImpl) GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices { 1057 return containerDevicesFromResourceDeviceInstances(cm.deviceManager.GetDevices(podUID, containerName)) 1058} 1059 1060func (cm *containerManagerImpl) GetAllocatableDevices() []*podresourcesapi.ContainerDevices { 1061 return containerDevicesFromResourceDeviceInstances(cm.deviceManager.GetAllocatableDevices()) 1062} 1063 1064func (cm *containerManagerImpl) GetCPUs(podUID, containerName string) []int64 { 1065 if cm.cpuManager != nil { 1066 return cm.cpuManager.GetCPUs(podUID, containerName).ToSliceNoSortInt64() 1067 } 1068 return []int64{} 1069} 1070 1071func (cm *containerManagerImpl) GetAllocatableCPUs() []int64 { 1072 if cm.cpuManager != nil { 1073 return cm.cpuManager.GetAllocatableCPUs().ToSliceNoSortInt64() 1074 } 1075 return []int64{} 1076} 1077 1078func (cm *containerManagerImpl) GetMemory(podUID, containerName string) []*podresourcesapi.ContainerMemory { 1079 if cm.memoryManager == nil { 1080 return []*podresourcesapi.ContainerMemory{} 1081 } 1082 1083 return containerMemoryFromBlock(cm.memoryManager.GetMemory(podUID, containerName)) 1084} 1085 1086func (cm *containerManagerImpl) GetAllocatableMemory() []*podresourcesapi.ContainerMemory { 1087 if cm.memoryManager == nil { 1088 return []*podresourcesapi.ContainerMemory{} 1089 } 1090 1091 return containerMemoryFromBlock(cm.memoryManager.GetAllocatableMemory()) 1092} 1093 1094func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool { 1095 return cm.deviceManager.ShouldResetExtendedResourceCapacity() 1096} 1097 1098func (cm *containerManagerImpl) UpdateAllocatedDevices() { 1099 cm.deviceManager.UpdateAllocatedDevices() 1100} 1101 1102func containerMemoryFromBlock(blocks []memorymanagerstate.Block) []*podresourcesapi.ContainerMemory { 1103 var containerMemories []*podresourcesapi.ContainerMemory 1104 1105 for _, b := range blocks { 1106 containerMemory := podresourcesapi.ContainerMemory{ 1107 MemoryType: string(b.Type), 1108 Size_: b.Size, 1109 Topology: &podresourcesapi.TopologyInfo{ 1110 Nodes: []*podresourcesapi.NUMANode{}, 1111 }, 1112 } 1113 1114 for _, numaNodeID := range b.NUMAAffinity { 1115 containerMemory.Topology.Nodes = append(containerMemory.Topology.Nodes, &podresourcesapi.NUMANode{ID: int64(numaNodeID)}) 1116 } 1117 1118 containerMemories = append(containerMemories, &containerMemory) 1119 } 1120 1121 return containerMemories 1122} 1123