1/*
2Copyright 2015 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package cache
18
19import (
20	"fmt"
21	"sync"
22	"time"
23
24	v1 "k8s.io/api/core/v1"
25	"k8s.io/apimachinery/pkg/util/sets"
26	"k8s.io/apimachinery/pkg/util/wait"
27	"k8s.io/klog/v2"
28	"k8s.io/kubernetes/pkg/scheduler/framework"
29	"k8s.io/kubernetes/pkg/scheduler/metrics"
30)
31
32var (
33	cleanAssumedPeriod = 1 * time.Second
34)
35
36// New returns a Cache implementation.
37// It automatically starts a go routine that manages expiration of assumed pods.
38// "ttl" is how long the assumed pod will get expired.
39// "stop" is the channel that would close the background goroutine.
40func New(ttl time.Duration, stop <-chan struct{}) Cache {
41	cache := newSchedulerCache(ttl, cleanAssumedPeriod, stop)
42	cache.run()
43	return cache
44}
45
46// nodeInfoListItem holds a NodeInfo pointer and acts as an item in a doubly
47// linked list. When a NodeInfo is updated, it goes to the head of the list.
48// The items closer to the head are the most recently updated items.
49type nodeInfoListItem struct {
50	info *framework.NodeInfo
51	next *nodeInfoListItem
52	prev *nodeInfoListItem
53}
54
55type schedulerCache struct {
56	stop   <-chan struct{}
57	ttl    time.Duration
58	period time.Duration
59
60	// This mutex guards all fields within this cache struct.
61	mu sync.RWMutex
62	// a set of assumed pod keys.
63	// The key could further be used to get an entry in podStates.
64	assumedPods sets.String
65	// a map from pod key to podState.
66	podStates map[string]*podState
67	nodes     map[string]*nodeInfoListItem
68	// headNode points to the most recently updated NodeInfo in "nodes". It is the
69	// head of the linked list.
70	headNode *nodeInfoListItem
71	nodeTree *nodeTree
72	// A map from image name to its imageState.
73	imageStates map[string]*imageState
74}
75
76type podState struct {
77	pod *v1.Pod
78	// Used by assumedPod to determinate expiration.
79	deadline *time.Time
80	// Used to block cache from expiring assumedPod if binding still runs
81	bindingFinished bool
82}
83
84type imageState struct {
85	// Size of the image
86	size int64
87	// A set of node names for nodes having this image present
88	nodes sets.String
89}
90
91// createImageStateSummary returns a summarizing snapshot of the given image's state.
92func (cache *schedulerCache) createImageStateSummary(state *imageState) *framework.ImageStateSummary {
93	return &framework.ImageStateSummary{
94		Size:     state.size,
95		NumNodes: len(state.nodes),
96	}
97}
98
99func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedulerCache {
100	return &schedulerCache{
101		ttl:    ttl,
102		period: period,
103		stop:   stop,
104
105		nodes:       make(map[string]*nodeInfoListItem),
106		nodeTree:    newNodeTree(nil),
107		assumedPods: make(sets.String),
108		podStates:   make(map[string]*podState),
109		imageStates: make(map[string]*imageState),
110	}
111}
112
113// newNodeInfoListItem initializes a new nodeInfoListItem.
114func newNodeInfoListItem(ni *framework.NodeInfo) *nodeInfoListItem {
115	return &nodeInfoListItem{
116		info: ni,
117	}
118}
119
120// moveNodeInfoToHead moves a NodeInfo to the head of "cache.nodes" doubly
121// linked list. The head is the most recently updated NodeInfo.
122// We assume cache lock is already acquired.
123func (cache *schedulerCache) moveNodeInfoToHead(name string) {
124	ni, ok := cache.nodes[name]
125	if !ok {
126		klog.Errorf("No NodeInfo with name %v found in the cache", name)
127		return
128	}
129	// if the node info list item is already at the head, we are done.
130	if ni == cache.headNode {
131		return
132	}
133
134	if ni.prev != nil {
135		ni.prev.next = ni.next
136	}
137	if ni.next != nil {
138		ni.next.prev = ni.prev
139	}
140	if cache.headNode != nil {
141		cache.headNode.prev = ni
142	}
143	ni.next = cache.headNode
144	ni.prev = nil
145	cache.headNode = ni
146}
147
148// removeNodeInfoFromList removes a NodeInfo from the "cache.nodes" doubly
149// linked list.
150// We assume cache lock is already acquired.
151func (cache *schedulerCache) removeNodeInfoFromList(name string) {
152	ni, ok := cache.nodes[name]
153	if !ok {
154		klog.Errorf("No NodeInfo with name %v found in the cache", name)
155		return
156	}
157
158	if ni.prev != nil {
159		ni.prev.next = ni.next
160	}
161	if ni.next != nil {
162		ni.next.prev = ni.prev
163	}
164	// if the removed item was at the head, we must update the head.
165	if ni == cache.headNode {
166		cache.headNode = ni.next
167	}
168	delete(cache.nodes, name)
169}
170
171// Dump produces a dump of the current scheduler cache. This is used for
172// debugging purposes only and shouldn't be confused with UpdateSnapshot
173// function.
174// This method is expensive, and should be only used in non-critical path.
175func (cache *schedulerCache) Dump() *Dump {
176	cache.mu.RLock()
177	defer cache.mu.RUnlock()
178
179	nodes := make(map[string]*framework.NodeInfo, len(cache.nodes))
180	for k, v := range cache.nodes {
181		nodes[k] = v.info.Clone()
182	}
183
184	return &Dump{
185		Nodes:       nodes,
186		AssumedPods: cache.assumedPods.Union(nil),
187	}
188}
189
190// UpdateSnapshot takes a snapshot of cached NodeInfo map. This is called at
191// beginning of every scheduling cycle.
192// The snapshot only includes Nodes that are not deleted at the time this function is called.
193// nodeinfo.Node() is guaranteed to be not nil for all the nodes in the snapshot.
194// This function tracks generation number of NodeInfo and updates only the
195// entries of an existing snapshot that have changed after the snapshot was taken.
196func (cache *schedulerCache) UpdateSnapshot(nodeSnapshot *Snapshot) error {
197	cache.mu.Lock()
198	defer cache.mu.Unlock()
199
200	// Get the last generation of the snapshot.
201	snapshotGeneration := nodeSnapshot.generation
202
203	// NodeInfoList and HavePodsWithAffinityNodeInfoList must be re-created if a node was added
204	// or removed from the cache.
205	updateAllLists := false
206	// HavePodsWithAffinityNodeInfoList must be re-created if a node changed its
207	// status from having pods with affinity to NOT having pods with affinity or the other
208	// way around.
209	updateNodesHavePodsWithAffinity := false
210	// HavePodsWithRequiredAntiAffinityNodeInfoList must be re-created if a node changed its
211	// status from having pods with required anti-affinity to NOT having pods with required
212	// anti-affinity or the other way around.
213	updateNodesHavePodsWithRequiredAntiAffinity := false
214
215	// Start from the head of the NodeInfo doubly linked list and update snapshot
216	// of NodeInfos updated after the last snapshot.
217	for node := cache.headNode; node != nil; node = node.next {
218		if node.info.Generation <= snapshotGeneration {
219			// all the nodes are updated before the existing snapshot. We are done.
220			break
221		}
222		if np := node.info.Node(); np != nil {
223			existing, ok := nodeSnapshot.nodeInfoMap[np.Name]
224			if !ok {
225				updateAllLists = true
226				existing = &framework.NodeInfo{}
227				nodeSnapshot.nodeInfoMap[np.Name] = existing
228			}
229			clone := node.info.Clone()
230			// We track nodes that have pods with affinity, here we check if this node changed its
231			// status from having pods with affinity to NOT having pods with affinity or the other
232			// way around.
233			if (len(existing.PodsWithAffinity) > 0) != (len(clone.PodsWithAffinity) > 0) {
234				updateNodesHavePodsWithAffinity = true
235			}
236			if (len(existing.PodsWithRequiredAntiAffinity) > 0) != (len(clone.PodsWithRequiredAntiAffinity) > 0) {
237				updateNodesHavePodsWithRequiredAntiAffinity = true
238			}
239			// We need to preserve the original pointer of the NodeInfo struct since it
240			// is used in the NodeInfoList, which we may not update.
241			*existing = *clone
242		}
243	}
244	// Update the snapshot generation with the latest NodeInfo generation.
245	if cache.headNode != nil {
246		nodeSnapshot.generation = cache.headNode.info.Generation
247	}
248
249	// Comparing to pods in nodeTree.
250	// Deleted nodes get removed from the tree, but they might remain in the nodes map
251	// if they still have non-deleted Pods.
252	if len(nodeSnapshot.nodeInfoMap) > cache.nodeTree.numNodes {
253		cache.removeDeletedNodesFromSnapshot(nodeSnapshot)
254		updateAllLists = true
255	}
256
257	if updateAllLists || updateNodesHavePodsWithAffinity || updateNodesHavePodsWithRequiredAntiAffinity {
258		cache.updateNodeInfoSnapshotList(nodeSnapshot, updateAllLists)
259	}
260
261	if len(nodeSnapshot.nodeInfoList) != cache.nodeTree.numNodes {
262		errMsg := fmt.Sprintf("snapshot state is not consistent, length of NodeInfoList=%v not equal to length of nodes in tree=%v "+
263			", length of NodeInfoMap=%v, length of nodes in cache=%v"+
264			", trying to recover",
265			len(nodeSnapshot.nodeInfoList), cache.nodeTree.numNodes,
266			len(nodeSnapshot.nodeInfoMap), len(cache.nodes))
267		klog.Error(errMsg)
268		// We will try to recover by re-creating the lists for the next scheduling cycle, but still return an
269		// error to surface the problem, the error will likely cause a failure to the current scheduling cycle.
270		cache.updateNodeInfoSnapshotList(nodeSnapshot, true)
271		return fmt.Errorf(errMsg)
272	}
273
274	return nil
275}
276
277func (cache *schedulerCache) updateNodeInfoSnapshotList(snapshot *Snapshot, updateAll bool) {
278	snapshot.havePodsWithAffinityNodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
279	snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
280	if updateAll {
281		// Take a snapshot of the nodes order in the tree
282		snapshot.nodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
283		nodesList, err := cache.nodeTree.list()
284		if err != nil {
285			klog.Error(err)
286		}
287		for _, nodeName := range nodesList {
288			if nodeInfo := snapshot.nodeInfoMap[nodeName]; nodeInfo != nil {
289				snapshot.nodeInfoList = append(snapshot.nodeInfoList, nodeInfo)
290				if len(nodeInfo.PodsWithAffinity) > 0 {
291					snapshot.havePodsWithAffinityNodeInfoList = append(snapshot.havePodsWithAffinityNodeInfoList, nodeInfo)
292				}
293				if len(nodeInfo.PodsWithRequiredAntiAffinity) > 0 {
294					snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = append(snapshot.havePodsWithRequiredAntiAffinityNodeInfoList, nodeInfo)
295				}
296			} else {
297				klog.Errorf("node %q exist in nodeTree but not in NodeInfoMap, this should not happen.", nodeName)
298			}
299		}
300	} else {
301		for _, nodeInfo := range snapshot.nodeInfoList {
302			if len(nodeInfo.PodsWithAffinity) > 0 {
303				snapshot.havePodsWithAffinityNodeInfoList = append(snapshot.havePodsWithAffinityNodeInfoList, nodeInfo)
304			}
305			if len(nodeInfo.PodsWithRequiredAntiAffinity) > 0 {
306				snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = append(snapshot.havePodsWithRequiredAntiAffinityNodeInfoList, nodeInfo)
307			}
308		}
309	}
310}
311
312// If certain nodes were deleted after the last snapshot was taken, we should remove them from the snapshot.
313func (cache *schedulerCache) removeDeletedNodesFromSnapshot(snapshot *Snapshot) {
314	toDelete := len(snapshot.nodeInfoMap) - cache.nodeTree.numNodes
315	for name := range snapshot.nodeInfoMap {
316		if toDelete <= 0 {
317			break
318		}
319		if n, ok := cache.nodes[name]; !ok || n.info.Node() == nil {
320			delete(snapshot.nodeInfoMap, name)
321			toDelete--
322		}
323	}
324}
325
326// NodeCount returns the number of nodes in the cache.
327// DO NOT use outside of tests.
328func (cache *schedulerCache) NodeCount() int {
329	cache.mu.RLock()
330	defer cache.mu.RUnlock()
331	return len(cache.nodes)
332}
333
334// PodCount returns the number of pods in the cache (including those from deleted nodes).
335// DO NOT use outside of tests.
336func (cache *schedulerCache) PodCount() (int, error) {
337	cache.mu.RLock()
338	defer cache.mu.RUnlock()
339	// podFilter is expected to return true for most or all of the pods. We
340	// can avoid expensive array growth without wasting too much memory by
341	// pre-allocating capacity.
342	count := 0
343	for _, n := range cache.nodes {
344		count += len(n.info.Pods)
345	}
346	return count, nil
347}
348
349func (cache *schedulerCache) AssumePod(pod *v1.Pod) error {
350	key, err := framework.GetPodKey(pod)
351	if err != nil {
352		return err
353	}
354
355	cache.mu.Lock()
356	defer cache.mu.Unlock()
357	if _, ok := cache.podStates[key]; ok {
358		return fmt.Errorf("pod %v is in the cache, so can't be assumed", key)
359	}
360
361	cache.addPod(pod)
362	ps := &podState{
363		pod: pod,
364	}
365	cache.podStates[key] = ps
366	cache.assumedPods.Insert(key)
367	return nil
368}
369
370func (cache *schedulerCache) FinishBinding(pod *v1.Pod) error {
371	return cache.finishBinding(pod, time.Now())
372}
373
374// finishBinding exists to make tests determinitistic by injecting now as an argument
375func (cache *schedulerCache) finishBinding(pod *v1.Pod, now time.Time) error {
376	key, err := framework.GetPodKey(pod)
377	if err != nil {
378		return err
379	}
380
381	cache.mu.RLock()
382	defer cache.mu.RUnlock()
383
384	klog.V(5).Infof("Finished binding for pod %v. Can be expired.", key)
385	currState, ok := cache.podStates[key]
386	if ok && cache.assumedPods.Has(key) {
387		dl := now.Add(cache.ttl)
388		currState.bindingFinished = true
389		currState.deadline = &dl
390	}
391	return nil
392}
393
394func (cache *schedulerCache) ForgetPod(pod *v1.Pod) error {
395	key, err := framework.GetPodKey(pod)
396	if err != nil {
397		return err
398	}
399
400	cache.mu.Lock()
401	defer cache.mu.Unlock()
402
403	currState, ok := cache.podStates[key]
404	if ok && currState.pod.Spec.NodeName != pod.Spec.NodeName {
405		return fmt.Errorf("pod %v was assumed on %v but assigned to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName)
406	}
407
408	switch {
409	// Only assumed pod can be forgotten.
410	case ok && cache.assumedPods.Has(key):
411		err := cache.removePod(pod)
412		if err != nil {
413			return err
414		}
415		delete(cache.assumedPods, key)
416		delete(cache.podStates, key)
417	default:
418		return fmt.Errorf("pod %v wasn't assumed so cannot be forgotten", key)
419	}
420	return nil
421}
422
423// Assumes that lock is already acquired.
424func (cache *schedulerCache) addPod(pod *v1.Pod) {
425	n, ok := cache.nodes[pod.Spec.NodeName]
426	if !ok {
427		n = newNodeInfoListItem(framework.NewNodeInfo())
428		cache.nodes[pod.Spec.NodeName] = n
429	}
430	n.info.AddPod(pod)
431	cache.moveNodeInfoToHead(pod.Spec.NodeName)
432}
433
434// Assumes that lock is already acquired.
435func (cache *schedulerCache) updatePod(oldPod, newPod *v1.Pod) error {
436	if err := cache.removePod(oldPod); err != nil {
437		return err
438	}
439	cache.addPod(newPod)
440	return nil
441}
442
443// Assumes that lock is already acquired.
444// Removes a pod from the cached node info. If the node information was already
445// removed and there are no more pods left in the node, cleans up the node from
446// the cache.
447func (cache *schedulerCache) removePod(pod *v1.Pod) error {
448	n, ok := cache.nodes[pod.Spec.NodeName]
449	if !ok {
450		klog.Errorf("node %v not found when trying to remove pod %v", pod.Spec.NodeName, pod.Name)
451		return nil
452	}
453	if err := n.info.RemovePod(pod); err != nil {
454		return err
455	}
456	if len(n.info.Pods) == 0 && n.info.Node() == nil {
457		cache.removeNodeInfoFromList(pod.Spec.NodeName)
458	} else {
459		cache.moveNodeInfoToHead(pod.Spec.NodeName)
460	}
461	return nil
462}
463
464func (cache *schedulerCache) AddPod(pod *v1.Pod) error {
465	key, err := framework.GetPodKey(pod)
466	if err != nil {
467		return err
468	}
469
470	cache.mu.Lock()
471	defer cache.mu.Unlock()
472
473	currState, ok := cache.podStates[key]
474	switch {
475	case ok && cache.assumedPods.Has(key):
476		if currState.pod.Spec.NodeName != pod.Spec.NodeName {
477			// The pod was added to a different node than it was assumed to.
478			klog.Warningf("Pod %v was assumed to be on %v but got added to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName)
479			// Clean this up.
480			if err = cache.removePod(currState.pod); err != nil {
481				klog.Errorf("removing pod error: %v", err)
482			}
483			cache.addPod(pod)
484		}
485		delete(cache.assumedPods, key)
486		cache.podStates[key].deadline = nil
487		cache.podStates[key].pod = pod
488	case !ok:
489		// Pod was expired. We should add it back.
490		cache.addPod(pod)
491		ps := &podState{
492			pod: pod,
493		}
494		cache.podStates[key] = ps
495	default:
496		return fmt.Errorf("pod %v was already in added state", key)
497	}
498	return nil
499}
500
501func (cache *schedulerCache) UpdatePod(oldPod, newPod *v1.Pod) error {
502	key, err := framework.GetPodKey(oldPod)
503	if err != nil {
504		return err
505	}
506
507	cache.mu.Lock()
508	defer cache.mu.Unlock()
509
510	currState, ok := cache.podStates[key]
511	switch {
512	// An assumed pod won't have Update/Remove event. It needs to have Add event
513	// before Update event, in which case the state would change from Assumed to Added.
514	case ok && !cache.assumedPods.Has(key):
515		if currState.pod.Spec.NodeName != newPod.Spec.NodeName {
516			klog.Errorf("Pod %v updated on a different node than previously added to.", key)
517			klog.Fatalf("Schedulercache is corrupted and can badly affect scheduling decisions")
518		}
519		if err := cache.updatePod(oldPod, newPod); err != nil {
520			return err
521		}
522		currState.pod = newPod
523	default:
524		return fmt.Errorf("pod %v is not added to scheduler cache, so cannot be updated", key)
525	}
526	return nil
527}
528
529func (cache *schedulerCache) RemovePod(pod *v1.Pod) error {
530	key, err := framework.GetPodKey(pod)
531	if err != nil {
532		return err
533	}
534
535	cache.mu.Lock()
536	defer cache.mu.Unlock()
537
538	currState, ok := cache.podStates[key]
539	switch {
540	// An assumed pod won't have Delete/Remove event. It needs to have Add event
541	// before Remove event, in which case the state would change from Assumed to Added.
542	case ok && !cache.assumedPods.Has(key):
543		if currState.pod.Spec.NodeName != pod.Spec.NodeName {
544			klog.Errorf("Pod %v was assumed to be on %v but got added to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName)
545			klog.Fatalf("Schedulercache is corrupted and can badly affect scheduling decisions")
546		}
547		err := cache.removePod(currState.pod)
548		if err != nil {
549			return err
550		}
551		delete(cache.podStates, key)
552	default:
553		return fmt.Errorf("pod %v is not found in scheduler cache, so cannot be removed from it", key)
554	}
555	return nil
556}
557
558func (cache *schedulerCache) IsAssumedPod(pod *v1.Pod) (bool, error) {
559	key, err := framework.GetPodKey(pod)
560	if err != nil {
561		return false, err
562	}
563
564	cache.mu.RLock()
565	defer cache.mu.RUnlock()
566
567	return cache.assumedPods.Has(key), nil
568}
569
570// GetPod might return a pod for which its node has already been deleted from
571// the main cache. This is useful to properly process pod update events.
572func (cache *schedulerCache) GetPod(pod *v1.Pod) (*v1.Pod, error) {
573	key, err := framework.GetPodKey(pod)
574	if err != nil {
575		return nil, err
576	}
577
578	cache.mu.RLock()
579	defer cache.mu.RUnlock()
580
581	podState, ok := cache.podStates[key]
582	if !ok {
583		return nil, fmt.Errorf("pod %v does not exist in scheduler cache", key)
584	}
585
586	return podState.pod, nil
587}
588
589func (cache *schedulerCache) AddNode(node *v1.Node) *framework.NodeInfo {
590	cache.mu.Lock()
591	defer cache.mu.Unlock()
592
593	n, ok := cache.nodes[node.Name]
594	if !ok {
595		n = newNodeInfoListItem(framework.NewNodeInfo())
596		cache.nodes[node.Name] = n
597	} else {
598		cache.removeNodeImageStates(n.info.Node())
599	}
600	cache.moveNodeInfoToHead(node.Name)
601
602	cache.nodeTree.addNode(node)
603	cache.addNodeImageStates(node, n.info)
604	n.info.SetNode(node)
605	return n.info.Clone()
606}
607
608func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) *framework.NodeInfo {
609	cache.mu.Lock()
610	defer cache.mu.Unlock()
611
612	n, ok := cache.nodes[newNode.Name]
613	if !ok {
614		n = newNodeInfoListItem(framework.NewNodeInfo())
615		cache.nodes[newNode.Name] = n
616		cache.nodeTree.addNode(newNode)
617	} else {
618		cache.removeNodeImageStates(n.info.Node())
619	}
620	cache.moveNodeInfoToHead(newNode.Name)
621
622	cache.nodeTree.updateNode(oldNode, newNode)
623	cache.addNodeImageStates(newNode, n.info)
624	n.info.SetNode(newNode)
625	return n.info.Clone()
626}
627
628// RemoveNode removes a node from the cache's tree.
629// The node might still have pods because their deletion events didn't arrive
630// yet. Those pods are considered removed from the cache, being the node tree
631// the source of truth.
632// However, we keep a ghost node with the list of pods until all pod deletion
633// events have arrived. A ghost node is skipped from snapshots.
634func (cache *schedulerCache) RemoveNode(node *v1.Node) error {
635	cache.mu.Lock()
636	defer cache.mu.Unlock()
637
638	n, ok := cache.nodes[node.Name]
639	if !ok {
640		return fmt.Errorf("node %v is not found", node.Name)
641	}
642	n.info.RemoveNode()
643	// We remove NodeInfo for this node only if there aren't any pods on this node.
644	// We can't do it unconditionally, because notifications about pods are delivered
645	// in a different watch, and thus can potentially be observed later, even though
646	// they happened before node removal.
647	if len(n.info.Pods) == 0 {
648		cache.removeNodeInfoFromList(node.Name)
649	} else {
650		cache.moveNodeInfoToHead(node.Name)
651	}
652	if err := cache.nodeTree.removeNode(node); err != nil {
653		return err
654	}
655	cache.removeNodeImageStates(node)
656	return nil
657}
658
659// addNodeImageStates adds states of the images on given node to the given nodeInfo and update the imageStates in
660// scheduler cache. This function assumes the lock to scheduler cache has been acquired.
661func (cache *schedulerCache) addNodeImageStates(node *v1.Node, nodeInfo *framework.NodeInfo) {
662	newSum := make(map[string]*framework.ImageStateSummary)
663
664	for _, image := range node.Status.Images {
665		for _, name := range image.Names {
666			// update the entry in imageStates
667			state, ok := cache.imageStates[name]
668			if !ok {
669				state = &imageState{
670					size:  image.SizeBytes,
671					nodes: sets.NewString(node.Name),
672				}
673				cache.imageStates[name] = state
674			} else {
675				state.nodes.Insert(node.Name)
676			}
677			// create the imageStateSummary for this image
678			if _, ok := newSum[name]; !ok {
679				newSum[name] = cache.createImageStateSummary(state)
680			}
681		}
682	}
683	nodeInfo.ImageStates = newSum
684}
685
686// removeNodeImageStates removes the given node record from image entries having the node
687// in imageStates cache. After the removal, if any image becomes free, i.e., the image
688// is no longer available on any node, the image entry will be removed from imageStates.
689func (cache *schedulerCache) removeNodeImageStates(node *v1.Node) {
690	if node == nil {
691		return
692	}
693
694	for _, image := range node.Status.Images {
695		for _, name := range image.Names {
696			state, ok := cache.imageStates[name]
697			if ok {
698				state.nodes.Delete(node.Name)
699				if len(state.nodes) == 0 {
700					// Remove the unused image to make sure the length of
701					// imageStates represents the total number of different
702					// images on all nodes
703					delete(cache.imageStates, name)
704				}
705			}
706		}
707	}
708}
709
710func (cache *schedulerCache) run() {
711	go wait.Until(cache.cleanupExpiredAssumedPods, cache.period, cache.stop)
712}
713
714func (cache *schedulerCache) cleanupExpiredAssumedPods() {
715	cache.cleanupAssumedPods(time.Now())
716}
717
718// cleanupAssumedPods exists for making test deterministic by taking time as input argument.
719// It also reports metrics on the cache size for nodes, pods, and assumed pods.
720func (cache *schedulerCache) cleanupAssumedPods(now time.Time) {
721	cache.mu.Lock()
722	defer cache.mu.Unlock()
723	defer cache.updateMetrics()
724
725	// The size of assumedPods should be small
726	for key := range cache.assumedPods {
727		ps, ok := cache.podStates[key]
728		if !ok {
729			klog.Fatal("Key found in assumed set but not in podStates. Potentially a logical error.")
730		}
731		if !ps.bindingFinished {
732			klog.V(5).Infof("Couldn't expire cache for pod %v/%v. Binding is still in progress.",
733				ps.pod.Namespace, ps.pod.Name)
734			continue
735		}
736		if now.After(*ps.deadline) {
737			klog.Warningf("Pod %s/%s expired", ps.pod.Namespace, ps.pod.Name)
738			if err := cache.expirePod(key, ps); err != nil {
739				klog.Errorf("ExpirePod failed for %s: %v", key, err)
740			}
741		}
742	}
743}
744
745func (cache *schedulerCache) expirePod(key string, ps *podState) error {
746	if err := cache.removePod(ps.pod); err != nil {
747		return err
748	}
749	delete(cache.assumedPods, key)
750	delete(cache.podStates, key)
751	return nil
752}
753
754// updateMetrics updates cache size metric values for pods, assumed pods, and nodes
755func (cache *schedulerCache) updateMetrics() {
756	metrics.CacheSize.WithLabelValues("assumed_pods").Set(float64(len(cache.assumedPods)))
757	metrics.CacheSize.WithLabelValues("pods").Set(float64(len(cache.podStates)))
758	metrics.CacheSize.WithLabelValues("nodes").Set(float64(len(cache.nodes)))
759}
760