1// +build linux
2
3/*
4Copyright 2015 The Kubernetes Authors.
5
6Licensed under the Apache License, Version 2.0 (the "License");
7you may not use this file except in compliance with the License.
8You may obtain a copy of the License at
9
10    http://www.apache.org/licenses/LICENSE-2.0
11
12Unless required by applicable law or agreed to in writing, software
13distributed under the License is distributed on an "AS IS" BASIS,
14WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15See the License for the specific language governing permissions and
16limitations under the License.
17*/
18
19package cm
20
21import (
22	"bytes"
23	"fmt"
24	"io/ioutil"
25	"os"
26	"path"
27	"strconv"
28	"strings"
29	"sync"
30	"time"
31
32	"github.com/opencontainers/runc/libcontainer/cgroups"
33	cgroupfs "github.com/opencontainers/runc/libcontainer/cgroups/fs"
34	cgroupfs2 "github.com/opencontainers/runc/libcontainer/cgroups/fs2"
35	"github.com/opencontainers/runc/libcontainer/configs"
36	"k8s.io/klog/v2"
37	"k8s.io/mount-utils"
38	utilio "k8s.io/utils/io"
39	utilpath "k8s.io/utils/path"
40
41	libcontaineruserns "github.com/opencontainers/runc/libcontainer/userns"
42	v1 "k8s.io/api/core/v1"
43	"k8s.io/apimachinery/pkg/api/resource"
44	utilerrors "k8s.io/apimachinery/pkg/util/errors"
45	"k8s.io/apimachinery/pkg/util/sets"
46	utilversion "k8s.io/apimachinery/pkg/util/version"
47	"k8s.io/apimachinery/pkg/util/wait"
48	utilfeature "k8s.io/apiserver/pkg/util/feature"
49	"k8s.io/client-go/tools/record"
50	internalapi "k8s.io/cri-api/pkg/apis"
51	podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
52	kubefeatures "k8s.io/kubernetes/pkg/features"
53	"k8s.io/kubernetes/pkg/kubelet/cadvisor"
54	"k8s.io/kubernetes/pkg/kubelet/cm/admission"
55	"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
56	"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
57	"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
58	"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager"
59	memorymanagerstate "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/state"
60	"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
61	cmutil "k8s.io/kubernetes/pkg/kubelet/cm/util"
62	"k8s.io/kubernetes/pkg/kubelet/config"
63	kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
64	"k8s.io/kubernetes/pkg/kubelet/lifecycle"
65	"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
66	"k8s.io/kubernetes/pkg/kubelet/qos"
67	"k8s.io/kubernetes/pkg/kubelet/stats/pidlimit"
68	"k8s.io/kubernetes/pkg/kubelet/status"
69	schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
70	"k8s.io/kubernetes/pkg/util/oom"
71	"k8s.io/kubernetes/pkg/util/procfs"
72	utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
73)
74
75const (
76	dockerProcessName = "dockerd"
77	// dockerd option --pidfile can specify path to use for daemon PID file, pid file path is default "/var/run/docker.pid"
78	dockerPidFile         = "/var/run/docker.pid"
79	containerdProcessName = "containerd"
80	maxPidFileLength      = 1 << 10 // 1KB
81)
82
83var (
84	// The docker version in which containerd was introduced.
85	containerdAPIVersion = utilversion.MustParseGeneric("1.23")
86)
87
88// A non-user container tracked by the Kubelet.
89type systemContainer struct {
90	// Absolute name of the container.
91	name string
92
93	// CPU limit in millicores.
94	cpuMillicores int64
95
96	// Function that ensures the state of the container.
97	// m is the cgroup manager for the specified container.
98	ensureStateFunc func(m cgroups.Manager) error
99
100	// Manager for the cgroups of the external container.
101	manager cgroups.Manager
102}
103
104func newSystemCgroups(containerName string) (*systemContainer, error) {
105	manager, err := createManager(containerName)
106	if err != nil {
107		return nil, err
108	}
109	return &systemContainer{
110		name:    containerName,
111		manager: manager,
112	}, nil
113}
114
115type containerManagerImpl struct {
116	sync.RWMutex
117	cadvisorInterface cadvisor.Interface
118	mountUtil         mount.Interface
119	NodeConfig
120	status Status
121	// External containers being managed.
122	systemContainers []*systemContainer
123	// Tasks that are run periodically
124	periodicTasks []func()
125	// Holds all the mounted cgroup subsystems
126	subsystems *CgroupSubsystems
127	nodeInfo   *v1.Node
128	// Interface for cgroup management
129	cgroupManager CgroupManager
130	// Capacity of this node.
131	capacity v1.ResourceList
132	// Capacity of this node, including internal resources.
133	internalCapacity v1.ResourceList
134	// Absolute cgroupfs path to a cgroup that Kubelet needs to place all pods under.
135	// This path include a top level container for enforcing Node Allocatable.
136	cgroupRoot CgroupName
137	// Event recorder interface.
138	recorder record.EventRecorder
139	// Interface for QoS cgroup management
140	qosContainerManager QOSContainerManager
141	// Interface for exporting and allocating devices reported by device plugins.
142	deviceManager devicemanager.Manager
143	// Interface for CPU affinity management.
144	cpuManager cpumanager.Manager
145	// Interface for memory affinity management.
146	memoryManager memorymanager.Manager
147	// Interface for Topology resource co-ordination
148	topologyManager topologymanager.Manager
149}
150
151type features struct {
152	cpuHardcapping bool
153}
154
155var _ ContainerManager = &containerManagerImpl{}
156
157// checks if the required cgroups subsystems are mounted.
158// As of now, only 'cpu' and 'memory' are required.
159// cpu quota is a soft requirement.
160func validateSystemRequirements(mountUtil mount.Interface) (features, error) {
161	const (
162		cgroupMountType = "cgroup"
163		localErr        = "system validation failed"
164	)
165	var (
166		cpuMountPoint string
167		f             features
168	)
169	mountPoints, err := mountUtil.List()
170	if err != nil {
171		return f, fmt.Errorf("%s - %v", localErr, err)
172	}
173
174	if cgroups.IsCgroup2UnifiedMode() {
175		f.cpuHardcapping = true
176		return f, nil
177	}
178
179	expectedCgroups := sets.NewString("cpu", "cpuacct", "cpuset", "memory")
180	for _, mountPoint := range mountPoints {
181		if mountPoint.Type == cgroupMountType {
182			for _, opt := range mountPoint.Opts {
183				if expectedCgroups.Has(opt) {
184					expectedCgroups.Delete(opt)
185				}
186				if opt == "cpu" {
187					cpuMountPoint = mountPoint.Path
188				}
189			}
190		}
191	}
192
193	if expectedCgroups.Len() > 0 {
194		return f, fmt.Errorf("%s - Following Cgroup subsystem not mounted: %v", localErr, expectedCgroups.List())
195	}
196
197	// Check if cpu quota is available.
198	// CPU cgroup is required and so it expected to be mounted at this point.
199	periodExists, err := utilpath.Exists(utilpath.CheckFollowSymlink, path.Join(cpuMountPoint, "cpu.cfs_period_us"))
200	if err != nil {
201		klog.ErrorS(err, "Failed to detect if CPU cgroup cpu.cfs_period_us is available")
202	}
203	quotaExists, err := utilpath.Exists(utilpath.CheckFollowSymlink, path.Join(cpuMountPoint, "cpu.cfs_quota_us"))
204	if err != nil {
205		klog.ErrorS(err, "Failed to detect if CPU cgroup cpu.cfs_quota_us is available")
206	}
207	if quotaExists && periodExists {
208		f.cpuHardcapping = true
209	}
210	return f, nil
211}
212
213// TODO(vmarmol): Add limits to the system containers.
214// Takes the absolute name of the specified containers.
215// Empty container name disables use of the specified container.
216func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, devicePluginEnabled bool, recorder record.EventRecorder) (ContainerManager, error) {
217	subsystems, err := GetCgroupSubsystems()
218	if err != nil {
219		return nil, fmt.Errorf("failed to get mounted cgroup subsystems: %v", err)
220	}
221
222	if failSwapOn {
223		// Check whether swap is enabled. The Kubelet does not support running with swap enabled.
224		swapFile := "/proc/swaps"
225		swapData, err := ioutil.ReadFile(swapFile)
226		if err != nil {
227			if os.IsNotExist(err) {
228				klog.InfoS("File does not exist, assuming that swap is disabled", "path", swapFile)
229			} else {
230				return nil, err
231			}
232		} else {
233			swapData = bytes.TrimSpace(swapData) // extra trailing \n
234			swapLines := strings.Split(string(swapData), "\n")
235
236			// If there is more than one line (table headers) in /proc/swaps, swap is enabled and we should
237			// error out unless --fail-swap-on is set to false.
238			if len(swapLines) > 1 {
239				return nil, fmt.Errorf("running with swap on is not supported, please disable swap! or set --fail-swap-on flag to false. /proc/swaps contained: %v", swapLines)
240			}
241		}
242	}
243
244	var internalCapacity = v1.ResourceList{}
245	// It is safe to invoke `MachineInfo` on cAdvisor before logically initializing cAdvisor here because
246	// machine info is computed and cached once as part of cAdvisor object creation.
247	// But `RootFsInfo` and `ImagesFsInfo` are not available at this moment so they will be called later during manager starts
248	machineInfo, err := cadvisorInterface.MachineInfo()
249	if err != nil {
250		return nil, err
251	}
252	capacity := cadvisor.CapacityFromMachineInfo(machineInfo)
253	for k, v := range capacity {
254		internalCapacity[k] = v
255	}
256	pidlimits, err := pidlimit.Stats()
257	if err == nil && pidlimits != nil && pidlimits.MaxPID != nil {
258		internalCapacity[pidlimit.PIDs] = *resource.NewQuantity(
259			int64(*pidlimits.MaxPID),
260			resource.DecimalSI)
261	}
262
263	// Turn CgroupRoot from a string (in cgroupfs path format) to internal CgroupName
264	cgroupRoot := ParseCgroupfsToCgroupName(nodeConfig.CgroupRoot)
265	cgroupManager := NewCgroupManager(subsystems, nodeConfig.CgroupDriver)
266	// Check if Cgroup-root actually exists on the node
267	if nodeConfig.CgroupsPerQOS {
268		// this does default to / when enabled, but this tests against regressions.
269		if nodeConfig.CgroupRoot == "" {
270			return nil, fmt.Errorf("invalid configuration: cgroups-per-qos was specified and cgroup-root was not specified. To enable the QoS cgroup hierarchy you need to specify a valid cgroup-root")
271		}
272
273		// we need to check that the cgroup root actually exists for each subsystem
274		// of note, we always use the cgroupfs driver when performing this check since
275		// the input is provided in that format.
276		// this is important because we do not want any name conversion to occur.
277		if !cgroupManager.Exists(cgroupRoot) {
278			return nil, fmt.Errorf("invalid configuration: cgroup-root %q doesn't exist", cgroupRoot)
279		}
280		klog.InfoS("Container manager verified user specified cgroup-root exists", "cgroupRoot", cgroupRoot)
281		// Include the top level cgroup for enforcing node allocatable into cgroup-root.
282		// This way, all sub modules can avoid having to understand the concept of node allocatable.
283		cgroupRoot = NewCgroupName(cgroupRoot, defaultNodeAllocatableCgroupName)
284	}
285	klog.InfoS("Creating Container Manager object based on Node Config", "nodeConfig", nodeConfig)
286
287	qosContainerManager, err := NewQOSContainerManager(subsystems, cgroupRoot, nodeConfig, cgroupManager)
288	if err != nil {
289		return nil, err
290	}
291
292	cm := &containerManagerImpl{
293		cadvisorInterface:   cadvisorInterface,
294		mountUtil:           mountUtil,
295		NodeConfig:          nodeConfig,
296		subsystems:          subsystems,
297		cgroupManager:       cgroupManager,
298		capacity:            capacity,
299		internalCapacity:    internalCapacity,
300		cgroupRoot:          cgroupRoot,
301		recorder:            recorder,
302		qosContainerManager: qosContainerManager,
303	}
304
305	if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.TopologyManager) {
306		cm.topologyManager, err = topologymanager.NewManager(
307			machineInfo.Topology,
308			nodeConfig.ExperimentalTopologyManagerPolicy,
309			nodeConfig.ExperimentalTopologyManagerScope,
310		)
311
312		if err != nil {
313			return nil, err
314		}
315
316	} else {
317		cm.topologyManager = topologymanager.NewFakeManager()
318	}
319
320	klog.InfoS("Creating device plugin manager", "devicePluginEnabled", devicePluginEnabled)
321	if devicePluginEnabled {
322		cm.deviceManager, err = devicemanager.NewManagerImpl(machineInfo.Topology, cm.topologyManager)
323		cm.topologyManager.AddHintProvider(cm.deviceManager)
324	} else {
325		cm.deviceManager, err = devicemanager.NewManagerStub()
326	}
327	if err != nil {
328		return nil, err
329	}
330
331	// Initialize CPU manager
332	if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) {
333		cm.cpuManager, err = cpumanager.NewManager(
334			nodeConfig.ExperimentalCPUManagerPolicy,
335			nodeConfig.ExperimentalCPUManagerPolicyOptions,
336			nodeConfig.ExperimentalCPUManagerReconcilePeriod,
337			machineInfo,
338			nodeConfig.NodeAllocatableConfig.ReservedSystemCPUs,
339			cm.GetNodeAllocatableReservation(),
340			nodeConfig.KubeletRootDir,
341			cm.topologyManager,
342		)
343		if err != nil {
344			klog.ErrorS(err, "Failed to initialize cpu manager")
345			return nil, err
346		}
347		cm.topologyManager.AddHintProvider(cm.cpuManager)
348	}
349
350	if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.MemoryManager) {
351		cm.memoryManager, err = memorymanager.NewManager(
352			nodeConfig.ExperimentalMemoryManagerPolicy,
353			machineInfo,
354			cm.GetNodeAllocatableReservation(),
355			nodeConfig.ExperimentalMemoryManagerReservedMemory,
356			nodeConfig.KubeletRootDir,
357			cm.topologyManager,
358		)
359		if err != nil {
360			klog.ErrorS(err, "Failed to initialize memory manager")
361			return nil, err
362		}
363		cm.topologyManager.AddHintProvider(cm.memoryManager)
364	}
365
366	return cm, nil
367}
368
369// NewPodContainerManager is a factory method returns a PodContainerManager object
370// If qosCgroups are enabled then it returns the general pod container manager
371// otherwise it returns a no-op manager which essentially does nothing
372func (cm *containerManagerImpl) NewPodContainerManager() PodContainerManager {
373	if cm.NodeConfig.CgroupsPerQOS {
374		return &podContainerManagerImpl{
375			qosContainersInfo: cm.GetQOSContainersInfo(),
376			subsystems:        cm.subsystems,
377			cgroupManager:     cm.cgroupManager,
378			podPidsLimit:      cm.ExperimentalPodPidsLimit,
379			enforceCPULimits:  cm.EnforceCPULimits,
380			cpuCFSQuotaPeriod: uint64(cm.CPUCFSQuotaPeriod / time.Microsecond),
381		}
382	}
383	return &podContainerManagerNoop{
384		cgroupRoot: cm.cgroupRoot,
385	}
386}
387
388func (cm *containerManagerImpl) InternalContainerLifecycle() InternalContainerLifecycle {
389	return &internalContainerLifecycleImpl{cm.cpuManager, cm.memoryManager, cm.topologyManager}
390}
391
392// Create a cgroup container manager.
393func createManager(containerName string) (cgroups.Manager, error) {
394	cg := &configs.Cgroup{
395		Parent: "/",
396		Name:   containerName,
397		Resources: &configs.Resources{
398			SkipDevices: true,
399		},
400	}
401
402	if cgroups.IsCgroup2UnifiedMode() {
403		return cgroupfs2.NewManager(cg, "", false)
404
405	}
406	return cgroupfs.NewManager(cg, nil, false), nil
407}
408
409type KernelTunableBehavior string
410
411const (
412	KernelTunableWarn   KernelTunableBehavior = "warn"
413	KernelTunableError  KernelTunableBehavior = "error"
414	KernelTunableModify KernelTunableBehavior = "modify"
415)
416
417// setupKernelTunables validates kernel tunable flags are set as expected
418// depending upon the specified option, it will either warn, error, or modify the kernel tunable flags
419func setupKernelTunables(option KernelTunableBehavior) error {
420	desiredState := map[string]int{
421		utilsysctl.VMOvercommitMemory: utilsysctl.VMOvercommitMemoryAlways,
422		utilsysctl.VMPanicOnOOM:       utilsysctl.VMPanicOnOOMInvokeOOMKiller,
423		utilsysctl.KernelPanic:        utilsysctl.KernelPanicRebootTimeout,
424		utilsysctl.KernelPanicOnOops:  utilsysctl.KernelPanicOnOopsAlways,
425		utilsysctl.RootMaxKeys:        utilsysctl.RootMaxKeysSetting,
426		utilsysctl.RootMaxBytes:       utilsysctl.RootMaxBytesSetting,
427	}
428
429	sysctl := utilsysctl.New()
430
431	errList := []error{}
432	for flag, expectedValue := range desiredState {
433		val, err := sysctl.GetSysctl(flag)
434		if err != nil {
435			errList = append(errList, err)
436			continue
437		}
438		if val == expectedValue {
439			continue
440		}
441
442		switch option {
443		case KernelTunableError:
444			errList = append(errList, fmt.Errorf("invalid kernel flag: %v, expected value: %v, actual value: %v", flag, expectedValue, val))
445		case KernelTunableWarn:
446			klog.V(2).InfoS("Invalid kernel flag", "flag", flag, "expectedValue", expectedValue, "actualValue", val)
447		case KernelTunableModify:
448			klog.V(2).InfoS("Updating kernel flag", "flag", flag, "expectedValue", expectedValue, "actualValue", val)
449			err = sysctl.SetSysctl(flag, expectedValue)
450			if err != nil {
451				if libcontaineruserns.RunningInUserNS() {
452					if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.KubeletInUserNamespace) {
453						klog.V(2).InfoS("Updating kernel flag failed (running in UserNS, ignoring)", "flag", flag, "err", err)
454						continue
455					}
456					klog.ErrorS(err, "Updating kernel flag failed (Hint: enable KubeletInUserNamespace feature flag to ignore the error)", "flag", flag)
457				}
458				errList = append(errList, err)
459			}
460		}
461	}
462	return utilerrors.NewAggregate(errList)
463}
464
465func (cm *containerManagerImpl) setupNode(activePods ActivePodsFunc) error {
466	f, err := validateSystemRequirements(cm.mountUtil)
467	if err != nil {
468		return err
469	}
470	if !f.cpuHardcapping {
471		cm.status.SoftRequirements = fmt.Errorf("CPU hardcapping unsupported")
472	}
473	b := KernelTunableModify
474	if cm.GetNodeConfig().ProtectKernelDefaults {
475		b = KernelTunableError
476	}
477	if err := setupKernelTunables(b); err != nil {
478		return err
479	}
480
481	// Setup top level qos containers only if CgroupsPerQOS flag is specified as true
482	if cm.NodeConfig.CgroupsPerQOS {
483		if err := cm.createNodeAllocatableCgroups(); err != nil {
484			return err
485		}
486		err = cm.qosContainerManager.Start(cm.GetNodeAllocatableAbsolute, activePods)
487		if err != nil {
488			return fmt.Errorf("failed to initialize top level QOS containers: %v", err)
489		}
490	}
491
492	// Enforce Node Allocatable (if required)
493	if err := cm.enforceNodeAllocatableCgroups(); err != nil {
494		return err
495	}
496
497	systemContainers := []*systemContainer{}
498	if cm.ContainerRuntime == "docker" {
499		// With the docker-CRI integration, dockershim manages the cgroups
500		// and oom score for the docker processes.
501		// Check the cgroup for docker periodically, so kubelet can serve stats for the docker runtime.
502		// TODO(KEP#866): remove special processing for CRI "docker" enablement
503		cm.periodicTasks = append(cm.periodicTasks, func() {
504			klog.V(4).InfoS("Adding periodic tasks for docker CRI integration")
505			cont, err := getContainerNameForProcess(dockerProcessName, dockerPidFile)
506			if err != nil {
507				klog.ErrorS(err, "Failed to get container name for process")
508				return
509			}
510			klog.V(2).InfoS("Discovered runtime cgroup name", "cgroupName", cont)
511			cm.Lock()
512			defer cm.Unlock()
513			cm.RuntimeCgroupsName = cont
514		})
515	}
516
517	if cm.SystemCgroupsName != "" {
518		if cm.SystemCgroupsName == "/" {
519			return fmt.Errorf("system container cannot be root (\"/\")")
520		}
521		cont, err := newSystemCgroups(cm.SystemCgroupsName)
522		if err != nil {
523			return err
524		}
525		cont.ensureStateFunc = func(manager cgroups.Manager) error {
526			return ensureSystemCgroups("/", manager)
527		}
528		systemContainers = append(systemContainers, cont)
529	}
530
531	if cm.KubeletCgroupsName != "" {
532		cont, err := newSystemCgroups(cm.KubeletCgroupsName)
533		if err != nil {
534			return err
535		}
536
537		cont.ensureStateFunc = func(_ cgroups.Manager) error {
538			return ensureProcessInContainerWithOOMScore(os.Getpid(), qos.KubeletOOMScoreAdj, cont.manager)
539		}
540		systemContainers = append(systemContainers, cont)
541	} else {
542		cm.periodicTasks = append(cm.periodicTasks, func() {
543			if err := ensureProcessInContainerWithOOMScore(os.Getpid(), qos.KubeletOOMScoreAdj, nil); err != nil {
544				klog.ErrorS(err, "Failed to ensure process in container with oom score")
545				return
546			}
547			cont, err := getContainer(os.Getpid())
548			if err != nil {
549				klog.ErrorS(err, "Failed to find cgroups of kubelet")
550				return
551			}
552			cm.Lock()
553			defer cm.Unlock()
554
555			cm.KubeletCgroupsName = cont
556		})
557	}
558
559	cm.systemContainers = systemContainers
560	return nil
561}
562
563func getContainerNameForProcess(name, pidFile string) (string, error) {
564	pids, err := getPidsForProcess(name, pidFile)
565	if err != nil {
566		return "", fmt.Errorf("failed to detect process id for %q - %v", name, err)
567	}
568	if len(pids) == 0 {
569		return "", nil
570	}
571	cont, err := getContainer(pids[0])
572	if err != nil {
573		return "", err
574	}
575	return cont, nil
576}
577
578func (cm *containerManagerImpl) GetNodeConfig() NodeConfig {
579	cm.RLock()
580	defer cm.RUnlock()
581	return cm.NodeConfig
582}
583
584// GetPodCgroupRoot returns the literal cgroupfs value for the cgroup containing all pods.
585func (cm *containerManagerImpl) GetPodCgroupRoot() string {
586	return cm.cgroupManager.Name(cm.cgroupRoot)
587}
588
589func (cm *containerManagerImpl) GetMountedSubsystems() *CgroupSubsystems {
590	return cm.subsystems
591}
592
593func (cm *containerManagerImpl) GetQOSContainersInfo() QOSContainersInfo {
594	return cm.qosContainerManager.GetQOSContainersInfo()
595}
596
597func (cm *containerManagerImpl) UpdateQOSCgroups() error {
598	return cm.qosContainerManager.UpdateCgroups()
599}
600
601func (cm *containerManagerImpl) Status() Status {
602	cm.RLock()
603	defer cm.RUnlock()
604	return cm.status
605}
606
607func (cm *containerManagerImpl) Start(node *v1.Node,
608	activePods ActivePodsFunc,
609	sourcesReady config.SourcesReady,
610	podStatusProvider status.PodStatusProvider,
611	runtimeService internalapi.RuntimeService) error {
612
613	// Initialize CPU manager
614	if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) {
615		containerMap, err := buildContainerMapFromRuntime(runtimeService)
616		if err != nil {
617			return fmt.Errorf("failed to build map of initial containers from runtime: %v", err)
618		}
619		err = cm.cpuManager.Start(cpumanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap)
620		if err != nil {
621			return fmt.Errorf("start cpu manager error: %v", err)
622		}
623	}
624
625	// Initialize memory manager
626	if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.MemoryManager) {
627		containerMap, err := buildContainerMapFromRuntime(runtimeService)
628		if err != nil {
629			return fmt.Errorf("failed to build map of initial containers from runtime: %v", err)
630		}
631		err = cm.memoryManager.Start(memorymanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap)
632		if err != nil {
633			return fmt.Errorf("start memory manager error: %v", err)
634		}
635	}
636
637	// cache the node Info including resource capacity and
638	// allocatable of the node
639	cm.nodeInfo = node
640
641	if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.LocalStorageCapacityIsolation) {
642		rootfs, err := cm.cadvisorInterface.RootFsInfo()
643		if err != nil {
644			return fmt.Errorf("failed to get rootfs info: %v", err)
645		}
646		for rName, rCap := range cadvisor.EphemeralStorageCapacityFromFsInfo(rootfs) {
647			cm.capacity[rName] = rCap
648		}
649	}
650
651	// Ensure that node allocatable configuration is valid.
652	if err := cm.validateNodeAllocatable(); err != nil {
653		return err
654	}
655
656	// Setup the node
657	if err := cm.setupNode(activePods); err != nil {
658		return err
659	}
660
661	// Don't run a background thread if there are no ensureStateFuncs.
662	hasEnsureStateFuncs := false
663	for _, cont := range cm.systemContainers {
664		if cont.ensureStateFunc != nil {
665			hasEnsureStateFuncs = true
666			break
667		}
668	}
669	if hasEnsureStateFuncs {
670		// Run ensure state functions every minute.
671		go wait.Until(func() {
672			for _, cont := range cm.systemContainers {
673				if cont.ensureStateFunc != nil {
674					if err := cont.ensureStateFunc(cont.manager); err != nil {
675						klog.InfoS("Failed to ensure state", "containerName", cont.name, "err", err)
676					}
677				}
678			}
679		}, time.Minute, wait.NeverStop)
680
681	}
682
683	if len(cm.periodicTasks) > 0 {
684		go wait.Until(func() {
685			for _, task := range cm.periodicTasks {
686				if task != nil {
687					task()
688				}
689			}
690		}, 5*time.Minute, wait.NeverStop)
691	}
692
693	// Starts device manager.
694	if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady); err != nil {
695		return err
696	}
697
698	return nil
699}
700
701func (cm *containerManagerImpl) GetPluginRegistrationHandler() cache.PluginHandler {
702	return cm.deviceManager.GetWatcherHandler()
703}
704
705// TODO: move the GetResources logic to PodContainerManager.
706func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error) {
707	opts := &kubecontainer.RunContainerOptions{}
708	// Allocate should already be called during predicateAdmitHandler.Admit(),
709	// just try to fetch device runtime information from cached state here
710	devOpts, err := cm.deviceManager.GetDeviceRunContainerOptions(pod, container)
711	if err != nil {
712		return nil, err
713	} else if devOpts == nil {
714		return opts, nil
715	}
716	opts.Devices = append(opts.Devices, devOpts.Devices...)
717	opts.Mounts = append(opts.Mounts, devOpts.Mounts...)
718	opts.Envs = append(opts.Envs, devOpts.Envs...)
719	opts.Annotations = append(opts.Annotations, devOpts.Annotations...)
720	return opts, nil
721}
722
723func (cm *containerManagerImpl) UpdatePluginResources(node *schedulerframework.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
724	return cm.deviceManager.UpdatePluginResources(node, attrs)
725}
726
727func (cm *containerManagerImpl) GetAllocateResourcesPodAdmitHandler() lifecycle.PodAdmitHandler {
728	if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.TopologyManager) {
729		return cm.topologyManager
730	}
731	// TODO: we need to think about a better way to do this. This will work for
732	// now so long as we have only the cpuManager and deviceManager relying on
733	// allocations here. However, going forward it is not generalized enough to
734	// work as we add more and more hint providers that the TopologyManager
735	// needs to call Allocate() on (that may not be directly intstantiated
736	// inside this component).
737	return &resourceAllocator{cm.cpuManager, cm.memoryManager, cm.deviceManager}
738}
739
740type resourceAllocator struct {
741	cpuManager    cpumanager.Manager
742	memoryManager memorymanager.Manager
743	deviceManager devicemanager.Manager
744}
745
746func (m *resourceAllocator) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult {
747	pod := attrs.Pod
748
749	for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
750		err := m.deviceManager.Allocate(pod, &container)
751		if err != nil {
752			return admission.GetPodAdmitResult(err)
753		}
754
755		if m.cpuManager != nil {
756			err = m.cpuManager.Allocate(pod, &container)
757			if err != nil {
758				return admission.GetPodAdmitResult(err)
759			}
760		}
761
762		if m.memoryManager != nil {
763			err = m.memoryManager.Allocate(pod, &container)
764			if err != nil {
765				return admission.GetPodAdmitResult(err)
766			}
767		}
768	}
769
770	return admission.GetPodAdmitResult(nil)
771}
772
773func (cm *containerManagerImpl) SystemCgroupsLimit() v1.ResourceList {
774	cpuLimit := int64(0)
775
776	// Sum up resources of all external containers.
777	for _, cont := range cm.systemContainers {
778		cpuLimit += cont.cpuMillicores
779	}
780
781	return v1.ResourceList{
782		v1.ResourceCPU: *resource.NewMilliQuantity(
783			cpuLimit,
784			resource.DecimalSI),
785	}
786}
787
788func buildContainerMapFromRuntime(runtimeService internalapi.RuntimeService) (containermap.ContainerMap, error) {
789	podSandboxMap := make(map[string]string)
790	podSandboxList, _ := runtimeService.ListPodSandbox(nil)
791	for _, p := range podSandboxList {
792		podSandboxMap[p.Id] = p.Metadata.Uid
793	}
794
795	containerMap := containermap.NewContainerMap()
796	containerList, _ := runtimeService.ListContainers(nil)
797	for _, c := range containerList {
798		if _, exists := podSandboxMap[c.PodSandboxId]; !exists {
799			return nil, fmt.Errorf("no PodsandBox found with Id '%s'", c.PodSandboxId)
800		}
801		containerMap.Add(podSandboxMap[c.PodSandboxId], c.Metadata.Name, c.Id)
802	}
803
804	return containerMap, nil
805}
806
807func isProcessRunningInHost(pid int) (bool, error) {
808	// Get init pid namespace.
809	initPidNs, err := os.Readlink("/proc/1/ns/pid")
810	if err != nil {
811		return false, fmt.Errorf("failed to find pid namespace of init process")
812	}
813	klog.V(10).InfoS("Found init PID namespace", "namespace", initPidNs)
814	processPidNs, err := os.Readlink(fmt.Sprintf("/proc/%d/ns/pid", pid))
815	if err != nil {
816		return false, fmt.Errorf("failed to find pid namespace of process %q", pid)
817	}
818	klog.V(10).InfoS("Process info", "pid", pid, "namespace", processPidNs)
819	return initPidNs == processPidNs, nil
820}
821
822func getPidFromPidFile(pidFile string) (int, error) {
823	file, err := os.Open(pidFile)
824	if err != nil {
825		return 0, fmt.Errorf("error opening pid file %s: %v", pidFile, err)
826	}
827	defer file.Close()
828
829	data, err := utilio.ReadAtMost(file, maxPidFileLength)
830	if err != nil {
831		return 0, fmt.Errorf("error reading pid file %s: %v", pidFile, err)
832	}
833
834	pid, err := strconv.Atoi(string(data))
835	if err != nil {
836		return 0, fmt.Errorf("error parsing %s as a number: %v", string(data), err)
837	}
838
839	return pid, nil
840}
841
842func getPidsForProcess(name, pidFile string) ([]int, error) {
843	if len(pidFile) == 0 {
844		return procfs.PidOf(name)
845	}
846
847	pid, err := getPidFromPidFile(pidFile)
848	if err == nil {
849		return []int{pid}, nil
850	}
851
852	// Try to lookup pid by process name
853	pids, err2 := procfs.PidOf(name)
854	if err2 == nil {
855		return pids, nil
856	}
857
858	// Return error from getPidFromPidFile since that should have worked
859	// and is the real source of the problem.
860	klog.V(4).InfoS("Unable to get pid from file", "path", pidFile, "err", err)
861	return []int{}, err
862}
863
864// Ensures that the Docker daemon is in the desired container.
865// Temporarily export the function to be used by dockershim.
866// TODO(yujuhong): Move this function to dockershim once kubelet migrates to
867// dockershim as the default.
868func EnsureDockerInContainer(dockerAPIVersion *utilversion.Version, oomScoreAdj int, manager cgroups.Manager) error {
869	type process struct{ name, file string }
870	dockerProcs := []process{{dockerProcessName, dockerPidFile}}
871	if dockerAPIVersion.AtLeast(containerdAPIVersion) {
872		// By default containerd is started separately, so there is no pid file.
873		containerdPidFile := ""
874		dockerProcs = append(dockerProcs, process{containerdProcessName, containerdPidFile})
875	}
876	var errs []error
877	for _, proc := range dockerProcs {
878		pids, err := getPidsForProcess(proc.name, proc.file)
879		if err != nil {
880			errs = append(errs, fmt.Errorf("failed to get pids for %q: %v", proc.name, err))
881			continue
882		}
883
884		// Move if the pid is not already in the desired container.
885		for _, pid := range pids {
886			if err := ensureProcessInContainerWithOOMScore(pid, oomScoreAdj, manager); err != nil {
887				errs = append(errs, fmt.Errorf("errors moving %q pid: %v", proc.name, err))
888			}
889		}
890	}
891	return utilerrors.NewAggregate(errs)
892}
893
894func ensureProcessInContainerWithOOMScore(pid int, oomScoreAdj int, manager cgroups.Manager) error {
895	if runningInHost, err := isProcessRunningInHost(pid); err != nil {
896		// Err on the side of caution. Avoid moving the docker daemon unless we are able to identify its context.
897		return err
898	} else if !runningInHost {
899		// Process is running inside a container. Don't touch that.
900		klog.V(2).InfoS("PID is not running in the host namespace", "pid", pid)
901		return nil
902	}
903
904	var errs []error
905	if manager != nil {
906		cont, err := getContainer(pid)
907		if err != nil {
908			errs = append(errs, fmt.Errorf("failed to find container of PID %d: %v", pid, err))
909		}
910
911		name := ""
912		cgroups, err := manager.GetCgroups()
913		if err != nil {
914			errs = append(errs, fmt.Errorf("failed to get cgroups for %d: %v", pid, err))
915		} else {
916			name = cgroups.Name
917		}
918
919		if cont != name {
920			err = manager.Apply(pid)
921			if err != nil {
922				errs = append(errs, fmt.Errorf("failed to move PID %d (in %q) to %q: %v", pid, cont, name, err))
923			}
924		}
925	}
926
927	// Also apply oom-score-adj to processes
928	oomAdjuster := oom.NewOOMAdjuster()
929	klog.V(5).InfoS("Attempting to apply oom_score_adj to process", "oomScoreAdj", oomScoreAdj, "pid", pid)
930	if err := oomAdjuster.ApplyOOMScoreAdj(pid, oomScoreAdj); err != nil {
931		klog.V(3).InfoS("Failed to apply oom_score_adj to process", "oomScoreAdj", oomScoreAdj, "pid", pid, "err", err)
932		errs = append(errs, fmt.Errorf("failed to apply oom score %d to PID %d: %v", oomScoreAdj, pid, err))
933	}
934	return utilerrors.NewAggregate(errs)
935}
936
937// getContainer returns the cgroup associated with the specified pid.
938// It enforces a unified hierarchy for memory and cpu cgroups.
939// On systemd environments, it uses the name=systemd cgroup for the specified pid.
940func getContainer(pid int) (string, error) {
941	cgs, err := cgroups.ParseCgroupFile(fmt.Sprintf("/proc/%d/cgroup", pid))
942	if err != nil {
943		return "", err
944	}
945
946	if cgroups.IsCgroup2UnifiedMode() {
947		c, found := cgs[""]
948		if !found {
949			return "", cgroups.NewNotFoundError("unified")
950		}
951		return c, nil
952	}
953
954	cpu, found := cgs["cpu"]
955	if !found {
956		return "", cgroups.NewNotFoundError("cpu")
957	}
958	memory, found := cgs["memory"]
959	if !found {
960		return "", cgroups.NewNotFoundError("memory")
961	}
962
963	// since we use this container for accounting, we need to ensure its a unified hierarchy.
964	if cpu != memory {
965		return "", fmt.Errorf("cpu and memory cgroup hierarchy not unified.  cpu: %s, memory: %s", cpu, memory)
966	}
967
968	// on systemd, every pid is in a unified cgroup hierarchy (name=systemd as seen in systemd-cgls)
969	// cpu and memory accounting is off by default, users may choose to enable it per unit or globally.
970	// users could enable CPU and memory accounting globally via /etc/systemd/system.conf (DefaultCPUAccounting=true DefaultMemoryAccounting=true).
971	// users could also enable CPU and memory accounting per unit via CPUAccounting=true and MemoryAccounting=true
972	// we only warn if accounting is not enabled for CPU or memory so as to not break local development flows where kubelet is launched in a terminal.
973	// for example, the cgroup for the user session will be something like /user.slice/user-X.slice/session-X.scope, but the cpu and memory
974	// cgroup will be the closest ancestor where accounting is performed (most likely /) on systems that launch docker containers.
975	// as a result, on those systems, you will not get cpu or memory accounting statistics for kubelet.
976	// in addition, you would not get memory or cpu accounting for the runtime unless accounting was enabled on its unit (or globally).
977	if systemd, found := cgs["name=systemd"]; found {
978		if systemd != cpu {
979			klog.InfoS("CPUAccounting not enabled for process", "pid", pid)
980		}
981		if systemd != memory {
982			klog.InfoS("MemoryAccounting not enabled for process", "pid", pid)
983		}
984		return systemd, nil
985	}
986
987	return cpu, nil
988}
989
990// Ensures the system container is created and all non-kernel threads and process 1
991// without a container are moved to it.
992//
993// The reason of leaving kernel threads at root cgroup is that we don't want to tie the
994// execution of these threads with to-be defined /system quota and create priority inversions.
995//
996func ensureSystemCgroups(rootCgroupPath string, manager cgroups.Manager) error {
997	// Move non-kernel PIDs to the system container.
998	// Only keep errors on latest attempt.
999	var finalErr error
1000	for i := 0; i <= 10; i++ {
1001		allPids, err := cmutil.GetPids(rootCgroupPath)
1002		if err != nil {
1003			finalErr = fmt.Errorf("failed to list PIDs for root: %v", err)
1004			continue
1005		}
1006
1007		// Remove kernel pids and other protected PIDs (pid 1, PIDs already in system & kubelet containers)
1008		pids := make([]int, 0, len(allPids))
1009		for _, pid := range allPids {
1010			if pid == 1 || isKernelPid(pid) {
1011				continue
1012			}
1013
1014			pids = append(pids, pid)
1015		}
1016
1017		// Check if we have moved all the non-kernel PIDs.
1018		if len(pids) == 0 {
1019			return nil
1020		}
1021
1022		klog.V(3).InfoS("Moving non-kernel processes", "pids", pids)
1023		for _, pid := range pids {
1024			err := manager.Apply(pid)
1025			if err != nil {
1026				name := ""
1027				cgroups, err := manager.GetCgroups()
1028				if err == nil {
1029					name = cgroups.Name
1030				}
1031
1032				finalErr = fmt.Errorf("failed to move PID %d into the system container %q: %v", pid, name, err)
1033			}
1034		}
1035
1036	}
1037
1038	return finalErr
1039}
1040
1041// Determines whether the specified PID is a kernel PID.
1042func isKernelPid(pid int) bool {
1043	// Kernel threads have no associated executable.
1044	_, err := os.Readlink(fmt.Sprintf("/proc/%d/exe", pid))
1045	return err != nil && os.IsNotExist(err)
1046}
1047
1048func (cm *containerManagerImpl) GetCapacity() v1.ResourceList {
1049	return cm.capacity
1050}
1051
1052func (cm *containerManagerImpl) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) {
1053	return cm.deviceManager.GetCapacity()
1054}
1055
1056func (cm *containerManagerImpl) GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices {
1057	return containerDevicesFromResourceDeviceInstances(cm.deviceManager.GetDevices(podUID, containerName))
1058}
1059
1060func (cm *containerManagerImpl) GetAllocatableDevices() []*podresourcesapi.ContainerDevices {
1061	return containerDevicesFromResourceDeviceInstances(cm.deviceManager.GetAllocatableDevices())
1062}
1063
1064func (cm *containerManagerImpl) GetCPUs(podUID, containerName string) []int64 {
1065	if cm.cpuManager != nil {
1066		return cm.cpuManager.GetCPUs(podUID, containerName).ToSliceNoSortInt64()
1067	}
1068	return []int64{}
1069}
1070
1071func (cm *containerManagerImpl) GetAllocatableCPUs() []int64 {
1072	if cm.cpuManager != nil {
1073		return cm.cpuManager.GetAllocatableCPUs().ToSliceNoSortInt64()
1074	}
1075	return []int64{}
1076}
1077
1078func (cm *containerManagerImpl) GetMemory(podUID, containerName string) []*podresourcesapi.ContainerMemory {
1079	if cm.memoryManager == nil {
1080		return []*podresourcesapi.ContainerMemory{}
1081	}
1082
1083	return containerMemoryFromBlock(cm.memoryManager.GetMemory(podUID, containerName))
1084}
1085
1086func (cm *containerManagerImpl) GetAllocatableMemory() []*podresourcesapi.ContainerMemory {
1087	if cm.memoryManager == nil {
1088		return []*podresourcesapi.ContainerMemory{}
1089	}
1090
1091	return containerMemoryFromBlock(cm.memoryManager.GetAllocatableMemory())
1092}
1093
1094func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool {
1095	return cm.deviceManager.ShouldResetExtendedResourceCapacity()
1096}
1097
1098func (cm *containerManagerImpl) UpdateAllocatedDevices() {
1099	cm.deviceManager.UpdateAllocatedDevices()
1100}
1101
1102func containerMemoryFromBlock(blocks []memorymanagerstate.Block) []*podresourcesapi.ContainerMemory {
1103	var containerMemories []*podresourcesapi.ContainerMemory
1104
1105	for _, b := range blocks {
1106		containerMemory := podresourcesapi.ContainerMemory{
1107			MemoryType: string(b.Type),
1108			Size_:      b.Size,
1109			Topology: &podresourcesapi.TopologyInfo{
1110				Nodes: []*podresourcesapi.NUMANode{},
1111			},
1112		}
1113
1114		for _, numaNodeID := range b.NUMAAffinity {
1115			containerMemory.Topology.Nodes = append(containerMemory.Topology.Nodes, &podresourcesapi.NUMANode{ID: int64(numaNodeID)})
1116		}
1117
1118		containerMemories = append(containerMemories, &containerMemory)
1119	}
1120
1121	return containerMemories
1122}
1123