1/* 2Copyright 2015 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package cache 18 19import ( 20 "fmt" 21 "sync" 22 "time" 23 24 v1 "k8s.io/api/core/v1" 25 "k8s.io/apimachinery/pkg/util/sets" 26 "k8s.io/apimachinery/pkg/util/wait" 27 "k8s.io/klog/v2" 28 "k8s.io/kubernetes/pkg/scheduler/framework" 29 "k8s.io/kubernetes/pkg/scheduler/metrics" 30) 31 32var ( 33 cleanAssumedPeriod = 1 * time.Second 34) 35 36// New returns a Cache implementation. 37// It automatically starts a go routine that manages expiration of assumed pods. 38// "ttl" is how long the assumed pod will get expired. 39// "stop" is the channel that would close the background goroutine. 40func New(ttl time.Duration, stop <-chan struct{}) Cache { 41 cache := newSchedulerCache(ttl, cleanAssumedPeriod, stop) 42 cache.run() 43 return cache 44} 45 46// nodeInfoListItem holds a NodeInfo pointer and acts as an item in a doubly 47// linked list. When a NodeInfo is updated, it goes to the head of the list. 48// The items closer to the head are the most recently updated items. 49type nodeInfoListItem struct { 50 info *framework.NodeInfo 51 next *nodeInfoListItem 52 prev *nodeInfoListItem 53} 54 55type schedulerCache struct { 56 stop <-chan struct{} 57 ttl time.Duration 58 period time.Duration 59 60 // This mutex guards all fields within this cache struct. 61 mu sync.RWMutex 62 // a set of assumed pod keys. 63 // The key could further be used to get an entry in podStates. 64 assumedPods sets.String 65 // a map from pod key to podState. 66 podStates map[string]*podState 67 nodes map[string]*nodeInfoListItem 68 // headNode points to the most recently updated NodeInfo in "nodes". It is the 69 // head of the linked list. 70 headNode *nodeInfoListItem 71 nodeTree *nodeTree 72 // A map from image name to its imageState. 73 imageStates map[string]*imageState 74} 75 76type podState struct { 77 pod *v1.Pod 78 // Used by assumedPod to determinate expiration. 79 deadline *time.Time 80 // Used to block cache from expiring assumedPod if binding still runs 81 bindingFinished bool 82} 83 84type imageState struct { 85 // Size of the image 86 size int64 87 // A set of node names for nodes having this image present 88 nodes sets.String 89} 90 91// createImageStateSummary returns a summarizing snapshot of the given image's state. 92func (cache *schedulerCache) createImageStateSummary(state *imageState) *framework.ImageStateSummary { 93 return &framework.ImageStateSummary{ 94 Size: state.size, 95 NumNodes: len(state.nodes), 96 } 97} 98 99func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedulerCache { 100 return &schedulerCache{ 101 ttl: ttl, 102 period: period, 103 stop: stop, 104 105 nodes: make(map[string]*nodeInfoListItem), 106 nodeTree: newNodeTree(nil), 107 assumedPods: make(sets.String), 108 podStates: make(map[string]*podState), 109 imageStates: make(map[string]*imageState), 110 } 111} 112 113// newNodeInfoListItem initializes a new nodeInfoListItem. 114func newNodeInfoListItem(ni *framework.NodeInfo) *nodeInfoListItem { 115 return &nodeInfoListItem{ 116 info: ni, 117 } 118} 119 120// moveNodeInfoToHead moves a NodeInfo to the head of "cache.nodes" doubly 121// linked list. The head is the most recently updated NodeInfo. 122// We assume cache lock is already acquired. 123func (cache *schedulerCache) moveNodeInfoToHead(name string) { 124 ni, ok := cache.nodes[name] 125 if !ok { 126 klog.Errorf("No NodeInfo with name %v found in the cache", name) 127 return 128 } 129 // if the node info list item is already at the head, we are done. 130 if ni == cache.headNode { 131 return 132 } 133 134 if ni.prev != nil { 135 ni.prev.next = ni.next 136 } 137 if ni.next != nil { 138 ni.next.prev = ni.prev 139 } 140 if cache.headNode != nil { 141 cache.headNode.prev = ni 142 } 143 ni.next = cache.headNode 144 ni.prev = nil 145 cache.headNode = ni 146} 147 148// removeNodeInfoFromList removes a NodeInfo from the "cache.nodes" doubly 149// linked list. 150// We assume cache lock is already acquired. 151func (cache *schedulerCache) removeNodeInfoFromList(name string) { 152 ni, ok := cache.nodes[name] 153 if !ok { 154 klog.Errorf("No NodeInfo with name %v found in the cache", name) 155 return 156 } 157 158 if ni.prev != nil { 159 ni.prev.next = ni.next 160 } 161 if ni.next != nil { 162 ni.next.prev = ni.prev 163 } 164 // if the removed item was at the head, we must update the head. 165 if ni == cache.headNode { 166 cache.headNode = ni.next 167 } 168 delete(cache.nodes, name) 169} 170 171// Dump produces a dump of the current scheduler cache. This is used for 172// debugging purposes only and shouldn't be confused with UpdateSnapshot 173// function. 174// This method is expensive, and should be only used in non-critical path. 175func (cache *schedulerCache) Dump() *Dump { 176 cache.mu.RLock() 177 defer cache.mu.RUnlock() 178 179 nodes := make(map[string]*framework.NodeInfo, len(cache.nodes)) 180 for k, v := range cache.nodes { 181 nodes[k] = v.info.Clone() 182 } 183 184 return &Dump{ 185 Nodes: nodes, 186 AssumedPods: cache.assumedPods.Union(nil), 187 } 188} 189 190// UpdateSnapshot takes a snapshot of cached NodeInfo map. This is called at 191// beginning of every scheduling cycle. 192// The snapshot only includes Nodes that are not deleted at the time this function is called. 193// nodeinfo.Node() is guaranteed to be not nil for all the nodes in the snapshot. 194// This function tracks generation number of NodeInfo and updates only the 195// entries of an existing snapshot that have changed after the snapshot was taken. 196func (cache *schedulerCache) UpdateSnapshot(nodeSnapshot *Snapshot) error { 197 cache.mu.Lock() 198 defer cache.mu.Unlock() 199 200 // Get the last generation of the snapshot. 201 snapshotGeneration := nodeSnapshot.generation 202 203 // NodeInfoList and HavePodsWithAffinityNodeInfoList must be re-created if a node was added 204 // or removed from the cache. 205 updateAllLists := false 206 // HavePodsWithAffinityNodeInfoList must be re-created if a node changed its 207 // status from having pods with affinity to NOT having pods with affinity or the other 208 // way around. 209 updateNodesHavePodsWithAffinity := false 210 // HavePodsWithRequiredAntiAffinityNodeInfoList must be re-created if a node changed its 211 // status from having pods with required anti-affinity to NOT having pods with required 212 // anti-affinity or the other way around. 213 updateNodesHavePodsWithRequiredAntiAffinity := false 214 215 // Start from the head of the NodeInfo doubly linked list and update snapshot 216 // of NodeInfos updated after the last snapshot. 217 for node := cache.headNode; node != nil; node = node.next { 218 if node.info.Generation <= snapshotGeneration { 219 // all the nodes are updated before the existing snapshot. We are done. 220 break 221 } 222 if np := node.info.Node(); np != nil { 223 existing, ok := nodeSnapshot.nodeInfoMap[np.Name] 224 if !ok { 225 updateAllLists = true 226 existing = &framework.NodeInfo{} 227 nodeSnapshot.nodeInfoMap[np.Name] = existing 228 } 229 clone := node.info.Clone() 230 // We track nodes that have pods with affinity, here we check if this node changed its 231 // status from having pods with affinity to NOT having pods with affinity or the other 232 // way around. 233 if (len(existing.PodsWithAffinity) > 0) != (len(clone.PodsWithAffinity) > 0) { 234 updateNodesHavePodsWithAffinity = true 235 } 236 if (len(existing.PodsWithRequiredAntiAffinity) > 0) != (len(clone.PodsWithRequiredAntiAffinity) > 0) { 237 updateNodesHavePodsWithRequiredAntiAffinity = true 238 } 239 // We need to preserve the original pointer of the NodeInfo struct since it 240 // is used in the NodeInfoList, which we may not update. 241 *existing = *clone 242 } 243 } 244 // Update the snapshot generation with the latest NodeInfo generation. 245 if cache.headNode != nil { 246 nodeSnapshot.generation = cache.headNode.info.Generation 247 } 248 249 // Comparing to pods in nodeTree. 250 // Deleted nodes get removed from the tree, but they might remain in the nodes map 251 // if they still have non-deleted Pods. 252 if len(nodeSnapshot.nodeInfoMap) > cache.nodeTree.numNodes { 253 cache.removeDeletedNodesFromSnapshot(nodeSnapshot) 254 updateAllLists = true 255 } 256 257 if updateAllLists || updateNodesHavePodsWithAffinity || updateNodesHavePodsWithRequiredAntiAffinity { 258 cache.updateNodeInfoSnapshotList(nodeSnapshot, updateAllLists) 259 } 260 261 if len(nodeSnapshot.nodeInfoList) != cache.nodeTree.numNodes { 262 errMsg := fmt.Sprintf("snapshot state is not consistent, length of NodeInfoList=%v not equal to length of nodes in tree=%v "+ 263 ", length of NodeInfoMap=%v, length of nodes in cache=%v"+ 264 ", trying to recover", 265 len(nodeSnapshot.nodeInfoList), cache.nodeTree.numNodes, 266 len(nodeSnapshot.nodeInfoMap), len(cache.nodes)) 267 klog.Error(errMsg) 268 // We will try to recover by re-creating the lists for the next scheduling cycle, but still return an 269 // error to surface the problem, the error will likely cause a failure to the current scheduling cycle. 270 cache.updateNodeInfoSnapshotList(nodeSnapshot, true) 271 return fmt.Errorf(errMsg) 272 } 273 274 return nil 275} 276 277func (cache *schedulerCache) updateNodeInfoSnapshotList(snapshot *Snapshot, updateAll bool) { 278 snapshot.havePodsWithAffinityNodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes) 279 snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes) 280 if updateAll { 281 // Take a snapshot of the nodes order in the tree 282 snapshot.nodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes) 283 nodesList, err := cache.nodeTree.list() 284 if err != nil { 285 klog.Error(err) 286 } 287 for _, nodeName := range nodesList { 288 if nodeInfo := snapshot.nodeInfoMap[nodeName]; nodeInfo != nil { 289 snapshot.nodeInfoList = append(snapshot.nodeInfoList, nodeInfo) 290 if len(nodeInfo.PodsWithAffinity) > 0 { 291 snapshot.havePodsWithAffinityNodeInfoList = append(snapshot.havePodsWithAffinityNodeInfoList, nodeInfo) 292 } 293 if len(nodeInfo.PodsWithRequiredAntiAffinity) > 0 { 294 snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = append(snapshot.havePodsWithRequiredAntiAffinityNodeInfoList, nodeInfo) 295 } 296 } else { 297 klog.Errorf("node %q exist in nodeTree but not in NodeInfoMap, this should not happen.", nodeName) 298 } 299 } 300 } else { 301 for _, nodeInfo := range snapshot.nodeInfoList { 302 if len(nodeInfo.PodsWithAffinity) > 0 { 303 snapshot.havePodsWithAffinityNodeInfoList = append(snapshot.havePodsWithAffinityNodeInfoList, nodeInfo) 304 } 305 if len(nodeInfo.PodsWithRequiredAntiAffinity) > 0 { 306 snapshot.havePodsWithRequiredAntiAffinityNodeInfoList = append(snapshot.havePodsWithRequiredAntiAffinityNodeInfoList, nodeInfo) 307 } 308 } 309 } 310} 311 312// If certain nodes were deleted after the last snapshot was taken, we should remove them from the snapshot. 313func (cache *schedulerCache) removeDeletedNodesFromSnapshot(snapshot *Snapshot) { 314 toDelete := len(snapshot.nodeInfoMap) - cache.nodeTree.numNodes 315 for name := range snapshot.nodeInfoMap { 316 if toDelete <= 0 { 317 break 318 } 319 if n, ok := cache.nodes[name]; !ok || n.info.Node() == nil { 320 delete(snapshot.nodeInfoMap, name) 321 toDelete-- 322 } 323 } 324} 325 326// NodeCount returns the number of nodes in the cache. 327// DO NOT use outside of tests. 328func (cache *schedulerCache) NodeCount() int { 329 cache.mu.RLock() 330 defer cache.mu.RUnlock() 331 return len(cache.nodes) 332} 333 334// PodCount returns the number of pods in the cache (including those from deleted nodes). 335// DO NOT use outside of tests. 336func (cache *schedulerCache) PodCount() (int, error) { 337 cache.mu.RLock() 338 defer cache.mu.RUnlock() 339 // podFilter is expected to return true for most or all of the pods. We 340 // can avoid expensive array growth without wasting too much memory by 341 // pre-allocating capacity. 342 count := 0 343 for _, n := range cache.nodes { 344 count += len(n.info.Pods) 345 } 346 return count, nil 347} 348 349func (cache *schedulerCache) AssumePod(pod *v1.Pod) error { 350 key, err := framework.GetPodKey(pod) 351 if err != nil { 352 return err 353 } 354 355 cache.mu.Lock() 356 defer cache.mu.Unlock() 357 if _, ok := cache.podStates[key]; ok { 358 return fmt.Errorf("pod %v is in the cache, so can't be assumed", key) 359 } 360 361 cache.addPod(pod) 362 ps := &podState{ 363 pod: pod, 364 } 365 cache.podStates[key] = ps 366 cache.assumedPods.Insert(key) 367 return nil 368} 369 370func (cache *schedulerCache) FinishBinding(pod *v1.Pod) error { 371 return cache.finishBinding(pod, time.Now()) 372} 373 374// finishBinding exists to make tests determinitistic by injecting now as an argument 375func (cache *schedulerCache) finishBinding(pod *v1.Pod, now time.Time) error { 376 key, err := framework.GetPodKey(pod) 377 if err != nil { 378 return err 379 } 380 381 cache.mu.RLock() 382 defer cache.mu.RUnlock() 383 384 klog.V(5).Infof("Finished binding for pod %v. Can be expired.", key) 385 currState, ok := cache.podStates[key] 386 if ok && cache.assumedPods.Has(key) { 387 dl := now.Add(cache.ttl) 388 currState.bindingFinished = true 389 currState.deadline = &dl 390 } 391 return nil 392} 393 394func (cache *schedulerCache) ForgetPod(pod *v1.Pod) error { 395 key, err := framework.GetPodKey(pod) 396 if err != nil { 397 return err 398 } 399 400 cache.mu.Lock() 401 defer cache.mu.Unlock() 402 403 currState, ok := cache.podStates[key] 404 if ok && currState.pod.Spec.NodeName != pod.Spec.NodeName { 405 return fmt.Errorf("pod %v was assumed on %v but assigned to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName) 406 } 407 408 switch { 409 // Only assumed pod can be forgotten. 410 case ok && cache.assumedPods.Has(key): 411 err := cache.removePod(pod) 412 if err != nil { 413 return err 414 } 415 delete(cache.assumedPods, key) 416 delete(cache.podStates, key) 417 default: 418 return fmt.Errorf("pod %v wasn't assumed so cannot be forgotten", key) 419 } 420 return nil 421} 422 423// Assumes that lock is already acquired. 424func (cache *schedulerCache) addPod(pod *v1.Pod) { 425 n, ok := cache.nodes[pod.Spec.NodeName] 426 if !ok { 427 n = newNodeInfoListItem(framework.NewNodeInfo()) 428 cache.nodes[pod.Spec.NodeName] = n 429 } 430 n.info.AddPod(pod) 431 cache.moveNodeInfoToHead(pod.Spec.NodeName) 432} 433 434// Assumes that lock is already acquired. 435func (cache *schedulerCache) updatePod(oldPod, newPod *v1.Pod) error { 436 if err := cache.removePod(oldPod); err != nil { 437 return err 438 } 439 cache.addPod(newPod) 440 return nil 441} 442 443// Assumes that lock is already acquired. 444// Removes a pod from the cached node info. If the node information was already 445// removed and there are no more pods left in the node, cleans up the node from 446// the cache. 447func (cache *schedulerCache) removePod(pod *v1.Pod) error { 448 n, ok := cache.nodes[pod.Spec.NodeName] 449 if !ok { 450 klog.Errorf("node %v not found when trying to remove pod %v", pod.Spec.NodeName, pod.Name) 451 return nil 452 } 453 if err := n.info.RemovePod(pod); err != nil { 454 return err 455 } 456 if len(n.info.Pods) == 0 && n.info.Node() == nil { 457 cache.removeNodeInfoFromList(pod.Spec.NodeName) 458 } else { 459 cache.moveNodeInfoToHead(pod.Spec.NodeName) 460 } 461 return nil 462} 463 464func (cache *schedulerCache) AddPod(pod *v1.Pod) error { 465 key, err := framework.GetPodKey(pod) 466 if err != nil { 467 return err 468 } 469 470 cache.mu.Lock() 471 defer cache.mu.Unlock() 472 473 currState, ok := cache.podStates[key] 474 switch { 475 case ok && cache.assumedPods.Has(key): 476 if currState.pod.Spec.NodeName != pod.Spec.NodeName { 477 // The pod was added to a different node than it was assumed to. 478 klog.Warningf("Pod %v was assumed to be on %v but got added to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName) 479 // Clean this up. 480 if err = cache.removePod(currState.pod); err != nil { 481 klog.Errorf("removing pod error: %v", err) 482 } 483 cache.addPod(pod) 484 } 485 delete(cache.assumedPods, key) 486 cache.podStates[key].deadline = nil 487 cache.podStates[key].pod = pod 488 case !ok: 489 // Pod was expired. We should add it back. 490 cache.addPod(pod) 491 ps := &podState{ 492 pod: pod, 493 } 494 cache.podStates[key] = ps 495 default: 496 return fmt.Errorf("pod %v was already in added state", key) 497 } 498 return nil 499} 500 501func (cache *schedulerCache) UpdatePod(oldPod, newPod *v1.Pod) error { 502 key, err := framework.GetPodKey(oldPod) 503 if err != nil { 504 return err 505 } 506 507 cache.mu.Lock() 508 defer cache.mu.Unlock() 509 510 currState, ok := cache.podStates[key] 511 switch { 512 // An assumed pod won't have Update/Remove event. It needs to have Add event 513 // before Update event, in which case the state would change from Assumed to Added. 514 case ok && !cache.assumedPods.Has(key): 515 if currState.pod.Spec.NodeName != newPod.Spec.NodeName { 516 klog.Errorf("Pod %v updated on a different node than previously added to.", key) 517 klog.Fatalf("Schedulercache is corrupted and can badly affect scheduling decisions") 518 } 519 if err := cache.updatePod(oldPod, newPod); err != nil { 520 return err 521 } 522 currState.pod = newPod 523 default: 524 return fmt.Errorf("pod %v is not added to scheduler cache, so cannot be updated", key) 525 } 526 return nil 527} 528 529func (cache *schedulerCache) RemovePod(pod *v1.Pod) error { 530 key, err := framework.GetPodKey(pod) 531 if err != nil { 532 return err 533 } 534 535 cache.mu.Lock() 536 defer cache.mu.Unlock() 537 538 currState, ok := cache.podStates[key] 539 switch { 540 // An assumed pod won't have Delete/Remove event. It needs to have Add event 541 // before Remove event, in which case the state would change from Assumed to Added. 542 case ok && !cache.assumedPods.Has(key): 543 if currState.pod.Spec.NodeName != pod.Spec.NodeName { 544 klog.Errorf("Pod %v was assumed to be on %v but got added to %v", key, pod.Spec.NodeName, currState.pod.Spec.NodeName) 545 klog.Fatalf("Schedulercache is corrupted and can badly affect scheduling decisions") 546 } 547 err := cache.removePod(currState.pod) 548 if err != nil { 549 return err 550 } 551 delete(cache.podStates, key) 552 default: 553 return fmt.Errorf("pod %v is not found in scheduler cache, so cannot be removed from it", key) 554 } 555 return nil 556} 557 558func (cache *schedulerCache) IsAssumedPod(pod *v1.Pod) (bool, error) { 559 key, err := framework.GetPodKey(pod) 560 if err != nil { 561 return false, err 562 } 563 564 cache.mu.RLock() 565 defer cache.mu.RUnlock() 566 567 return cache.assumedPods.Has(key), nil 568} 569 570// GetPod might return a pod for which its node has already been deleted from 571// the main cache. This is useful to properly process pod update events. 572func (cache *schedulerCache) GetPod(pod *v1.Pod) (*v1.Pod, error) { 573 key, err := framework.GetPodKey(pod) 574 if err != nil { 575 return nil, err 576 } 577 578 cache.mu.RLock() 579 defer cache.mu.RUnlock() 580 581 podState, ok := cache.podStates[key] 582 if !ok { 583 return nil, fmt.Errorf("pod %v does not exist in scheduler cache", key) 584 } 585 586 return podState.pod, nil 587} 588 589func (cache *schedulerCache) AddNode(node *v1.Node) *framework.NodeInfo { 590 cache.mu.Lock() 591 defer cache.mu.Unlock() 592 593 n, ok := cache.nodes[node.Name] 594 if !ok { 595 n = newNodeInfoListItem(framework.NewNodeInfo()) 596 cache.nodes[node.Name] = n 597 } else { 598 cache.removeNodeImageStates(n.info.Node()) 599 } 600 cache.moveNodeInfoToHead(node.Name) 601 602 cache.nodeTree.addNode(node) 603 cache.addNodeImageStates(node, n.info) 604 n.info.SetNode(node) 605 return n.info.Clone() 606} 607 608func (cache *schedulerCache) UpdateNode(oldNode, newNode *v1.Node) *framework.NodeInfo { 609 cache.mu.Lock() 610 defer cache.mu.Unlock() 611 612 n, ok := cache.nodes[newNode.Name] 613 if !ok { 614 n = newNodeInfoListItem(framework.NewNodeInfo()) 615 cache.nodes[newNode.Name] = n 616 cache.nodeTree.addNode(newNode) 617 } else { 618 cache.removeNodeImageStates(n.info.Node()) 619 } 620 cache.moveNodeInfoToHead(newNode.Name) 621 622 cache.nodeTree.updateNode(oldNode, newNode) 623 cache.addNodeImageStates(newNode, n.info) 624 n.info.SetNode(newNode) 625 return n.info.Clone() 626} 627 628// RemoveNode removes a node from the cache's tree. 629// The node might still have pods because their deletion events didn't arrive 630// yet. Those pods are considered removed from the cache, being the node tree 631// the source of truth. 632// However, we keep a ghost node with the list of pods until all pod deletion 633// events have arrived. A ghost node is skipped from snapshots. 634func (cache *schedulerCache) RemoveNode(node *v1.Node) error { 635 cache.mu.Lock() 636 defer cache.mu.Unlock() 637 638 n, ok := cache.nodes[node.Name] 639 if !ok { 640 return fmt.Errorf("node %v is not found", node.Name) 641 } 642 n.info.RemoveNode() 643 // We remove NodeInfo for this node only if there aren't any pods on this node. 644 // We can't do it unconditionally, because notifications about pods are delivered 645 // in a different watch, and thus can potentially be observed later, even though 646 // they happened before node removal. 647 if len(n.info.Pods) == 0 { 648 cache.removeNodeInfoFromList(node.Name) 649 } else { 650 cache.moveNodeInfoToHead(node.Name) 651 } 652 if err := cache.nodeTree.removeNode(node); err != nil { 653 return err 654 } 655 cache.removeNodeImageStates(node) 656 return nil 657} 658 659// addNodeImageStates adds states of the images on given node to the given nodeInfo and update the imageStates in 660// scheduler cache. This function assumes the lock to scheduler cache has been acquired. 661func (cache *schedulerCache) addNodeImageStates(node *v1.Node, nodeInfo *framework.NodeInfo) { 662 newSum := make(map[string]*framework.ImageStateSummary) 663 664 for _, image := range node.Status.Images { 665 for _, name := range image.Names { 666 // update the entry in imageStates 667 state, ok := cache.imageStates[name] 668 if !ok { 669 state = &imageState{ 670 size: image.SizeBytes, 671 nodes: sets.NewString(node.Name), 672 } 673 cache.imageStates[name] = state 674 } else { 675 state.nodes.Insert(node.Name) 676 } 677 // create the imageStateSummary for this image 678 if _, ok := newSum[name]; !ok { 679 newSum[name] = cache.createImageStateSummary(state) 680 } 681 } 682 } 683 nodeInfo.ImageStates = newSum 684} 685 686// removeNodeImageStates removes the given node record from image entries having the node 687// in imageStates cache. After the removal, if any image becomes free, i.e., the image 688// is no longer available on any node, the image entry will be removed from imageStates. 689func (cache *schedulerCache) removeNodeImageStates(node *v1.Node) { 690 if node == nil { 691 return 692 } 693 694 for _, image := range node.Status.Images { 695 for _, name := range image.Names { 696 state, ok := cache.imageStates[name] 697 if ok { 698 state.nodes.Delete(node.Name) 699 if len(state.nodes) == 0 { 700 // Remove the unused image to make sure the length of 701 // imageStates represents the total number of different 702 // images on all nodes 703 delete(cache.imageStates, name) 704 } 705 } 706 } 707 } 708} 709 710func (cache *schedulerCache) run() { 711 go wait.Until(cache.cleanupExpiredAssumedPods, cache.period, cache.stop) 712} 713 714func (cache *schedulerCache) cleanupExpiredAssumedPods() { 715 cache.cleanupAssumedPods(time.Now()) 716} 717 718// cleanupAssumedPods exists for making test deterministic by taking time as input argument. 719// It also reports metrics on the cache size for nodes, pods, and assumed pods. 720func (cache *schedulerCache) cleanupAssumedPods(now time.Time) { 721 cache.mu.Lock() 722 defer cache.mu.Unlock() 723 defer cache.updateMetrics() 724 725 // The size of assumedPods should be small 726 for key := range cache.assumedPods { 727 ps, ok := cache.podStates[key] 728 if !ok { 729 klog.Fatal("Key found in assumed set but not in podStates. Potentially a logical error.") 730 } 731 if !ps.bindingFinished { 732 klog.V(5).Infof("Couldn't expire cache for pod %v/%v. Binding is still in progress.", 733 ps.pod.Namespace, ps.pod.Name) 734 continue 735 } 736 if now.After(*ps.deadline) { 737 klog.Warningf("Pod %s/%s expired", ps.pod.Namespace, ps.pod.Name) 738 if err := cache.expirePod(key, ps); err != nil { 739 klog.Errorf("ExpirePod failed for %s: %v", key, err) 740 } 741 } 742 } 743} 744 745func (cache *schedulerCache) expirePod(key string, ps *podState) error { 746 if err := cache.removePod(ps.pod); err != nil { 747 return err 748 } 749 delete(cache.assumedPods, key) 750 delete(cache.podStates, key) 751 return nil 752} 753 754// updateMetrics updates cache size metric values for pods, assumed pods, and nodes 755func (cache *schedulerCache) updateMetrics() { 756 metrics.CacheSize.WithLabelValues("assumed_pods").Set(float64(len(cache.assumedPods))) 757 metrics.CacheSize.WithLabelValues("pods").Set(float64(len(cache.podStates))) 758 metrics.CacheSize.WithLabelValues("nodes").Set(float64(len(cache.nodes))) 759} 760