1/* 2Copyright 2015 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package cache 18 19import ( 20 "errors" 21 "fmt" 22 "reflect" 23 "strings" 24 "testing" 25 "time" 26 27 v1 "k8s.io/api/core/v1" 28 "k8s.io/apimachinery/pkg/api/resource" 29 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 30 "k8s.io/apimachinery/pkg/types" 31 "k8s.io/kubernetes/pkg/scheduler/framework" 32 schedutil "k8s.io/kubernetes/pkg/scheduler/util" 33) 34 35func deepEqualWithoutGeneration(actual *nodeInfoListItem, expected *framework.NodeInfo) error { 36 if (actual == nil) != (expected == nil) { 37 return errors.New("one of the actual or expected is nil and the other is not") 38 } 39 // Ignore generation field. 40 if actual != nil { 41 actual.info.Generation = 0 42 } 43 if expected != nil { 44 expected.Generation = 0 45 } 46 if actual != nil && !reflect.DeepEqual(actual.info, expected) { 47 return fmt.Errorf("got node info %s, want %s", actual.info, expected) 48 } 49 return nil 50} 51 52type hostPortInfoParam struct { 53 protocol, ip string 54 port int32 55} 56 57type hostPortInfoBuilder struct { 58 inputs []hostPortInfoParam 59} 60 61func newHostPortInfoBuilder() *hostPortInfoBuilder { 62 return &hostPortInfoBuilder{} 63} 64 65func (b *hostPortInfoBuilder) add(protocol, ip string, port int32) *hostPortInfoBuilder { 66 b.inputs = append(b.inputs, hostPortInfoParam{protocol, ip, port}) 67 return b 68} 69 70func (b *hostPortInfoBuilder) build() framework.HostPortInfo { 71 res := make(framework.HostPortInfo) 72 for _, param := range b.inputs { 73 res.Add(param.ip, param.protocol, param.port) 74 } 75 return res 76} 77 78func newNodeInfo(requestedResource *framework.Resource, 79 nonzeroRequest *framework.Resource, 80 pods []*v1.Pod, 81 usedPorts framework.HostPortInfo, 82 imageStates map[string]*framework.ImageStateSummary, 83) *framework.NodeInfo { 84 nodeInfo := framework.NewNodeInfo(pods...) 85 nodeInfo.Requested = requestedResource 86 nodeInfo.NonZeroRequested = nonzeroRequest 87 nodeInfo.UsedPorts = usedPorts 88 nodeInfo.ImageStates = imageStates 89 return nodeInfo 90} 91 92// TestAssumePodScheduled tests that after a pod is assumed, its information is aggregated 93// on node level. 94func TestAssumePodScheduled(t *testing.T) { 95 nodeName := "node" 96 testPods := []*v1.Pod{ 97 makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}), 98 makeBasePod(t, nodeName, "test-1", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}), 99 makeBasePod(t, nodeName, "test-2", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}), 100 makeBasePod(t, nodeName, "test-nonzero", "", "", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}), 101 makeBasePod(t, nodeName, "test", "100m", "500", "example.com/foo:3", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}), 102 makeBasePod(t, nodeName, "test-2", "200m", "1Ki", "example.com/foo:5", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}), 103 makeBasePod(t, nodeName, "test", "100m", "500", "random-invalid-extended-key:100", []v1.ContainerPort{{}}), 104 } 105 106 tests := []struct { 107 pods []*v1.Pod 108 109 wNodeInfo *framework.NodeInfo 110 }{{ 111 pods: []*v1.Pod{testPods[0]}, 112 wNodeInfo: newNodeInfo( 113 &framework.Resource{ 114 MilliCPU: 100, 115 Memory: 500, 116 }, 117 &framework.Resource{ 118 MilliCPU: 100, 119 Memory: 500, 120 }, 121 []*v1.Pod{testPods[0]}, 122 newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), 123 make(map[string]*framework.ImageStateSummary), 124 ), 125 }, { 126 pods: []*v1.Pod{testPods[1], testPods[2]}, 127 wNodeInfo: newNodeInfo( 128 &framework.Resource{ 129 MilliCPU: 300, 130 Memory: 1524, 131 }, 132 &framework.Resource{ 133 MilliCPU: 300, 134 Memory: 1524, 135 }, 136 []*v1.Pod{testPods[1], testPods[2]}, 137 newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).add("TCP", "127.0.0.1", 8080).build(), 138 make(map[string]*framework.ImageStateSummary), 139 ), 140 }, { // test non-zero request 141 pods: []*v1.Pod{testPods[3]}, 142 wNodeInfo: newNodeInfo( 143 &framework.Resource{ 144 MilliCPU: 0, 145 Memory: 0, 146 }, 147 &framework.Resource{ 148 MilliCPU: schedutil.DefaultMilliCPURequest, 149 Memory: schedutil.DefaultMemoryRequest, 150 }, 151 []*v1.Pod{testPods[3]}, 152 newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), 153 make(map[string]*framework.ImageStateSummary), 154 ), 155 }, { 156 pods: []*v1.Pod{testPods[4]}, 157 wNodeInfo: newNodeInfo( 158 &framework.Resource{ 159 MilliCPU: 100, 160 Memory: 500, 161 ScalarResources: map[v1.ResourceName]int64{"example.com/foo": 3}, 162 }, 163 &framework.Resource{ 164 MilliCPU: 100, 165 Memory: 500, 166 }, 167 []*v1.Pod{testPods[4]}, 168 newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), 169 make(map[string]*framework.ImageStateSummary), 170 ), 171 }, { 172 pods: []*v1.Pod{testPods[4], testPods[5]}, 173 wNodeInfo: newNodeInfo( 174 &framework.Resource{ 175 MilliCPU: 300, 176 Memory: 1524, 177 ScalarResources: map[v1.ResourceName]int64{"example.com/foo": 8}, 178 }, 179 &framework.Resource{ 180 MilliCPU: 300, 181 Memory: 1524, 182 }, 183 []*v1.Pod{testPods[4], testPods[5]}, 184 newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).add("TCP", "127.0.0.1", 8080).build(), 185 make(map[string]*framework.ImageStateSummary), 186 ), 187 }, { 188 pods: []*v1.Pod{testPods[6]}, 189 wNodeInfo: newNodeInfo( 190 &framework.Resource{ 191 MilliCPU: 100, 192 Memory: 500, 193 }, 194 &framework.Resource{ 195 MilliCPU: 100, 196 Memory: 500, 197 }, 198 []*v1.Pod{testPods[6]}, 199 newHostPortInfoBuilder().build(), 200 make(map[string]*framework.ImageStateSummary), 201 ), 202 }, 203 } 204 205 for i, tt := range tests { 206 t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { 207 cache := newSchedulerCache(time.Second, time.Second, nil) 208 for _, pod := range tt.pods { 209 if err := cache.AssumePod(pod); err != nil { 210 t.Fatalf("AssumePod failed: %v", err) 211 } 212 } 213 n := cache.nodes[nodeName] 214 if err := deepEqualWithoutGeneration(n, tt.wNodeInfo); err != nil { 215 t.Error(err) 216 } 217 218 for _, pod := range tt.pods { 219 if err := cache.ForgetPod(pod); err != nil { 220 t.Fatalf("ForgetPod failed: %v", err) 221 } 222 if err := isForgottenFromCache(pod, cache); err != nil { 223 t.Errorf("pod %s: %v", pod.Name, err) 224 } 225 } 226 }) 227 } 228} 229 230type testExpirePodStruct struct { 231 pod *v1.Pod 232 finishBind bool 233 assumedTime time.Time 234} 235 236func assumeAndFinishBinding(cache *schedulerCache, pod *v1.Pod, assumedTime time.Time) error { 237 if err := cache.AssumePod(pod); err != nil { 238 return err 239 } 240 return cache.finishBinding(pod, assumedTime) 241} 242 243// TestExpirePod tests that assumed pods will be removed if expired. 244// The removal will be reflected in node info. 245func TestExpirePod(t *testing.T) { 246 nodeName := "node" 247 testPods := []*v1.Pod{ 248 makeBasePod(t, nodeName, "test-1", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}), 249 makeBasePod(t, nodeName, "test-2", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}), 250 makeBasePod(t, nodeName, "test-3", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}), 251 } 252 now := time.Now() 253 ttl := 10 * time.Second 254 tests := []struct { 255 pods []*testExpirePodStruct 256 cleanupTime time.Time 257 258 wNodeInfo *framework.NodeInfo 259 }{{ // assumed pod would expires 260 pods: []*testExpirePodStruct{ 261 {pod: testPods[0], finishBind: true, assumedTime: now}, 262 }, 263 cleanupTime: now.Add(2 * ttl), 264 wNodeInfo: nil, 265 }, { // first one would expire, second and third would not. 266 pods: []*testExpirePodStruct{ 267 {pod: testPods[0], finishBind: true, assumedTime: now}, 268 {pod: testPods[1], finishBind: true, assumedTime: now.Add(3 * ttl / 2)}, 269 {pod: testPods[2]}, 270 }, 271 cleanupTime: now.Add(2 * ttl), 272 wNodeInfo: newNodeInfo( 273 &framework.Resource{ 274 MilliCPU: 400, 275 Memory: 2048, 276 }, 277 &framework.Resource{ 278 MilliCPU: 400, 279 Memory: 2048, 280 }, 281 // Order gets altered when removing pods. 282 []*v1.Pod{testPods[2], testPods[1]}, 283 newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(), 284 make(map[string]*framework.ImageStateSummary), 285 ), 286 }} 287 288 for i, tt := range tests { 289 t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { 290 cache := newSchedulerCache(ttl, time.Second, nil) 291 292 for _, pod := range tt.pods { 293 if err := cache.AssumePod(pod.pod); err != nil { 294 t.Fatal(err) 295 } 296 if !pod.finishBind { 297 continue 298 } 299 if err := cache.finishBinding(pod.pod, pod.assumedTime); err != nil { 300 t.Fatal(err) 301 } 302 } 303 // pods that got bound and have assumedTime + ttl < cleanupTime will get 304 // expired and removed 305 cache.cleanupAssumedPods(tt.cleanupTime) 306 n := cache.nodes[nodeName] 307 if err := deepEqualWithoutGeneration(n, tt.wNodeInfo); err != nil { 308 t.Error(err) 309 } 310 }) 311 } 312} 313 314// TestAddPodWillConfirm tests that a pod being Add()ed will be confirmed if assumed. 315// The pod info should still exist after manually expiring unconfirmed pods. 316func TestAddPodWillConfirm(t *testing.T) { 317 nodeName := "node" 318 now := time.Now() 319 ttl := 10 * time.Second 320 321 testPods := []*v1.Pod{ 322 makeBasePod(t, nodeName, "test-1", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}), 323 makeBasePod(t, nodeName, "test-2", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}), 324 } 325 tests := []struct { 326 podsToAssume []*v1.Pod 327 podsToAdd []*v1.Pod 328 329 wNodeInfo *framework.NodeInfo 330 }{{ // two pod were assumed at same time. But first one is called Add() and gets confirmed. 331 podsToAssume: []*v1.Pod{testPods[0], testPods[1]}, 332 podsToAdd: []*v1.Pod{testPods[0]}, 333 wNodeInfo: newNodeInfo( 334 &framework.Resource{ 335 MilliCPU: 100, 336 Memory: 500, 337 }, 338 &framework.Resource{ 339 MilliCPU: 100, 340 Memory: 500, 341 }, 342 []*v1.Pod{testPods[0]}, 343 newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), 344 make(map[string]*framework.ImageStateSummary), 345 ), 346 }} 347 348 for i, tt := range tests { 349 t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { 350 cache := newSchedulerCache(ttl, time.Second, nil) 351 for _, podToAssume := range tt.podsToAssume { 352 if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil { 353 t.Fatalf("assumePod failed: %v", err) 354 } 355 } 356 for _, podToAdd := range tt.podsToAdd { 357 if err := cache.AddPod(podToAdd); err != nil { 358 t.Fatalf("AddPod failed: %v", err) 359 } 360 } 361 cache.cleanupAssumedPods(now.Add(2 * ttl)) 362 // check after expiration. confirmed pods shouldn't be expired. 363 n := cache.nodes[nodeName] 364 if err := deepEqualWithoutGeneration(n, tt.wNodeInfo); err != nil { 365 t.Error(err) 366 } 367 }) 368 } 369} 370 371func TestDump(t *testing.T) { 372 nodeName := "node" 373 now := time.Now() 374 ttl := 10 * time.Second 375 376 testPods := []*v1.Pod{ 377 makeBasePod(t, nodeName, "test-1", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}), 378 makeBasePod(t, nodeName, "test-2", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}), 379 } 380 tests := []struct { 381 podsToAssume []*v1.Pod 382 podsToAdd []*v1.Pod 383 }{{ // two pod were assumed at same time. But first one is called Add() and gets confirmed. 384 podsToAssume: []*v1.Pod{testPods[0], testPods[1]}, 385 podsToAdd: []*v1.Pod{testPods[0]}, 386 }} 387 388 for i, tt := range tests { 389 t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { 390 cache := newSchedulerCache(ttl, time.Second, nil) 391 for _, podToAssume := range tt.podsToAssume { 392 if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil { 393 t.Errorf("assumePod failed: %v", err) 394 } 395 } 396 for _, podToAdd := range tt.podsToAdd { 397 if err := cache.AddPod(podToAdd); err != nil { 398 t.Errorf("AddPod failed: %v", err) 399 } 400 } 401 402 snapshot := cache.Dump() 403 if len(snapshot.Nodes) != len(cache.nodes) { 404 t.Errorf("Unequal number of nodes in the cache and its snapshot. expected: %v, got: %v", len(cache.nodes), len(snapshot.Nodes)) 405 } 406 for name, ni := range snapshot.Nodes { 407 nItem := cache.nodes[name] 408 if !reflect.DeepEqual(ni, nItem.info) { 409 t.Errorf("expect \n%+v; got \n%+v", nItem.info, ni) 410 } 411 } 412 if !reflect.DeepEqual(snapshot.AssumedPods, cache.assumedPods) { 413 t.Errorf("expect \n%+v; got \n%+v", cache.assumedPods, snapshot.AssumedPods) 414 } 415 }) 416 } 417} 418 419// TestAddPodWillReplaceAssumed tests that a pod being Add()ed will replace any assumed pod. 420func TestAddPodWillReplaceAssumed(t *testing.T) { 421 now := time.Now() 422 ttl := 10 * time.Second 423 424 assumedPod := makeBasePod(t, "assumed-node-1", "test-1", "100m", "500", "", []v1.ContainerPort{{HostPort: 80}}) 425 addedPod := makeBasePod(t, "actual-node", "test-1", "100m", "500", "", []v1.ContainerPort{{HostPort: 80}}) 426 updatedPod := makeBasePod(t, "actual-node", "test-1", "200m", "500", "", []v1.ContainerPort{{HostPort: 90}}) 427 428 tests := []struct { 429 podsToAssume []*v1.Pod 430 podsToAdd []*v1.Pod 431 podsToUpdate [][]*v1.Pod 432 433 wNodeInfo map[string]*framework.NodeInfo 434 }{{ 435 podsToAssume: []*v1.Pod{assumedPod.DeepCopy()}, 436 podsToAdd: []*v1.Pod{addedPod.DeepCopy()}, 437 podsToUpdate: [][]*v1.Pod{{addedPod.DeepCopy(), updatedPod.DeepCopy()}}, 438 wNodeInfo: map[string]*framework.NodeInfo{ 439 "assumed-node": nil, 440 "actual-node": newNodeInfo( 441 &framework.Resource{ 442 MilliCPU: 200, 443 Memory: 500, 444 }, 445 &framework.Resource{ 446 MilliCPU: 200, 447 Memory: 500, 448 }, 449 []*v1.Pod{updatedPod.DeepCopy()}, 450 newHostPortInfoBuilder().add("TCP", "0.0.0.0", 90).build(), 451 make(map[string]*framework.ImageStateSummary), 452 ), 453 }, 454 }} 455 456 for i, tt := range tests { 457 t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { 458 cache := newSchedulerCache(ttl, time.Second, nil) 459 for _, podToAssume := range tt.podsToAssume { 460 if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil { 461 t.Fatalf("assumePod failed: %v", err) 462 } 463 } 464 for _, podToAdd := range tt.podsToAdd { 465 if err := cache.AddPod(podToAdd); err != nil { 466 t.Fatalf("AddPod failed: %v", err) 467 } 468 } 469 for _, podToUpdate := range tt.podsToUpdate { 470 if err := cache.UpdatePod(podToUpdate[0], podToUpdate[1]); err != nil { 471 t.Fatalf("UpdatePod failed: %v", err) 472 } 473 } 474 for nodeName, expected := range tt.wNodeInfo { 475 n := cache.nodes[nodeName] 476 if err := deepEqualWithoutGeneration(n, expected); err != nil { 477 t.Errorf("node %q: %v", nodeName, err) 478 } 479 } 480 }) 481 } 482} 483 484// TestAddPodAfterExpiration tests that a pod being Add()ed will be added back if expired. 485func TestAddPodAfterExpiration(t *testing.T) { 486 nodeName := "node" 487 ttl := 10 * time.Second 488 basePod := makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}) 489 tests := []struct { 490 pod *v1.Pod 491 492 wNodeInfo *framework.NodeInfo 493 }{{ 494 pod: basePod, 495 wNodeInfo: newNodeInfo( 496 &framework.Resource{ 497 MilliCPU: 100, 498 Memory: 500, 499 }, 500 &framework.Resource{ 501 MilliCPU: 100, 502 Memory: 500, 503 }, 504 []*v1.Pod{basePod}, 505 newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), 506 make(map[string]*framework.ImageStateSummary), 507 ), 508 }} 509 510 for i, tt := range tests { 511 t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { 512 now := time.Now() 513 cache := newSchedulerCache(ttl, time.Second, nil) 514 if err := assumeAndFinishBinding(cache, tt.pod, now); err != nil { 515 t.Fatalf("assumePod failed: %v", err) 516 } 517 cache.cleanupAssumedPods(now.Add(2 * ttl)) 518 // It should be expired and removed. 519 if err := isForgottenFromCache(tt.pod, cache); err != nil { 520 t.Error(err) 521 } 522 if err := cache.AddPod(tt.pod); err != nil { 523 t.Fatalf("AddPod failed: %v", err) 524 } 525 // check after expiration. confirmed pods shouldn't be expired. 526 n := cache.nodes[nodeName] 527 if err := deepEqualWithoutGeneration(n, tt.wNodeInfo); err != nil { 528 t.Error(err) 529 } 530 }) 531 } 532} 533 534// TestUpdatePod tests that a pod will be updated if added before. 535func TestUpdatePod(t *testing.T) { 536 nodeName := "node" 537 ttl := 10 * time.Second 538 testPods := []*v1.Pod{ 539 makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}), 540 makeBasePod(t, nodeName, "test", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}), 541 } 542 tests := []struct { 543 podsToAdd []*v1.Pod 544 podsToUpdate []*v1.Pod 545 546 wNodeInfo []*framework.NodeInfo 547 }{{ // add a pod and then update it twice 548 podsToAdd: []*v1.Pod{testPods[0]}, 549 podsToUpdate: []*v1.Pod{testPods[0], testPods[1], testPods[0]}, 550 wNodeInfo: []*framework.NodeInfo{newNodeInfo( 551 &framework.Resource{ 552 MilliCPU: 200, 553 Memory: 1024, 554 }, 555 &framework.Resource{ 556 MilliCPU: 200, 557 Memory: 1024, 558 }, 559 []*v1.Pod{testPods[1]}, 560 newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(), 561 make(map[string]*framework.ImageStateSummary), 562 ), newNodeInfo( 563 &framework.Resource{ 564 MilliCPU: 100, 565 Memory: 500, 566 }, 567 &framework.Resource{ 568 MilliCPU: 100, 569 Memory: 500, 570 }, 571 []*v1.Pod{testPods[0]}, 572 newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), 573 make(map[string]*framework.ImageStateSummary), 574 )}, 575 }} 576 577 for i, tt := range tests { 578 t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { 579 cache := newSchedulerCache(ttl, time.Second, nil) 580 for _, podToAdd := range tt.podsToAdd { 581 if err := cache.AddPod(podToAdd); err != nil { 582 t.Fatalf("AddPod failed: %v", err) 583 } 584 } 585 586 for j := range tt.podsToUpdate { 587 if j == 0 { 588 continue 589 } 590 if err := cache.UpdatePod(tt.podsToUpdate[j-1], tt.podsToUpdate[j]); err != nil { 591 t.Fatalf("UpdatePod failed: %v", err) 592 } 593 // check after expiration. confirmed pods shouldn't be expired. 594 n := cache.nodes[nodeName] 595 if err := deepEqualWithoutGeneration(n, tt.wNodeInfo[j-1]); err != nil { 596 t.Errorf("update %d: %v", j, err) 597 } 598 } 599 }) 600 } 601} 602 603// TestUpdatePodAndGet tests get always return latest pod state 604func TestUpdatePodAndGet(t *testing.T) { 605 nodeName := "node" 606 ttl := 10 * time.Second 607 testPods := []*v1.Pod{ 608 makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}), 609 makeBasePod(t, nodeName, "test", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}), 610 } 611 tests := []struct { 612 pod *v1.Pod 613 614 podToUpdate *v1.Pod 615 handler func(cache Cache, pod *v1.Pod) error 616 617 assumePod bool 618 }{ 619 { 620 pod: testPods[0], 621 622 podToUpdate: testPods[0], 623 handler: func(cache Cache, pod *v1.Pod) error { 624 return cache.AssumePod(pod) 625 }, 626 assumePod: true, 627 }, 628 { 629 pod: testPods[0], 630 631 podToUpdate: testPods[1], 632 handler: func(cache Cache, pod *v1.Pod) error { 633 return cache.AddPod(pod) 634 }, 635 assumePod: false, 636 }, 637 } 638 639 for i, tt := range tests { 640 t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { 641 cache := newSchedulerCache(ttl, time.Second, nil) 642 643 if err := tt.handler(cache, tt.pod); err != nil { 644 t.Fatalf("unexpected err: %v", err) 645 } 646 647 if !tt.assumePod { 648 if err := cache.UpdatePod(tt.pod, tt.podToUpdate); err != nil { 649 t.Fatalf("UpdatePod failed: %v", err) 650 } 651 } 652 653 cachedPod, err := cache.GetPod(tt.pod) 654 if err != nil { 655 t.Fatalf("GetPod failed: %v", err) 656 } 657 if !reflect.DeepEqual(tt.podToUpdate, cachedPod) { 658 t.Fatalf("pod get=%s, want=%s", cachedPod, tt.podToUpdate) 659 } 660 }) 661 } 662} 663 664// TestExpireAddUpdatePod test the sequence that a pod is expired, added, then updated 665func TestExpireAddUpdatePod(t *testing.T) { 666 nodeName := "node" 667 ttl := 10 * time.Second 668 testPods := []*v1.Pod{ 669 makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}), 670 makeBasePod(t, nodeName, "test", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}), 671 } 672 tests := []struct { 673 podsToAssume []*v1.Pod 674 podsToAdd []*v1.Pod 675 podsToUpdate []*v1.Pod 676 677 wNodeInfo []*framework.NodeInfo 678 }{{ // Pod is assumed, expired, and added. Then it would be updated twice. 679 podsToAssume: []*v1.Pod{testPods[0]}, 680 podsToAdd: []*v1.Pod{testPods[0]}, 681 podsToUpdate: []*v1.Pod{testPods[0], testPods[1], testPods[0]}, 682 wNodeInfo: []*framework.NodeInfo{newNodeInfo( 683 &framework.Resource{ 684 MilliCPU: 200, 685 Memory: 1024, 686 }, 687 &framework.Resource{ 688 MilliCPU: 200, 689 Memory: 1024, 690 }, 691 []*v1.Pod{testPods[1]}, 692 newHostPortInfoBuilder().add("TCP", "127.0.0.1", 8080).build(), 693 make(map[string]*framework.ImageStateSummary), 694 ), newNodeInfo( 695 &framework.Resource{ 696 MilliCPU: 100, 697 Memory: 500, 698 }, 699 &framework.Resource{ 700 MilliCPU: 100, 701 Memory: 500, 702 }, 703 []*v1.Pod{testPods[0]}, 704 newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), 705 make(map[string]*framework.ImageStateSummary), 706 )}, 707 }} 708 709 for i, tt := range tests { 710 t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { 711 now := time.Now() 712 cache := newSchedulerCache(ttl, time.Second, nil) 713 for _, podToAssume := range tt.podsToAssume { 714 if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil { 715 t.Fatalf("assumePod failed: %v", err) 716 } 717 } 718 cache.cleanupAssumedPods(now.Add(2 * ttl)) 719 720 for _, podToAdd := range tt.podsToAdd { 721 if err := cache.AddPod(podToAdd); err != nil { 722 t.Fatalf("AddPod failed: %v", err) 723 } 724 } 725 726 for j := range tt.podsToUpdate { 727 if j == 0 { 728 continue 729 } 730 if err := cache.UpdatePod(tt.podsToUpdate[j-1], tt.podsToUpdate[j]); err != nil { 731 t.Fatalf("UpdatePod failed: %v", err) 732 } 733 // check after expiration. confirmed pods shouldn't be expired. 734 n := cache.nodes[nodeName] 735 if err := deepEqualWithoutGeneration(n, tt.wNodeInfo[j-1]); err != nil { 736 t.Errorf("update %d: %v", j, err) 737 } 738 } 739 }) 740 } 741} 742 743func makePodWithEphemeralStorage(nodeName, ephemeralStorage string) *v1.Pod { 744 req := v1.ResourceList{ 745 v1.ResourceEphemeralStorage: resource.MustParse(ephemeralStorage), 746 } 747 return &v1.Pod{ 748 ObjectMeta: metav1.ObjectMeta{ 749 Namespace: "default-namespace", 750 Name: "pod-with-ephemeral-storage", 751 UID: types.UID("pod-with-ephemeral-storage"), 752 }, 753 Spec: v1.PodSpec{ 754 Containers: []v1.Container{{ 755 Resources: v1.ResourceRequirements{ 756 Requests: req, 757 }, 758 }}, 759 NodeName: nodeName, 760 }, 761 } 762} 763 764func TestEphemeralStorageResource(t *testing.T) { 765 nodeName := "node" 766 podE := makePodWithEphemeralStorage(nodeName, "500") 767 tests := []struct { 768 pod *v1.Pod 769 wNodeInfo *framework.NodeInfo 770 }{ 771 { 772 pod: podE, 773 wNodeInfo: newNodeInfo( 774 &framework.Resource{ 775 EphemeralStorage: 500, 776 }, 777 &framework.Resource{ 778 MilliCPU: schedutil.DefaultMilliCPURequest, 779 Memory: schedutil.DefaultMemoryRequest, 780 }, 781 []*v1.Pod{podE}, 782 framework.HostPortInfo{}, 783 make(map[string]*framework.ImageStateSummary), 784 ), 785 }, 786 } 787 for i, tt := range tests { 788 t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { 789 cache := newSchedulerCache(time.Second, time.Second, nil) 790 if err := cache.AddPod(tt.pod); err != nil { 791 t.Fatalf("AddPod failed: %v", err) 792 } 793 n := cache.nodes[nodeName] 794 if err := deepEqualWithoutGeneration(n, tt.wNodeInfo); err != nil { 795 t.Error(err) 796 } 797 798 if err := cache.RemovePod(tt.pod); err != nil { 799 t.Fatalf("RemovePod failed: %v", err) 800 } 801 if _, err := cache.GetPod(tt.pod); err == nil { 802 t.Errorf("pod was not deleted") 803 } 804 }) 805 } 806} 807 808// TestRemovePod tests after added pod is removed, its information should also be subtracted. 809func TestRemovePod(t *testing.T) { 810 basePod := makeBasePod(t, "node-1", "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}) 811 tests := []struct { 812 nodes []*v1.Node 813 pod *v1.Pod 814 wNodeInfo *framework.NodeInfo 815 }{{ 816 nodes: []*v1.Node{ 817 { 818 ObjectMeta: metav1.ObjectMeta{Name: "node-1"}, 819 }, 820 { 821 ObjectMeta: metav1.ObjectMeta{Name: "node-2"}, 822 }, 823 }, 824 pod: basePod, 825 wNodeInfo: newNodeInfo( 826 &framework.Resource{ 827 MilliCPU: 100, 828 Memory: 500, 829 }, 830 &framework.Resource{ 831 MilliCPU: 100, 832 Memory: 500, 833 }, 834 []*v1.Pod{basePod}, 835 newHostPortInfoBuilder().add("TCP", "127.0.0.1", 80).build(), 836 make(map[string]*framework.ImageStateSummary), 837 ), 838 }} 839 840 for i, tt := range tests { 841 t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { 842 nodeName := tt.pod.Spec.NodeName 843 cache := newSchedulerCache(time.Second, time.Second, nil) 844 // Add pod succeeds even before adding the nodes. 845 if err := cache.AddPod(tt.pod); err != nil { 846 t.Fatalf("AddPod failed: %v", err) 847 } 848 n := cache.nodes[nodeName] 849 if err := deepEqualWithoutGeneration(n, tt.wNodeInfo); err != nil { 850 t.Error(err) 851 } 852 for _, n := range tt.nodes { 853 cache.AddNode(n) 854 } 855 856 if err := cache.RemovePod(tt.pod); err != nil { 857 t.Fatalf("RemovePod failed: %v", err) 858 } 859 860 if _, err := cache.GetPod(tt.pod); err == nil { 861 t.Errorf("pod was not deleted") 862 } 863 864 // Node that owned the Pod should be at the head of the list. 865 if cache.headNode.info.Node().Name != nodeName { 866 t.Errorf("node %q is not at the head of the list", nodeName) 867 } 868 }) 869 } 870} 871 872func TestForgetPod(t *testing.T) { 873 nodeName := "node" 874 basePod := makeBasePod(t, nodeName, "test", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}) 875 pods := []*v1.Pod{basePod} 876 now := time.Now() 877 ttl := 10 * time.Second 878 879 cache := newSchedulerCache(ttl, time.Second, nil) 880 for _, pod := range pods { 881 if err := assumeAndFinishBinding(cache, pod, now); err != nil { 882 t.Fatalf("assumePod failed: %v", err) 883 } 884 isAssumed, err := cache.IsAssumedPod(pod) 885 if err != nil { 886 t.Fatalf("IsAssumedPod failed: %v.", err) 887 } 888 if !isAssumed { 889 t.Fatalf("Pod is expected to be assumed.") 890 } 891 assumedPod, err := cache.GetPod(pod) 892 if err != nil { 893 t.Fatalf("GetPod failed: %v.", err) 894 } 895 if assumedPod.Namespace != pod.Namespace { 896 t.Errorf("assumedPod.Namespace != pod.Namespace (%s != %s)", assumedPod.Namespace, pod.Namespace) 897 } 898 if assumedPod.Name != pod.Name { 899 t.Errorf("assumedPod.Name != pod.Name (%s != %s)", assumedPod.Name, pod.Name) 900 } 901 } 902 for _, pod := range pods { 903 if err := cache.ForgetPod(pod); err != nil { 904 t.Fatalf("ForgetPod failed: %v", err) 905 } 906 if err := isForgottenFromCache(pod, cache); err != nil { 907 t.Errorf("pod %q: %v", pod.Name, err) 908 } 909 } 910} 911 912// buildNodeInfo creates a NodeInfo by simulating node operations in cache. 913func buildNodeInfo(node *v1.Node, pods []*v1.Pod) *framework.NodeInfo { 914 expected := framework.NewNodeInfo() 915 expected.SetNode(node) 916 expected.Allocatable = framework.NewResource(node.Status.Allocatable) 917 expected.Generation++ 918 for _, pod := range pods { 919 expected.AddPod(pod) 920 } 921 return expected 922} 923 924// TestNodeOperators tests node operations of cache, including add, update 925// and remove. 926func TestNodeOperators(t *testing.T) { 927 // Test datas 928 nodeName := "test-node" 929 cpu1 := resource.MustParse("1000m") 930 mem100m := resource.MustParse("100m") 931 cpuHalf := resource.MustParse("500m") 932 mem50m := resource.MustParse("50m") 933 resourceFooName := "example.com/foo" 934 resourceFoo := resource.MustParse("1") 935 936 tests := []struct { 937 node *v1.Node 938 pods []*v1.Pod 939 }{ 940 { 941 node: &v1.Node{ 942 ObjectMeta: metav1.ObjectMeta{ 943 Name: nodeName, 944 }, 945 Status: v1.NodeStatus{ 946 Allocatable: v1.ResourceList{ 947 v1.ResourceCPU: cpu1, 948 v1.ResourceMemory: mem100m, 949 v1.ResourceName(resourceFooName): resourceFoo, 950 }, 951 }, 952 Spec: v1.NodeSpec{ 953 Taints: []v1.Taint{ 954 { 955 Key: "test-key", 956 Value: "test-value", 957 Effect: v1.TaintEffectPreferNoSchedule, 958 }, 959 }, 960 }, 961 }, 962 pods: []*v1.Pod{ 963 { 964 ObjectMeta: metav1.ObjectMeta{ 965 Name: "pod1", 966 UID: types.UID("pod1"), 967 }, 968 Spec: v1.PodSpec{ 969 NodeName: nodeName, 970 Containers: []v1.Container{ 971 { 972 Resources: v1.ResourceRequirements{ 973 Requests: v1.ResourceList{ 974 v1.ResourceCPU: cpuHalf, 975 v1.ResourceMemory: mem50m, 976 }, 977 }, 978 Ports: []v1.ContainerPort{ 979 { 980 Name: "http", 981 HostPort: 80, 982 ContainerPort: 80, 983 }, 984 }, 985 }, 986 }, 987 }, 988 }, 989 }, 990 }, 991 { 992 node: &v1.Node{ 993 ObjectMeta: metav1.ObjectMeta{ 994 Name: nodeName, 995 }, 996 Status: v1.NodeStatus{ 997 Allocatable: v1.ResourceList{ 998 v1.ResourceCPU: cpu1, 999 v1.ResourceMemory: mem100m, 1000 v1.ResourceName(resourceFooName): resourceFoo, 1001 }, 1002 }, 1003 Spec: v1.NodeSpec{ 1004 Taints: []v1.Taint{ 1005 { 1006 Key: "test-key", 1007 Value: "test-value", 1008 Effect: v1.TaintEffectPreferNoSchedule, 1009 }, 1010 }, 1011 }, 1012 }, 1013 pods: []*v1.Pod{ 1014 { 1015 ObjectMeta: metav1.ObjectMeta{ 1016 Name: "pod1", 1017 UID: types.UID("pod1"), 1018 }, 1019 Spec: v1.PodSpec{ 1020 NodeName: nodeName, 1021 Containers: []v1.Container{ 1022 { 1023 Resources: v1.ResourceRequirements{ 1024 Requests: v1.ResourceList{ 1025 v1.ResourceCPU: cpuHalf, 1026 v1.ResourceMemory: mem50m, 1027 }, 1028 }, 1029 }, 1030 }, 1031 }, 1032 }, 1033 { 1034 ObjectMeta: metav1.ObjectMeta{ 1035 Name: "pod2", 1036 UID: types.UID("pod2"), 1037 }, 1038 Spec: v1.PodSpec{ 1039 NodeName: nodeName, 1040 Containers: []v1.Container{ 1041 { 1042 Resources: v1.ResourceRequirements{ 1043 Requests: v1.ResourceList{ 1044 v1.ResourceCPU: cpuHalf, 1045 v1.ResourceMemory: mem50m, 1046 }, 1047 }, 1048 }, 1049 }, 1050 }, 1051 }, 1052 }, 1053 }, 1054 } 1055 1056 for i, test := range tests { 1057 t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { 1058 expected := buildNodeInfo(test.node, test.pods) 1059 node := test.node 1060 1061 cache := newSchedulerCache(time.Second, time.Second, nil) 1062 cache.AddNode(node) 1063 for _, pod := range test.pods { 1064 if err := cache.AddPod(pod); err != nil { 1065 t.Fatal(err) 1066 } 1067 } 1068 1069 // Step 1: the node was added into cache successfully. 1070 got, found := cache.nodes[node.Name] 1071 if !found { 1072 t.Errorf("Failed to find node %v in internalcache.", node.Name) 1073 } 1074 nodesList, err := cache.nodeTree.list() 1075 if err != nil { 1076 t.Fatal(err) 1077 } 1078 if cache.nodeTree.numNodes != 1 || nodesList[len(nodesList)-1] != node.Name { 1079 t.Errorf("cache.nodeTree is not updated correctly after adding node: %v", node.Name) 1080 } 1081 1082 // Generations are globally unique. We check in our unit tests that they are incremented correctly. 1083 expected.Generation = got.info.Generation 1084 if !reflect.DeepEqual(got.info, expected) { 1085 t.Errorf("Failed to add node into schedulercache:\n got: %+v \nexpected: %+v", got, expected) 1086 } 1087 1088 // Step 2: dump cached nodes successfully. 1089 cachedNodes := NewEmptySnapshot() 1090 if err := cache.UpdateSnapshot(cachedNodes); err != nil { 1091 t.Error(err) 1092 } 1093 newNode, found := cachedNodes.nodeInfoMap[node.Name] 1094 if !found || len(cachedNodes.nodeInfoMap) != 1 { 1095 t.Errorf("failed to dump cached nodes:\n got: %v \nexpected: %v", cachedNodes, cache.nodes) 1096 } 1097 expected.Generation = newNode.Generation 1098 if !reflect.DeepEqual(newNode, expected) { 1099 t.Errorf("Failed to clone node:\n got: %+v, \n expected: %+v", newNode, expected) 1100 } 1101 1102 // Step 3: update node attribute successfully. 1103 node.Status.Allocatable[v1.ResourceMemory] = mem50m 1104 expected.Allocatable.Memory = mem50m.Value() 1105 1106 cache.UpdateNode(nil, node) 1107 got, found = cache.nodes[node.Name] 1108 if !found { 1109 t.Errorf("Failed to find node %v in schedulertypes after UpdateNode.", node.Name) 1110 } 1111 if got.info.Generation <= expected.Generation { 1112 t.Errorf("Generation is not incremented. got: %v, expected: %v", got.info.Generation, expected.Generation) 1113 } 1114 expected.Generation = got.info.Generation 1115 1116 if !reflect.DeepEqual(got.info, expected) { 1117 t.Errorf("Failed to update node in schedulertypes:\n got: %+v \nexpected: %+v", got, expected) 1118 } 1119 // Check nodeTree after update 1120 nodesList, err = cache.nodeTree.list() 1121 if err != nil { 1122 t.Fatal(err) 1123 } 1124 if cache.nodeTree.numNodes != 1 || nodesList[len(nodesList)-1] != node.Name { 1125 t.Errorf("unexpected cache.nodeTree after updating node: %v", node.Name) 1126 } 1127 1128 // Step 4: the node can be removed even if it still has pods. 1129 if err := cache.RemoveNode(node); err != nil { 1130 t.Error(err) 1131 } 1132 if n, err := cache.getNodeInfo(node.Name); err != nil { 1133 t.Errorf("The node %v should still have a ghost entry: %v", node.Name, err) 1134 } else if n != nil { 1135 t.Errorf("The node object for %v should be nil", node.Name) 1136 } 1137 // Check node is removed from nodeTree as well. 1138 nodesList, err = cache.nodeTree.list() 1139 if err != nil { 1140 t.Fatal(err) 1141 } 1142 if cache.nodeTree.numNodes != 0 || len(nodesList) != 0 { 1143 t.Errorf("unexpected cache.nodeTree after removing node: %v", node.Name) 1144 } 1145 // Pods are still in the pods cache. 1146 for _, p := range test.pods { 1147 if _, err := cache.GetPod(p); err != nil { 1148 t.Error(err) 1149 } 1150 } 1151 1152 // Step 5: removing pods for the removed node still succeeds. 1153 for _, p := range test.pods { 1154 if err := cache.RemovePod(p); err != nil { 1155 t.Error(err) 1156 } 1157 if _, err := cache.GetPod(p); err == nil { 1158 t.Errorf("pod %q still in cache", p.Name) 1159 } 1160 } 1161 }) 1162 } 1163} 1164 1165func TestSchedulerCache_UpdateSnapshot(t *testing.T) { 1166 // Create a few nodes to be used in tests. 1167 nodes := []*v1.Node{} 1168 for i := 0; i < 10; i++ { 1169 node := &v1.Node{ 1170 ObjectMeta: metav1.ObjectMeta{ 1171 Name: fmt.Sprintf("test-node%v", i), 1172 }, 1173 Status: v1.NodeStatus{ 1174 Allocatable: v1.ResourceList{ 1175 v1.ResourceCPU: resource.MustParse("1000m"), 1176 v1.ResourceMemory: resource.MustParse("100m"), 1177 }, 1178 }, 1179 } 1180 nodes = append(nodes, node) 1181 } 1182 // Create a few nodes as updated versions of the above nodes 1183 updatedNodes := []*v1.Node{} 1184 for _, n := range nodes { 1185 updatedNode := n.DeepCopy() 1186 updatedNode.Status.Allocatable = v1.ResourceList{ 1187 v1.ResourceCPU: resource.MustParse("2000m"), 1188 v1.ResourceMemory: resource.MustParse("500m"), 1189 } 1190 updatedNodes = append(updatedNodes, updatedNode) 1191 } 1192 1193 // Create a few pods for tests. 1194 pods := []*v1.Pod{} 1195 for i := 0; i < 20; i++ { 1196 pod := &v1.Pod{ 1197 ObjectMeta: metav1.ObjectMeta{ 1198 Name: fmt.Sprintf("test-pod%v", i), 1199 Namespace: "test-ns", 1200 UID: types.UID(fmt.Sprintf("test-puid%v", i)), 1201 }, 1202 Spec: v1.PodSpec{ 1203 NodeName: fmt.Sprintf("test-node%v", i%10), 1204 }, 1205 } 1206 pods = append(pods, pod) 1207 } 1208 1209 // Create a few pods as updated versions of the above pods. 1210 updatedPods := []*v1.Pod{} 1211 for _, p := range pods { 1212 updatedPod := p.DeepCopy() 1213 priority := int32(1000) 1214 updatedPod.Spec.Priority = &priority 1215 updatedPods = append(updatedPods, updatedPod) 1216 } 1217 1218 // Add a couple of pods with affinity, on the first and seconds nodes. 1219 podsWithAffinity := []*v1.Pod{} 1220 for i := 0; i < 2; i++ { 1221 pod := &v1.Pod{ 1222 ObjectMeta: metav1.ObjectMeta{ 1223 Name: fmt.Sprintf("test-pod%v", i), 1224 Namespace: "test-ns", 1225 UID: types.UID(fmt.Sprintf("test-puid%v", i)), 1226 }, 1227 Spec: v1.PodSpec{ 1228 NodeName: fmt.Sprintf("test-node%v", i), 1229 Affinity: &v1.Affinity{ 1230 PodAffinity: &v1.PodAffinity{}, 1231 }, 1232 }, 1233 } 1234 podsWithAffinity = append(podsWithAffinity, pod) 1235 } 1236 1237 var cache *schedulerCache 1238 var snapshot *Snapshot 1239 type operation = func(t *testing.T) 1240 1241 addNode := func(i int) operation { 1242 return func(t *testing.T) { 1243 cache.AddNode(nodes[i]) 1244 } 1245 } 1246 removeNode := func(i int) operation { 1247 return func(t *testing.T) { 1248 if err := cache.RemoveNode(nodes[i]); err != nil { 1249 t.Error(err) 1250 } 1251 } 1252 } 1253 updateNode := func(i int) operation { 1254 return func(t *testing.T) { 1255 cache.UpdateNode(nodes[i], updatedNodes[i]) 1256 } 1257 } 1258 addPod := func(i int) operation { 1259 return func(t *testing.T) { 1260 if err := cache.AddPod(pods[i]); err != nil { 1261 t.Error(err) 1262 } 1263 } 1264 } 1265 addPodWithAffinity := func(i int) operation { 1266 return func(t *testing.T) { 1267 if err := cache.AddPod(podsWithAffinity[i]); err != nil { 1268 t.Error(err) 1269 } 1270 } 1271 } 1272 removePod := func(i int) operation { 1273 return func(t *testing.T) { 1274 if err := cache.RemovePod(pods[i]); err != nil { 1275 t.Error(err) 1276 } 1277 } 1278 } 1279 removePodWithAffinity := func(i int) operation { 1280 return func(t *testing.T) { 1281 if err := cache.RemovePod(podsWithAffinity[i]); err != nil { 1282 t.Error(err) 1283 } 1284 } 1285 } 1286 updatePod := func(i int) operation { 1287 return func(t *testing.T) { 1288 if err := cache.UpdatePod(pods[i], updatedPods[i]); err != nil { 1289 t.Error(err) 1290 } 1291 } 1292 } 1293 updateSnapshot := func() operation { 1294 return func(t *testing.T) { 1295 cache.UpdateSnapshot(snapshot) 1296 if err := compareCacheWithNodeInfoSnapshot(t, cache, snapshot); err != nil { 1297 t.Error(err) 1298 } 1299 } 1300 } 1301 1302 tests := []struct { 1303 name string 1304 operations []operation 1305 expected []*v1.Node 1306 expectedHavePodsWithAffinity int 1307 }{ 1308 { 1309 name: "Empty cache", 1310 operations: []operation{}, 1311 expected: []*v1.Node{}, 1312 }, 1313 { 1314 name: "Single node", 1315 operations: []operation{addNode(1)}, 1316 expected: []*v1.Node{nodes[1]}, 1317 }, 1318 { 1319 name: "Add node, remove it, add it again", 1320 operations: []operation{ 1321 addNode(1), updateSnapshot(), removeNode(1), addNode(1), 1322 }, 1323 expected: []*v1.Node{nodes[1]}, 1324 }, 1325 { 1326 name: "Add node and remove it in the same cycle, add it again", 1327 operations: []operation{ 1328 addNode(1), updateSnapshot(), addNode(2), removeNode(1), 1329 }, 1330 expected: []*v1.Node{nodes[2]}, 1331 }, 1332 { 1333 name: "Add a few nodes, and snapshot in the middle", 1334 operations: []operation{ 1335 addNode(0), updateSnapshot(), addNode(1), updateSnapshot(), addNode(2), 1336 updateSnapshot(), addNode(3), 1337 }, 1338 expected: []*v1.Node{nodes[3], nodes[2], nodes[1], nodes[0]}, 1339 }, 1340 { 1341 name: "Add a few nodes, and snapshot in the end", 1342 operations: []operation{ 1343 addNode(0), addNode(2), addNode(5), addNode(6), 1344 }, 1345 expected: []*v1.Node{nodes[6], nodes[5], nodes[2], nodes[0]}, 1346 }, 1347 { 1348 name: "Update some nodes", 1349 operations: []operation{ 1350 addNode(0), addNode(1), addNode(5), updateSnapshot(), updateNode(1), 1351 }, 1352 expected: []*v1.Node{nodes[1], nodes[5], nodes[0]}, 1353 }, 1354 { 1355 name: "Add a few nodes, and remove all of them", 1356 operations: []operation{ 1357 addNode(0), addNode(2), addNode(5), addNode(6), updateSnapshot(), 1358 removeNode(0), removeNode(2), removeNode(5), removeNode(6), 1359 }, 1360 expected: []*v1.Node{}, 1361 }, 1362 { 1363 name: "Add a few nodes, and remove some of them", 1364 operations: []operation{ 1365 addNode(0), addNode(2), addNode(5), addNode(6), updateSnapshot(), 1366 removeNode(0), removeNode(6), 1367 }, 1368 expected: []*v1.Node{nodes[5], nodes[2]}, 1369 }, 1370 { 1371 name: "Add a few nodes, remove all of them, and add more", 1372 operations: []operation{ 1373 addNode(2), addNode(5), addNode(6), updateSnapshot(), 1374 removeNode(2), removeNode(5), removeNode(6), updateSnapshot(), 1375 addNode(7), addNode(9), 1376 }, 1377 expected: []*v1.Node{nodes[9], nodes[7]}, 1378 }, 1379 { 1380 name: "Update nodes in particular order", 1381 operations: []operation{ 1382 addNode(8), updateNode(2), updateNode(8), updateSnapshot(), 1383 addNode(1), 1384 }, 1385 expected: []*v1.Node{nodes[1], nodes[8], nodes[2]}, 1386 }, 1387 { 1388 name: "Add some nodes and some pods", 1389 operations: []operation{ 1390 addNode(0), addNode(2), addNode(8), updateSnapshot(), 1391 addPod(8), addPod(2), 1392 }, 1393 expected: []*v1.Node{nodes[2], nodes[8], nodes[0]}, 1394 }, 1395 { 1396 name: "Updating a pod moves its node to the head", 1397 operations: []operation{ 1398 addNode(0), addPod(0), addNode(2), addNode(4), updatePod(0), 1399 }, 1400 expected: []*v1.Node{nodes[0], nodes[4], nodes[2]}, 1401 }, 1402 { 1403 name: "Add pod before its node", 1404 operations: []operation{ 1405 addNode(0), addPod(1), updatePod(1), addNode(1), 1406 }, 1407 expected: []*v1.Node{nodes[1], nodes[0]}, 1408 }, 1409 { 1410 name: "Remove node before its pods", 1411 operations: []operation{ 1412 addNode(0), addNode(1), addPod(1), addPod(11), updateSnapshot(), 1413 removeNode(1), updateSnapshot(), 1414 updatePod(1), updatePod(11), removePod(1), removePod(11), 1415 }, 1416 expected: []*v1.Node{nodes[0]}, 1417 }, 1418 { 1419 name: "Add Pods with affinity", 1420 operations: []operation{ 1421 addNode(0), addPodWithAffinity(0), updateSnapshot(), addNode(1), 1422 }, 1423 expected: []*v1.Node{nodes[1], nodes[0]}, 1424 expectedHavePodsWithAffinity: 1, 1425 }, 1426 { 1427 name: "Add multiple nodes with pods with affinity", 1428 operations: []operation{ 1429 addNode(0), addPodWithAffinity(0), updateSnapshot(), addNode(1), addPodWithAffinity(1), updateSnapshot(), 1430 }, 1431 expected: []*v1.Node{nodes[1], nodes[0]}, 1432 expectedHavePodsWithAffinity: 2, 1433 }, 1434 { 1435 name: "Add then Remove pods with affinity", 1436 operations: []operation{ 1437 addNode(0), addNode(1), addPodWithAffinity(0), updateSnapshot(), removePodWithAffinity(0), updateSnapshot(), 1438 }, 1439 expected: []*v1.Node{nodes[0], nodes[1]}, 1440 expectedHavePodsWithAffinity: 0, 1441 }, 1442 } 1443 1444 for _, test := range tests { 1445 t.Run(test.name, func(t *testing.T) { 1446 cache = newSchedulerCache(time.Second, time.Second, nil) 1447 snapshot = NewEmptySnapshot() 1448 1449 for _, op := range test.operations { 1450 op(t) 1451 } 1452 1453 if len(test.expected) != len(cache.nodes) { 1454 t.Errorf("unexpected number of nodes. Expected: %v, got: %v", len(test.expected), len(cache.nodes)) 1455 } 1456 var i int 1457 // Check that cache is in the expected state. 1458 for node := cache.headNode; node != nil; node = node.next { 1459 if node.info.Node() != nil && node.info.Node().Name != test.expected[i].Name { 1460 t.Errorf("unexpected node. Expected: %v, got: %v, index: %v", test.expected[i].Name, node.info.Node().Name, i) 1461 } 1462 i++ 1463 } 1464 // Make sure we visited all the cached nodes in the above for loop. 1465 if i != len(cache.nodes) { 1466 t.Errorf("Not all the nodes were visited by following the NodeInfo linked list. Expected to see %v nodes, saw %v.", len(cache.nodes), i) 1467 } 1468 1469 // Check number of nodes with pods with affinity 1470 if len(snapshot.havePodsWithAffinityNodeInfoList) != test.expectedHavePodsWithAffinity { 1471 t.Errorf("unexpected number of HavePodsWithAffinity nodes. Expected: %v, got: %v", test.expectedHavePodsWithAffinity, len(snapshot.havePodsWithAffinityNodeInfoList)) 1472 } 1473 1474 // Always update the snapshot at the end of operations and compare it. 1475 if err := cache.UpdateSnapshot(snapshot); err != nil { 1476 t.Error(err) 1477 } 1478 if err := compareCacheWithNodeInfoSnapshot(t, cache, snapshot); err != nil { 1479 t.Error(err) 1480 } 1481 }) 1482 } 1483} 1484 1485func compareCacheWithNodeInfoSnapshot(t *testing.T, cache *schedulerCache, snapshot *Snapshot) error { 1486 // Compare the map. 1487 if len(snapshot.nodeInfoMap) != cache.nodeTree.numNodes { 1488 return fmt.Errorf("unexpected number of nodes in the snapshot. Expected: %v, got: %v", cache.nodeTree.numNodes, len(snapshot.nodeInfoMap)) 1489 } 1490 for name, ni := range cache.nodes { 1491 want := ni.info 1492 if want.Node() == nil { 1493 want = nil 1494 } 1495 if !reflect.DeepEqual(snapshot.nodeInfoMap[name], want) { 1496 return fmt.Errorf("unexpected node info for node %q.Expected:\n%v, got:\n%v", name, ni.info, snapshot.nodeInfoMap[name]) 1497 } 1498 } 1499 1500 // Compare the lists. 1501 if len(snapshot.nodeInfoList) != cache.nodeTree.numNodes { 1502 return fmt.Errorf("unexpected number of nodes in NodeInfoList. Expected: %v, got: %v", cache.nodeTree.numNodes, len(snapshot.nodeInfoList)) 1503 } 1504 1505 expectedNodeInfoList := make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes) 1506 expectedHavePodsWithAffinityNodeInfoList := make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes) 1507 nodesList, err := cache.nodeTree.list() 1508 if err != nil { 1509 t.Fatal(err) 1510 } 1511 for _, nodeName := range nodesList { 1512 if n := snapshot.nodeInfoMap[nodeName]; n != nil { 1513 expectedNodeInfoList = append(expectedNodeInfoList, n) 1514 if len(n.PodsWithAffinity) > 0 { 1515 expectedHavePodsWithAffinityNodeInfoList = append(expectedHavePodsWithAffinityNodeInfoList, n) 1516 } 1517 } else { 1518 return fmt.Errorf("node %q exist in nodeTree but not in NodeInfoMap, this should not happen", nodeName) 1519 } 1520 } 1521 1522 for i, expected := range expectedNodeInfoList { 1523 got := snapshot.nodeInfoList[i] 1524 if expected != got { 1525 return fmt.Errorf("unexpected NodeInfo pointer in NodeInfoList. Expected: %p, got: %p", expected, got) 1526 } 1527 } 1528 1529 for i, expected := range expectedHavePodsWithAffinityNodeInfoList { 1530 got := snapshot.havePodsWithAffinityNodeInfoList[i] 1531 if expected != got { 1532 return fmt.Errorf("unexpected NodeInfo pointer in HavePodsWithAffinityNodeInfoList. Expected: %p, got: %p", expected, got) 1533 } 1534 } 1535 1536 return nil 1537} 1538 1539func TestSchedulerCache_updateNodeInfoSnapshotList(t *testing.T) { 1540 // Create a few nodes to be used in tests. 1541 nodes := []*v1.Node{} 1542 i := 0 1543 // List of number of nodes per zone, zone 0 -> 2, zone 1 -> 6 1544 for zone, nb := range []int{2, 6} { 1545 for j := 0; j < nb; j++ { 1546 nodes = append(nodes, &v1.Node{ 1547 ObjectMeta: metav1.ObjectMeta{ 1548 Name: fmt.Sprintf("node-%d", i), 1549 Labels: map[string]string{ 1550 v1.LabelTopologyRegion: fmt.Sprintf("region-%d", zone), 1551 v1.LabelTopologyZone: fmt.Sprintf("zone-%d", zone), 1552 }, 1553 }, 1554 }) 1555 i++ 1556 } 1557 } 1558 1559 var cache *schedulerCache 1560 var snapshot *Snapshot 1561 1562 addNode := func(t *testing.T, i int) { 1563 cache.AddNode(nodes[i]) 1564 _, ok := snapshot.nodeInfoMap[nodes[i].Name] 1565 if !ok { 1566 snapshot.nodeInfoMap[nodes[i].Name] = cache.nodes[nodes[i].Name].info 1567 } 1568 } 1569 1570 updateSnapshot := func(t *testing.T) { 1571 cache.updateNodeInfoSnapshotList(snapshot, true) 1572 if err := compareCacheWithNodeInfoSnapshot(t, cache, snapshot); err != nil { 1573 t.Error(err) 1574 } 1575 } 1576 1577 tests := []struct { 1578 name string 1579 operations func(t *testing.T) 1580 expected []string 1581 }{ 1582 { 1583 name: "Empty cache", 1584 operations: func(t *testing.T) {}, 1585 expected: []string{}, 1586 }, 1587 { 1588 name: "Single node", 1589 operations: func(t *testing.T) { 1590 addNode(t, 0) 1591 }, 1592 expected: []string{"node-0"}, 1593 }, 1594 { 1595 name: "Two nodes", 1596 operations: func(t *testing.T) { 1597 addNode(t, 0) 1598 updateSnapshot(t) 1599 addNode(t, 1) 1600 }, 1601 expected: []string{"node-0", "node-1"}, 1602 }, 1603 { 1604 name: "bug 91601, two nodes, update the snapshot and add two nodes in different zones", 1605 operations: func(t *testing.T) { 1606 addNode(t, 2) 1607 addNode(t, 3) 1608 updateSnapshot(t) 1609 addNode(t, 4) 1610 addNode(t, 0) 1611 }, 1612 expected: []string{"node-2", "node-0", "node-3", "node-4"}, 1613 }, 1614 { 1615 name: "bug 91601, 6 nodes, one in a different zone", 1616 operations: func(t *testing.T) { 1617 addNode(t, 2) 1618 addNode(t, 3) 1619 addNode(t, 4) 1620 addNode(t, 5) 1621 updateSnapshot(t) 1622 addNode(t, 6) 1623 addNode(t, 0) 1624 }, 1625 expected: []string{"node-2", "node-0", "node-3", "node-4", "node-5", "node-6"}, 1626 }, 1627 { 1628 name: "bug 91601, 7 nodes, two in a different zone", 1629 operations: func(t *testing.T) { 1630 addNode(t, 2) 1631 updateSnapshot(t) 1632 addNode(t, 3) 1633 addNode(t, 4) 1634 updateSnapshot(t) 1635 addNode(t, 5) 1636 addNode(t, 6) 1637 addNode(t, 0) 1638 addNode(t, 1) 1639 }, 1640 expected: []string{"node-2", "node-0", "node-3", "node-1", "node-4", "node-5", "node-6"}, 1641 }, 1642 { 1643 name: "bug 91601, 7 nodes, two in a different zone, different zone order", 1644 operations: func(t *testing.T) { 1645 addNode(t, 2) 1646 addNode(t, 1) 1647 updateSnapshot(t) 1648 addNode(t, 3) 1649 addNode(t, 4) 1650 updateSnapshot(t) 1651 addNode(t, 5) 1652 addNode(t, 6) 1653 addNode(t, 0) 1654 }, 1655 expected: []string{"node-2", "node-1", "node-3", "node-0", "node-4", "node-5", "node-6"}, 1656 }, 1657 } 1658 1659 for _, test := range tests { 1660 t.Run(test.name, func(t *testing.T) { 1661 cache = newSchedulerCache(time.Second, time.Second, nil) 1662 snapshot = NewEmptySnapshot() 1663 1664 test.operations(t) 1665 1666 // Always update the snapshot at the end of operations and compare it. 1667 cache.updateNodeInfoSnapshotList(snapshot, true) 1668 if err := compareCacheWithNodeInfoSnapshot(t, cache, snapshot); err != nil { 1669 t.Error(err) 1670 } 1671 nodeNames := make([]string, len(snapshot.nodeInfoList)) 1672 for i, nodeInfo := range snapshot.nodeInfoList { 1673 nodeNames[i] = nodeInfo.Node().Name 1674 } 1675 if !reflect.DeepEqual(nodeNames, test.expected) { 1676 t.Errorf("The nodeInfoList is incorrect. Expected %v , got %v", test.expected, nodeNames) 1677 } 1678 }) 1679 } 1680} 1681 1682func BenchmarkUpdate1kNodes30kPods(b *testing.B) { 1683 cache := setupCacheOf1kNodes30kPods(b) 1684 b.ResetTimer() 1685 for n := 0; n < b.N; n++ { 1686 cachedNodes := NewEmptySnapshot() 1687 cache.UpdateSnapshot(cachedNodes) 1688 } 1689} 1690 1691func BenchmarkExpirePods(b *testing.B) { 1692 podNums := []int{ 1693 100, 1694 1000, 1695 10000, 1696 } 1697 for _, podNum := range podNums { 1698 name := fmt.Sprintf("%dPods", podNum) 1699 b.Run(name, func(b *testing.B) { 1700 benchmarkExpire(b, podNum) 1701 }) 1702 } 1703} 1704 1705func benchmarkExpire(b *testing.B, podNum int) { 1706 now := time.Now() 1707 for n := 0; n < b.N; n++ { 1708 b.StopTimer() 1709 cache := setupCacheWithAssumedPods(b, podNum, now) 1710 b.StartTimer() 1711 cache.cleanupAssumedPods(now.Add(2 * time.Second)) 1712 } 1713} 1714 1715type testingMode interface { 1716 Fatalf(format string, args ...interface{}) 1717} 1718 1719func makeBasePod(t testingMode, nodeName, objName, cpu, mem, extended string, ports []v1.ContainerPort) *v1.Pod { 1720 req := v1.ResourceList{} 1721 if cpu != "" { 1722 req = v1.ResourceList{ 1723 v1.ResourceCPU: resource.MustParse(cpu), 1724 v1.ResourceMemory: resource.MustParse(mem), 1725 } 1726 if extended != "" { 1727 parts := strings.Split(extended, ":") 1728 if len(parts) != 2 { 1729 t.Fatalf("Invalid extended resource string: \"%s\"", extended) 1730 } 1731 req[v1.ResourceName(parts[0])] = resource.MustParse(parts[1]) 1732 } 1733 } 1734 return &v1.Pod{ 1735 ObjectMeta: metav1.ObjectMeta{ 1736 UID: types.UID(objName), 1737 Namespace: "node_info_cache_test", 1738 Name: objName, 1739 }, 1740 Spec: v1.PodSpec{ 1741 Containers: []v1.Container{{ 1742 Resources: v1.ResourceRequirements{ 1743 Requests: req, 1744 }, 1745 Ports: ports, 1746 }}, 1747 NodeName: nodeName, 1748 }, 1749 } 1750} 1751 1752func setupCacheOf1kNodes30kPods(b *testing.B) Cache { 1753 cache := newSchedulerCache(time.Second, time.Second, nil) 1754 for i := 0; i < 1000; i++ { 1755 nodeName := fmt.Sprintf("node-%d", i) 1756 for j := 0; j < 30; j++ { 1757 objName := fmt.Sprintf("%s-pod-%d", nodeName, j) 1758 pod := makeBasePod(b, nodeName, objName, "0", "0", "", nil) 1759 1760 if err := cache.AddPod(pod); err != nil { 1761 b.Fatalf("AddPod failed: %v", err) 1762 } 1763 } 1764 } 1765 return cache 1766} 1767 1768func setupCacheWithAssumedPods(b *testing.B, podNum int, assumedTime time.Time) *schedulerCache { 1769 cache := newSchedulerCache(time.Second, time.Second, nil) 1770 for i := 0; i < podNum; i++ { 1771 nodeName := fmt.Sprintf("node-%d", i/10) 1772 objName := fmt.Sprintf("%s-pod-%d", nodeName, i%10) 1773 pod := makeBasePod(b, nodeName, objName, "0", "0", "", nil) 1774 1775 err := assumeAndFinishBinding(cache, pod, assumedTime) 1776 if err != nil { 1777 b.Fatalf("assumePod failed: %v", err) 1778 } 1779 } 1780 return cache 1781} 1782 1783func isForgottenFromCache(p *v1.Pod, c *schedulerCache) error { 1784 if assumed, err := c.IsAssumedPod(p); err != nil { 1785 return err 1786 } else if assumed { 1787 return errors.New("still assumed") 1788 } 1789 if _, err := c.GetPod(p); err == nil { 1790 return errors.New("still in cache") 1791 } 1792 return nil 1793} 1794 1795// getNodeInfo returns cached data for the node name. 1796func (cache *schedulerCache) getNodeInfo(nodeName string) (*v1.Node, error) { 1797 cache.mu.RLock() 1798 defer cache.mu.RUnlock() 1799 1800 n, ok := cache.nodes[nodeName] 1801 if !ok { 1802 return nil, fmt.Errorf("node %q not found in cache", nodeName) 1803 } 1804 1805 return n.info.Node(), nil 1806} 1807