1/* 2Copyright 2018 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package scheduler 18 19import ( 20 "context" 21 "fmt" 22 "sync/atomic" 23 "testing" 24 "time" 25 26 v1 "k8s.io/api/core/v1" 27 "k8s.io/apimachinery/pkg/api/errors" 28 "k8s.io/apimachinery/pkg/api/resource" 29 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 30 "k8s.io/apimachinery/pkg/labels" 31 "k8s.io/apimachinery/pkg/runtime" 32 "k8s.io/apimachinery/pkg/util/wait" 33 clientset "k8s.io/client-go/kubernetes" 34 listersv1 "k8s.io/client-go/listers/core/v1" 35 "k8s.io/kube-scheduler/config/v1beta2" 36 "k8s.io/kubernetes/pkg/scheduler" 37 schedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" 38 configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing" 39 "k8s.io/kubernetes/pkg/scheduler/framework" 40 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder" 41 frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" 42 st "k8s.io/kubernetes/pkg/scheduler/testing" 43 testutils "k8s.io/kubernetes/test/integration/util" 44 imageutils "k8s.io/kubernetes/test/utils/image" 45 "k8s.io/utils/pointer" 46) 47 48type PreFilterPlugin struct { 49 numPreFilterCalled int 50 failPreFilter bool 51 rejectPreFilter bool 52} 53 54type ScorePlugin struct { 55 failScore bool 56 numScoreCalled int32 57 highScoreNode string 58} 59 60type ScoreWithNormalizePlugin struct { 61 numScoreCalled int 62 numNormalizeScoreCalled int 63} 64 65type FilterPlugin struct { 66 numFilterCalled int32 67 failFilter bool 68 rejectFilter bool 69} 70 71type PostFilterPlugin struct { 72 fh framework.Handle 73 numPostFilterCalled int 74 failPostFilter bool 75 rejectPostFilter bool 76} 77 78type ReservePlugin struct { 79 name string 80 numReserveCalled int 81 failReserve bool 82 numUnreserveCalled int 83 pluginInvokeEventChan chan pluginInvokeEvent 84} 85 86type PreScorePlugin struct { 87 numPreScoreCalled int 88 failPreScore bool 89} 90 91type PreBindPlugin struct { 92 numPreBindCalled int 93 failPreBind bool 94 rejectPreBind bool 95} 96 97type BindPlugin struct { 98 numBindCalled int 99 PluginName string 100 bindStatus *framework.Status 101 client clientset.Interface 102 pluginInvokeEventChan chan pluginInvokeEvent 103} 104 105type PostBindPlugin struct { 106 name string 107 numPostBindCalled int 108 pluginInvokeEventChan chan pluginInvokeEvent 109} 110 111type PermitPlugin struct { 112 name string 113 numPermitCalled int 114 failPermit bool 115 rejectPermit bool 116 timeoutPermit bool 117 waitAndRejectPermit bool 118 waitAndAllowPermit bool 119 cancelled bool 120 waitingPod string 121 rejectingPod string 122 allowingPod string 123 fh framework.Handle 124} 125 126const ( 127 prefilterPluginName = "prefilter-plugin" 128 postfilterPluginName = "postfilter-plugin" 129 scorePluginName = "score-plugin" 130 scoreWithNormalizePluginName = "score-with-normalize-plugin" 131 filterPluginName = "filter-plugin" 132 preScorePluginName = "prescore-plugin" 133 reservePluginName = "reserve-plugin" 134 preBindPluginName = "prebind-plugin" 135 postBindPluginName = "postbind-plugin" 136 permitPluginName = "permit-plugin" 137) 138 139var _ framework.PreFilterPlugin = &PreFilterPlugin{} 140var _ framework.PostFilterPlugin = &PostFilterPlugin{} 141var _ framework.ScorePlugin = &ScorePlugin{} 142var _ framework.FilterPlugin = &FilterPlugin{} 143var _ framework.ScorePlugin = &ScorePlugin{} 144var _ framework.ScorePlugin = &ScoreWithNormalizePlugin{} 145var _ framework.ReservePlugin = &ReservePlugin{} 146var _ framework.PreScorePlugin = &PreScorePlugin{} 147var _ framework.PreBindPlugin = &PreBindPlugin{} 148var _ framework.BindPlugin = &BindPlugin{} 149var _ framework.PostBindPlugin = &PostBindPlugin{} 150var _ framework.PermitPlugin = &PermitPlugin{} 151 152// newPlugin returns a plugin factory with specified Plugin. 153func newPlugin(plugin framework.Plugin) frameworkruntime.PluginFactory { 154 return func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) { 155 return plugin, nil 156 } 157} 158 159// newPlugin returns a plugin factory with specified Plugin. 160func newPostFilterPlugin(plugin *PostFilterPlugin) frameworkruntime.PluginFactory { 161 return func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) { 162 plugin.fh = fh 163 return plugin, nil 164 } 165} 166 167// Name returns name of the score plugin. 168func (sp *ScorePlugin) Name() string { 169 return scorePluginName 170} 171 172// reset returns name of the score plugin. 173func (sp *ScorePlugin) reset() { 174 sp.failScore = false 175 sp.numScoreCalled = 0 176 sp.highScoreNode = "" 177} 178 179// Score returns the score of scheduling a pod on a specific node. 180func (sp *ScorePlugin) Score(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) { 181 curCalled := atomic.AddInt32(&sp.numScoreCalled, 1) 182 if sp.failScore { 183 return 0, framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", p.Name)) 184 } 185 186 score := int64(1) 187 if curCalled == 1 { 188 // The first node is scored the highest, the rest is scored lower. 189 sp.highScoreNode = nodeName 190 score = framework.MaxNodeScore 191 } 192 return score, nil 193} 194 195func (sp *ScorePlugin) ScoreExtensions() framework.ScoreExtensions { 196 return nil 197} 198 199// Name returns name of the score plugin. 200func (sp *ScoreWithNormalizePlugin) Name() string { 201 return scoreWithNormalizePluginName 202} 203 204// reset returns name of the score plugin. 205func (sp *ScoreWithNormalizePlugin) reset() { 206 sp.numScoreCalled = 0 207 sp.numNormalizeScoreCalled = 0 208} 209 210// Score returns the score of scheduling a pod on a specific node. 211func (sp *ScoreWithNormalizePlugin) Score(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) { 212 sp.numScoreCalled++ 213 score := int64(10) 214 return score, nil 215} 216 217func (sp *ScoreWithNormalizePlugin) NormalizeScore(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status { 218 sp.numNormalizeScoreCalled++ 219 return nil 220} 221 222func (sp *ScoreWithNormalizePlugin) ScoreExtensions() framework.ScoreExtensions { 223 return sp 224} 225 226// Name returns name of the plugin. 227func (fp *FilterPlugin) Name() string { 228 return filterPluginName 229} 230 231// reset is used to reset filter plugin. 232func (fp *FilterPlugin) reset() { 233 fp.numFilterCalled = 0 234 fp.failFilter = false 235} 236 237// Filter is a test function that returns an error or nil, depending on the 238// value of "failFilter". 239func (fp *FilterPlugin) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { 240 atomic.AddInt32(&fp.numFilterCalled, 1) 241 242 if fp.failFilter { 243 return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name)) 244 } 245 if fp.rejectFilter { 246 return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name)) 247 } 248 249 return nil 250} 251 252// Name returns name of the plugin. 253func (rp *ReservePlugin) Name() string { 254 return rp.name 255} 256 257// Reserve is a test function that increments an intenral counter and returns 258// an error or nil, depending on the value of "failReserve". 259func (rp *ReservePlugin) Reserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { 260 rp.numReserveCalled++ 261 if rp.failReserve { 262 return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name)) 263 } 264 return nil 265} 266 267// Unreserve is a test function that increments an internal counter and emits 268// an event to a channel. While Unreserve implementations should normally be 269// idempotent, we relax that requirement here for testing purposes. 270func (rp *ReservePlugin) Unreserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) { 271 rp.numUnreserveCalled++ 272 if rp.pluginInvokeEventChan != nil { 273 rp.pluginInvokeEventChan <- pluginInvokeEvent{pluginName: rp.Name(), val: rp.numUnreserveCalled} 274 } 275} 276 277// reset used to reset internal counters. 278func (rp *ReservePlugin) reset() { 279 rp.numReserveCalled = 0 280 rp.numUnreserveCalled = 0 281 rp.failReserve = false 282} 283 284// Name returns name of the plugin. 285func (*PreScorePlugin) Name() string { 286 return preScorePluginName 287} 288 289// PreScore is a test function. 290func (pfp *PreScorePlugin) PreScore(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, _ []*v1.Node) *framework.Status { 291 pfp.numPreScoreCalled++ 292 if pfp.failPreScore { 293 return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name)) 294 } 295 296 return nil 297} 298 299// reset used to reset prescore plugin. 300func (pfp *PreScorePlugin) reset() { 301 pfp.numPreScoreCalled = 0 302 pfp.failPreScore = false 303} 304 305// Name returns name of the plugin. 306func (pp *PreBindPlugin) Name() string { 307 return preBindPluginName 308} 309 310// PreBind is a test function that returns (true, nil) or errors for testing. 311func (pp *PreBindPlugin) PreBind(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status { 312 pp.numPreBindCalled++ 313 if pp.failPreBind { 314 return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name)) 315 } 316 if pp.rejectPreBind { 317 return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name)) 318 } 319 return nil 320} 321 322// reset used to reset prebind plugin. 323func (pp *PreBindPlugin) reset() { 324 pp.numPreBindCalled = 0 325 pp.failPreBind = false 326 pp.rejectPreBind = false 327} 328 329const bindPluginAnnotation = "bindPluginName" 330 331func (bp *BindPlugin) Name() string { 332 return bp.PluginName 333} 334 335func (bp *BindPlugin) Bind(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status { 336 bp.numBindCalled++ 337 if bp.pluginInvokeEventChan != nil { 338 bp.pluginInvokeEventChan <- pluginInvokeEvent{pluginName: bp.Name(), val: bp.numBindCalled} 339 } 340 if bp.bindStatus.IsSuccess() { 341 if err := bp.client.CoreV1().Pods(p.Namespace).Bind(context.TODO(), &v1.Binding{ 342 ObjectMeta: metav1.ObjectMeta{Namespace: p.Namespace, Name: p.Name, UID: p.UID, Annotations: map[string]string{bindPluginAnnotation: bp.Name()}}, 343 Target: v1.ObjectReference{ 344 Kind: "Node", 345 Name: nodeName, 346 }, 347 }, metav1.CreateOptions{}); err != nil { 348 return framework.NewStatus(framework.Error, fmt.Sprintf("bind failed: %v", err)) 349 } 350 } 351 return bp.bindStatus 352} 353 354// reset used to reset numBindCalled. 355func (bp *BindPlugin) reset() { 356 bp.numBindCalled = 0 357} 358 359// Name returns name of the plugin. 360func (pp *PostBindPlugin) Name() string { 361 return pp.name 362} 363 364// PostBind is a test function, which counts the number of times called. 365func (pp *PostBindPlugin) PostBind(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) { 366 pp.numPostBindCalled++ 367 if pp.pluginInvokeEventChan != nil { 368 pp.pluginInvokeEventChan <- pluginInvokeEvent{pluginName: pp.Name(), val: pp.numPostBindCalled} 369 } 370} 371 372// reset used to reset postbind plugin. 373func (pp *PostBindPlugin) reset() { 374 pp.numPostBindCalled = 0 375} 376 377// Name returns name of the plugin. 378func (pp *PreFilterPlugin) Name() string { 379 return prefilterPluginName 380} 381 382// Extensions returns the PreFilterExtensions interface. 383func (pp *PreFilterPlugin) PreFilterExtensions() framework.PreFilterExtensions { 384 return nil 385} 386 387// PreFilter is a test function that returns (true, nil) or errors for testing. 388func (pp *PreFilterPlugin) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) *framework.Status { 389 pp.numPreFilterCalled++ 390 if pp.failPreFilter { 391 return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name)) 392 } 393 if pp.rejectPreFilter { 394 return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name)) 395 } 396 return nil 397} 398 399// reset used to reset prefilter plugin. 400func (pp *PreFilterPlugin) reset() { 401 pp.numPreFilterCalled = 0 402 pp.failPreFilter = false 403 pp.rejectPreFilter = false 404} 405 406// Name returns name of the plugin. 407func (pp *PostFilterPlugin) Name() string { 408 return postfilterPluginName 409} 410 411func (pp *PostFilterPlugin) PostFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, _ framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) { 412 pp.numPostFilterCalled++ 413 nodeInfos, err := pp.fh.SnapshotSharedLister().NodeInfos().List() 414 if err != nil { 415 return nil, framework.NewStatus(framework.Error, err.Error()) 416 } 417 418 for _, nodeInfo := range nodeInfos { 419 pp.fh.RunFilterPlugins(ctx, state, pod, nodeInfo) 420 } 421 var nodes []*v1.Node 422 for _, nodeInfo := range nodeInfos { 423 nodes = append(nodes, nodeInfo.Node()) 424 } 425 pp.fh.RunScorePlugins(ctx, state, pod, nodes) 426 427 if pp.failPostFilter { 428 return nil, framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name)) 429 } 430 if pp.rejectPostFilter { 431 return nil, framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name)) 432 } 433 return nil, framework.NewStatus(framework.Success, fmt.Sprintf("make room for pod %v to be schedulable", pod.Name)) 434} 435 436// Name returns name of the plugin. 437func (pp *PermitPlugin) Name() string { 438 return pp.name 439} 440 441// Permit implements the permit test plugin. 442func (pp *PermitPlugin) Permit(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (*framework.Status, time.Duration) { 443 pp.numPermitCalled++ 444 if pp.failPermit { 445 return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name)), 0 446 } 447 if pp.rejectPermit { 448 return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name)), 0 449 } 450 if pp.timeoutPermit { 451 go func() { 452 select { 453 case <-ctx.Done(): 454 pp.cancelled = true 455 } 456 }() 457 return framework.NewStatus(framework.Wait, ""), 3 * time.Second 458 } 459 if pp.waitAndRejectPermit || pp.waitAndAllowPermit { 460 if pp.waitingPod == "" || pp.waitingPod == pod.Name { 461 pp.waitingPod = pod.Name 462 return framework.NewStatus(framework.Wait, ""), 30 * time.Second 463 } 464 if pp.waitAndRejectPermit { 465 pp.rejectingPod = pod.Name 466 pp.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { 467 wp.Reject(pp.name, fmt.Sprintf("reject pod %v", wp.GetPod().Name)) 468 }) 469 return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name)), 0 470 } 471 if pp.waitAndAllowPermit { 472 pp.allowingPod = pod.Name 473 pp.allowAllPods() 474 return nil, 0 475 } 476 } 477 return nil, 0 478} 479 480// allowAllPods allows all waiting pods. 481func (pp *PermitPlugin) allowAllPods() { 482 pp.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { wp.Allow(pp.name) }) 483} 484 485// rejectAllPods rejects all waiting pods. 486func (pp *PermitPlugin) rejectAllPods() { 487 pp.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { wp.Reject(pp.name, "rejectAllPods") }) 488} 489 490// reset used to reset permit plugin. 491func (pp *PermitPlugin) reset() { 492 pp.numPermitCalled = 0 493 pp.failPermit = false 494 pp.rejectPermit = false 495 pp.timeoutPermit = false 496 pp.waitAndRejectPermit = false 497 pp.waitAndAllowPermit = false 498 pp.cancelled = false 499 pp.waitingPod = "" 500 pp.allowingPod = "" 501 pp.rejectingPod = "" 502} 503 504// newPermitPlugin returns a factory for permit plugin with specified PermitPlugin. 505func newPermitPlugin(permitPlugin *PermitPlugin) frameworkruntime.PluginFactory { 506 return func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) { 507 permitPlugin.fh = fh 508 return permitPlugin, nil 509 } 510} 511 512// TestPreFilterPlugin tests invocation of prefilter plugins. 513func TestPreFilterPlugin(t *testing.T) { 514 // Create a plugin registry for testing. Register only a pre-filter plugin. 515 preFilterPlugin := &PreFilterPlugin{} 516 registry := frameworkruntime.Registry{prefilterPluginName: newPlugin(preFilterPlugin)} 517 518 // Setup initial prefilter plugin for testing. 519 cfg := configtesting.V1beta2ToInternalWithDefaults(t, v1beta2.KubeSchedulerConfiguration{ 520 Profiles: []v1beta2.KubeSchedulerProfile{{ 521 SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName), 522 Plugins: &v1beta2.Plugins{ 523 PreFilter: v1beta2.PluginSet{ 524 Enabled: []v1beta2.Plugin{ 525 {Name: prefilterPluginName}, 526 }, 527 }, 528 }, 529 }}, 530 }) 531 532 // Create the API server and the scheduler with the test plugin set. 533 testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "prefilter-plugin", nil), 2, 534 scheduler.WithProfiles(cfg.Profiles...), 535 scheduler.WithFrameworkOutOfTreeRegistry(registry)) 536 defer testutils.CleanupTest(t, testCtx) 537 538 tests := []struct { 539 name string 540 fail bool 541 reject bool 542 }{ 543 { 544 name: "disable fail and reject flags", 545 fail: false, 546 reject: false, 547 }, 548 { 549 name: "enable fail and disable reject flags", 550 fail: true, 551 reject: false, 552 }, 553 { 554 name: "disable fail and enable reject flags", 555 fail: false, 556 reject: true, 557 }, 558 } 559 560 for _, test := range tests { 561 t.Run(test.name, func(t *testing.T) { 562 preFilterPlugin.failPreFilter = test.fail 563 preFilterPlugin.rejectPreFilter = test.reject 564 // Create a best effort pod. 565 pod, err := createPausePod(testCtx.ClientSet, 566 initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) 567 if err != nil { 568 t.Errorf("Error while creating a test pod: %v", err) 569 } 570 571 if test.reject || test.fail { 572 if err = waitForPodUnschedulable(testCtx.ClientSet, pod); err != nil { 573 t.Errorf("Didn't expect the pod to be scheduled. error: %v", err) 574 } 575 } else { 576 if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil { 577 t.Errorf("Expected the pod to be scheduled. error: %v", err) 578 } 579 } 580 581 if preFilterPlugin.numPreFilterCalled == 0 { 582 t.Errorf("Expected the prefilter plugin to be called.") 583 } 584 585 preFilterPlugin.reset() 586 testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod}) 587 }) 588 } 589} 590 591// TestPostFilterPlugin tests invocation of postfilter plugins. 592func TestPostFilterPlugin(t *testing.T) { 593 var numNodes int32 = 1 594 tests := []struct { 595 name string 596 numNodes int32 597 rejectFilter bool 598 failScore bool 599 rejectPostFilter bool 600 expectFilterNumCalled int32 601 expectScoreNumCalled int32 602 expectPostFilterNumCalled int 603 }{ 604 { 605 name: "Filter passed and Score success", 606 numNodes: 30, 607 rejectFilter: false, 608 failScore: false, 609 rejectPostFilter: false, 610 expectFilterNumCalled: 30, 611 expectScoreNumCalled: 30, 612 expectPostFilterNumCalled: 0, 613 }, 614 { 615 name: "Filter failed and PostFilter passed", 616 numNodes: numNodes, 617 rejectFilter: true, 618 failScore: false, 619 rejectPostFilter: false, 620 expectFilterNumCalled: numNodes * 2, 621 expectScoreNumCalled: 1, 622 expectPostFilterNumCalled: 1, 623 }, 624 { 625 name: "Filter failed and PostFilter failed", 626 numNodes: numNodes, 627 rejectFilter: true, 628 failScore: false, 629 rejectPostFilter: true, 630 expectFilterNumCalled: numNodes * 2, 631 expectScoreNumCalled: 1, 632 expectPostFilterNumCalled: 1, 633 }, 634 { 635 name: "Score failed and PostFilter failed", 636 numNodes: numNodes, 637 rejectFilter: true, 638 failScore: true, 639 rejectPostFilter: true, 640 expectFilterNumCalled: numNodes * 2, 641 expectScoreNumCalled: 1, 642 expectPostFilterNumCalled: 1, 643 }, 644 } 645 646 for i, tt := range tests { 647 t.Run(tt.name, func(t *testing.T) { 648 // Create a plugin registry for testing. Register a combination of filter and postFilter plugin. 649 var ( 650 filterPlugin = &FilterPlugin{} 651 scorePlugin = &ScorePlugin{} 652 postFilterPlugin = &PostFilterPlugin{} 653 ) 654 filterPlugin.rejectFilter = tt.rejectFilter 655 scorePlugin.failScore = tt.failScore 656 postFilterPlugin.rejectPostFilter = tt.rejectPostFilter 657 registry := frameworkruntime.Registry{ 658 filterPluginName: newPlugin(filterPlugin), 659 scorePluginName: newPlugin(scorePlugin), 660 postfilterPluginName: newPostFilterPlugin(postFilterPlugin), 661 } 662 663 // Setup plugins for testing. 664 cfg := configtesting.V1beta2ToInternalWithDefaults(t, v1beta2.KubeSchedulerConfiguration{ 665 Profiles: []v1beta2.KubeSchedulerProfile{{ 666 SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName), 667 Plugins: &v1beta2.Plugins{ 668 Filter: v1beta2.PluginSet{ 669 Enabled: []v1beta2.Plugin{ 670 {Name: filterPluginName}, 671 }, 672 }, 673 Score: v1beta2.PluginSet{ 674 Enabled: []v1beta2.Plugin{ 675 {Name: scorePluginName}, 676 }, 677 // disable default in-tree Score plugins 678 // to make it easy to control configured ScorePlugins failure 679 Disabled: []v1beta2.Plugin{ 680 {Name: "*"}, 681 }, 682 }, 683 PostFilter: v1beta2.PluginSet{ 684 Enabled: []v1beta2.Plugin{ 685 {Name: postfilterPluginName}, 686 }, 687 // Need to disable default in-tree PostFilter plugins, as they will 688 // call RunFilterPlugins and hence impact the "numFilterCalled". 689 Disabled: []v1beta2.Plugin{ 690 {Name: "*"}, 691 }, 692 }, 693 }, 694 }}}) 695 696 // Create the API server and the scheduler with the test plugin set. 697 testCtx := initTestSchedulerForFrameworkTest( 698 t, 699 testutils.InitTestAPIServer(t, fmt.Sprintf("postfilter%v-", i), nil), 700 int(tt.numNodes), 701 scheduler.WithProfiles(cfg.Profiles...), 702 scheduler.WithFrameworkOutOfTreeRegistry(registry), 703 ) 704 defer testutils.CleanupTest(t, testCtx) 705 706 // Create a best effort pod. 707 pod, err := createPausePod(testCtx.ClientSet, initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) 708 if err != nil { 709 t.Errorf("Error while creating a test pod: %v", err) 710 } 711 712 if tt.rejectFilter { 713 if err = wait.Poll(10*time.Millisecond, 10*time.Second, podUnschedulable(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil { 714 t.Errorf("Didn't expect the pod to be scheduled.") 715 } 716 717 if numFilterCalled := atomic.LoadInt32(&filterPlugin.numFilterCalled); numFilterCalled < tt.expectFilterNumCalled { 718 t.Errorf("Expected the filter plugin to be called at least %v times, but got %v.", tt.expectFilterNumCalled, numFilterCalled) 719 } 720 if numScoreCalled := atomic.LoadInt32(&scorePlugin.numScoreCalled); numScoreCalled < tt.expectScoreNumCalled { 721 t.Errorf("Expected the score plugin to be called at least %v times, but got %v.", tt.expectScoreNumCalled, numScoreCalled) 722 } 723 if postFilterPlugin.numPostFilterCalled < tt.expectPostFilterNumCalled { 724 t.Errorf("Expected the postfilter plugin to be called at least %v times, but got %v.", tt.expectPostFilterNumCalled, postFilterPlugin.numPostFilterCalled) 725 } 726 } else { 727 if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil { 728 t.Errorf("Expected the pod to be scheduled. error: %v", err) 729 } 730 if numFilterCalled := atomic.LoadInt32(&filterPlugin.numFilterCalled); numFilterCalled != tt.expectFilterNumCalled { 731 t.Errorf("Expected the filter plugin to be called %v times, but got %v.", tt.expectFilterNumCalled, numFilterCalled) 732 } 733 if numScoreCalled := atomic.LoadInt32(&scorePlugin.numScoreCalled); numScoreCalled != tt.expectScoreNumCalled { 734 t.Errorf("Expected the score plugin to be called %v times, but got %v.", tt.expectScoreNumCalled, numScoreCalled) 735 } 736 if postFilterPlugin.numPostFilterCalled != tt.expectPostFilterNumCalled { 737 t.Errorf("Expected the postfilter plugin to be called %v times, but got %v.", tt.expectPostFilterNumCalled, postFilterPlugin.numPostFilterCalled) 738 } 739 } 740 }) 741 } 742} 743 744// TestScorePlugin tests invocation of score plugins. 745func TestScorePlugin(t *testing.T) { 746 // Create a plugin registry for testing. Register only a score plugin. 747 scorePlugin := &ScorePlugin{} 748 registry := frameworkruntime.Registry{ 749 scorePluginName: newPlugin(scorePlugin), 750 } 751 752 cfg := configtesting.V1beta2ToInternalWithDefaults(t, v1beta2.KubeSchedulerConfiguration{ 753 Profiles: []v1beta2.KubeSchedulerProfile{{ 754 SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName), 755 Plugins: &v1beta2.Plugins{ 756 Score: v1beta2.PluginSet{ 757 Enabled: []v1beta2.Plugin{ 758 {Name: scorePluginName}, 759 }, 760 }, 761 }, 762 }}, 763 }) 764 765 testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "score-plugin", nil), 10, 766 scheduler.WithProfiles(cfg.Profiles...), 767 scheduler.WithFrameworkOutOfTreeRegistry(registry)) 768 defer testutils.CleanupTest(t, testCtx) 769 770 tests := []struct { 771 name string 772 fail bool 773 }{ 774 { 775 name: "fail score plugin", 776 fail: true, 777 }, 778 { 779 name: "do not fail score plugin", 780 fail: false, 781 }, 782 } 783 784 for _, test := range tests { 785 t.Run(test.name, func(t *testing.T) { 786 scorePlugin.failScore = test.fail 787 // Create a best effort pod. 788 pod, err := createPausePod(testCtx.ClientSet, 789 initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) 790 if err != nil { 791 t.Fatalf("Error while creating a test pod: %v", err) 792 } 793 794 if test.fail { 795 if err = waitForPodUnschedulable(testCtx.ClientSet, pod); err != nil { 796 t.Errorf("Didn't expect the pod to be scheduled. error: %v", err) 797 } 798 } else { 799 if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil { 800 t.Errorf("Expected the pod to be scheduled. error: %v", err) 801 } else { 802 p, err := getPod(testCtx.ClientSet, pod.Name, pod.Namespace) 803 if err != nil { 804 t.Errorf("Failed to retrieve the pod. error: %v", err) 805 } else if p.Spec.NodeName != scorePlugin.highScoreNode { 806 t.Errorf("Expected the pod to be scheduled on node %q, got %q", scorePlugin.highScoreNode, p.Spec.NodeName) 807 } 808 } 809 } 810 811 if numScoreCalled := atomic.LoadInt32(&scorePlugin.numScoreCalled); numScoreCalled == 0 { 812 t.Errorf("Expected the score plugin to be called.") 813 } 814 815 scorePlugin.reset() 816 testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod}) 817 }) 818 } 819} 820 821// TestNormalizeScorePlugin tests invocation of normalize score plugins. 822func TestNormalizeScorePlugin(t *testing.T) { 823 // Create a plugin registry for testing. Register only a normalize score plugin. 824 scoreWithNormalizePlugin := &ScoreWithNormalizePlugin{} 825 registry := frameworkruntime.Registry{ 826 scoreWithNormalizePluginName: newPlugin(scoreWithNormalizePlugin), 827 } 828 829 // Setup initial score plugin for testing. 830 cfg := configtesting.V1beta2ToInternalWithDefaults(t, v1beta2.KubeSchedulerConfiguration{ 831 Profiles: []v1beta2.KubeSchedulerProfile{{ 832 SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName), 833 Plugins: &v1beta2.Plugins{ 834 Score: v1beta2.PluginSet{ 835 Enabled: []v1beta2.Plugin{ 836 {Name: scoreWithNormalizePluginName}, 837 }, 838 }, 839 }, 840 }}, 841 }) 842 843 testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "score-plugin", nil), 10, 844 scheduler.WithProfiles(cfg.Profiles...), 845 scheduler.WithFrameworkOutOfTreeRegistry(registry)) 846 847 defer testutils.CleanupTest(t, testCtx) 848 849 // Create a best effort pod. 850 pod, err := createPausePod(testCtx.ClientSet, 851 initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) 852 if err != nil { 853 t.Fatalf("Error while creating a test pod: %v", err) 854 } 855 856 if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil { 857 t.Errorf("Expected the pod to be scheduled. error: %v", err) 858 } 859 860 if scoreWithNormalizePlugin.numScoreCalled == 0 { 861 t.Errorf("Expected the score plugin to be called.") 862 } 863 if scoreWithNormalizePlugin.numNormalizeScoreCalled == 0 { 864 t.Error("Expected the normalize score plugin to be called") 865 } 866 867 scoreWithNormalizePlugin.reset() 868} 869 870// TestReservePlugin tests invocation of reserve plugins. 871func TestReservePluginReserve(t *testing.T) { 872 // Create a plugin registry for testing. Register only a reserve plugin. 873 reservePlugin := &ReservePlugin{} 874 registry := frameworkruntime.Registry{reservePluginName: newPlugin(reservePlugin)} 875 876 // Setup initial reserve plugin for testing. 877 cfg := configtesting.V1beta2ToInternalWithDefaults(t, v1beta2.KubeSchedulerConfiguration{ 878 Profiles: []v1beta2.KubeSchedulerProfile{{ 879 SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName), 880 Plugins: &v1beta2.Plugins{ 881 Reserve: v1beta2.PluginSet{ 882 Enabled: []v1beta2.Plugin{ 883 {Name: reservePluginName}, 884 }, 885 }, 886 }, 887 }}, 888 }) 889 890 // Create the API server and the scheduler with the test plugin set. 891 testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "reserve-plugin-reserve", nil), 2, 892 scheduler.WithProfiles(cfg.Profiles...), 893 scheduler.WithFrameworkOutOfTreeRegistry(registry)) 894 defer testutils.CleanupTest(t, testCtx) 895 896 tests := []struct { 897 name string 898 fail bool 899 }{ 900 { 901 name: "fail reserve plugin", 902 fail: true, 903 }, 904 { 905 name: "do not fail reserve plugin", 906 fail: false, 907 }, 908 } 909 910 for _, test := range tests { 911 t.Run(test.name, func(t *testing.T) { 912 reservePlugin.failReserve = test.fail 913 // Create a best effort pod. 914 pod, err := createPausePod(testCtx.ClientSet, 915 initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) 916 if err != nil { 917 t.Errorf("Error while creating a test pod: %v", err) 918 } 919 920 if test.fail { 921 if err = wait.Poll(10*time.Millisecond, 30*time.Second, 922 podSchedulingError(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil { 923 t.Errorf("Didn't expect the pod to be scheduled. error: %v", err) 924 } 925 } else { 926 if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil { 927 t.Errorf("Expected the pod to be scheduled. error: %v", err) 928 } 929 } 930 931 if reservePlugin.numReserveCalled == 0 { 932 t.Errorf("Expected the reserve plugin to be called.") 933 } 934 935 reservePlugin.reset() 936 testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod}) 937 }) 938 } 939} 940 941// TestPrebindPlugin tests invocation of prebind plugins. 942func TestPrebindPlugin(t *testing.T) { 943 // Create a plugin registry for testing. Register only a prebind plugin. 944 preBindPlugin := &PreBindPlugin{} 945 registry := frameworkruntime.Registry{preBindPluginName: newPlugin(preBindPlugin)} 946 947 // Setup initial prebind plugin for testing. 948 cfg := configtesting.V1beta2ToInternalWithDefaults(t, v1beta2.KubeSchedulerConfiguration{ 949 Profiles: []v1beta2.KubeSchedulerProfile{{ 950 SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName), 951 Plugins: &v1beta2.Plugins{ 952 PreBind: v1beta2.PluginSet{ 953 Enabled: []v1beta2.Plugin{ 954 {Name: preBindPluginName}, 955 }, 956 }, 957 }, 958 }}, 959 }) 960 961 // Create the API server and the scheduler with the test plugin set. 962 testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "prebind-plugin", nil), 2, 963 scheduler.WithProfiles(cfg.Profiles...), 964 scheduler.WithFrameworkOutOfTreeRegistry(registry)) 965 defer testutils.CleanupTest(t, testCtx) 966 967 tests := []struct { 968 name string 969 fail bool 970 reject bool 971 }{ 972 { 973 name: "disable fail and reject flags", 974 fail: false, 975 reject: false, 976 }, 977 { 978 name: "enable fail and disable reject flags", 979 fail: true, 980 reject: false, 981 }, 982 { 983 name: "disable fail and enable reject flags", 984 fail: false, 985 reject: true, 986 }, 987 { 988 name: "enable fail and reject flags", 989 fail: true, 990 reject: true, 991 }, 992 } 993 994 for _, test := range tests { 995 t.Run(test.name, func(t *testing.T) { 996 preBindPlugin.failPreBind = test.fail 997 preBindPlugin.rejectPreBind = test.reject 998 // Create a best effort pod. 999 pod, err := createPausePod(testCtx.ClientSet, 1000 initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) 1001 if err != nil { 1002 t.Errorf("Error while creating a test pod: %v", err) 1003 } 1004 1005 if test.fail || test.reject { 1006 if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil { 1007 t.Errorf("Expected a scheduling error, but didn't get it. error: %v", err) 1008 } 1009 } else if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil { 1010 t.Errorf("Expected the pod to be scheduled. error: %v", err) 1011 } 1012 1013 if preBindPlugin.numPreBindCalled == 0 { 1014 t.Errorf("Expected the prebind plugin to be called.") 1015 } 1016 1017 preBindPlugin.reset() 1018 testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod}) 1019 }) 1020 } 1021} 1022 1023// // TestUnreserveReservePlugin tests invocation of the Unreserve operation in 1024// // reserve plugins through failures in execution points such as pre-bind. Also 1025// // tests that the order of invocation of Unreserve operation is executed in the 1026// // reverse order of invocation of the Reserve operation. 1027// func TestReservePluginUnreserve(t *testing.T) { 1028// tests := []struct { 1029// name string 1030// failReserve bool 1031// failReserveIndex int 1032// failPreBind bool 1033// }{ 1034// { 1035// name: "fail reserve", 1036// failReserve: true, 1037// failReserveIndex: 1, 1038// }, 1039// { 1040// name: "fail preBind", 1041// failPreBind: true, 1042// }, 1043// { 1044// name: "pass everything", 1045// }, 1046// } 1047 1048// for _, test := range tests { 1049// t.Run(test.name, func(t *testing.T) { 1050// numReservePlugins := 3 1051// pluginInvokeEventChan := make(chan pluginInvokeEvent, numReservePlugins) 1052 1053// preBindPlugin := &PreBindPlugin{ 1054// failPreBind: true, 1055// } 1056// var reservePlugins []*ReservePlugin 1057// for i := 0; i < numReservePlugins; i++ { 1058// reservePlugins = append(reservePlugins, &ReservePlugin{ 1059// name: fmt.Sprintf("%s-%d", reservePluginName, i), 1060// pluginInvokeEventChan: pluginInvokeEventChan, 1061// }) 1062// } 1063 1064// registry := frameworkruntime.Registry{ 1065// // TODO(#92229): test more failure points that would trigger Unreserve in 1066// // reserve plugins than just one pre-bind plugin. 1067// preBindPluginName: newPlugin(preBindPlugin), 1068// } 1069// for _, pl := range reservePlugins { 1070// registry[pl.Name()] = newPlugin(pl) 1071// } 1072 1073// // Setup initial reserve and prebind plugin for testing. 1074// prof := schedulerconfig.KubeSchedulerProfile{ 1075// SchedulerName: v1.DefaultSchedulerName, 1076// Plugins: &schedulerconfig.Plugins{ 1077// Reserve: schedulerconfig.PluginSet{ 1078// // filled by looping over reservePlugins 1079// }, 1080// PreBind: schedulerconfig.PluginSet{ 1081// Enabled: []schedulerconfig.Plugin{ 1082// { 1083// Name: preBindPluginName, 1084// }, 1085// }, 1086// }, 1087// }, 1088// } 1089// for _, pl := range reservePlugins { 1090// prof.Plugins.Reserve.Enabled = append(prof.Plugins.Reserve.Enabled, schedulerconfig.Plugin{ 1091// Name: pl.Name(), 1092// }) 1093// } 1094 1095// // Create the master and the scheduler with the test plugin set. 1096// testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestMaster(t, "reserve-plugin-unreserve", nil), 2, 1097// scheduler.WithProfiles(prof), 1098// scheduler.WithFrameworkOutOfTreeRegistry(registry)) 1099// defer testutils.CleanupTest(t, testCtx) 1100 1101// preBindPlugin.failPreBind = test.failPreBind 1102// if test.failReserve { 1103// reservePlugins[test.failReserveIndex].failReserve = true 1104// } 1105// // Create a best effort pod. 1106// pod, err := createPausePod(testCtx.ClientSet, 1107// initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) 1108// if err != nil { 1109// t.Errorf("Error while creating a test pod: %v", err) 1110// } 1111 1112// if test.failPreBind || test.failReserve { 1113// if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil { 1114// t.Errorf("Expected a scheduling error, but didn't get it: %v", err) 1115// } 1116// for i := numReservePlugins - 1; i >= 0; i-- { 1117// select { 1118// case event := <-pluginInvokeEventChan: 1119// expectedPluginName := reservePlugins[i].Name() 1120// if expectedPluginName != event.pluginName { 1121// t.Errorf("event.pluginName = %s, want %s", event.pluginName, expectedPluginName) 1122// } 1123// case <-time.After(time.Second * 30): 1124// t.Errorf("pluginInvokeEventChan receive timed out") 1125// } 1126// } 1127// } else { 1128// if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil { 1129// t.Errorf("Expected the pod to be scheduled, got an error: %v", err) 1130// } 1131// for i, pl := range reservePlugins { 1132// if pl.numUnreserveCalled != 0 { 1133// t.Errorf("reservePlugins[%d].numUnreserveCalled = %d, want 0", i, pl.numUnreserveCalled) 1134// } 1135// } 1136// } 1137// testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod}) 1138// }) 1139// } 1140// } 1141 1142type pluginInvokeEvent struct { 1143 pluginName string 1144 val int 1145} 1146 1147// TestBindPlugin tests invocation of bind plugins. 1148func TestBindPlugin(t *testing.T) { 1149 testContext := testutils.InitTestAPIServer(t, "bind-plugin", nil) 1150 bindPlugin1 := &BindPlugin{PluginName: "bind-plugin-1", client: testContext.ClientSet} 1151 bindPlugin2 := &BindPlugin{PluginName: "bind-plugin-2", client: testContext.ClientSet} 1152 reservePlugin := &ReservePlugin{name: "mock-reserve-plugin"} 1153 postBindPlugin := &PostBindPlugin{name: "mock-post-bind-plugin"} 1154 // Create a plugin registry for testing. Register reserve, bind, and 1155 // postBind plugins. 1156 registry := frameworkruntime.Registry{ 1157 reservePlugin.Name(): func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { 1158 return reservePlugin, nil 1159 }, 1160 bindPlugin1.Name(): func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { 1161 return bindPlugin1, nil 1162 }, 1163 bindPlugin2.Name(): func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { 1164 return bindPlugin2, nil 1165 }, 1166 postBindPlugin.Name(): func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { 1167 return postBindPlugin, nil 1168 }, 1169 } 1170 1171 // Setup initial unreserve and bind plugins for testing. 1172 cfg := configtesting.V1beta2ToInternalWithDefaults(t, v1beta2.KubeSchedulerConfiguration{ 1173 Profiles: []v1beta2.KubeSchedulerProfile{{ 1174 SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName), 1175 Plugins: &v1beta2.Plugins{ 1176 Reserve: v1beta2.PluginSet{ 1177 Enabled: []v1beta2.Plugin{{Name: reservePlugin.Name()}}, 1178 }, 1179 Bind: v1beta2.PluginSet{ 1180 // Put DefaultBinder last. 1181 Enabled: []v1beta2.Plugin{{Name: bindPlugin1.Name()}, {Name: bindPlugin2.Name()}, {Name: defaultbinder.Name}}, 1182 Disabled: []v1beta2.Plugin{{Name: defaultbinder.Name}}, 1183 }, 1184 PostBind: v1beta2.PluginSet{ 1185 Enabled: []v1beta2.Plugin{{Name: postBindPlugin.Name()}}, 1186 }, 1187 }, 1188 }}, 1189 }) 1190 1191 // Create the scheduler with the test plugin set. 1192 testCtx := testutils.InitTestSchedulerWithOptions(t, testContext, nil, 1193 scheduler.WithProfiles(cfg.Profiles...), 1194 scheduler.WithFrameworkOutOfTreeRegistry(registry)) 1195 testutils.SyncInformerFactory(testCtx) 1196 go testCtx.Scheduler.Run(testCtx.Ctx) 1197 defer testutils.CleanupTest(t, testCtx) 1198 1199 // Add a few nodes. 1200 _, err := createAndWaitForNodesInCache(testCtx, "test-node", st.MakeNode(), 2) 1201 if err != nil { 1202 t.Fatal(err) 1203 } 1204 1205 tests := []struct { 1206 name string 1207 bindPluginStatuses []*framework.Status 1208 expectBoundByScheduler bool // true means this test case expecting scheduler would bind pods 1209 expectBoundByPlugin bool // true means this test case expecting a plugin would bind pods 1210 expectBindPluginName string // expecting plugin name to bind pods 1211 expectInvokeEvents []pluginInvokeEvent 1212 }{ 1213 { 1214 name: "bind plugins skipped to bind the pod and scheduler bond the pod", 1215 bindPluginStatuses: []*framework.Status{framework.NewStatus(framework.Skip, ""), framework.NewStatus(framework.Skip, "")}, 1216 expectBoundByScheduler: true, 1217 expectInvokeEvents: []pluginInvokeEvent{{pluginName: bindPlugin1.Name(), val: 1}, {pluginName: bindPlugin2.Name(), val: 1}, {pluginName: postBindPlugin.Name(), val: 1}}, 1218 }, 1219 { 1220 name: "bindplugin2 succeeded to bind the pod", 1221 bindPluginStatuses: []*framework.Status{framework.NewStatus(framework.Skip, ""), framework.NewStatus(framework.Success, "")}, 1222 expectBoundByPlugin: true, 1223 expectBindPluginName: bindPlugin2.Name(), 1224 expectInvokeEvents: []pluginInvokeEvent{{pluginName: bindPlugin1.Name(), val: 1}, {pluginName: bindPlugin2.Name(), val: 1}, {pluginName: postBindPlugin.Name(), val: 1}}, 1225 }, 1226 { 1227 name: "bindplugin1 succeeded to bind the pod", 1228 bindPluginStatuses: []*framework.Status{framework.NewStatus(framework.Success, ""), framework.NewStatus(framework.Success, "")}, 1229 expectBoundByPlugin: true, 1230 expectBindPluginName: bindPlugin1.Name(), 1231 expectInvokeEvents: []pluginInvokeEvent{{pluginName: bindPlugin1.Name(), val: 1}, {pluginName: postBindPlugin.Name(), val: 1}}, 1232 }, 1233 { 1234 name: "bind plugin fails to bind the pod", 1235 bindPluginStatuses: []*framework.Status{framework.NewStatus(framework.Error, "failed to bind"), framework.NewStatus(framework.Success, "")}, 1236 expectInvokeEvents: []pluginInvokeEvent{{pluginName: bindPlugin1.Name(), val: 1}, {pluginName: reservePlugin.Name(), val: 1}}, 1237 }, 1238 } 1239 1240 var pluginInvokeEventChan chan pluginInvokeEvent 1241 for _, test := range tests { 1242 t.Run(test.name, func(t *testing.T) { 1243 pluginInvokeEventChan = make(chan pluginInvokeEvent, 10) 1244 1245 bindPlugin1.bindStatus = test.bindPluginStatuses[0] 1246 bindPlugin2.bindStatus = test.bindPluginStatuses[1] 1247 1248 bindPlugin1.pluginInvokeEventChan = pluginInvokeEventChan 1249 bindPlugin2.pluginInvokeEventChan = pluginInvokeEventChan 1250 reservePlugin.pluginInvokeEventChan = pluginInvokeEventChan 1251 postBindPlugin.pluginInvokeEventChan = pluginInvokeEventChan 1252 1253 // Create a best effort pod. 1254 pod, err := createPausePod(testCtx.ClientSet, 1255 initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) 1256 if err != nil { 1257 t.Errorf("Error while creating a test pod: %v", err) 1258 } 1259 1260 if test.expectBoundByScheduler || test.expectBoundByPlugin { 1261 // bind plugins skipped to bind the pod 1262 if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil { 1263 t.Fatalf("Expected the pod to be scheduled. error: %v", err) 1264 } 1265 pod, err = testCtx.ClientSet.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}) 1266 if err != nil { 1267 t.Errorf("can't get pod: %v", err) 1268 } 1269 if test.expectBoundByScheduler { 1270 if pod.Annotations[bindPluginAnnotation] != "" { 1271 t.Errorf("Expected the pod to be bound by scheduler instead of by bindplugin %s", pod.Annotations[bindPluginAnnotation]) 1272 } 1273 if bindPlugin1.numBindCalled != 1 || bindPlugin2.numBindCalled != 1 { 1274 t.Errorf("Expected each bind plugin to be called once, was called %d and %d times.", bindPlugin1.numBindCalled, bindPlugin2.numBindCalled) 1275 } 1276 } else { 1277 if pod.Annotations[bindPluginAnnotation] != test.expectBindPluginName { 1278 t.Errorf("Expected the pod to be bound by bindplugin %s instead of by bindplugin %s", test.expectBindPluginName, pod.Annotations[bindPluginAnnotation]) 1279 } 1280 if bindPlugin1.numBindCalled != 1 { 1281 t.Errorf("Expected %s to be called once, was called %d times.", bindPlugin1.Name(), bindPlugin1.numBindCalled) 1282 } 1283 if test.expectBindPluginName == bindPlugin1.Name() && bindPlugin2.numBindCalled > 0 { 1284 // expect bindplugin1 succeeded to bind the pod and bindplugin2 should not be called. 1285 t.Errorf("Expected %s not to be called, was called %d times.", bindPlugin2.Name(), bindPlugin1.numBindCalled) 1286 } 1287 } 1288 if err = wait.Poll(10*time.Millisecond, 30*time.Second, func() (done bool, err error) { 1289 return postBindPlugin.numPostBindCalled == 1, nil 1290 }); err != nil { 1291 t.Errorf("Expected the postbind plugin to be called once, was called %d times.", postBindPlugin.numPostBindCalled) 1292 } 1293 if reservePlugin.numUnreserveCalled != 0 { 1294 t.Errorf("Expected unreserve to not be called, was called %d times.", reservePlugin.numUnreserveCalled) 1295 } 1296 } else { 1297 // bind plugin fails to bind the pod 1298 if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil { 1299 t.Errorf("Expected a scheduling error, but didn't get it. error: %v", err) 1300 } 1301 if postBindPlugin.numPostBindCalled > 0 { 1302 t.Errorf("Didn't expect the postbind plugin to be called %d times.", postBindPlugin.numPostBindCalled) 1303 } 1304 } 1305 for j := range test.expectInvokeEvents { 1306 expectEvent := test.expectInvokeEvents[j] 1307 select { 1308 case event := <-pluginInvokeEventChan: 1309 if event.pluginName != expectEvent.pluginName { 1310 t.Errorf("Expect invoke event %d from plugin %s instead of %s", j, expectEvent.pluginName, event.pluginName) 1311 } 1312 if event.val != expectEvent.val { 1313 t.Errorf("Expect val of invoke event %d to be %d instead of %d", j, expectEvent.val, event.val) 1314 } 1315 case <-time.After(time.Second * 30): 1316 t.Errorf("Waiting for invoke event %d timeout.", j) 1317 } 1318 } 1319 postBindPlugin.reset() 1320 bindPlugin1.reset() 1321 bindPlugin2.reset() 1322 reservePlugin.reset() 1323 testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod}) 1324 }) 1325 } 1326} 1327 1328// TestPostBindPlugin tests invocation of postbind plugins. 1329func TestPostBindPlugin(t *testing.T) { 1330 tests := []struct { 1331 name string 1332 preBindFail bool 1333 }{ 1334 { 1335 name: "plugin preBind fail", 1336 preBindFail: true, 1337 }, 1338 { 1339 name: "plugin preBind do not fail", 1340 preBindFail: false, 1341 }, 1342 } 1343 1344 for _, test := range tests { 1345 t.Run(test.name, func(t *testing.T) { 1346 // Create a plugin registry for testing. Register a prebind and a postbind plugin. 1347 preBindPlugin := &PreBindPlugin{ 1348 failPreBind: test.preBindFail, 1349 } 1350 postBindPlugin := &PostBindPlugin{ 1351 name: postBindPluginName, 1352 pluginInvokeEventChan: make(chan pluginInvokeEvent, 1), 1353 } 1354 registry := frameworkruntime.Registry{ 1355 preBindPluginName: newPlugin(preBindPlugin), 1356 postBindPluginName: newPlugin(postBindPlugin), 1357 } 1358 1359 // Setup initial prebind and postbind plugin for testing. 1360 cfg := configtesting.V1beta2ToInternalWithDefaults(t, v1beta2.KubeSchedulerConfiguration{ 1361 Profiles: []v1beta2.KubeSchedulerProfile{{ 1362 SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName), 1363 Plugins: &v1beta2.Plugins{ 1364 PreBind: v1beta2.PluginSet{ 1365 Enabled: []v1beta2.Plugin{ 1366 { 1367 Name: preBindPluginName, 1368 }, 1369 }, 1370 }, 1371 PostBind: v1beta2.PluginSet{ 1372 Enabled: []v1beta2.Plugin{ 1373 { 1374 Name: postBindPluginName, 1375 }, 1376 }, 1377 }, 1378 }, 1379 }}, 1380 }) 1381 1382 // Create the API server and the scheduler with the test plugin set. 1383 testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "postbind-plugin", nil), 2, 1384 scheduler.WithProfiles(cfg.Profiles...), 1385 scheduler.WithFrameworkOutOfTreeRegistry(registry)) 1386 defer testutils.CleanupTest(t, testCtx) 1387 1388 // Create a best effort pod. 1389 pod, err := createPausePod(testCtx.ClientSet, 1390 initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) 1391 if err != nil { 1392 t.Errorf("Error while creating a test pod: %v", err) 1393 } 1394 1395 if test.preBindFail { 1396 if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil { 1397 t.Errorf("Expected a scheduling error, but didn't get it. error: %v", err) 1398 } 1399 if postBindPlugin.numPostBindCalled > 0 { 1400 t.Errorf("Didn't expect the postbind plugin to be called %d times.", postBindPlugin.numPostBindCalled) 1401 } 1402 } else { 1403 if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil { 1404 t.Errorf("Expected the pod to be scheduled. error: %v", err) 1405 } 1406 select { 1407 case <-postBindPlugin.pluginInvokeEventChan: 1408 case <-time.After(time.Second * 15): 1409 t.Errorf("pluginInvokeEventChan timed out") 1410 } 1411 if postBindPlugin.numPostBindCalled == 0 { 1412 t.Errorf("Expected the postbind plugin to be called, was called %d times.", postBindPlugin.numPostBindCalled) 1413 } 1414 } 1415 1416 testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod}) 1417 }) 1418 } 1419} 1420 1421// TestPermitPlugin tests invocation of permit plugins. 1422func TestPermitPlugin(t *testing.T) { 1423 // Create a plugin registry for testing. Register only a permit plugin. 1424 perPlugin := &PermitPlugin{name: permitPluginName} 1425 registry, prof := initRegistryAndConfig(t, perPlugin) 1426 1427 // Create the API server and the scheduler with the test plugin set. 1428 testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "permit-plugin", nil), 2, 1429 scheduler.WithProfiles(prof), 1430 scheduler.WithFrameworkOutOfTreeRegistry(registry)) 1431 defer testutils.CleanupTest(t, testCtx) 1432 1433 tests := []struct { 1434 name string 1435 fail bool 1436 reject bool 1437 timeout bool 1438 }{ 1439 { 1440 name: "disable fail, reject and timeout flags", 1441 fail: false, 1442 reject: false, 1443 timeout: false, 1444 }, 1445 { 1446 name: "enable fail, disable reject and timeout flags", 1447 fail: true, 1448 reject: false, 1449 timeout: false, 1450 }, 1451 { 1452 name: "disable fail and timeout, enable reject flags", 1453 fail: false, 1454 reject: true, 1455 timeout: false, 1456 }, 1457 { 1458 name: "enable fail and reject, disable timeout flags", 1459 fail: true, 1460 reject: true, 1461 timeout: false, 1462 }, 1463 { 1464 name: "disable fail and reject, disable timeout flags", 1465 fail: false, 1466 reject: false, 1467 timeout: true, 1468 }, 1469 { 1470 name: "disable fail and reject, enable timeout flags", 1471 fail: false, 1472 reject: false, 1473 timeout: true, 1474 }, 1475 } 1476 1477 for _, test := range tests { 1478 t.Run(test.name, func(t *testing.T) { 1479 perPlugin.failPermit = test.fail 1480 perPlugin.rejectPermit = test.reject 1481 perPlugin.timeoutPermit = test.timeout 1482 perPlugin.waitAndRejectPermit = false 1483 perPlugin.waitAndAllowPermit = false 1484 1485 // Create a best effort pod. 1486 pod, err := createPausePod(testCtx.ClientSet, 1487 initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) 1488 if err != nil { 1489 t.Errorf("Error while creating a test pod: %v", err) 1490 } 1491 if test.fail { 1492 if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil { 1493 t.Errorf("Expected a scheduling error, but didn't get it. error: %v", err) 1494 } 1495 } else { 1496 if test.reject || test.timeout { 1497 if err = waitForPodUnschedulable(testCtx.ClientSet, pod); err != nil { 1498 t.Errorf("Didn't expect the pod to be scheduled. error: %v", err) 1499 } 1500 } else { 1501 if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil { 1502 t.Errorf("Expected the pod to be scheduled. error: %v", err) 1503 } 1504 } 1505 } 1506 1507 if perPlugin.numPermitCalled == 0 { 1508 t.Errorf("Expected the permit plugin to be called.") 1509 } 1510 1511 perPlugin.reset() 1512 testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod}) 1513 }) 1514 } 1515} 1516 1517// TestMultiplePermitPlugins tests multiple permit plugins returning wait for a same pod. 1518func TestMultiplePermitPlugins(t *testing.T) { 1519 // Create a plugin registry for testing. 1520 perPlugin1 := &PermitPlugin{name: "permit-plugin-1"} 1521 perPlugin2 := &PermitPlugin{name: "permit-plugin-2"} 1522 registry, prof := initRegistryAndConfig(t, perPlugin1, perPlugin2) 1523 1524 // Create the API server and the scheduler with the test plugin set. 1525 testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "multi-permit-plugin", nil), 2, 1526 scheduler.WithProfiles(prof), 1527 scheduler.WithFrameworkOutOfTreeRegistry(registry)) 1528 defer testutils.CleanupTest(t, testCtx) 1529 1530 // Both permit plugins will return Wait for permitting 1531 perPlugin1.timeoutPermit = true 1532 perPlugin2.timeoutPermit = true 1533 1534 // Create a test pod. 1535 podName := "test-pod" 1536 pod, err := createPausePod(testCtx.ClientSet, 1537 initPausePod(&pausePodConfig{Name: podName, Namespace: testCtx.NS.Name})) 1538 if err != nil { 1539 t.Errorf("Error while creating a test pod: %v", err) 1540 } 1541 1542 var waitingPod framework.WaitingPod 1543 // Wait until the test pod is actually waiting. 1544 wait.Poll(10*time.Millisecond, 30*time.Second, func() (bool, error) { 1545 waitingPod = perPlugin1.fh.GetWaitingPod(pod.UID) 1546 return waitingPod != nil, nil 1547 }) 1548 1549 // Check the number of pending permits 1550 if l := len(waitingPod.GetPendingPlugins()); l != 2 { 1551 t.Errorf("Expected the number of pending plugins is 2, but got %d", l) 1552 } 1553 1554 perPlugin1.allowAllPods() 1555 // Check the number of pending permits 1556 if l := len(waitingPod.GetPendingPlugins()); l != 1 { 1557 t.Errorf("Expected the number of pending plugins is 1, but got %d", l) 1558 } 1559 1560 perPlugin2.allowAllPods() 1561 if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil { 1562 t.Errorf("Expected the pod to be scheduled. error: %v", err) 1563 } 1564 1565 if perPlugin1.numPermitCalled == 0 || perPlugin2.numPermitCalled == 0 { 1566 t.Errorf("Expected the permit plugin to be called.") 1567 } 1568 1569 testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod}) 1570} 1571 1572// TestPermitPluginsCancelled tests whether all permit plugins are cancelled when pod is rejected. 1573func TestPermitPluginsCancelled(t *testing.T) { 1574 // Create a plugin registry for testing. 1575 perPlugin1 := &PermitPlugin{name: "permit-plugin-1"} 1576 perPlugin2 := &PermitPlugin{name: "permit-plugin-2"} 1577 registry, prof := initRegistryAndConfig(t, perPlugin1, perPlugin2) 1578 1579 // Create the API server and the scheduler with the test plugin set. 1580 testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "permit-plugins", nil), 2, 1581 scheduler.WithProfiles(prof), 1582 scheduler.WithFrameworkOutOfTreeRegistry(registry)) 1583 defer testutils.CleanupTest(t, testCtx) 1584 1585 // Both permit plugins will return Wait for permitting 1586 perPlugin1.timeoutPermit = true 1587 perPlugin2.timeoutPermit = true 1588 1589 // Create a test pod. 1590 podName := "test-pod" 1591 pod, err := createPausePod(testCtx.ClientSet, 1592 initPausePod(&pausePodConfig{Name: podName, Namespace: testCtx.NS.Name})) 1593 if err != nil { 1594 t.Errorf("Error while creating a test pod: %v", err) 1595 } 1596 1597 var waitingPod framework.WaitingPod 1598 // Wait until the test pod is actually waiting. 1599 wait.Poll(10*time.Millisecond, 30*time.Second, func() (bool, error) { 1600 waitingPod = perPlugin1.fh.GetWaitingPod(pod.UID) 1601 return waitingPod != nil, nil 1602 }) 1603 1604 perPlugin1.rejectAllPods() 1605 // Wait some time for the permit plugins to be cancelled 1606 err = wait.Poll(10*time.Millisecond, 30*time.Second, func() (bool, error) { 1607 return perPlugin1.cancelled && perPlugin2.cancelled, nil 1608 }) 1609 if err != nil { 1610 t.Errorf("Expected all permit plugins to be cancelled") 1611 } 1612} 1613 1614// TestCoSchedulingWithPermitPlugin tests invocation of permit plugins. 1615func TestCoSchedulingWithPermitPlugin(t *testing.T) { 1616 // Create a plugin registry for testing. Register only a permit plugin. 1617 permitPlugin := &PermitPlugin{name: permitPluginName} 1618 registry, prof := initRegistryAndConfig(t, permitPlugin) 1619 1620 // Create the API server and the scheduler with the test plugin set. 1621 // TODO Make the subtests not share scheduler instances. 1622 testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "permit-plugin", nil), 2, 1623 scheduler.WithProfiles(prof), 1624 scheduler.WithFrameworkOutOfTreeRegistry(registry)) 1625 defer testutils.CleanupTest(t, testCtx) 1626 1627 tests := []struct { 1628 name string 1629 waitReject bool 1630 waitAllow bool 1631 }{ 1632 { 1633 name: "having wait reject true and wait allow false", 1634 waitReject: true, 1635 waitAllow: false, 1636 }, 1637 { 1638 name: "having wait reject false and wait allow true", 1639 waitReject: false, 1640 waitAllow: true, 1641 }, 1642 } 1643 1644 for _, test := range tests { 1645 t.Run(test.name, func(t *testing.T) { 1646 permitPlugin.failPermit = false 1647 permitPlugin.rejectPermit = false 1648 permitPlugin.timeoutPermit = false 1649 permitPlugin.waitAndRejectPermit = test.waitReject 1650 permitPlugin.waitAndAllowPermit = test.waitAllow 1651 1652 // Create two pods. First pod to enter Permit() will wait and a second one will either 1653 // reject or allow first one. 1654 podA, err := createPausePod(testCtx.ClientSet, 1655 initPausePod(&pausePodConfig{Name: "pod-a", Namespace: testCtx.NS.Name})) 1656 if err != nil { 1657 t.Errorf("Error while creating the first pod: %v", err) 1658 } 1659 podB, err := createPausePod(testCtx.ClientSet, 1660 initPausePod(&pausePodConfig{Name: "pod-b", Namespace: testCtx.NS.Name})) 1661 if err != nil { 1662 t.Errorf("Error while creating the second pod: %v", err) 1663 } 1664 1665 if test.waitReject { 1666 if err = waitForPodUnschedulable(testCtx.ClientSet, podA); err != nil { 1667 t.Errorf("Didn't expect the first pod to be scheduled. error: %v", err) 1668 } 1669 if err = waitForPodUnschedulable(testCtx.ClientSet, podB); err != nil { 1670 t.Errorf("Didn't expect the second pod to be scheduled. error: %v", err) 1671 } 1672 if !((permitPlugin.waitingPod == podA.Name && permitPlugin.rejectingPod == podB.Name) || 1673 (permitPlugin.waitingPod == podB.Name && permitPlugin.rejectingPod == podA.Name)) { 1674 t.Errorf("Expect one pod to wait and another pod to reject instead %s waited and %s rejected.", 1675 permitPlugin.waitingPod, permitPlugin.rejectingPod) 1676 } 1677 } else { 1678 if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, podA); err != nil { 1679 t.Errorf("Expected the first pod to be scheduled. error: %v", err) 1680 } 1681 if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, podB); err != nil { 1682 t.Errorf("Expected the second pod to be scheduled. error: %v", err) 1683 } 1684 if !((permitPlugin.waitingPod == podA.Name && permitPlugin.allowingPod == podB.Name) || 1685 (permitPlugin.waitingPod == podB.Name && permitPlugin.allowingPod == podA.Name)) { 1686 t.Errorf("Expect one pod to wait and another pod to allow instead %s waited and %s allowed.", 1687 permitPlugin.waitingPod, permitPlugin.allowingPod) 1688 } 1689 } 1690 1691 if permitPlugin.numPermitCalled == 0 { 1692 t.Errorf("Expected the permit plugin to be called.") 1693 } 1694 1695 permitPlugin.reset() 1696 testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{podA, podB}) 1697 }) 1698 } 1699} 1700 1701// TestFilterPlugin tests invocation of filter plugins. 1702func TestFilterPlugin(t *testing.T) { 1703 // Create a plugin registry for testing. Register only a filter plugin. 1704 filterPlugin := &FilterPlugin{} 1705 registry := frameworkruntime.Registry{filterPluginName: newPlugin(filterPlugin)} 1706 1707 // Setup initial filter plugin for testing. 1708 cfg := configtesting.V1beta2ToInternalWithDefaults(t, v1beta2.KubeSchedulerConfiguration{ 1709 Profiles: []v1beta2.KubeSchedulerProfile{{ 1710 SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName), 1711 Plugins: &v1beta2.Plugins{ 1712 Filter: v1beta2.PluginSet{ 1713 Enabled: []v1beta2.Plugin{ 1714 {Name: filterPluginName}, 1715 }, 1716 }, 1717 }, 1718 }}, 1719 }) 1720 1721 // Create the API server and the scheduler with the test plugin set. 1722 testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "filter-plugin", nil), 1, 1723 scheduler.WithProfiles(cfg.Profiles...), 1724 scheduler.WithFrameworkOutOfTreeRegistry(registry)) 1725 defer testutils.CleanupTest(t, testCtx) 1726 1727 tests := []struct { 1728 name string 1729 fail bool 1730 }{ 1731 { 1732 name: "fail filter plugin", 1733 fail: true, 1734 }, 1735 { 1736 name: "do not fail filter plugin", 1737 fail: false, 1738 }, 1739 } 1740 1741 for _, test := range tests { 1742 t.Run(test.name, func(t *testing.T) { 1743 filterPlugin.failFilter = test.fail 1744 // Create a best effort pod. 1745 pod, err := createPausePod(testCtx.ClientSet, 1746 initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) 1747 if err != nil { 1748 t.Errorf("Error while creating a test pod: %v", err) 1749 } 1750 1751 if test.fail { 1752 if err = wait.Poll(10*time.Millisecond, 30*time.Second, podUnschedulable(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil { 1753 t.Errorf("Didn't expect the pod to be scheduled.") 1754 } 1755 if filterPlugin.numFilterCalled < 1 { 1756 t.Errorf("Expected the filter plugin to be called at least 1 time, but got %v.", filterPlugin.numFilterCalled) 1757 } 1758 } else { 1759 if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil { 1760 t.Errorf("Expected the pod to be scheduled. error: %v", err) 1761 } 1762 if filterPlugin.numFilterCalled != 1 { 1763 t.Errorf("Expected the filter plugin to be called 1 time, but got %v.", filterPlugin.numFilterCalled) 1764 } 1765 } 1766 1767 filterPlugin.reset() 1768 testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod}) 1769 }) 1770 } 1771} 1772 1773// TestPreScorePlugin tests invocation of pre-score plugins. 1774func TestPreScorePlugin(t *testing.T) { 1775 // Create a plugin registry for testing. Register only a pre-score plugin. 1776 preScorePlugin := &PreScorePlugin{} 1777 registry := frameworkruntime.Registry{preScorePluginName: newPlugin(preScorePlugin)} 1778 1779 // Setup initial pre-score plugin for testing. 1780 cfg := configtesting.V1beta2ToInternalWithDefaults(t, v1beta2.KubeSchedulerConfiguration{ 1781 Profiles: []v1beta2.KubeSchedulerProfile{{ 1782 SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName), 1783 Plugins: &v1beta2.Plugins{ 1784 PreScore: v1beta2.PluginSet{ 1785 Enabled: []v1beta2.Plugin{ 1786 {Name: preScorePluginName}, 1787 }, 1788 }, 1789 }, 1790 }}, 1791 }) 1792 1793 // Create the API server and the scheduler with the test plugin set. 1794 testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "pre-score-plugin", nil), 2, 1795 scheduler.WithProfiles(cfg.Profiles...), 1796 scheduler.WithFrameworkOutOfTreeRegistry(registry)) 1797 defer testutils.CleanupTest(t, testCtx) 1798 1799 tests := []struct { 1800 name string 1801 fail bool 1802 }{ 1803 { 1804 name: "fail preScore plugin", 1805 fail: true, 1806 }, 1807 { 1808 name: "do not fail preScore plugin", 1809 fail: false, 1810 }, 1811 } 1812 1813 for _, test := range tests { 1814 t.Run(test.name, func(t *testing.T) { 1815 preScorePlugin.failPreScore = test.fail 1816 // Create a best effort pod. 1817 pod, err := createPausePod(testCtx.ClientSet, 1818 initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name})) 1819 if err != nil { 1820 t.Errorf("Error while creating a test pod: %v", err) 1821 } 1822 1823 if test.fail { 1824 if err = waitForPodUnschedulable(testCtx.ClientSet, pod); err != nil { 1825 t.Errorf("Didn't expect the pod to be scheduled. error: %v", err) 1826 } 1827 } else { 1828 if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil { 1829 t.Errorf("Expected the pod to be scheduled. error: %v", err) 1830 } 1831 } 1832 1833 if preScorePlugin.numPreScoreCalled == 0 { 1834 t.Errorf("Expected the pre-score plugin to be called.") 1835 } 1836 1837 preScorePlugin.reset() 1838 testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod}) 1839 }) 1840 } 1841} 1842 1843// TestPreemptWithPermitPlugin tests preempt with permit plugins. 1844func TestPreemptWithPermitPlugin(t *testing.T) { 1845 // Create a plugin registry for testing. Register only a permit plugin. 1846 permitPlugin := &PermitPlugin{} 1847 registry, prof := initRegistryAndConfig(t, permitPlugin) 1848 1849 // Create the API server and the scheduler with the test plugin set. 1850 testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "preempt-with-permit-plugin", nil), 0, 1851 scheduler.WithProfiles(prof), 1852 scheduler.WithFrameworkOutOfTreeRegistry(registry)) 1853 defer testutils.CleanupTest(t, testCtx) 1854 1855 // Add one node. 1856 nodeRes := map[v1.ResourceName]string{ 1857 v1.ResourcePods: "32", 1858 v1.ResourceCPU: "500m", 1859 v1.ResourceMemory: "500", 1860 } 1861 _, err := createAndWaitForNodesInCache(testCtx, "test-node", st.MakeNode().Capacity(nodeRes), 1) 1862 if err != nil { 1863 t.Fatal(err) 1864 } 1865 1866 permitPlugin.failPermit = false 1867 permitPlugin.rejectPermit = false 1868 permitPlugin.timeoutPermit = false 1869 permitPlugin.waitAndRejectPermit = false 1870 permitPlugin.waitAndAllowPermit = true 1871 permitPlugin.waitingPod = "waiting-pod" 1872 1873 lowPriority, highPriority := int32(100), int32(300) 1874 resourceRequest := v1.ResourceRequirements{Requests: v1.ResourceList{ 1875 v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI), 1876 v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, 1877 } 1878 preemptorResourceRequest := v1.ResourceRequirements{Requests: v1.ResourceList{ 1879 v1.ResourceCPU: *resource.NewMilliQuantity(400, resource.DecimalSI), 1880 v1.ResourceMemory: *resource.NewQuantity(400, resource.DecimalSI)}, 1881 } 1882 1883 // First pod will go running. 1884 runningPod := initPausePod(&pausePodConfig{Name: "running-pod", Namespace: testCtx.NS.Name, Priority: &lowPriority, Resources: &resourceRequest}) 1885 runningPod.Spec.TerminationGracePeriodSeconds = new(int64) 1886 runningPod, err = createPausePod(testCtx.ClientSet, runningPod) 1887 if err != nil { 1888 t.Errorf("Error while creating the waiting pod: %v", err) 1889 } 1890 // Wait until the pod scheduled, then create a preemptor pod to preempt it. 1891 wait.Poll(100*time.Millisecond, 30*time.Second, podScheduled(testCtx.ClientSet, runningPod.Name, runningPod.Namespace)) 1892 1893 // Second pod will go waiting. 1894 waitingPod := initPausePod(&pausePodConfig{Name: "waiting-pod", Namespace: testCtx.NS.Name, Priority: &lowPriority, Resources: &resourceRequest}) 1895 waitingPod.Spec.TerminationGracePeriodSeconds = new(int64) 1896 waitingPod, err = createPausePod(testCtx.ClientSet, waitingPod) 1897 if err != nil { 1898 t.Errorf("Error while creating the waiting pod: %v", err) 1899 } 1900 // Wait until the waiting-pod is actually waiting, then create a preemptor pod to preempt it. 1901 wait.Poll(10*time.Millisecond, 30*time.Second, func() (bool, error) { 1902 w := false 1903 permitPlugin.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { w = true }) 1904 return w, nil 1905 }) 1906 1907 // Create third pod which should preempt other pods. 1908 preemptorPod, err := createPausePod(testCtx.ClientSet, 1909 initPausePod(&pausePodConfig{Name: "preemptor-pod", Namespace: testCtx.NS.Name, Priority: &highPriority, Resources: &preemptorResourceRequest})) 1910 if err != nil { 1911 t.Errorf("Error while creating the preemptor pod: %v", err) 1912 } 1913 1914 // TODO(#96478): uncomment below once we find a way to trigger MoveAllToActiveOrBackoffQueue() 1915 // upon deletion event of unassigned waiting pods. 1916 // if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, preemptorPod); err != nil { 1917 // t.Errorf("Expected the preemptor pod to be scheduled. error: %v", err) 1918 // } 1919 1920 if err := wait.Poll(200*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { 1921 w := false 1922 permitPlugin.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { w = true }) 1923 return !w, nil 1924 }); err != nil { 1925 t.Error("Expected the waiting pod to get preempted") 1926 } 1927 // Expect the waitingPod to be still present. 1928 if _, err := getPod(testCtx.ClientSet, waitingPod.Name, waitingPod.Namespace); err != nil { 1929 t.Error("Get waiting pod in waiting pod failed.") 1930 } 1931 // Expect the runningPod to be deleted physically. 1932 _, err = getPod(testCtx.ClientSet, runningPod.Name, runningPod.Namespace) 1933 if err != nil && !errors.IsNotFound(err) { 1934 t.Error("Get running pod failed.") 1935 } 1936 if err == nil { 1937 t.Error("Running pod still exist.") 1938 } 1939 if permitPlugin.numPermitCalled == 0 { 1940 t.Errorf("Expected the permit plugin to be called.") 1941 } 1942 1943 permitPlugin.reset() 1944 testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{waitingPod, runningPod, preemptorPod}) 1945} 1946 1947const ( 1948 jobPluginName = "job plugin" 1949) 1950 1951var _ framework.PreFilterPlugin = &JobPlugin{} 1952var _ framework.PostBindPlugin = &PostBindPlugin{} 1953 1954type JobPlugin struct { 1955 podLister listersv1.PodLister 1956 podsActivated bool 1957} 1958 1959func (j *JobPlugin) Name() string { 1960 return jobPluginName 1961} 1962 1963func (j *JobPlugin) PreFilter(_ context.Context, _ *framework.CycleState, p *v1.Pod) *framework.Status { 1964 labelSelector := labels.SelectorFromSet(labels.Set{"driver": ""}) 1965 driverPods, err := j.podLister.Pods(p.Namespace).List(labelSelector) 1966 if err != nil { 1967 return framework.AsStatus(err) 1968 } 1969 if len(driverPods) == 0 { 1970 return framework.NewStatus(framework.UnschedulableAndUnresolvable, "unable to find driver pod") 1971 } 1972 return nil 1973} 1974 1975func (j *JobPlugin) PreFilterExtensions() framework.PreFilterExtensions { 1976 return nil 1977} 1978 1979func (j *JobPlugin) PostBind(_ context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) { 1980 if _, ok := p.Labels["driver"]; !ok { 1981 return 1982 } 1983 1984 // If it's a driver pod, move other executor pods proactively to accelerating the scheduling. 1985 labelSelector := labels.SelectorFromSet(labels.Set{"executor": ""}) 1986 podsToActivate, err := j.podLister.Pods(p.Namespace).List(labelSelector) 1987 if err == nil && len(podsToActivate) != 0 { 1988 c, err := state.Read(framework.PodsToActivateKey) 1989 if err == nil { 1990 if s, ok := c.(*framework.PodsToActivate); ok { 1991 s.Lock() 1992 for _, pod := range podsToActivate { 1993 namespacedName := fmt.Sprintf("%v/%v", pod.Namespace, pod.Name) 1994 s.Map[namespacedName] = pod 1995 } 1996 s.Unlock() 1997 j.podsActivated = true 1998 } 1999 } 2000 } 2001} 2002 2003// This test simulates a typical spark job workflow. 2004// - N executor pods are created, but kept pending due to missing the driver pod 2005// - when the driver pod gets created and scheduled, proactively move the executors to activeQ 2006// and thus accelerate the entire job workflow. 2007func TestActivatePods(t *testing.T) { 2008 var jobPlugin *JobPlugin 2009 // Create a plugin registry for testing. Register a Job plugin. 2010 registry := frameworkruntime.Registry{jobPluginName: func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) { 2011 jobPlugin = &JobPlugin{podLister: fh.SharedInformerFactory().Core().V1().Pods().Lister()} 2012 return jobPlugin, nil 2013 }} 2014 2015 // Setup initial filter plugin for testing. 2016 cfg := configtesting.V1beta2ToInternalWithDefaults(t, v1beta2.KubeSchedulerConfiguration{ 2017 Profiles: []v1beta2.KubeSchedulerProfile{{ 2018 SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName), 2019 Plugins: &v1beta2.Plugins{ 2020 PreFilter: v1beta2.PluginSet{ 2021 Enabled: []v1beta2.Plugin{ 2022 {Name: jobPluginName}, 2023 }, 2024 }, 2025 PostBind: v1beta2.PluginSet{ 2026 Enabled: []v1beta2.Plugin{ 2027 {Name: jobPluginName}, 2028 }, 2029 }, 2030 }, 2031 }}, 2032 }) 2033 2034 // Create the API server and the scheduler with the test plugin set. 2035 testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "job-plugin", nil), 1, 2036 scheduler.WithProfiles(cfg.Profiles...), 2037 scheduler.WithFrameworkOutOfTreeRegistry(registry)) 2038 defer testutils.CleanupTest(t, testCtx) 2039 2040 cs := testCtx.ClientSet 2041 ns := testCtx.NS.Name 2042 pause := imageutils.GetPauseImageName() 2043 2044 // Firstly create 2 executor pods. 2045 var pods []*v1.Pod 2046 for i := 1; i <= 2; i++ { 2047 name := fmt.Sprintf("executor-%v", i) 2048 executor := st.MakePod().Name(name).Namespace(ns).Label("executor", "").Container(pause).Obj() 2049 pods = append(pods, executor) 2050 if _, err := cs.CoreV1().Pods(executor.Namespace).Create(context.TODO(), executor, metav1.CreateOptions{}); err != nil { 2051 t.Fatalf("Failed to create pod %v: %v", executor.Name, err) 2052 } 2053 } 2054 2055 // Wait for the 2 executor pods to be unschedulable. 2056 for _, pod := range pods { 2057 if err := waitForPodUnschedulable(cs, pod); err != nil { 2058 t.Errorf("Failed to wait for Pod %v to be unschedulable: %v", pod.Name, err) 2059 } 2060 } 2061 2062 // Create a driver pod. 2063 driver := st.MakePod().Name("driver").Namespace(ns).Label("driver", "").Container(pause).Obj() 2064 pods = append(pods, driver) 2065 if _, err := cs.CoreV1().Pods(driver.Namespace).Create(context.TODO(), driver, metav1.CreateOptions{}); err != nil { 2066 t.Fatalf("Failed to create pod %v: %v", driver.Name, err) 2067 } 2068 2069 // Verify all pods to be scheduled. 2070 for _, pod := range pods { 2071 if err := waitForPodToScheduleWithTimeout(cs, pod, wait.ForeverTestTimeout); err != nil { 2072 t.Fatalf("Failed to wait for Pod %v to be schedulable: %v", pod.Name, err) 2073 } 2074 } 2075 2076 // Lastly verify the pods activation logic is really called. 2077 if jobPlugin.podsActivated == false { 2078 t.Errorf("JobPlugin's pods activation logic is not called") 2079 } 2080} 2081 2082func initTestSchedulerForFrameworkTest(t *testing.T, testCtx *testutils.TestContext, nodeCount int, opts ...scheduler.Option) *testutils.TestContext { 2083 testCtx = testutils.InitTestSchedulerWithOptions(t, testCtx, nil, opts...) 2084 testutils.SyncInformerFactory(testCtx) 2085 go testCtx.Scheduler.Run(testCtx.Ctx) 2086 2087 if nodeCount > 0 { 2088 if _, err := createAndWaitForNodesInCache(testCtx, "test-node", st.MakeNode(), nodeCount); err != nil { 2089 t.Fatal(err) 2090 } 2091 } 2092 return testCtx 2093} 2094 2095// initRegistryAndConfig returns registry and plugins config based on give plugins. 2096// TODO: refactor it to a more generic functions that accepts all kinds of Plugins as arguments 2097func initRegistryAndConfig(t *testing.T, pp ...*PermitPlugin) (frameworkruntime.Registry, schedulerconfig.KubeSchedulerProfile) { 2098 var registry frameworkruntime.Registry 2099 if len(pp) == 0 { 2100 return frameworkruntime.Registry{}, schedulerconfig.KubeSchedulerProfile{} 2101 } 2102 2103 versionedCfg := v1beta2.KubeSchedulerConfiguration{ 2104 Profiles: []v1beta2.KubeSchedulerProfile{{ 2105 SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName), 2106 Plugins: &v1beta2.Plugins{ 2107 Permit: v1beta2.PluginSet{}, 2108 }, 2109 }}, 2110 } 2111 registry = frameworkruntime.Registry{} 2112 for _, p := range pp { 2113 registry.Register(p.Name(), newPermitPlugin(p)) 2114 versionedCfg.Profiles[0].Plugins.Permit.Enabled = append(versionedCfg.Profiles[0].Plugins.Permit.Enabled, v1beta2.Plugin{Name: p.Name()}) 2115 } 2116 cfg := configtesting.V1beta2ToInternalWithDefaults(t, versionedCfg) 2117 return registry, cfg.Profiles[0] 2118} 2119