1/* 2Copyright 2017 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17// This file tests preemption functionality of the scheduler. 18 19package scheduler 20 21import ( 22 "context" 23 "fmt" 24 "testing" 25 "time" 26 27 v1 "k8s.io/api/core/v1" 28 policy "k8s.io/api/policy/v1beta1" 29 "k8s.io/apimachinery/pkg/api/resource" 30 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 31 "k8s.io/apimachinery/pkg/runtime" 32 "k8s.io/apimachinery/pkg/runtime/schema" 33 "k8s.io/apimachinery/pkg/types" 34 "k8s.io/apimachinery/pkg/util/intstr" 35 "k8s.io/apimachinery/pkg/util/wait" 36 utilfeature "k8s.io/apiserver/pkg/util/feature" 37 "k8s.io/client-go/informers" 38 "k8s.io/client-go/kubernetes" 39 clientset "k8s.io/client-go/kubernetes" 40 restclient "k8s.io/client-go/rest" 41 featuregatetesting "k8s.io/component-base/featuregate/testing" 42 "k8s.io/klog/v2" 43 "k8s.io/kube-scheduler/config/v1beta2" 44 podutil "k8s.io/kubernetes/pkg/api/v1/pod" 45 "k8s.io/kubernetes/pkg/apis/scheduling" 46 "k8s.io/kubernetes/pkg/features" 47 "k8s.io/kubernetes/pkg/scheduler" 48 configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing" 49 framework "k8s.io/kubernetes/pkg/scheduler/framework" 50 frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" 51 st "k8s.io/kubernetes/pkg/scheduler/testing" 52 "k8s.io/kubernetes/plugin/pkg/admission/priority" 53 testutils "k8s.io/kubernetes/test/integration/util" 54 "k8s.io/utils/pointer" 55) 56 57var lowPriority, mediumPriority, highPriority = int32(100), int32(200), int32(300) 58 59func waitForNominatedNodeNameWithTimeout(cs clientset.Interface, pod *v1.Pod, timeout time.Duration) error { 60 if err := wait.Poll(100*time.Millisecond, timeout, func() (bool, error) { 61 pod, err := cs.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}) 62 if err != nil { 63 return false, err 64 } 65 if len(pod.Status.NominatedNodeName) > 0 { 66 return true, nil 67 } 68 return false, err 69 }); err != nil { 70 return fmt.Errorf(".status.nominatedNodeName of Pod %v/%v did not get set: %v", pod.Namespace, pod.Name, err) 71 } 72 return nil 73} 74 75func waitForNominatedNodeName(cs clientset.Interface, pod *v1.Pod) error { 76 return waitForNominatedNodeNameWithTimeout(cs, pod, wait.ForeverTestTimeout) 77} 78 79const tokenFilterName = "token-filter" 80 81type tokenFilter struct { 82 Tokens int 83 Unresolvable bool 84} 85 86// Name returns name of the plugin. 87func (fp *tokenFilter) Name() string { 88 return tokenFilterName 89} 90 91func (fp *tokenFilter) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, 92 nodeInfo *framework.NodeInfo) *framework.Status { 93 if fp.Tokens > 0 { 94 fp.Tokens-- 95 return nil 96 } 97 status := framework.Unschedulable 98 if fp.Unresolvable { 99 status = framework.UnschedulableAndUnresolvable 100 } 101 return framework.NewStatus(status, fmt.Sprintf("can't fit %v", pod.Name)) 102} 103 104func (fp *tokenFilter) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) *framework.Status { 105 return nil 106} 107 108func (fp *tokenFilter) AddPod(ctx context.Context, state *framework.CycleState, podToSchedule *v1.Pod, 109 podInfoToAdd *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status { 110 fp.Tokens-- 111 return nil 112} 113 114func (fp *tokenFilter) RemovePod(ctx context.Context, state *framework.CycleState, podToSchedule *v1.Pod, 115 podInfoToRemove *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status { 116 fp.Tokens++ 117 return nil 118} 119 120func (fp *tokenFilter) PreFilterExtensions() framework.PreFilterExtensions { 121 return fp 122} 123 124var _ framework.FilterPlugin = &tokenFilter{} 125 126// TestPreemption tests a few preemption scenarios. 127func TestPreemption(t *testing.T) { 128 // Initialize scheduler with a filter plugin. 129 var filter tokenFilter 130 registry := make(frameworkruntime.Registry) 131 err := registry.Register(filterPluginName, func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) { 132 return &filter, nil 133 }) 134 if err != nil { 135 t.Fatalf("Error registering a filter: %v", err) 136 } 137 cfg := configtesting.V1beta2ToInternalWithDefaults(t, v1beta2.KubeSchedulerConfiguration{ 138 Profiles: []v1beta2.KubeSchedulerProfile{{ 139 SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName), 140 Plugins: &v1beta2.Plugins{ 141 Filter: v1beta2.PluginSet{ 142 Enabled: []v1beta2.Plugin{ 143 {Name: filterPluginName}, 144 }, 145 }, 146 PreFilter: v1beta2.PluginSet{ 147 Enabled: []v1beta2.Plugin{ 148 {Name: filterPluginName}, 149 }, 150 }, 151 }, 152 }}, 153 }) 154 155 testCtx := testutils.InitTestSchedulerWithOptions(t, 156 testutils.InitTestAPIServer(t, "preemption", nil), 157 nil, 158 scheduler.WithProfiles(cfg.Profiles...), 159 scheduler.WithFrameworkOutOfTreeRegistry(registry)) 160 testutils.SyncInformerFactory(testCtx) 161 go testCtx.Scheduler.Run(testCtx.Ctx) 162 163 defer testutils.CleanupTest(t, testCtx) 164 cs := testCtx.ClientSet 165 166 defaultPodRes := &v1.ResourceRequirements{Requests: v1.ResourceList{ 167 v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI), 168 v1.ResourceMemory: *resource.NewQuantity(100, resource.DecimalSI)}, 169 } 170 171 maxTokens := 1000 172 tests := []struct { 173 name string 174 existingPods []*v1.Pod 175 pod *v1.Pod 176 initTokens int 177 unresolvable bool 178 preemptedPodIndexes map[int]struct{} 179 }{ 180 { 181 name: "basic pod preemption", 182 initTokens: maxTokens, 183 existingPods: []*v1.Pod{ 184 initPausePod(&pausePodConfig{ 185 Name: "victim-pod", 186 Namespace: testCtx.NS.Name, 187 Priority: &lowPriority, 188 Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ 189 v1.ResourceCPU: *resource.NewMilliQuantity(400, resource.DecimalSI), 190 v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, 191 }, 192 }), 193 }, 194 pod: initPausePod(&pausePodConfig{ 195 Name: "preemptor-pod", 196 Namespace: testCtx.NS.Name, 197 Priority: &highPriority, 198 Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ 199 v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI), 200 v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, 201 }, 202 }), 203 preemptedPodIndexes: map[int]struct{}{0: {}}, 204 }, 205 { 206 name: "basic pod preemption with filter", 207 initTokens: 1, 208 existingPods: []*v1.Pod{ 209 initPausePod(&pausePodConfig{ 210 Name: "victim-pod", 211 Namespace: testCtx.NS.Name, 212 Priority: &lowPriority, 213 Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ 214 v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI), 215 v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, 216 }, 217 }), 218 }, 219 pod: initPausePod(&pausePodConfig{ 220 Name: "preemptor-pod", 221 Namespace: testCtx.NS.Name, 222 Priority: &highPriority, 223 Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ 224 v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI), 225 v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, 226 }, 227 }), 228 preemptedPodIndexes: map[int]struct{}{0: {}}, 229 }, 230 { 231 // same as the previous test, but the filter is unresolvable. 232 name: "basic pod preemption with unresolvable filter", 233 initTokens: 1, 234 unresolvable: true, 235 existingPods: []*v1.Pod{ 236 initPausePod(&pausePodConfig{ 237 Name: "victim-pod", 238 Namespace: testCtx.NS.Name, 239 Priority: &lowPriority, 240 Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ 241 v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI), 242 v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, 243 }, 244 }), 245 }, 246 pod: initPausePod(&pausePodConfig{ 247 Name: "preemptor-pod", 248 Namespace: testCtx.NS.Name, 249 Priority: &highPriority, 250 Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ 251 v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI), 252 v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, 253 }, 254 }), 255 preemptedPodIndexes: map[int]struct{}{}, 256 }, 257 { 258 name: "preemption is performed to satisfy anti-affinity", 259 initTokens: maxTokens, 260 existingPods: []*v1.Pod{ 261 initPausePod(&pausePodConfig{ 262 Name: "pod-0", Namespace: testCtx.NS.Name, 263 Priority: &mediumPriority, 264 Labels: map[string]string{"pod": "p0"}, 265 Resources: defaultPodRes, 266 }), 267 initPausePod(&pausePodConfig{ 268 Name: "pod-1", Namespace: testCtx.NS.Name, 269 Priority: &lowPriority, 270 Labels: map[string]string{"pod": "p1"}, 271 Resources: defaultPodRes, 272 Affinity: &v1.Affinity{ 273 PodAntiAffinity: &v1.PodAntiAffinity{ 274 RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{ 275 { 276 LabelSelector: &metav1.LabelSelector{ 277 MatchExpressions: []metav1.LabelSelectorRequirement{ 278 { 279 Key: "pod", 280 Operator: metav1.LabelSelectorOpIn, 281 Values: []string{"preemptor"}, 282 }, 283 }, 284 }, 285 TopologyKey: "node", 286 }, 287 }, 288 }, 289 }, 290 }), 291 }, 292 // A higher priority pod with anti-affinity. 293 pod: initPausePod(&pausePodConfig{ 294 Name: "preemptor-pod", 295 Namespace: testCtx.NS.Name, 296 Priority: &highPriority, 297 Labels: map[string]string{"pod": "preemptor"}, 298 Resources: defaultPodRes, 299 Affinity: &v1.Affinity{ 300 PodAntiAffinity: &v1.PodAntiAffinity{ 301 RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{ 302 { 303 LabelSelector: &metav1.LabelSelector{ 304 MatchExpressions: []metav1.LabelSelectorRequirement{ 305 { 306 Key: "pod", 307 Operator: metav1.LabelSelectorOpIn, 308 Values: []string{"p0"}, 309 }, 310 }, 311 }, 312 TopologyKey: "node", 313 }, 314 }, 315 }, 316 }, 317 }), 318 preemptedPodIndexes: map[int]struct{}{0: {}, 1: {}}, 319 }, 320 { 321 // This is similar to the previous case only pod-1 is high priority. 322 name: "preemption is not performed when anti-affinity is not satisfied", 323 initTokens: maxTokens, 324 existingPods: []*v1.Pod{ 325 initPausePod(&pausePodConfig{ 326 Name: "pod-0", Namespace: testCtx.NS.Name, 327 Priority: &mediumPriority, 328 Labels: map[string]string{"pod": "p0"}, 329 Resources: defaultPodRes, 330 }), 331 initPausePod(&pausePodConfig{ 332 Name: "pod-1", Namespace: testCtx.NS.Name, 333 Priority: &highPriority, 334 Labels: map[string]string{"pod": "p1"}, 335 Resources: defaultPodRes, 336 Affinity: &v1.Affinity{ 337 PodAntiAffinity: &v1.PodAntiAffinity{ 338 RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{ 339 { 340 LabelSelector: &metav1.LabelSelector{ 341 MatchExpressions: []metav1.LabelSelectorRequirement{ 342 { 343 Key: "pod", 344 Operator: metav1.LabelSelectorOpIn, 345 Values: []string{"preemptor"}, 346 }, 347 }, 348 }, 349 TopologyKey: "node", 350 }, 351 }, 352 }, 353 }, 354 }), 355 }, 356 // A higher priority pod with anti-affinity. 357 pod: initPausePod(&pausePodConfig{ 358 Name: "preemptor-pod", 359 Namespace: testCtx.NS.Name, 360 Priority: &highPriority, 361 Labels: map[string]string{"pod": "preemptor"}, 362 Resources: defaultPodRes, 363 Affinity: &v1.Affinity{ 364 PodAntiAffinity: &v1.PodAntiAffinity{ 365 RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{ 366 { 367 LabelSelector: &metav1.LabelSelector{ 368 MatchExpressions: []metav1.LabelSelectorRequirement{ 369 { 370 Key: "pod", 371 Operator: metav1.LabelSelectorOpIn, 372 Values: []string{"p0"}, 373 }, 374 }, 375 }, 376 TopologyKey: "node", 377 }, 378 }, 379 }, 380 }, 381 }), 382 preemptedPodIndexes: map[int]struct{}{}, 383 }, 384 } 385 386 // Create a node with some resources and a label. 387 nodeRes := map[v1.ResourceName]string{ 388 v1.ResourcePods: "32", 389 v1.ResourceCPU: "500m", 390 v1.ResourceMemory: "500", 391 } 392 nodeObject := st.MakeNode().Name("node1").Capacity(nodeRes).Label("node", "node1").Obj() 393 if _, err := createNode(testCtx.ClientSet, nodeObject); err != nil { 394 t.Fatalf("Error creating node: %v", err) 395 } 396 397 for _, test := range tests { 398 t.Run(test.name, func(t *testing.T) { 399 filter.Tokens = test.initTokens 400 filter.Unresolvable = test.unresolvable 401 pods := make([]*v1.Pod, len(test.existingPods)) 402 // Create and run existingPods. 403 for i, p := range test.existingPods { 404 pods[i], err = runPausePod(cs, p) 405 if err != nil { 406 t.Fatalf("Error running pause pod: %v", err) 407 } 408 } 409 // Create the "pod". 410 preemptor, err := createPausePod(cs, test.pod) 411 if err != nil { 412 t.Errorf("Error while creating high priority pod: %v", err) 413 } 414 // Wait for preemption of pods and make sure the other ones are not preempted. 415 for i, p := range pods { 416 if _, found := test.preemptedPodIndexes[i]; found { 417 if err = wait.Poll(time.Second, wait.ForeverTestTimeout, podIsGettingEvicted(cs, p.Namespace, p.Name)); err != nil { 418 t.Errorf("Pod %v/%v is not getting evicted.", p.Namespace, p.Name) 419 } 420 } else { 421 if p.DeletionTimestamp != nil { 422 t.Errorf("Didn't expect pod %v to get preempted.", p.Name) 423 } 424 } 425 } 426 // Also check that the preemptor pod gets the NominatedNodeName field set. 427 if len(test.preemptedPodIndexes) > 0 { 428 if err := waitForNominatedNodeName(cs, preemptor); err != nil { 429 t.Errorf("NominatedNodeName field was not set for pod %v: %v", preemptor.Name, err) 430 } 431 } 432 433 // Cleanup 434 pods = append(pods, preemptor) 435 testutils.CleanupPods(cs, t, pods) 436 }) 437 } 438} 439 440// TestNonPreemption tests NonPreempt option of PriorityClass of scheduler works as expected. 441func TestNonPreemption(t *testing.T) { 442 defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.NonPreemptingPriority, true)() 443 444 var preemptNever = v1.PreemptNever 445 // Initialize scheduler. 446 testCtx := initTest(t, "non-preemption") 447 defer testutils.CleanupTest(t, testCtx) 448 cs := testCtx.ClientSet 449 tests := []struct { 450 name string 451 PreemptionPolicy *v1.PreemptionPolicy 452 }{ 453 { 454 name: "pod preemption will happen", 455 PreemptionPolicy: nil, 456 }, 457 { 458 name: "pod preemption will not happen", 459 PreemptionPolicy: &preemptNever, 460 }, 461 } 462 victim := initPausePod(&pausePodConfig{ 463 Name: "victim-pod", 464 Namespace: testCtx.NS.Name, 465 Priority: &lowPriority, 466 Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ 467 v1.ResourceCPU: *resource.NewMilliQuantity(400, resource.DecimalSI), 468 v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, 469 }, 470 }) 471 472 preemptor := initPausePod(&pausePodConfig{ 473 Name: "preemptor-pod", 474 Namespace: testCtx.NS.Name, 475 Priority: &highPriority, 476 Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ 477 v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI), 478 v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, 479 }, 480 }) 481 482 // Create a node with some resources 483 nodeRes := map[v1.ResourceName]string{ 484 v1.ResourcePods: "32", 485 v1.ResourceCPU: "500m", 486 v1.ResourceMemory: "500", 487 } 488 _, err := createNode(testCtx.ClientSet, st.MakeNode().Name("node1").Capacity(nodeRes).Obj()) 489 if err != nil { 490 t.Fatalf("Error creating nodes: %v", err) 491 } 492 for _, test := range tests { 493 t.Run(test.name, func(t *testing.T) { 494 defer testutils.CleanupPods(cs, t, []*v1.Pod{preemptor, victim}) 495 preemptor.Spec.PreemptionPolicy = test.PreemptionPolicy 496 victimPod, err := createPausePod(cs, victim) 497 if err != nil { 498 t.Fatalf("Error while creating victim: %v", err) 499 } 500 if err := waitForPodToScheduleWithTimeout(cs, victimPod, 5*time.Second); err != nil { 501 t.Fatalf("victim %v should be become scheduled", victimPod.Name) 502 } 503 504 preemptorPod, err := createPausePod(cs, preemptor) 505 if err != nil { 506 t.Fatalf("Error while creating preemptor: %v", err) 507 } 508 509 err = waitForNominatedNodeNameWithTimeout(cs, preemptorPod, 5*time.Second) 510 // test.PreemptionPolicy == nil means we expect the preemptor to be nominated. 511 expect := test.PreemptionPolicy == nil 512 // err == nil indicates the preemptor is indeed nominated. 513 got := err == nil 514 if got != expect { 515 t.Errorf("Expect preemptor to be nominated=%v, but got=%v", expect, got) 516 } 517 }) 518 } 519} 520 521// TestDisablePreemption tests disable pod preemption of scheduler works as expected. 522func TestDisablePreemption(t *testing.T) { 523 // Initialize scheduler, and disable preemption. 524 testCtx := initTestDisablePreemption(t, "disable-preemption") 525 defer testutils.CleanupTest(t, testCtx) 526 cs := testCtx.ClientSet 527 528 tests := []struct { 529 name string 530 existingPods []*v1.Pod 531 pod *v1.Pod 532 }{ 533 { 534 name: "pod preemption will not happen", 535 existingPods: []*v1.Pod{ 536 initPausePod(&pausePodConfig{ 537 Name: "victim-pod", 538 Namespace: testCtx.NS.Name, 539 Priority: &lowPriority, 540 Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ 541 v1.ResourceCPU: *resource.NewMilliQuantity(400, resource.DecimalSI), 542 v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, 543 }, 544 }), 545 }, 546 pod: initPausePod(&pausePodConfig{ 547 Name: "preemptor-pod", 548 Namespace: testCtx.NS.Name, 549 Priority: &highPriority, 550 Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ 551 v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI), 552 v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, 553 }, 554 }), 555 }, 556 } 557 558 // Create a node with some resources 559 nodeRes := map[v1.ResourceName]string{ 560 v1.ResourcePods: "32", 561 v1.ResourceCPU: "500m", 562 v1.ResourceMemory: "500", 563 } 564 _, err := createNode(testCtx.ClientSet, st.MakeNode().Name("node1").Capacity(nodeRes).Obj()) 565 if err != nil { 566 t.Fatalf("Error creating nodes: %v", err) 567 } 568 569 for _, test := range tests { 570 t.Run(test.name, func(t *testing.T) { 571 pods := make([]*v1.Pod, len(test.existingPods)) 572 // Create and run existingPods. 573 for i, p := range test.existingPods { 574 pods[i], err = runPausePod(cs, p) 575 if err != nil { 576 t.Fatalf("Test [%v]: Error running pause pod: %v", test.name, err) 577 } 578 } 579 // Create the "pod". 580 preemptor, err := createPausePod(cs, test.pod) 581 if err != nil { 582 t.Errorf("Error while creating high priority pod: %v", err) 583 } 584 // Ensure preemptor should keep unschedulable. 585 if err := waitForPodUnschedulable(cs, preemptor); err != nil { 586 t.Errorf("Preemptor %v should not become scheduled", preemptor.Name) 587 } 588 589 // Ensure preemptor should not be nominated. 590 if err := waitForNominatedNodeNameWithTimeout(cs, preemptor, 5*time.Second); err == nil { 591 t.Errorf("Preemptor %v should not be nominated", preemptor.Name) 592 } 593 594 // Cleanup 595 pods = append(pods, preemptor) 596 testutils.CleanupPods(cs, t, pods) 597 }) 598 } 599} 600 601// This test verifies that system critical priorities are created automatically and resolved properly. 602func TestPodPriorityResolution(t *testing.T) { 603 admission := priority.NewPlugin() 604 testCtx := testutils.InitTestScheduler(t, testutils.InitTestAPIServer(t, "preemption", admission), nil) 605 defer testutils.CleanupTest(t, testCtx) 606 cs := testCtx.ClientSet 607 608 // Build clientset and informers for controllers. 609 externalClientset := kubernetes.NewForConfigOrDie(&restclient.Config{ 610 QPS: -1, 611 Host: testCtx.HTTPServer.URL, 612 ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) 613 externalInformers := informers.NewSharedInformerFactory(externalClientset, time.Second) 614 admission.SetExternalKubeClientSet(externalClientset) 615 admission.SetExternalKubeInformerFactory(externalInformers) 616 617 // Waiting for all controllers to sync 618 testutils.SyncInformerFactory(testCtx) 619 externalInformers.Start(testCtx.Ctx.Done()) 620 externalInformers.WaitForCacheSync(testCtx.Ctx.Done()) 621 622 // Run all controllers 623 go testCtx.Scheduler.Run(testCtx.Ctx) 624 625 tests := []struct { 626 Name string 627 PriorityClass string 628 Pod *v1.Pod 629 ExpectedPriority int32 630 ExpectedError error 631 }{ 632 { 633 Name: "SystemNodeCritical priority class", 634 PriorityClass: scheduling.SystemNodeCritical, 635 ExpectedPriority: scheduling.SystemCriticalPriority + 1000, 636 Pod: initPausePod(&pausePodConfig{ 637 Name: fmt.Sprintf("pod1-%v", scheduling.SystemNodeCritical), 638 Namespace: metav1.NamespaceSystem, 639 PriorityClassName: scheduling.SystemNodeCritical, 640 }), 641 }, 642 { 643 Name: "SystemClusterCritical priority class", 644 PriorityClass: scheduling.SystemClusterCritical, 645 ExpectedPriority: scheduling.SystemCriticalPriority, 646 Pod: initPausePod(&pausePodConfig{ 647 Name: fmt.Sprintf("pod2-%v", scheduling.SystemClusterCritical), 648 Namespace: metav1.NamespaceSystem, 649 PriorityClassName: scheduling.SystemClusterCritical, 650 }), 651 }, 652 { 653 Name: "Invalid priority class should result in error", 654 PriorityClass: "foo", 655 ExpectedPriority: scheduling.SystemCriticalPriority, 656 Pod: initPausePod(&pausePodConfig{ 657 Name: fmt.Sprintf("pod3-%v", scheduling.SystemClusterCritical), 658 Namespace: metav1.NamespaceSystem, 659 PriorityClassName: "foo", 660 }), 661 ExpectedError: fmt.Errorf("failed to create pause pod: pods \"pod3-system-cluster-critical\" is forbidden: no PriorityClass with name foo was found"), 662 }, 663 } 664 665 // Create a node with some resources 666 nodeRes := map[v1.ResourceName]string{ 667 v1.ResourcePods: "32", 668 v1.ResourceCPU: "500m", 669 v1.ResourceMemory: "500", 670 } 671 _, err := createNode(testCtx.ClientSet, st.MakeNode().Name("node1").Capacity(nodeRes).Obj()) 672 if err != nil { 673 t.Fatalf("Error creating nodes: %v", err) 674 } 675 676 pods := make([]*v1.Pod, 0, len(tests)) 677 for _, test := range tests { 678 t.Run(test.Name, func(t *testing.T) { 679 t.Run(test.Name, func(t *testing.T) { 680 pod, err := runPausePod(cs, test.Pod) 681 if err != nil { 682 if test.ExpectedError == nil { 683 t.Fatalf("Test [PodPriority/%v]: Error running pause pod: %v", test.PriorityClass, err) 684 } 685 if err.Error() != test.ExpectedError.Error() { 686 t.Fatalf("Test [PodPriority/%v]: Expected error %v but got error %v", test.PriorityClass, test.ExpectedError, err) 687 } 688 return 689 } 690 pods = append(pods, pod) 691 if pod.Spec.Priority != nil { 692 if *pod.Spec.Priority != test.ExpectedPriority { 693 t.Errorf("Expected pod %v to have priority %v but was %v", pod.Name, test.ExpectedPriority, pod.Spec.Priority) 694 } 695 } else { 696 t.Errorf("Expected pod %v to have priority %v but was nil", pod.Name, test.PriorityClass) 697 } 698 }) 699 }) 700 } 701 testutils.CleanupPods(cs, t, pods) 702 testutils.CleanupNodes(cs, t) 703} 704 705func mkPriorityPodWithGrace(tc *testutils.TestContext, name string, priority int32, grace int64) *v1.Pod { 706 defaultPodRes := &v1.ResourceRequirements{Requests: v1.ResourceList{ 707 v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI), 708 v1.ResourceMemory: *resource.NewQuantity(100, resource.DecimalSI)}, 709 } 710 pod := initPausePod(&pausePodConfig{ 711 Name: name, 712 Namespace: tc.NS.Name, 713 Priority: &priority, 714 Labels: map[string]string{"pod": name}, 715 Resources: defaultPodRes, 716 }) 717 pod.Spec.TerminationGracePeriodSeconds = &grace 718 return pod 719} 720 721// This test ensures that while the preempting pod is waiting for the victims to 722// terminate, other pending lower priority pods are not scheduled in the room created 723// after preemption and while the higher priority pods is not scheduled yet. 724func TestPreemptionStarvation(t *testing.T) { 725 // Initialize scheduler. 726 testCtx := initTest(t, "preemption") 727 defer testutils.CleanupTest(t, testCtx) 728 cs := testCtx.ClientSet 729 730 tests := []struct { 731 name string 732 numExistingPod int 733 numExpectedPending int 734 preemptor *v1.Pod 735 }{ 736 { 737 // This test ensures that while the preempting pod is waiting for the victims 738 // terminate, other lower priority pods are not scheduled in the room created 739 // after preemption and while the higher priority pods is not scheduled yet. 740 name: "starvation test: higher priority pod is scheduled before the lower priority ones", 741 numExistingPod: 10, 742 numExpectedPending: 5, 743 preemptor: initPausePod(&pausePodConfig{ 744 Name: "preemptor-pod", 745 Namespace: testCtx.NS.Name, 746 Priority: &highPriority, 747 Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ 748 v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI), 749 v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, 750 }, 751 }), 752 }, 753 } 754 755 // Create a node with some resources 756 nodeRes := map[v1.ResourceName]string{ 757 v1.ResourcePods: "32", 758 v1.ResourceCPU: "500m", 759 v1.ResourceMemory: "500", 760 } 761 _, err := createNode(testCtx.ClientSet, st.MakeNode().Name("node1").Capacity(nodeRes).Obj()) 762 if err != nil { 763 t.Fatalf("Error creating nodes: %v", err) 764 } 765 766 for _, test := range tests { 767 t.Run(test.name, func(t *testing.T) { 768 pendingPods := make([]*v1.Pod, test.numExpectedPending) 769 numRunningPods := test.numExistingPod - test.numExpectedPending 770 runningPods := make([]*v1.Pod, numRunningPods) 771 // Create and run existingPods. 772 for i := 0; i < numRunningPods; i++ { 773 runningPods[i], err = createPausePod(cs, mkPriorityPodWithGrace(testCtx, fmt.Sprintf("rpod-%v", i), mediumPriority, 0)) 774 if err != nil { 775 t.Fatalf("Error creating pause pod: %v", err) 776 } 777 } 778 // make sure that runningPods are all scheduled. 779 for _, p := range runningPods { 780 if err := testutils.WaitForPodToSchedule(cs, p); err != nil { 781 t.Fatalf("Pod %v/%v didn't get scheduled: %v", p.Namespace, p.Name, err) 782 } 783 } 784 // Create pending pods. 785 for i := 0; i < test.numExpectedPending; i++ { 786 pendingPods[i], err = createPausePod(cs, mkPriorityPodWithGrace(testCtx, fmt.Sprintf("ppod-%v", i), mediumPriority, 0)) 787 if err != nil { 788 t.Fatalf("Error creating pending pod: %v", err) 789 } 790 } 791 // Make sure that all pending pods are being marked unschedulable. 792 for _, p := range pendingPods { 793 if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, 794 podUnschedulable(cs, p.Namespace, p.Name)); err != nil { 795 t.Errorf("Pod %v/%v didn't get marked unschedulable: %v", p.Namespace, p.Name, err) 796 } 797 } 798 // Create the preemptor. 799 preemptor, err := createPausePod(cs, test.preemptor) 800 if err != nil { 801 t.Errorf("Error while creating the preempting pod: %v", err) 802 } 803 // Check if .status.nominatedNodeName of the preemptor pod gets set. 804 if err := waitForNominatedNodeName(cs, preemptor); err != nil { 805 t.Errorf(".status.nominatedNodeName was not set for pod %v/%v: %v", preemptor.Namespace, preemptor.Name, err) 806 } 807 // Make sure that preemptor is scheduled after preemptions. 808 if err := testutils.WaitForPodToScheduleWithTimeout(cs, preemptor, 60*time.Second); err != nil { 809 t.Errorf("Preemptor pod %v didn't get scheduled: %v", preemptor.Name, err) 810 } 811 // Cleanup 812 klog.Info("Cleaning up all pods...") 813 allPods := pendingPods 814 allPods = append(allPods, runningPods...) 815 allPods = append(allPods, preemptor) 816 testutils.CleanupPods(cs, t, allPods) 817 }) 818 } 819} 820 821// TestPreemptionRaces tests that other scheduling events and operations do not 822// race with the preemption process. 823func TestPreemptionRaces(t *testing.T) { 824 // Initialize scheduler. 825 testCtx := initTest(t, "preemption-race") 826 defer testutils.CleanupTest(t, testCtx) 827 cs := testCtx.ClientSet 828 829 tests := []struct { 830 name string 831 numInitialPods int // Pods created and executed before running preemptor 832 numAdditionalPods int // Pods created after creating the preemptor 833 numRepetitions int // Repeat the tests to check races 834 preemptor *v1.Pod 835 }{ 836 { 837 // This test ensures that while the preempting pod is waiting for the victims 838 // terminate, other lower priority pods are not scheduled in the room created 839 // after preemption and while the higher priority pods is not scheduled yet. 840 name: "ensures that other pods are not scheduled while preemptor is being marked as nominated (issue #72124)", 841 numInitialPods: 2, 842 numAdditionalPods: 50, 843 numRepetitions: 10, 844 preemptor: initPausePod(&pausePodConfig{ 845 Name: "preemptor-pod", 846 Namespace: testCtx.NS.Name, 847 Priority: &highPriority, 848 Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ 849 v1.ResourceCPU: *resource.NewMilliQuantity(4900, resource.DecimalSI), 850 v1.ResourceMemory: *resource.NewQuantity(4900, resource.DecimalSI)}, 851 }, 852 }), 853 }, 854 } 855 856 // Create a node with some resources 857 nodeRes := map[v1.ResourceName]string{ 858 v1.ResourcePods: "100", 859 v1.ResourceCPU: "5000m", 860 v1.ResourceMemory: "5000", 861 } 862 _, err := createNode(testCtx.ClientSet, st.MakeNode().Name("node1").Capacity(nodeRes).Obj()) 863 if err != nil { 864 t.Fatalf("Error creating nodes: %v", err) 865 } 866 867 for _, test := range tests { 868 t.Run(test.name, func(t *testing.T) { 869 if test.numRepetitions <= 0 { 870 test.numRepetitions = 1 871 } 872 for n := 0; n < test.numRepetitions; n++ { 873 initialPods := make([]*v1.Pod, test.numInitialPods) 874 additionalPods := make([]*v1.Pod, test.numAdditionalPods) 875 // Create and run existingPods. 876 for i := 0; i < test.numInitialPods; i++ { 877 initialPods[i], err = createPausePod(cs, mkPriorityPodWithGrace(testCtx, fmt.Sprintf("rpod-%v", i), mediumPriority, 0)) 878 if err != nil { 879 t.Fatalf("Error creating pause pod: %v", err) 880 } 881 } 882 // make sure that initial Pods are all scheduled. 883 for _, p := range initialPods { 884 if err := testutils.WaitForPodToSchedule(cs, p); err != nil { 885 t.Fatalf("Pod %v/%v didn't get scheduled: %v", p.Namespace, p.Name, err) 886 } 887 } 888 // Create the preemptor. 889 klog.Info("Creating the preemptor pod...") 890 preemptor, err := createPausePod(cs, test.preemptor) 891 if err != nil { 892 t.Errorf("Error while creating the preempting pod: %v", err) 893 } 894 895 klog.Info("Creating additional pods...") 896 for i := 0; i < test.numAdditionalPods; i++ { 897 additionalPods[i], err = createPausePod(cs, mkPriorityPodWithGrace(testCtx, fmt.Sprintf("ppod-%v", i), mediumPriority, 0)) 898 if err != nil { 899 t.Fatalf("Error creating pending pod: %v", err) 900 } 901 } 902 // Check that the preemptor pod gets nominated node name. 903 if err := waitForNominatedNodeName(cs, preemptor); err != nil { 904 t.Errorf(".status.nominatedNodeName was not set for pod %v/%v: %v", preemptor.Namespace, preemptor.Name, err) 905 } 906 // Make sure that preemptor is scheduled after preemptions. 907 if err := testutils.WaitForPodToScheduleWithTimeout(cs, preemptor, 60*time.Second); err != nil { 908 t.Errorf("Preemptor pod %v didn't get scheduled: %v", preemptor.Name, err) 909 } 910 911 klog.Info("Check unschedulable pods still exists and were never scheduled...") 912 for _, p := range additionalPods { 913 pod, err := cs.CoreV1().Pods(p.Namespace).Get(context.TODO(), p.Name, metav1.GetOptions{}) 914 if err != nil { 915 t.Errorf("Error in getting Pod %v/%v info: %v", p.Namespace, p.Name, err) 916 } 917 if len(pod.Spec.NodeName) > 0 { 918 t.Errorf("Pod %v/%v is already scheduled", p.Namespace, p.Name) 919 } 920 _, cond := podutil.GetPodCondition(&pod.Status, v1.PodScheduled) 921 if cond != nil && cond.Status != v1.ConditionFalse { 922 t.Errorf("Pod %v/%v is no longer unschedulable: %v", p.Namespace, p.Name, err) 923 } 924 } 925 // Cleanup 926 klog.Info("Cleaning up all pods...") 927 allPods := additionalPods 928 allPods = append(allPods, initialPods...) 929 allPods = append(allPods, preemptor) 930 testutils.CleanupPods(cs, t, allPods) 931 } 932 }) 933 } 934} 935 936// TestNominatedNodeCleanUp checks that when there are nominated pods on a 937// node and a higher priority pod is nominated to run on the node, the nominated 938// node name of the lower priority pods is cleared. 939// Test scenario: 940// 1. Create a few low priority pods with long grade period that fill up a node. 941// 2. Create a medium priority pod that preempt some of those pods. 942// 3. Check that nominated node name of the medium priority pod is set. 943// 4. Create a high priority pod that preempts some pods on that node. 944// 5. Check that nominated node name of the high priority pod is set and nominated 945// node name of the medium priority pod is cleared. 946func TestNominatedNodeCleanUp(t *testing.T) { 947 // Initialize scheduler. 948 testCtx := initTest(t, "preemption") 949 defer testutils.CleanupTest(t, testCtx) 950 951 cs := testCtx.ClientSet 952 953 defer cleanupPodsInNamespace(cs, t, testCtx.NS.Name) 954 955 // Create a node with some resources 956 nodeRes := map[v1.ResourceName]string{ 957 v1.ResourcePods: "32", 958 v1.ResourceCPU: "500m", 959 v1.ResourceMemory: "500", 960 } 961 _, err := createNode(testCtx.ClientSet, st.MakeNode().Name("node1").Capacity(nodeRes).Obj()) 962 if err != nil { 963 t.Fatalf("Error creating nodes: %v", err) 964 } 965 966 // Step 1. Create a few low priority pods. 967 lowPriPods := make([]*v1.Pod, 4) 968 for i := 0; i < len(lowPriPods); i++ { 969 lowPriPods[i], err = createPausePod(cs, mkPriorityPodWithGrace(testCtx, fmt.Sprintf("lpod-%v", i), lowPriority, 60)) 970 if err != nil { 971 t.Fatalf("Error creating pause pod: %v", err) 972 } 973 } 974 // make sure that the pods are all scheduled. 975 for _, p := range lowPriPods { 976 if err := testutils.WaitForPodToSchedule(cs, p); err != nil { 977 t.Fatalf("Pod %v/%v didn't get scheduled: %v", p.Namespace, p.Name, err) 978 } 979 } 980 // Step 2. Create a medium priority pod. 981 podConf := initPausePod(&pausePodConfig{ 982 Name: "medium-priority", 983 Namespace: testCtx.NS.Name, 984 Priority: &mediumPriority, 985 Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ 986 v1.ResourceCPU: *resource.NewMilliQuantity(400, resource.DecimalSI), 987 v1.ResourceMemory: *resource.NewQuantity(400, resource.DecimalSI)}, 988 }, 989 }) 990 medPriPod, err := createPausePod(cs, podConf) 991 if err != nil { 992 t.Errorf("Error while creating the medium priority pod: %v", err) 993 } 994 // Step 3. Check if .status.nominatedNodeName of the medium priority pod is set. 995 if err := waitForNominatedNodeName(cs, medPriPod); err != nil { 996 t.Errorf(".status.nominatedNodeName was not set for pod %v/%v: %v", medPriPod.Namespace, medPriPod.Name, err) 997 } 998 // Step 4. Create a high priority pod. 999 podConf = initPausePod(&pausePodConfig{ 1000 Name: "high-priority", 1001 Namespace: testCtx.NS.Name, 1002 Priority: &highPriority, 1003 Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ 1004 v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI), 1005 v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, 1006 }, 1007 }) 1008 highPriPod, err := createPausePod(cs, podConf) 1009 if err != nil { 1010 t.Errorf("Error while creating the high priority pod: %v", err) 1011 } 1012 // Step 5. Check if .status.nominatedNodeName of the high priority pod is set. 1013 if err := waitForNominatedNodeName(cs, highPriPod); err != nil { 1014 t.Errorf(".status.nominatedNodeName was not set for pod %v/%v: %v", highPriPod.Namespace, highPriPod.Name, err) 1015 } 1016 // And .status.nominatedNodeName of the medium priority pod is cleared. 1017 if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { 1018 pod, err := cs.CoreV1().Pods(medPriPod.Namespace).Get(context.TODO(), medPriPod.Name, metav1.GetOptions{}) 1019 if err != nil { 1020 t.Errorf("Error getting the medium priority pod info: %v", err) 1021 } 1022 if len(pod.Status.NominatedNodeName) == 0 { 1023 return true, nil 1024 } 1025 return false, err 1026 }); err != nil { 1027 t.Errorf(".status.nominatedNodeName of the medium priority pod was not cleared: %v", err) 1028 } 1029} 1030 1031func mkMinAvailablePDB(name, namespace string, uid types.UID, minAvailable int, matchLabels map[string]string) *policy.PodDisruptionBudget { 1032 intMinAvailable := intstr.FromInt(minAvailable) 1033 return &policy.PodDisruptionBudget{ 1034 ObjectMeta: metav1.ObjectMeta{ 1035 Name: name, 1036 Namespace: namespace, 1037 }, 1038 Spec: policy.PodDisruptionBudgetSpec{ 1039 MinAvailable: &intMinAvailable, 1040 Selector: &metav1.LabelSelector{MatchLabels: matchLabels}, 1041 }, 1042 } 1043} 1044 1045func addPodConditionReady(pod *v1.Pod) { 1046 pod.Status = v1.PodStatus{ 1047 Phase: v1.PodRunning, 1048 Conditions: []v1.PodCondition{ 1049 { 1050 Type: v1.PodReady, 1051 Status: v1.ConditionTrue, 1052 }, 1053 }, 1054 } 1055} 1056 1057// TestPDBInPreemption tests PodDisruptionBudget support in preemption. 1058func TestPDBInPreemption(t *testing.T) { 1059 // Initialize scheduler. 1060 testCtx := initTest(t, "preemption-pdb") 1061 defer testutils.CleanupTest(t, testCtx) 1062 cs := testCtx.ClientSet 1063 1064 initDisruptionController(t, testCtx) 1065 1066 defaultPodRes := &v1.ResourceRequirements{Requests: v1.ResourceList{ 1067 v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI), 1068 v1.ResourceMemory: *resource.NewQuantity(100, resource.DecimalSI)}, 1069 } 1070 defaultNodeRes := map[v1.ResourceName]string{ 1071 v1.ResourcePods: "32", 1072 v1.ResourceCPU: "500m", 1073 v1.ResourceMemory: "500", 1074 } 1075 1076 type nodeConfig struct { 1077 name string 1078 res map[v1.ResourceName]string 1079 } 1080 1081 tests := []struct { 1082 name string 1083 nodes []*nodeConfig 1084 pdbs []*policy.PodDisruptionBudget 1085 pdbPodNum []int32 1086 existingPods []*v1.Pod 1087 pod *v1.Pod 1088 preemptedPodIndexes map[int]struct{} 1089 }{ 1090 { 1091 name: "A non-PDB violating pod is preempted despite its higher priority", 1092 nodes: []*nodeConfig{{name: "node-1", res: defaultNodeRes}}, 1093 pdbs: []*policy.PodDisruptionBudget{ 1094 mkMinAvailablePDB("pdb-1", testCtx.NS.Name, types.UID("pdb-1-uid"), 2, map[string]string{"foo": "bar"}), 1095 }, 1096 pdbPodNum: []int32{2}, 1097 existingPods: []*v1.Pod{ 1098 initPausePod(&pausePodConfig{ 1099 Name: "low-pod1", 1100 Namespace: testCtx.NS.Name, 1101 Priority: &lowPriority, 1102 Resources: defaultPodRes, 1103 Labels: map[string]string{"foo": "bar"}, 1104 }), 1105 initPausePod(&pausePodConfig{ 1106 Name: "low-pod2", 1107 Namespace: testCtx.NS.Name, 1108 Priority: &lowPriority, 1109 Resources: defaultPodRes, 1110 Labels: map[string]string{"foo": "bar"}, 1111 }), 1112 initPausePod(&pausePodConfig{ 1113 Name: "mid-pod3", 1114 Namespace: testCtx.NS.Name, 1115 Priority: &mediumPriority, 1116 Resources: defaultPodRes, 1117 }), 1118 }, 1119 pod: initPausePod(&pausePodConfig{ 1120 Name: "preemptor-pod", 1121 Namespace: testCtx.NS.Name, 1122 Priority: &highPriority, 1123 Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ 1124 v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI), 1125 v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, 1126 }, 1127 }), 1128 preemptedPodIndexes: map[int]struct{}{2: {}}, 1129 }, 1130 { 1131 name: "A node without any PDB violating pods is preferred for preemption", 1132 nodes: []*nodeConfig{ 1133 {name: "node-1", res: defaultNodeRes}, 1134 {name: "node-2", res: defaultNodeRes}, 1135 }, 1136 pdbs: []*policy.PodDisruptionBudget{ 1137 mkMinAvailablePDB("pdb-1", testCtx.NS.Name, types.UID("pdb-1-uid"), 2, map[string]string{"foo": "bar"}), 1138 }, 1139 pdbPodNum: []int32{1}, 1140 existingPods: []*v1.Pod{ 1141 initPausePod(&pausePodConfig{ 1142 Name: "low-pod1", 1143 Namespace: testCtx.NS.Name, 1144 Priority: &lowPriority, 1145 Resources: defaultPodRes, 1146 NodeName: "node-1", 1147 Labels: map[string]string{"foo": "bar"}, 1148 }), 1149 initPausePod(&pausePodConfig{ 1150 Name: "mid-pod2", 1151 Namespace: testCtx.NS.Name, 1152 Priority: &mediumPriority, 1153 NodeName: "node-2", 1154 Resources: defaultPodRes, 1155 }), 1156 }, 1157 pod: initPausePod(&pausePodConfig{ 1158 Name: "preemptor-pod", 1159 Namespace: testCtx.NS.Name, 1160 Priority: &highPriority, 1161 Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ 1162 v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI), 1163 v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, 1164 }, 1165 }), 1166 preemptedPodIndexes: map[int]struct{}{1: {}}, 1167 }, 1168 { 1169 name: "A node with fewer PDB violating pods is preferred for preemption", 1170 nodes: []*nodeConfig{ 1171 {name: "node-1", res: defaultNodeRes}, 1172 {name: "node-2", res: defaultNodeRes}, 1173 {name: "node-3", res: defaultNodeRes}, 1174 }, 1175 pdbs: []*policy.PodDisruptionBudget{ 1176 mkMinAvailablePDB("pdb-1", testCtx.NS.Name, types.UID("pdb-1-uid"), 2, map[string]string{"foo1": "bar"}), 1177 mkMinAvailablePDB("pdb-2", testCtx.NS.Name, types.UID("pdb-2-uid"), 2, map[string]string{"foo2": "bar"}), 1178 }, 1179 pdbPodNum: []int32{1, 5}, 1180 existingPods: []*v1.Pod{ 1181 initPausePod(&pausePodConfig{ 1182 Name: "low-pod1", 1183 Namespace: testCtx.NS.Name, 1184 Priority: &lowPriority, 1185 Resources: defaultPodRes, 1186 NodeName: "node-1", 1187 Labels: map[string]string{"foo1": "bar"}, 1188 }), 1189 initPausePod(&pausePodConfig{ 1190 Name: "mid-pod1", 1191 Namespace: testCtx.NS.Name, 1192 Priority: &mediumPriority, 1193 Resources: defaultPodRes, 1194 NodeName: "node-1", 1195 }), 1196 initPausePod(&pausePodConfig{ 1197 Name: "low-pod2", 1198 Namespace: testCtx.NS.Name, 1199 Priority: &lowPriority, 1200 Resources: defaultPodRes, 1201 NodeName: "node-2", 1202 Labels: map[string]string{"foo2": "bar"}, 1203 }), 1204 initPausePod(&pausePodConfig{ 1205 Name: "mid-pod2", 1206 Namespace: testCtx.NS.Name, 1207 Priority: &mediumPriority, 1208 Resources: defaultPodRes, 1209 NodeName: "node-2", 1210 Labels: map[string]string{"foo2": "bar"}, 1211 }), 1212 initPausePod(&pausePodConfig{ 1213 Name: "low-pod4", 1214 Namespace: testCtx.NS.Name, 1215 Priority: &lowPriority, 1216 Resources: defaultPodRes, 1217 NodeName: "node-3", 1218 Labels: map[string]string{"foo2": "bar"}, 1219 }), 1220 initPausePod(&pausePodConfig{ 1221 Name: "low-pod5", 1222 Namespace: testCtx.NS.Name, 1223 Priority: &lowPriority, 1224 Resources: defaultPodRes, 1225 NodeName: "node-3", 1226 Labels: map[string]string{"foo2": "bar"}, 1227 }), 1228 initPausePod(&pausePodConfig{ 1229 Name: "low-pod6", 1230 Namespace: testCtx.NS.Name, 1231 Priority: &lowPriority, 1232 Resources: defaultPodRes, 1233 NodeName: "node-3", 1234 Labels: map[string]string{"foo2": "bar"}, 1235 }), 1236 }, 1237 pod: initPausePod(&pausePodConfig{ 1238 Name: "preemptor-pod", 1239 Namespace: testCtx.NS.Name, 1240 Priority: &highPriority, 1241 Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ 1242 v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI), 1243 v1.ResourceMemory: *resource.NewQuantity(400, resource.DecimalSI)}, 1244 }, 1245 }), 1246 // The third node is chosen because PDB is not violated for node 3 and the victims have lower priority than node-2. 1247 preemptedPodIndexes: map[int]struct{}{4: {}, 5: {}, 6: {}}, 1248 }, 1249 } 1250 1251 for _, test := range tests { 1252 t.Run(test.name, func(t *testing.T) { 1253 for _, nodeConf := range test.nodes { 1254 _, err := createNode(cs, st.MakeNode().Name(nodeConf.name).Capacity(nodeConf.res).Obj()) 1255 if err != nil { 1256 t.Fatalf("Error creating node %v: %v", nodeConf.name, err) 1257 } 1258 } 1259 1260 pods := make([]*v1.Pod, len(test.existingPods)) 1261 var err error 1262 // Create and run existingPods. 1263 for i, p := range test.existingPods { 1264 if pods[i], err = runPausePod(cs, p); err != nil { 1265 t.Fatalf("Test [%v]: Error running pause pod: %v", test.name, err) 1266 } 1267 // Add pod condition ready so that PDB is updated. 1268 addPodConditionReady(p) 1269 if _, err := testCtx.ClientSet.CoreV1().Pods(testCtx.NS.Name).UpdateStatus(context.TODO(), p, metav1.UpdateOptions{}); err != nil { 1270 t.Fatal(err) 1271 } 1272 } 1273 // Wait for Pods to be stable in scheduler cache. 1274 if err := waitCachedPodsStable(testCtx, test.existingPods); err != nil { 1275 t.Fatalf("Not all pods are stable in the cache: %v", err) 1276 } 1277 1278 // Create PDBs. 1279 for _, pdb := range test.pdbs { 1280 _, err := testCtx.ClientSet.PolicyV1beta1().PodDisruptionBudgets(testCtx.NS.Name).Create(context.TODO(), pdb, metav1.CreateOptions{}) 1281 if err != nil { 1282 t.Fatalf("Failed to create PDB: %v", err) 1283 } 1284 } 1285 // Wait for PDBs to become stable. 1286 if err := waitForPDBsStable(testCtx, test.pdbs, test.pdbPodNum); err != nil { 1287 t.Fatalf("Not all pdbs are stable in the cache: %v", err) 1288 } 1289 1290 // Create the "pod". 1291 preemptor, err := createPausePod(cs, test.pod) 1292 if err != nil { 1293 t.Errorf("Error while creating high priority pod: %v", err) 1294 } 1295 // Wait for preemption of pods and make sure the other ones are not preempted. 1296 for i, p := range pods { 1297 if _, found := test.preemptedPodIndexes[i]; found { 1298 if err = wait.Poll(time.Second, wait.ForeverTestTimeout, podIsGettingEvicted(cs, p.Namespace, p.Name)); err != nil { 1299 t.Errorf("Test [%v]: Pod %v/%v is not getting evicted.", test.name, p.Namespace, p.Name) 1300 } 1301 } else { 1302 if p.DeletionTimestamp != nil { 1303 t.Errorf("Test [%v]: Didn't expect pod %v/%v to get preempted.", test.name, p.Namespace, p.Name) 1304 } 1305 } 1306 } 1307 // Also check if .status.nominatedNodeName of the preemptor pod gets set. 1308 if len(test.preemptedPodIndexes) > 0 { 1309 if err := waitForNominatedNodeName(cs, preemptor); err != nil { 1310 t.Errorf("Test [%v]: .status.nominatedNodeName was not set for pod %v/%v: %v", test.name, preemptor.Namespace, preemptor.Name, err) 1311 } 1312 } 1313 1314 // Cleanup 1315 pods = append(pods, preemptor) 1316 testutils.CleanupPods(cs, t, pods) 1317 cs.PolicyV1beta1().PodDisruptionBudgets(testCtx.NS.Name).DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{}) 1318 cs.CoreV1().Nodes().DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{}) 1319 }) 1320 } 1321} 1322 1323func initTestPreferNominatedNode(t *testing.T, nsPrefix string, opts ...scheduler.Option) *testutils.TestContext { 1324 testCtx := testutils.InitTestSchedulerWithOptions(t, testutils.InitTestAPIServer(t, nsPrefix, nil), nil, opts...) 1325 testutils.SyncInformerFactory(testCtx) 1326 // wraps the NextPod() method to make it appear the preemption has been done already and the nominated node has been set. 1327 f := testCtx.Scheduler.NextPod 1328 testCtx.Scheduler.NextPod = func() (podInfo *framework.QueuedPodInfo) { 1329 podInfo = f() 1330 podInfo.Pod.Status.NominatedNodeName = "node-1" 1331 return podInfo 1332 } 1333 go testCtx.Scheduler.Run(testCtx.Ctx) 1334 return testCtx 1335} 1336 1337// TestPreferNominatedNode test when the feature of "PreferNominatedNode" is enabled, the overall scheduling logic is not changed. 1338// If the nominated node pass all the filters, then preemptor pod will run on the nominated node, otherwise, it will be scheduled 1339// to another node in the cluster that ables to pass all the filters. 1340// NOTE: This integration test is not intending to check the logic of preemption, but rather a sanity check when the feature is 1341// enabled. 1342func TestPreferNominatedNode(t *testing.T) { 1343 defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PreferNominatedNode, true)() 1344 testCtx := initTestPreferNominatedNode(t, "perfer-nominated-node") 1345 t.Cleanup(func() { 1346 testutils.CleanupTest(t, testCtx) 1347 }) 1348 cs := testCtx.ClientSet 1349 defaultPodRes := &v1.ResourceRequirements{Requests: v1.ResourceList{ 1350 v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI), 1351 v1.ResourceMemory: *resource.NewQuantity(100, resource.DecimalSI)}, 1352 } 1353 defaultNodeRes := map[v1.ResourceName]string{ 1354 v1.ResourcePods: "32", 1355 v1.ResourceCPU: "500m", 1356 v1.ResourceMemory: "500", 1357 } 1358 1359 type nodeConfig struct { 1360 name string 1361 res map[v1.ResourceName]string 1362 } 1363 1364 tests := []struct { 1365 name string 1366 nodes []*nodeConfig 1367 existingPods []*v1.Pod 1368 pod *v1.Pod 1369 runnningNode string 1370 }{ 1371 { 1372 name: "nominated node released all resource, preemptor is scheduled to the nominated node", 1373 nodes: []*nodeConfig{ 1374 {name: "node-1", res: defaultNodeRes}, 1375 {name: "node-2", res: defaultNodeRes}, 1376 }, 1377 existingPods: []*v1.Pod{ 1378 initPausePod(&pausePodConfig{ 1379 Name: "low-pod1", 1380 Namespace: testCtx.NS.Name, 1381 Priority: &lowPriority, 1382 NodeName: "node-2", 1383 Resources: defaultPodRes, 1384 }), 1385 }, 1386 pod: initPausePod(&pausePodConfig{ 1387 Name: "preemptor-pod", 1388 Namespace: testCtx.NS.Name, 1389 Priority: &highPriority, 1390 Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ 1391 v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI), 1392 v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, 1393 }, 1394 }), 1395 runnningNode: "node-1", 1396 }, 1397 { 1398 name: "nominated node cannot pass all the filters, preemptor should find a different node", 1399 nodes: []*nodeConfig{ 1400 {name: "node-1", res: defaultNodeRes}, 1401 {name: "node-2", res: defaultNodeRes}, 1402 }, 1403 existingPods: []*v1.Pod{ 1404 initPausePod(&pausePodConfig{ 1405 Name: "low-pod1", 1406 Namespace: testCtx.NS.Name, 1407 Priority: &lowPriority, 1408 Resources: defaultPodRes, 1409 NodeName: "node-1", 1410 }), 1411 }, 1412 pod: initPausePod(&pausePodConfig{ 1413 Name: "preemptor-pod", 1414 Namespace: testCtx.NS.Name, 1415 Priority: &highPriority, 1416 Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ 1417 v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI), 1418 v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, 1419 }, 1420 }), 1421 runnningNode: "node-2", 1422 }, 1423 } 1424 1425 for _, test := range tests { 1426 t.Run(test.name, func(t *testing.T) { 1427 var err error 1428 var preemptor *v1.Pod 1429 for _, nodeConf := range test.nodes { 1430 _, err := createNode(cs, st.MakeNode().Name(nodeConf.name).Capacity(nodeConf.res).Obj()) 1431 if err != nil { 1432 t.Fatalf("Error creating node %v: %v", nodeConf.name, err) 1433 } 1434 } 1435 pods := make([]*v1.Pod, len(test.existingPods)) 1436 // Create and run existingPods. 1437 for i, p := range test.existingPods { 1438 pods[i], err = runPausePod(cs, p) 1439 if err != nil { 1440 t.Fatalf("Error running pause pod: %v", err) 1441 } 1442 } 1443 preemptor, err = createPausePod(cs, test.pod) 1444 if err != nil { 1445 t.Errorf("Error while creating high priority pod: %v", err) 1446 } 1447 err = wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { 1448 preemptor, err = cs.CoreV1().Pods(test.pod.Namespace).Get(context.TODO(), test.pod.Name, metav1.GetOptions{}) 1449 if err != nil { 1450 t.Errorf("Error getting the preemptor pod info: %v", err) 1451 } 1452 if len(preemptor.Spec.NodeName) == 0 { 1453 return false, err 1454 } 1455 return true, nil 1456 }) 1457 if err != nil { 1458 t.Errorf("Cannot schedule Pod %v/%v, error: %v", test.pod.Namespace, test.pod.Name, err) 1459 } 1460 // Make sure the pod has been scheduled to the right node. 1461 if preemptor.Spec.NodeName != test.runnningNode { 1462 t.Errorf("Expect pod running on %v, got %v.", test.runnningNode, preemptor.Spec.NodeName) 1463 } 1464 pods = append(pods, preemptor) 1465 // cleanup 1466 defer testutils.CleanupPods(cs, t, pods) 1467 cs.CoreV1().Nodes().DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{}) 1468 }) 1469 } 1470} 1471