1/*
2Copyright 2018 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package scheduler
18
19import (
20	"context"
21	"fmt"
22	"sync/atomic"
23	"testing"
24	"time"
25
26	v1 "k8s.io/api/core/v1"
27	"k8s.io/apimachinery/pkg/api/errors"
28	"k8s.io/apimachinery/pkg/api/resource"
29	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30	"k8s.io/apimachinery/pkg/labels"
31	"k8s.io/apimachinery/pkg/runtime"
32	"k8s.io/apimachinery/pkg/util/wait"
33	clientset "k8s.io/client-go/kubernetes"
34	listersv1 "k8s.io/client-go/listers/core/v1"
35	"k8s.io/kube-scheduler/config/v1beta2"
36	"k8s.io/kubernetes/pkg/scheduler"
37	schedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
38	configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing"
39	"k8s.io/kubernetes/pkg/scheduler/framework"
40	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
41	frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
42	st "k8s.io/kubernetes/pkg/scheduler/testing"
43	testutils "k8s.io/kubernetes/test/integration/util"
44	imageutils "k8s.io/kubernetes/test/utils/image"
45	"k8s.io/utils/pointer"
46)
47
48type PreFilterPlugin struct {
49	numPreFilterCalled int
50	failPreFilter      bool
51	rejectPreFilter    bool
52}
53
54type ScorePlugin struct {
55	failScore      bool
56	numScoreCalled int32
57	highScoreNode  string
58}
59
60type ScoreWithNormalizePlugin struct {
61	numScoreCalled          int
62	numNormalizeScoreCalled int
63}
64
65type FilterPlugin struct {
66	numFilterCalled int32
67	failFilter      bool
68	rejectFilter    bool
69}
70
71type PostFilterPlugin struct {
72	fh                  framework.Handle
73	numPostFilterCalled int
74	failPostFilter      bool
75	rejectPostFilter    bool
76}
77
78type ReservePlugin struct {
79	name                  string
80	numReserveCalled      int
81	failReserve           bool
82	numUnreserveCalled    int
83	pluginInvokeEventChan chan pluginInvokeEvent
84}
85
86type PreScorePlugin struct {
87	numPreScoreCalled int
88	failPreScore      bool
89}
90
91type PreBindPlugin struct {
92	numPreBindCalled int
93	failPreBind      bool
94	rejectPreBind    bool
95}
96
97type BindPlugin struct {
98	numBindCalled         int
99	PluginName            string
100	bindStatus            *framework.Status
101	client                clientset.Interface
102	pluginInvokeEventChan chan pluginInvokeEvent
103}
104
105type PostBindPlugin struct {
106	name                  string
107	numPostBindCalled     int
108	pluginInvokeEventChan chan pluginInvokeEvent
109}
110
111type PermitPlugin struct {
112	name                string
113	numPermitCalled     int
114	failPermit          bool
115	rejectPermit        bool
116	timeoutPermit       bool
117	waitAndRejectPermit bool
118	waitAndAllowPermit  bool
119	cancelled           bool
120	waitingPod          string
121	rejectingPod        string
122	allowingPod         string
123	fh                  framework.Handle
124}
125
126const (
127	prefilterPluginName          = "prefilter-plugin"
128	postfilterPluginName         = "postfilter-plugin"
129	scorePluginName              = "score-plugin"
130	scoreWithNormalizePluginName = "score-with-normalize-plugin"
131	filterPluginName             = "filter-plugin"
132	preScorePluginName           = "prescore-plugin"
133	reservePluginName            = "reserve-plugin"
134	preBindPluginName            = "prebind-plugin"
135	postBindPluginName           = "postbind-plugin"
136	permitPluginName             = "permit-plugin"
137)
138
139var _ framework.PreFilterPlugin = &PreFilterPlugin{}
140var _ framework.PostFilterPlugin = &PostFilterPlugin{}
141var _ framework.ScorePlugin = &ScorePlugin{}
142var _ framework.FilterPlugin = &FilterPlugin{}
143var _ framework.ScorePlugin = &ScorePlugin{}
144var _ framework.ScorePlugin = &ScoreWithNormalizePlugin{}
145var _ framework.ReservePlugin = &ReservePlugin{}
146var _ framework.PreScorePlugin = &PreScorePlugin{}
147var _ framework.PreBindPlugin = &PreBindPlugin{}
148var _ framework.BindPlugin = &BindPlugin{}
149var _ framework.PostBindPlugin = &PostBindPlugin{}
150var _ framework.PermitPlugin = &PermitPlugin{}
151
152// newPlugin returns a plugin factory with specified Plugin.
153func newPlugin(plugin framework.Plugin) frameworkruntime.PluginFactory {
154	return func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) {
155		return plugin, nil
156	}
157}
158
159// newPlugin returns a plugin factory with specified Plugin.
160func newPostFilterPlugin(plugin *PostFilterPlugin) frameworkruntime.PluginFactory {
161	return func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) {
162		plugin.fh = fh
163		return plugin, nil
164	}
165}
166
167// Name returns name of the score plugin.
168func (sp *ScorePlugin) Name() string {
169	return scorePluginName
170}
171
172// reset returns name of the score plugin.
173func (sp *ScorePlugin) reset() {
174	sp.failScore = false
175	sp.numScoreCalled = 0
176	sp.highScoreNode = ""
177}
178
179// Score returns the score of scheduling a pod on a specific node.
180func (sp *ScorePlugin) Score(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) {
181	curCalled := atomic.AddInt32(&sp.numScoreCalled, 1)
182	if sp.failScore {
183		return 0, framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", p.Name))
184	}
185
186	score := int64(1)
187	if curCalled == 1 {
188		// The first node is scored the highest, the rest is scored lower.
189		sp.highScoreNode = nodeName
190		score = framework.MaxNodeScore
191	}
192	return score, nil
193}
194
195func (sp *ScorePlugin) ScoreExtensions() framework.ScoreExtensions {
196	return nil
197}
198
199// Name returns name of the score plugin.
200func (sp *ScoreWithNormalizePlugin) Name() string {
201	return scoreWithNormalizePluginName
202}
203
204// reset returns name of the score plugin.
205func (sp *ScoreWithNormalizePlugin) reset() {
206	sp.numScoreCalled = 0
207	sp.numNormalizeScoreCalled = 0
208}
209
210// Score returns the score of scheduling a pod on a specific node.
211func (sp *ScoreWithNormalizePlugin) Score(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) {
212	sp.numScoreCalled++
213	score := int64(10)
214	return score, nil
215}
216
217func (sp *ScoreWithNormalizePlugin) NormalizeScore(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
218	sp.numNormalizeScoreCalled++
219	return nil
220}
221
222func (sp *ScoreWithNormalizePlugin) ScoreExtensions() framework.ScoreExtensions {
223	return sp
224}
225
226// Name returns name of the plugin.
227func (fp *FilterPlugin) Name() string {
228	return filterPluginName
229}
230
231// reset is used to reset filter plugin.
232func (fp *FilterPlugin) reset() {
233	fp.numFilterCalled = 0
234	fp.failFilter = false
235}
236
237// Filter is a test function that returns an error or nil, depending on the
238// value of "failFilter".
239func (fp *FilterPlugin) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
240	atomic.AddInt32(&fp.numFilterCalled, 1)
241
242	if fp.failFilter {
243		return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name))
244	}
245	if fp.rejectFilter {
246		return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name))
247	}
248
249	return nil
250}
251
252// Name returns name of the plugin.
253func (rp *ReservePlugin) Name() string {
254	return rp.name
255}
256
257// Reserve is a test function that increments an intenral counter and returns
258// an error or nil, depending on the value of "failReserve".
259func (rp *ReservePlugin) Reserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
260	rp.numReserveCalled++
261	if rp.failReserve {
262		return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name))
263	}
264	return nil
265}
266
267// Unreserve is a test function that increments an internal counter and emits
268// an event to a channel. While Unreserve implementations should normally be
269// idempotent, we relax that requirement here for testing purposes.
270func (rp *ReservePlugin) Unreserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) {
271	rp.numUnreserveCalled++
272	if rp.pluginInvokeEventChan != nil {
273		rp.pluginInvokeEventChan <- pluginInvokeEvent{pluginName: rp.Name(), val: rp.numUnreserveCalled}
274	}
275}
276
277// reset used to reset internal counters.
278func (rp *ReservePlugin) reset() {
279	rp.numReserveCalled = 0
280	rp.numUnreserveCalled = 0
281	rp.failReserve = false
282}
283
284// Name returns name of the plugin.
285func (*PreScorePlugin) Name() string {
286	return preScorePluginName
287}
288
289// PreScore is a test function.
290func (pfp *PreScorePlugin) PreScore(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, _ []*v1.Node) *framework.Status {
291	pfp.numPreScoreCalled++
292	if pfp.failPreScore {
293		return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name))
294	}
295
296	return nil
297}
298
299// reset used to reset prescore plugin.
300func (pfp *PreScorePlugin) reset() {
301	pfp.numPreScoreCalled = 0
302	pfp.failPreScore = false
303}
304
305// Name returns name of the plugin.
306func (pp *PreBindPlugin) Name() string {
307	return preBindPluginName
308}
309
310// PreBind is a test function that returns (true, nil) or errors for testing.
311func (pp *PreBindPlugin) PreBind(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
312	pp.numPreBindCalled++
313	if pp.failPreBind {
314		return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name))
315	}
316	if pp.rejectPreBind {
317		return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name))
318	}
319	return nil
320}
321
322// reset used to reset prebind plugin.
323func (pp *PreBindPlugin) reset() {
324	pp.numPreBindCalled = 0
325	pp.failPreBind = false
326	pp.rejectPreBind = false
327}
328
329const bindPluginAnnotation = "bindPluginName"
330
331func (bp *BindPlugin) Name() string {
332	return bp.PluginName
333}
334
335func (bp *BindPlugin) Bind(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status {
336	bp.numBindCalled++
337	if bp.pluginInvokeEventChan != nil {
338		bp.pluginInvokeEventChan <- pluginInvokeEvent{pluginName: bp.Name(), val: bp.numBindCalled}
339	}
340	if bp.bindStatus.IsSuccess() {
341		if err := bp.client.CoreV1().Pods(p.Namespace).Bind(context.TODO(), &v1.Binding{
342			ObjectMeta: metav1.ObjectMeta{Namespace: p.Namespace, Name: p.Name, UID: p.UID, Annotations: map[string]string{bindPluginAnnotation: bp.Name()}},
343			Target: v1.ObjectReference{
344				Kind: "Node",
345				Name: nodeName,
346			},
347		}, metav1.CreateOptions{}); err != nil {
348			return framework.NewStatus(framework.Error, fmt.Sprintf("bind failed: %v", err))
349		}
350	}
351	return bp.bindStatus
352}
353
354// reset used to reset numBindCalled.
355func (bp *BindPlugin) reset() {
356	bp.numBindCalled = 0
357}
358
359// Name returns name of the plugin.
360func (pp *PostBindPlugin) Name() string {
361	return pp.name
362}
363
364// PostBind is a test function, which counts the number of times called.
365func (pp *PostBindPlugin) PostBind(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) {
366	pp.numPostBindCalled++
367	if pp.pluginInvokeEventChan != nil {
368		pp.pluginInvokeEventChan <- pluginInvokeEvent{pluginName: pp.Name(), val: pp.numPostBindCalled}
369	}
370}
371
372// reset used to reset postbind plugin.
373func (pp *PostBindPlugin) reset() {
374	pp.numPostBindCalled = 0
375}
376
377// Name returns name of the plugin.
378func (pp *PreFilterPlugin) Name() string {
379	return prefilterPluginName
380}
381
382// Extensions returns the PreFilterExtensions interface.
383func (pp *PreFilterPlugin) PreFilterExtensions() framework.PreFilterExtensions {
384	return nil
385}
386
387// PreFilter is a test function that returns (true, nil) or errors for testing.
388func (pp *PreFilterPlugin) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) *framework.Status {
389	pp.numPreFilterCalled++
390	if pp.failPreFilter {
391		return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name))
392	}
393	if pp.rejectPreFilter {
394		return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name))
395	}
396	return nil
397}
398
399// reset used to reset prefilter plugin.
400func (pp *PreFilterPlugin) reset() {
401	pp.numPreFilterCalled = 0
402	pp.failPreFilter = false
403	pp.rejectPreFilter = false
404}
405
406// Name returns name of the plugin.
407func (pp *PostFilterPlugin) Name() string {
408	return postfilterPluginName
409}
410
411func (pp *PostFilterPlugin) PostFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, _ framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
412	pp.numPostFilterCalled++
413	nodeInfos, err := pp.fh.SnapshotSharedLister().NodeInfos().List()
414	if err != nil {
415		return nil, framework.NewStatus(framework.Error, err.Error())
416	}
417
418	for _, nodeInfo := range nodeInfos {
419		pp.fh.RunFilterPlugins(ctx, state, pod, nodeInfo)
420	}
421	var nodes []*v1.Node
422	for _, nodeInfo := range nodeInfos {
423		nodes = append(nodes, nodeInfo.Node())
424	}
425	pp.fh.RunScorePlugins(ctx, state, pod, nodes)
426
427	if pp.failPostFilter {
428		return nil, framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name))
429	}
430	if pp.rejectPostFilter {
431		return nil, framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name))
432	}
433	return nil, framework.NewStatus(framework.Success, fmt.Sprintf("make room for pod %v to be schedulable", pod.Name))
434}
435
436// Name returns name of the plugin.
437func (pp *PermitPlugin) Name() string {
438	return pp.name
439}
440
441// Permit implements the permit test plugin.
442func (pp *PermitPlugin) Permit(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (*framework.Status, time.Duration) {
443	pp.numPermitCalled++
444	if pp.failPermit {
445		return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name)), 0
446	}
447	if pp.rejectPermit {
448		return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name)), 0
449	}
450	if pp.timeoutPermit {
451		go func() {
452			select {
453			case <-ctx.Done():
454				pp.cancelled = true
455			}
456		}()
457		return framework.NewStatus(framework.Wait, ""), 3 * time.Second
458	}
459	if pp.waitAndRejectPermit || pp.waitAndAllowPermit {
460		if pp.waitingPod == "" || pp.waitingPod == pod.Name {
461			pp.waitingPod = pod.Name
462			return framework.NewStatus(framework.Wait, ""), 30 * time.Second
463		}
464		if pp.waitAndRejectPermit {
465			pp.rejectingPod = pod.Name
466			pp.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) {
467				wp.Reject(pp.name, fmt.Sprintf("reject pod %v", wp.GetPod().Name))
468			})
469			return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name)), 0
470		}
471		if pp.waitAndAllowPermit {
472			pp.allowingPod = pod.Name
473			pp.allowAllPods()
474			return nil, 0
475		}
476	}
477	return nil, 0
478}
479
480// allowAllPods allows all waiting pods.
481func (pp *PermitPlugin) allowAllPods() {
482	pp.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { wp.Allow(pp.name) })
483}
484
485// rejectAllPods rejects all waiting pods.
486func (pp *PermitPlugin) rejectAllPods() {
487	pp.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { wp.Reject(pp.name, "rejectAllPods") })
488}
489
490// reset used to reset permit plugin.
491func (pp *PermitPlugin) reset() {
492	pp.numPermitCalled = 0
493	pp.failPermit = false
494	pp.rejectPermit = false
495	pp.timeoutPermit = false
496	pp.waitAndRejectPermit = false
497	pp.waitAndAllowPermit = false
498	pp.cancelled = false
499	pp.waitingPod = ""
500	pp.allowingPod = ""
501	pp.rejectingPod = ""
502}
503
504// newPermitPlugin returns a factory for permit plugin with specified PermitPlugin.
505func newPermitPlugin(permitPlugin *PermitPlugin) frameworkruntime.PluginFactory {
506	return func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) {
507		permitPlugin.fh = fh
508		return permitPlugin, nil
509	}
510}
511
512// TestPreFilterPlugin tests invocation of prefilter plugins.
513func TestPreFilterPlugin(t *testing.T) {
514	// Create a plugin registry for testing. Register only a pre-filter plugin.
515	preFilterPlugin := &PreFilterPlugin{}
516	registry := frameworkruntime.Registry{prefilterPluginName: newPlugin(preFilterPlugin)}
517
518	// Setup initial prefilter plugin for testing.
519	cfg := configtesting.V1beta2ToInternalWithDefaults(t, v1beta2.KubeSchedulerConfiguration{
520		Profiles: []v1beta2.KubeSchedulerProfile{{
521			SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName),
522			Plugins: &v1beta2.Plugins{
523				PreFilter: v1beta2.PluginSet{
524					Enabled: []v1beta2.Plugin{
525						{Name: prefilterPluginName},
526					},
527				},
528			},
529		}},
530	})
531
532	// Create the API server and the scheduler with the test plugin set.
533	testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "prefilter-plugin", nil), 2,
534		scheduler.WithProfiles(cfg.Profiles...),
535		scheduler.WithFrameworkOutOfTreeRegistry(registry))
536	defer testutils.CleanupTest(t, testCtx)
537
538	tests := []struct {
539		name   string
540		fail   bool
541		reject bool
542	}{
543		{
544			name:   "disable fail and reject flags",
545			fail:   false,
546			reject: false,
547		},
548		{
549			name:   "enable fail and disable reject flags",
550			fail:   true,
551			reject: false,
552		},
553		{
554			name:   "disable fail and enable reject flags",
555			fail:   false,
556			reject: true,
557		},
558	}
559
560	for _, test := range tests {
561		t.Run(test.name, func(t *testing.T) {
562			preFilterPlugin.failPreFilter = test.fail
563			preFilterPlugin.rejectPreFilter = test.reject
564			// Create a best effort pod.
565			pod, err := createPausePod(testCtx.ClientSet,
566				initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name}))
567			if err != nil {
568				t.Errorf("Error while creating a test pod: %v", err)
569			}
570
571			if test.reject || test.fail {
572				if err = waitForPodUnschedulable(testCtx.ClientSet, pod); err != nil {
573					t.Errorf("Didn't expect the pod to be scheduled. error: %v", err)
574				}
575			} else {
576				if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil {
577					t.Errorf("Expected the pod to be scheduled. error: %v", err)
578				}
579			}
580
581			if preFilterPlugin.numPreFilterCalled == 0 {
582				t.Errorf("Expected the prefilter plugin to be called.")
583			}
584
585			preFilterPlugin.reset()
586			testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod})
587		})
588	}
589}
590
591// TestPostFilterPlugin tests invocation of postfilter plugins.
592func TestPostFilterPlugin(t *testing.T) {
593	var numNodes int32 = 1
594	tests := []struct {
595		name                      string
596		numNodes                  int32
597		rejectFilter              bool
598		failScore                 bool
599		rejectPostFilter          bool
600		expectFilterNumCalled     int32
601		expectScoreNumCalled      int32
602		expectPostFilterNumCalled int
603	}{
604		{
605			name:                      "Filter passed and Score success",
606			numNodes:                  30,
607			rejectFilter:              false,
608			failScore:                 false,
609			rejectPostFilter:          false,
610			expectFilterNumCalled:     30,
611			expectScoreNumCalled:      30,
612			expectPostFilterNumCalled: 0,
613		},
614		{
615			name:                      "Filter failed and PostFilter passed",
616			numNodes:                  numNodes,
617			rejectFilter:              true,
618			failScore:                 false,
619			rejectPostFilter:          false,
620			expectFilterNumCalled:     numNodes * 2,
621			expectScoreNumCalled:      1,
622			expectPostFilterNumCalled: 1,
623		},
624		{
625			name:                      "Filter failed and PostFilter failed",
626			numNodes:                  numNodes,
627			rejectFilter:              true,
628			failScore:                 false,
629			rejectPostFilter:          true,
630			expectFilterNumCalled:     numNodes * 2,
631			expectScoreNumCalled:      1,
632			expectPostFilterNumCalled: 1,
633		},
634		{
635			name:                      "Score failed and PostFilter failed",
636			numNodes:                  numNodes,
637			rejectFilter:              true,
638			failScore:                 true,
639			rejectPostFilter:          true,
640			expectFilterNumCalled:     numNodes * 2,
641			expectScoreNumCalled:      1,
642			expectPostFilterNumCalled: 1,
643		},
644	}
645
646	for i, tt := range tests {
647		t.Run(tt.name, func(t *testing.T) {
648			// Create a plugin registry for testing. Register a combination of filter and postFilter plugin.
649			var (
650				filterPlugin     = &FilterPlugin{}
651				scorePlugin      = &ScorePlugin{}
652				postFilterPlugin = &PostFilterPlugin{}
653			)
654			filterPlugin.rejectFilter = tt.rejectFilter
655			scorePlugin.failScore = tt.failScore
656			postFilterPlugin.rejectPostFilter = tt.rejectPostFilter
657			registry := frameworkruntime.Registry{
658				filterPluginName:     newPlugin(filterPlugin),
659				scorePluginName:      newPlugin(scorePlugin),
660				postfilterPluginName: newPostFilterPlugin(postFilterPlugin),
661			}
662
663			// Setup plugins for testing.
664			cfg := configtesting.V1beta2ToInternalWithDefaults(t, v1beta2.KubeSchedulerConfiguration{
665				Profiles: []v1beta2.KubeSchedulerProfile{{
666					SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName),
667					Plugins: &v1beta2.Plugins{
668						Filter: v1beta2.PluginSet{
669							Enabled: []v1beta2.Plugin{
670								{Name: filterPluginName},
671							},
672						},
673						Score: v1beta2.PluginSet{
674							Enabled: []v1beta2.Plugin{
675								{Name: scorePluginName},
676							},
677							// disable default in-tree Score plugins
678							// to make it easy to control configured ScorePlugins failure
679							Disabled: []v1beta2.Plugin{
680								{Name: "*"},
681							},
682						},
683						PostFilter: v1beta2.PluginSet{
684							Enabled: []v1beta2.Plugin{
685								{Name: postfilterPluginName},
686							},
687							// Need to disable default in-tree PostFilter plugins, as they will
688							// call RunFilterPlugins and hence impact the "numFilterCalled".
689							Disabled: []v1beta2.Plugin{
690								{Name: "*"},
691							},
692						},
693					},
694				}}})
695
696			// Create the API server and the scheduler with the test plugin set.
697			testCtx := initTestSchedulerForFrameworkTest(
698				t,
699				testutils.InitTestAPIServer(t, fmt.Sprintf("postfilter%v-", i), nil),
700				int(tt.numNodes),
701				scheduler.WithProfiles(cfg.Profiles...),
702				scheduler.WithFrameworkOutOfTreeRegistry(registry),
703			)
704			defer testutils.CleanupTest(t, testCtx)
705
706			// Create a best effort pod.
707			pod, err := createPausePod(testCtx.ClientSet, initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name}))
708			if err != nil {
709				t.Errorf("Error while creating a test pod: %v", err)
710			}
711
712			if tt.rejectFilter {
713				if err = wait.Poll(10*time.Millisecond, 10*time.Second, podUnschedulable(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil {
714					t.Errorf("Didn't expect the pod to be scheduled.")
715				}
716
717				if numFilterCalled := atomic.LoadInt32(&filterPlugin.numFilterCalled); numFilterCalled < tt.expectFilterNumCalled {
718					t.Errorf("Expected the filter plugin to be called at least %v times, but got %v.", tt.expectFilterNumCalled, numFilterCalled)
719				}
720				if numScoreCalled := atomic.LoadInt32(&scorePlugin.numScoreCalled); numScoreCalled < tt.expectScoreNumCalled {
721					t.Errorf("Expected the score plugin to be called at least %v times, but got %v.", tt.expectScoreNumCalled, numScoreCalled)
722				}
723				if postFilterPlugin.numPostFilterCalled < tt.expectPostFilterNumCalled {
724					t.Errorf("Expected the postfilter plugin to be called at least %v times, but got %v.", tt.expectPostFilterNumCalled, postFilterPlugin.numPostFilterCalled)
725				}
726			} else {
727				if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil {
728					t.Errorf("Expected the pod to be scheduled. error: %v", err)
729				}
730				if numFilterCalled := atomic.LoadInt32(&filterPlugin.numFilterCalled); numFilterCalled != tt.expectFilterNumCalled {
731					t.Errorf("Expected the filter plugin to be called %v times, but got %v.", tt.expectFilterNumCalled, numFilterCalled)
732				}
733				if numScoreCalled := atomic.LoadInt32(&scorePlugin.numScoreCalled); numScoreCalled != tt.expectScoreNumCalled {
734					t.Errorf("Expected the score plugin to be called %v times, but got %v.", tt.expectScoreNumCalled, numScoreCalled)
735				}
736				if postFilterPlugin.numPostFilterCalled != tt.expectPostFilterNumCalled {
737					t.Errorf("Expected the postfilter plugin to be called %v times, but got %v.", tt.expectPostFilterNumCalled, postFilterPlugin.numPostFilterCalled)
738				}
739			}
740		})
741	}
742}
743
744// TestScorePlugin tests invocation of score plugins.
745func TestScorePlugin(t *testing.T) {
746	// Create a plugin registry for testing. Register only a score plugin.
747	scorePlugin := &ScorePlugin{}
748	registry := frameworkruntime.Registry{
749		scorePluginName: newPlugin(scorePlugin),
750	}
751
752	cfg := configtesting.V1beta2ToInternalWithDefaults(t, v1beta2.KubeSchedulerConfiguration{
753		Profiles: []v1beta2.KubeSchedulerProfile{{
754			SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName),
755			Plugins: &v1beta2.Plugins{
756				Score: v1beta2.PluginSet{
757					Enabled: []v1beta2.Plugin{
758						{Name: scorePluginName},
759					},
760				},
761			},
762		}},
763	})
764
765	testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "score-plugin", nil), 10,
766		scheduler.WithProfiles(cfg.Profiles...),
767		scheduler.WithFrameworkOutOfTreeRegistry(registry))
768	defer testutils.CleanupTest(t, testCtx)
769
770	tests := []struct {
771		name string
772		fail bool
773	}{
774		{
775			name: "fail score plugin",
776			fail: true,
777		},
778		{
779			name: "do not fail score plugin",
780			fail: false,
781		},
782	}
783
784	for _, test := range tests {
785		t.Run(test.name, func(t *testing.T) {
786			scorePlugin.failScore = test.fail
787			// Create a best effort pod.
788			pod, err := createPausePod(testCtx.ClientSet,
789				initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name}))
790			if err != nil {
791				t.Fatalf("Error while creating a test pod: %v", err)
792			}
793
794			if test.fail {
795				if err = waitForPodUnschedulable(testCtx.ClientSet, pod); err != nil {
796					t.Errorf("Didn't expect the pod to be scheduled. error: %v", err)
797				}
798			} else {
799				if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil {
800					t.Errorf("Expected the pod to be scheduled. error: %v", err)
801				} else {
802					p, err := getPod(testCtx.ClientSet, pod.Name, pod.Namespace)
803					if err != nil {
804						t.Errorf("Failed to retrieve the pod. error: %v", err)
805					} else if p.Spec.NodeName != scorePlugin.highScoreNode {
806						t.Errorf("Expected the pod to be scheduled on node %q, got %q", scorePlugin.highScoreNode, p.Spec.NodeName)
807					}
808				}
809			}
810
811			if numScoreCalled := atomic.LoadInt32(&scorePlugin.numScoreCalled); numScoreCalled == 0 {
812				t.Errorf("Expected the score plugin to be called.")
813			}
814
815			scorePlugin.reset()
816			testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod})
817		})
818	}
819}
820
821// TestNormalizeScorePlugin tests invocation of normalize score plugins.
822func TestNormalizeScorePlugin(t *testing.T) {
823	// Create a plugin registry for testing. Register only a normalize score plugin.
824	scoreWithNormalizePlugin := &ScoreWithNormalizePlugin{}
825	registry := frameworkruntime.Registry{
826		scoreWithNormalizePluginName: newPlugin(scoreWithNormalizePlugin),
827	}
828
829	// Setup initial score plugin for testing.
830	cfg := configtesting.V1beta2ToInternalWithDefaults(t, v1beta2.KubeSchedulerConfiguration{
831		Profiles: []v1beta2.KubeSchedulerProfile{{
832			SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName),
833			Plugins: &v1beta2.Plugins{
834				Score: v1beta2.PluginSet{
835					Enabled: []v1beta2.Plugin{
836						{Name: scoreWithNormalizePluginName},
837					},
838				},
839			},
840		}},
841	})
842
843	testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "score-plugin", nil), 10,
844		scheduler.WithProfiles(cfg.Profiles...),
845		scheduler.WithFrameworkOutOfTreeRegistry(registry))
846
847	defer testutils.CleanupTest(t, testCtx)
848
849	// Create a best effort pod.
850	pod, err := createPausePod(testCtx.ClientSet,
851		initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name}))
852	if err != nil {
853		t.Fatalf("Error while creating a test pod: %v", err)
854	}
855
856	if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil {
857		t.Errorf("Expected the pod to be scheduled. error: %v", err)
858	}
859
860	if scoreWithNormalizePlugin.numScoreCalled == 0 {
861		t.Errorf("Expected the score plugin to be called.")
862	}
863	if scoreWithNormalizePlugin.numNormalizeScoreCalled == 0 {
864		t.Error("Expected the normalize score plugin to be called")
865	}
866
867	scoreWithNormalizePlugin.reset()
868}
869
870// TestReservePlugin tests invocation of reserve plugins.
871func TestReservePluginReserve(t *testing.T) {
872	// Create a plugin registry for testing. Register only a reserve plugin.
873	reservePlugin := &ReservePlugin{}
874	registry := frameworkruntime.Registry{reservePluginName: newPlugin(reservePlugin)}
875
876	// Setup initial reserve plugin for testing.
877	cfg := configtesting.V1beta2ToInternalWithDefaults(t, v1beta2.KubeSchedulerConfiguration{
878		Profiles: []v1beta2.KubeSchedulerProfile{{
879			SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName),
880			Plugins: &v1beta2.Plugins{
881				Reserve: v1beta2.PluginSet{
882					Enabled: []v1beta2.Plugin{
883						{Name: reservePluginName},
884					},
885				},
886			},
887		}},
888	})
889
890	// Create the API server and the scheduler with the test plugin set.
891	testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "reserve-plugin-reserve", nil), 2,
892		scheduler.WithProfiles(cfg.Profiles...),
893		scheduler.WithFrameworkOutOfTreeRegistry(registry))
894	defer testutils.CleanupTest(t, testCtx)
895
896	tests := []struct {
897		name string
898		fail bool
899	}{
900		{
901			name: "fail reserve plugin",
902			fail: true,
903		},
904		{
905			name: "do not fail reserve plugin",
906			fail: false,
907		},
908	}
909
910	for _, test := range tests {
911		t.Run(test.name, func(t *testing.T) {
912			reservePlugin.failReserve = test.fail
913			// Create a best effort pod.
914			pod, err := createPausePod(testCtx.ClientSet,
915				initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name}))
916			if err != nil {
917				t.Errorf("Error while creating a test pod: %v", err)
918			}
919
920			if test.fail {
921				if err = wait.Poll(10*time.Millisecond, 30*time.Second,
922					podSchedulingError(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil {
923					t.Errorf("Didn't expect the pod to be scheduled. error: %v", err)
924				}
925			} else {
926				if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil {
927					t.Errorf("Expected the pod to be scheduled. error: %v", err)
928				}
929			}
930
931			if reservePlugin.numReserveCalled == 0 {
932				t.Errorf("Expected the reserve plugin to be called.")
933			}
934
935			reservePlugin.reset()
936			testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod})
937		})
938	}
939}
940
941// TestPrebindPlugin tests invocation of prebind plugins.
942func TestPrebindPlugin(t *testing.T) {
943	// Create a plugin registry for testing. Register only a prebind plugin.
944	preBindPlugin := &PreBindPlugin{}
945	registry := frameworkruntime.Registry{preBindPluginName: newPlugin(preBindPlugin)}
946
947	// Setup initial prebind plugin for testing.
948	cfg := configtesting.V1beta2ToInternalWithDefaults(t, v1beta2.KubeSchedulerConfiguration{
949		Profiles: []v1beta2.KubeSchedulerProfile{{
950			SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName),
951			Plugins: &v1beta2.Plugins{
952				PreBind: v1beta2.PluginSet{
953					Enabled: []v1beta2.Plugin{
954						{Name: preBindPluginName},
955					},
956				},
957			},
958		}},
959	})
960
961	// Create the API server and the scheduler with the test plugin set.
962	testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "prebind-plugin", nil), 2,
963		scheduler.WithProfiles(cfg.Profiles...),
964		scheduler.WithFrameworkOutOfTreeRegistry(registry))
965	defer testutils.CleanupTest(t, testCtx)
966
967	tests := []struct {
968		name   string
969		fail   bool
970		reject bool
971	}{
972		{
973			name:   "disable fail and reject flags",
974			fail:   false,
975			reject: false,
976		},
977		{
978			name:   "enable fail and disable reject flags",
979			fail:   true,
980			reject: false,
981		},
982		{
983			name:   "disable fail and enable reject flags",
984			fail:   false,
985			reject: true,
986		},
987		{
988			name:   "enable fail and reject flags",
989			fail:   true,
990			reject: true,
991		},
992	}
993
994	for _, test := range tests {
995		t.Run(test.name, func(t *testing.T) {
996			preBindPlugin.failPreBind = test.fail
997			preBindPlugin.rejectPreBind = test.reject
998			// Create a best effort pod.
999			pod, err := createPausePod(testCtx.ClientSet,
1000				initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name}))
1001			if err != nil {
1002				t.Errorf("Error while creating a test pod: %v", err)
1003			}
1004
1005			if test.fail || test.reject {
1006				if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil {
1007					t.Errorf("Expected a scheduling error, but didn't get it. error: %v", err)
1008				}
1009			} else if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil {
1010				t.Errorf("Expected the pod to be scheduled. error: %v", err)
1011			}
1012
1013			if preBindPlugin.numPreBindCalled == 0 {
1014				t.Errorf("Expected the prebind plugin to be called.")
1015			}
1016
1017			preBindPlugin.reset()
1018			testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod})
1019		})
1020	}
1021}
1022
1023// // TestUnreserveReservePlugin tests invocation of the Unreserve operation in
1024// // reserve plugins through failures in execution points such as pre-bind. Also
1025// // tests that the order of invocation of Unreserve operation is executed in the
1026// // reverse order of invocation of the Reserve operation.
1027// func TestReservePluginUnreserve(t *testing.T) {
1028// 	tests := []struct {
1029// 		name             string
1030// 		failReserve      bool
1031// 		failReserveIndex int
1032// 		failPreBind      bool
1033// 	}{
1034// 		{
1035// 			name:             "fail reserve",
1036// 			failReserve:      true,
1037// 			failReserveIndex: 1,
1038// 		},
1039// 		{
1040// 			name:        "fail preBind",
1041// 			failPreBind: true,
1042// 		},
1043// 		{
1044// 			name: "pass everything",
1045// 		},
1046// 	}
1047
1048// 	for _, test := range tests {
1049// 		t.Run(test.name, func(t *testing.T) {
1050// 			numReservePlugins := 3
1051// 			pluginInvokeEventChan := make(chan pluginInvokeEvent, numReservePlugins)
1052
1053// 			preBindPlugin := &PreBindPlugin{
1054// 				failPreBind: true,
1055// 			}
1056// 			var reservePlugins []*ReservePlugin
1057// 			for i := 0; i < numReservePlugins; i++ {
1058// 				reservePlugins = append(reservePlugins, &ReservePlugin{
1059// 					name:                  fmt.Sprintf("%s-%d", reservePluginName, i),
1060// 					pluginInvokeEventChan: pluginInvokeEventChan,
1061// 				})
1062// 			}
1063
1064// 			registry := frameworkruntime.Registry{
1065// 				// TODO(#92229): test more failure points that would trigger Unreserve in
1066// 				// reserve plugins than just one pre-bind plugin.
1067// 				preBindPluginName: newPlugin(preBindPlugin),
1068// 			}
1069// 			for _, pl := range reservePlugins {
1070// 				registry[pl.Name()] = newPlugin(pl)
1071// 			}
1072
1073// 			// Setup initial reserve and prebind plugin for testing.
1074// 			prof := schedulerconfig.KubeSchedulerProfile{
1075// 				SchedulerName: v1.DefaultSchedulerName,
1076// 				Plugins: &schedulerconfig.Plugins{
1077// 					Reserve: schedulerconfig.PluginSet{
1078// 						// filled by looping over reservePlugins
1079// 					},
1080// 					PreBind: schedulerconfig.PluginSet{
1081// 						Enabled: []schedulerconfig.Plugin{
1082// 							{
1083// 								Name: preBindPluginName,
1084// 							},
1085// 						},
1086// 					},
1087// 				},
1088// 			}
1089// 			for _, pl := range reservePlugins {
1090// 				prof.Plugins.Reserve.Enabled = append(prof.Plugins.Reserve.Enabled, schedulerconfig.Plugin{
1091// 					Name: pl.Name(),
1092// 				})
1093// 			}
1094
1095// 			// Create the master and the scheduler with the test plugin set.
1096// 			testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestMaster(t, "reserve-plugin-unreserve", nil), 2,
1097// 				scheduler.WithProfiles(prof),
1098// 				scheduler.WithFrameworkOutOfTreeRegistry(registry))
1099// 			defer testutils.CleanupTest(t, testCtx)
1100
1101// 			preBindPlugin.failPreBind = test.failPreBind
1102// 			if test.failReserve {
1103// 				reservePlugins[test.failReserveIndex].failReserve = true
1104// 			}
1105// 			// Create a best effort pod.
1106// 			pod, err := createPausePod(testCtx.ClientSet,
1107// 				initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name}))
1108// 			if err != nil {
1109// 				t.Errorf("Error while creating a test pod: %v", err)
1110// 			}
1111
1112// 			if test.failPreBind || test.failReserve {
1113// 				if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil {
1114// 					t.Errorf("Expected a scheduling error, but didn't get it: %v", err)
1115// 				}
1116// 				for i := numReservePlugins - 1; i >= 0; i-- {
1117// 					select {
1118// 					case event := <-pluginInvokeEventChan:
1119// 						expectedPluginName := reservePlugins[i].Name()
1120// 						if expectedPluginName != event.pluginName {
1121// 							t.Errorf("event.pluginName = %s, want %s", event.pluginName, expectedPluginName)
1122// 						}
1123// 					case <-time.After(time.Second * 30):
1124// 						t.Errorf("pluginInvokeEventChan receive timed out")
1125// 					}
1126// 				}
1127// 			} else {
1128// 				if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil {
1129// 					t.Errorf("Expected the pod to be scheduled, got an error: %v", err)
1130// 				}
1131// 				for i, pl := range reservePlugins {
1132// 					if pl.numUnreserveCalled != 0 {
1133// 						t.Errorf("reservePlugins[%d].numUnreserveCalled = %d, want 0", i, pl.numUnreserveCalled)
1134// 					}
1135// 				}
1136// 			}
1137// 			testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod})
1138// 		})
1139// 	}
1140// }
1141
1142type pluginInvokeEvent struct {
1143	pluginName string
1144	val        int
1145}
1146
1147// TestBindPlugin tests invocation of bind plugins.
1148func TestBindPlugin(t *testing.T) {
1149	testContext := testutils.InitTestAPIServer(t, "bind-plugin", nil)
1150	bindPlugin1 := &BindPlugin{PluginName: "bind-plugin-1", client: testContext.ClientSet}
1151	bindPlugin2 := &BindPlugin{PluginName: "bind-plugin-2", client: testContext.ClientSet}
1152	reservePlugin := &ReservePlugin{name: "mock-reserve-plugin"}
1153	postBindPlugin := &PostBindPlugin{name: "mock-post-bind-plugin"}
1154	// Create a plugin registry for testing. Register reserve, bind, and
1155	// postBind plugins.
1156	registry := frameworkruntime.Registry{
1157		reservePlugin.Name(): func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
1158			return reservePlugin, nil
1159		},
1160		bindPlugin1.Name(): func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
1161			return bindPlugin1, nil
1162		},
1163		bindPlugin2.Name(): func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
1164			return bindPlugin2, nil
1165		},
1166		postBindPlugin.Name(): func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
1167			return postBindPlugin, nil
1168		},
1169	}
1170
1171	// Setup initial unreserve and bind plugins for testing.
1172	cfg := configtesting.V1beta2ToInternalWithDefaults(t, v1beta2.KubeSchedulerConfiguration{
1173		Profiles: []v1beta2.KubeSchedulerProfile{{
1174			SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName),
1175			Plugins: &v1beta2.Plugins{
1176				Reserve: v1beta2.PluginSet{
1177					Enabled: []v1beta2.Plugin{{Name: reservePlugin.Name()}},
1178				},
1179				Bind: v1beta2.PluginSet{
1180					// Put DefaultBinder last.
1181					Enabled:  []v1beta2.Plugin{{Name: bindPlugin1.Name()}, {Name: bindPlugin2.Name()}, {Name: defaultbinder.Name}},
1182					Disabled: []v1beta2.Plugin{{Name: defaultbinder.Name}},
1183				},
1184				PostBind: v1beta2.PluginSet{
1185					Enabled: []v1beta2.Plugin{{Name: postBindPlugin.Name()}},
1186				},
1187			},
1188		}},
1189	})
1190
1191	// Create the scheduler with the test plugin set.
1192	testCtx := testutils.InitTestSchedulerWithOptions(t, testContext, nil,
1193		scheduler.WithProfiles(cfg.Profiles...),
1194		scheduler.WithFrameworkOutOfTreeRegistry(registry))
1195	testutils.SyncInformerFactory(testCtx)
1196	go testCtx.Scheduler.Run(testCtx.Ctx)
1197	defer testutils.CleanupTest(t, testCtx)
1198
1199	// Add a few nodes.
1200	_, err := createAndWaitForNodesInCache(testCtx, "test-node", st.MakeNode(), 2)
1201	if err != nil {
1202		t.Fatal(err)
1203	}
1204
1205	tests := []struct {
1206		name                   string
1207		bindPluginStatuses     []*framework.Status
1208		expectBoundByScheduler bool   // true means this test case expecting scheduler would bind pods
1209		expectBoundByPlugin    bool   // true means this test case expecting a plugin would bind pods
1210		expectBindPluginName   string // expecting plugin name to bind pods
1211		expectInvokeEvents     []pluginInvokeEvent
1212	}{
1213		{
1214			name:                   "bind plugins skipped to bind the pod and scheduler bond the pod",
1215			bindPluginStatuses:     []*framework.Status{framework.NewStatus(framework.Skip, ""), framework.NewStatus(framework.Skip, "")},
1216			expectBoundByScheduler: true,
1217			expectInvokeEvents:     []pluginInvokeEvent{{pluginName: bindPlugin1.Name(), val: 1}, {pluginName: bindPlugin2.Name(), val: 1}, {pluginName: postBindPlugin.Name(), val: 1}},
1218		},
1219		{
1220			name:                 "bindplugin2 succeeded to bind the pod",
1221			bindPluginStatuses:   []*framework.Status{framework.NewStatus(framework.Skip, ""), framework.NewStatus(framework.Success, "")},
1222			expectBoundByPlugin:  true,
1223			expectBindPluginName: bindPlugin2.Name(),
1224			expectInvokeEvents:   []pluginInvokeEvent{{pluginName: bindPlugin1.Name(), val: 1}, {pluginName: bindPlugin2.Name(), val: 1}, {pluginName: postBindPlugin.Name(), val: 1}},
1225		},
1226		{
1227			name:                 "bindplugin1 succeeded to bind the pod",
1228			bindPluginStatuses:   []*framework.Status{framework.NewStatus(framework.Success, ""), framework.NewStatus(framework.Success, "")},
1229			expectBoundByPlugin:  true,
1230			expectBindPluginName: bindPlugin1.Name(),
1231			expectInvokeEvents:   []pluginInvokeEvent{{pluginName: bindPlugin1.Name(), val: 1}, {pluginName: postBindPlugin.Name(), val: 1}},
1232		},
1233		{
1234			name:               "bind plugin fails to bind the pod",
1235			bindPluginStatuses: []*framework.Status{framework.NewStatus(framework.Error, "failed to bind"), framework.NewStatus(framework.Success, "")},
1236			expectInvokeEvents: []pluginInvokeEvent{{pluginName: bindPlugin1.Name(), val: 1}, {pluginName: reservePlugin.Name(), val: 1}},
1237		},
1238	}
1239
1240	var pluginInvokeEventChan chan pluginInvokeEvent
1241	for _, test := range tests {
1242		t.Run(test.name, func(t *testing.T) {
1243			pluginInvokeEventChan = make(chan pluginInvokeEvent, 10)
1244
1245			bindPlugin1.bindStatus = test.bindPluginStatuses[0]
1246			bindPlugin2.bindStatus = test.bindPluginStatuses[1]
1247
1248			bindPlugin1.pluginInvokeEventChan = pluginInvokeEventChan
1249			bindPlugin2.pluginInvokeEventChan = pluginInvokeEventChan
1250			reservePlugin.pluginInvokeEventChan = pluginInvokeEventChan
1251			postBindPlugin.pluginInvokeEventChan = pluginInvokeEventChan
1252
1253			// Create a best effort pod.
1254			pod, err := createPausePod(testCtx.ClientSet,
1255				initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name}))
1256			if err != nil {
1257				t.Errorf("Error while creating a test pod: %v", err)
1258			}
1259
1260			if test.expectBoundByScheduler || test.expectBoundByPlugin {
1261				// bind plugins skipped to bind the pod
1262				if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil {
1263					t.Fatalf("Expected the pod to be scheduled. error: %v", err)
1264				}
1265				pod, err = testCtx.ClientSet.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{})
1266				if err != nil {
1267					t.Errorf("can't get pod: %v", err)
1268				}
1269				if test.expectBoundByScheduler {
1270					if pod.Annotations[bindPluginAnnotation] != "" {
1271						t.Errorf("Expected the pod to be bound by scheduler instead of by bindplugin %s", pod.Annotations[bindPluginAnnotation])
1272					}
1273					if bindPlugin1.numBindCalled != 1 || bindPlugin2.numBindCalled != 1 {
1274						t.Errorf("Expected each bind plugin to be called once, was called %d and %d times.", bindPlugin1.numBindCalled, bindPlugin2.numBindCalled)
1275					}
1276				} else {
1277					if pod.Annotations[bindPluginAnnotation] != test.expectBindPluginName {
1278						t.Errorf("Expected the pod to be bound by bindplugin %s instead of by bindplugin %s", test.expectBindPluginName, pod.Annotations[bindPluginAnnotation])
1279					}
1280					if bindPlugin1.numBindCalled != 1 {
1281						t.Errorf("Expected %s to be called once, was called %d times.", bindPlugin1.Name(), bindPlugin1.numBindCalled)
1282					}
1283					if test.expectBindPluginName == bindPlugin1.Name() && bindPlugin2.numBindCalled > 0 {
1284						// expect bindplugin1 succeeded to bind the pod and bindplugin2 should not be called.
1285						t.Errorf("Expected %s not to be called, was called %d times.", bindPlugin2.Name(), bindPlugin1.numBindCalled)
1286					}
1287				}
1288				if err = wait.Poll(10*time.Millisecond, 30*time.Second, func() (done bool, err error) {
1289					return postBindPlugin.numPostBindCalled == 1, nil
1290				}); err != nil {
1291					t.Errorf("Expected the postbind plugin to be called once, was called %d times.", postBindPlugin.numPostBindCalled)
1292				}
1293				if reservePlugin.numUnreserveCalled != 0 {
1294					t.Errorf("Expected unreserve to not be called, was called %d times.", reservePlugin.numUnreserveCalled)
1295				}
1296			} else {
1297				// bind plugin fails to bind the pod
1298				if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil {
1299					t.Errorf("Expected a scheduling error, but didn't get it. error: %v", err)
1300				}
1301				if postBindPlugin.numPostBindCalled > 0 {
1302					t.Errorf("Didn't expect the postbind plugin to be called %d times.", postBindPlugin.numPostBindCalled)
1303				}
1304			}
1305			for j := range test.expectInvokeEvents {
1306				expectEvent := test.expectInvokeEvents[j]
1307				select {
1308				case event := <-pluginInvokeEventChan:
1309					if event.pluginName != expectEvent.pluginName {
1310						t.Errorf("Expect invoke event %d from plugin %s instead of %s", j, expectEvent.pluginName, event.pluginName)
1311					}
1312					if event.val != expectEvent.val {
1313						t.Errorf("Expect val of invoke event %d to be %d instead of %d", j, expectEvent.val, event.val)
1314					}
1315				case <-time.After(time.Second * 30):
1316					t.Errorf("Waiting for invoke event %d timeout.", j)
1317				}
1318			}
1319			postBindPlugin.reset()
1320			bindPlugin1.reset()
1321			bindPlugin2.reset()
1322			reservePlugin.reset()
1323			testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod})
1324		})
1325	}
1326}
1327
1328// TestPostBindPlugin tests invocation of postbind plugins.
1329func TestPostBindPlugin(t *testing.T) {
1330	tests := []struct {
1331		name        string
1332		preBindFail bool
1333	}{
1334		{
1335			name:        "plugin preBind fail",
1336			preBindFail: true,
1337		},
1338		{
1339			name:        "plugin preBind do not fail",
1340			preBindFail: false,
1341		},
1342	}
1343
1344	for _, test := range tests {
1345		t.Run(test.name, func(t *testing.T) {
1346			// Create a plugin registry for testing. Register a prebind and a postbind plugin.
1347			preBindPlugin := &PreBindPlugin{
1348				failPreBind: test.preBindFail,
1349			}
1350			postBindPlugin := &PostBindPlugin{
1351				name:                  postBindPluginName,
1352				pluginInvokeEventChan: make(chan pluginInvokeEvent, 1),
1353			}
1354			registry := frameworkruntime.Registry{
1355				preBindPluginName:  newPlugin(preBindPlugin),
1356				postBindPluginName: newPlugin(postBindPlugin),
1357			}
1358
1359			// Setup initial prebind and postbind plugin for testing.
1360			cfg := configtesting.V1beta2ToInternalWithDefaults(t, v1beta2.KubeSchedulerConfiguration{
1361				Profiles: []v1beta2.KubeSchedulerProfile{{
1362					SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName),
1363					Plugins: &v1beta2.Plugins{
1364						PreBind: v1beta2.PluginSet{
1365							Enabled: []v1beta2.Plugin{
1366								{
1367									Name: preBindPluginName,
1368								},
1369							},
1370						},
1371						PostBind: v1beta2.PluginSet{
1372							Enabled: []v1beta2.Plugin{
1373								{
1374									Name: postBindPluginName,
1375								},
1376							},
1377						},
1378					},
1379				}},
1380			})
1381
1382			// Create the API server and the scheduler with the test plugin set.
1383			testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "postbind-plugin", nil), 2,
1384				scheduler.WithProfiles(cfg.Profiles...),
1385				scheduler.WithFrameworkOutOfTreeRegistry(registry))
1386			defer testutils.CleanupTest(t, testCtx)
1387
1388			// Create a best effort pod.
1389			pod, err := createPausePod(testCtx.ClientSet,
1390				initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name}))
1391			if err != nil {
1392				t.Errorf("Error while creating a test pod: %v", err)
1393			}
1394
1395			if test.preBindFail {
1396				if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil {
1397					t.Errorf("Expected a scheduling error, but didn't get it. error: %v", err)
1398				}
1399				if postBindPlugin.numPostBindCalled > 0 {
1400					t.Errorf("Didn't expect the postbind plugin to be called %d times.", postBindPlugin.numPostBindCalled)
1401				}
1402			} else {
1403				if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil {
1404					t.Errorf("Expected the pod to be scheduled. error: %v", err)
1405				}
1406				select {
1407				case <-postBindPlugin.pluginInvokeEventChan:
1408				case <-time.After(time.Second * 15):
1409					t.Errorf("pluginInvokeEventChan timed out")
1410				}
1411				if postBindPlugin.numPostBindCalled == 0 {
1412					t.Errorf("Expected the postbind plugin to be called, was called %d times.", postBindPlugin.numPostBindCalled)
1413				}
1414			}
1415
1416			testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod})
1417		})
1418	}
1419}
1420
1421// TestPermitPlugin tests invocation of permit plugins.
1422func TestPermitPlugin(t *testing.T) {
1423	// Create a plugin registry for testing. Register only a permit plugin.
1424	perPlugin := &PermitPlugin{name: permitPluginName}
1425	registry, prof := initRegistryAndConfig(t, perPlugin)
1426
1427	// Create the API server and the scheduler with the test plugin set.
1428	testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "permit-plugin", nil), 2,
1429		scheduler.WithProfiles(prof),
1430		scheduler.WithFrameworkOutOfTreeRegistry(registry))
1431	defer testutils.CleanupTest(t, testCtx)
1432
1433	tests := []struct {
1434		name    string
1435		fail    bool
1436		reject  bool
1437		timeout bool
1438	}{
1439		{
1440			name:    "disable fail, reject and timeout flags",
1441			fail:    false,
1442			reject:  false,
1443			timeout: false,
1444		},
1445		{
1446			name:    "enable fail, disable reject and timeout flags",
1447			fail:    true,
1448			reject:  false,
1449			timeout: false,
1450		},
1451		{
1452			name:    "disable fail and timeout, enable reject flags",
1453			fail:    false,
1454			reject:  true,
1455			timeout: false,
1456		},
1457		{
1458			name:    "enable fail and reject, disable timeout flags",
1459			fail:    true,
1460			reject:  true,
1461			timeout: false,
1462		},
1463		{
1464			name:    "disable fail and reject, disable timeout flags",
1465			fail:    false,
1466			reject:  false,
1467			timeout: true,
1468		},
1469		{
1470			name:    "disable fail and reject, enable timeout flags",
1471			fail:    false,
1472			reject:  false,
1473			timeout: true,
1474		},
1475	}
1476
1477	for _, test := range tests {
1478		t.Run(test.name, func(t *testing.T) {
1479			perPlugin.failPermit = test.fail
1480			perPlugin.rejectPermit = test.reject
1481			perPlugin.timeoutPermit = test.timeout
1482			perPlugin.waitAndRejectPermit = false
1483			perPlugin.waitAndAllowPermit = false
1484
1485			// Create a best effort pod.
1486			pod, err := createPausePod(testCtx.ClientSet,
1487				initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name}))
1488			if err != nil {
1489				t.Errorf("Error while creating a test pod: %v", err)
1490			}
1491			if test.fail {
1492				if err = wait.Poll(10*time.Millisecond, 30*time.Second, podSchedulingError(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil {
1493					t.Errorf("Expected a scheduling error, but didn't get it. error: %v", err)
1494				}
1495			} else {
1496				if test.reject || test.timeout {
1497					if err = waitForPodUnschedulable(testCtx.ClientSet, pod); err != nil {
1498						t.Errorf("Didn't expect the pod to be scheduled. error: %v", err)
1499					}
1500				} else {
1501					if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil {
1502						t.Errorf("Expected the pod to be scheduled. error: %v", err)
1503					}
1504				}
1505			}
1506
1507			if perPlugin.numPermitCalled == 0 {
1508				t.Errorf("Expected the permit plugin to be called.")
1509			}
1510
1511			perPlugin.reset()
1512			testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod})
1513		})
1514	}
1515}
1516
1517// TestMultiplePermitPlugins tests multiple permit plugins returning wait for a same pod.
1518func TestMultiplePermitPlugins(t *testing.T) {
1519	// Create a plugin registry for testing.
1520	perPlugin1 := &PermitPlugin{name: "permit-plugin-1"}
1521	perPlugin2 := &PermitPlugin{name: "permit-plugin-2"}
1522	registry, prof := initRegistryAndConfig(t, perPlugin1, perPlugin2)
1523
1524	// Create the API server and the scheduler with the test plugin set.
1525	testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "multi-permit-plugin", nil), 2,
1526		scheduler.WithProfiles(prof),
1527		scheduler.WithFrameworkOutOfTreeRegistry(registry))
1528	defer testutils.CleanupTest(t, testCtx)
1529
1530	// Both permit plugins will return Wait for permitting
1531	perPlugin1.timeoutPermit = true
1532	perPlugin2.timeoutPermit = true
1533
1534	// Create a test pod.
1535	podName := "test-pod"
1536	pod, err := createPausePod(testCtx.ClientSet,
1537		initPausePod(&pausePodConfig{Name: podName, Namespace: testCtx.NS.Name}))
1538	if err != nil {
1539		t.Errorf("Error while creating a test pod: %v", err)
1540	}
1541
1542	var waitingPod framework.WaitingPod
1543	// Wait until the test pod is actually waiting.
1544	wait.Poll(10*time.Millisecond, 30*time.Second, func() (bool, error) {
1545		waitingPod = perPlugin1.fh.GetWaitingPod(pod.UID)
1546		return waitingPod != nil, nil
1547	})
1548
1549	// Check the number of pending permits
1550	if l := len(waitingPod.GetPendingPlugins()); l != 2 {
1551		t.Errorf("Expected the number of pending plugins is 2, but got %d", l)
1552	}
1553
1554	perPlugin1.allowAllPods()
1555	// Check the number of pending permits
1556	if l := len(waitingPod.GetPendingPlugins()); l != 1 {
1557		t.Errorf("Expected the number of pending plugins is 1, but got %d", l)
1558	}
1559
1560	perPlugin2.allowAllPods()
1561	if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil {
1562		t.Errorf("Expected the pod to be scheduled. error: %v", err)
1563	}
1564
1565	if perPlugin1.numPermitCalled == 0 || perPlugin2.numPermitCalled == 0 {
1566		t.Errorf("Expected the permit plugin to be called.")
1567	}
1568
1569	testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod})
1570}
1571
1572// TestPermitPluginsCancelled tests whether all permit plugins are cancelled when pod is rejected.
1573func TestPermitPluginsCancelled(t *testing.T) {
1574	// Create a plugin registry for testing.
1575	perPlugin1 := &PermitPlugin{name: "permit-plugin-1"}
1576	perPlugin2 := &PermitPlugin{name: "permit-plugin-2"}
1577	registry, prof := initRegistryAndConfig(t, perPlugin1, perPlugin2)
1578
1579	// Create the API server and the scheduler with the test plugin set.
1580	testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "permit-plugins", nil), 2,
1581		scheduler.WithProfiles(prof),
1582		scheduler.WithFrameworkOutOfTreeRegistry(registry))
1583	defer testutils.CleanupTest(t, testCtx)
1584
1585	// Both permit plugins will return Wait for permitting
1586	perPlugin1.timeoutPermit = true
1587	perPlugin2.timeoutPermit = true
1588
1589	// Create a test pod.
1590	podName := "test-pod"
1591	pod, err := createPausePod(testCtx.ClientSet,
1592		initPausePod(&pausePodConfig{Name: podName, Namespace: testCtx.NS.Name}))
1593	if err != nil {
1594		t.Errorf("Error while creating a test pod: %v", err)
1595	}
1596
1597	var waitingPod framework.WaitingPod
1598	// Wait until the test pod is actually waiting.
1599	wait.Poll(10*time.Millisecond, 30*time.Second, func() (bool, error) {
1600		waitingPod = perPlugin1.fh.GetWaitingPod(pod.UID)
1601		return waitingPod != nil, nil
1602	})
1603
1604	perPlugin1.rejectAllPods()
1605	// Wait some time for the permit plugins to be cancelled
1606	err = wait.Poll(10*time.Millisecond, 30*time.Second, func() (bool, error) {
1607		return perPlugin1.cancelled && perPlugin2.cancelled, nil
1608	})
1609	if err != nil {
1610		t.Errorf("Expected all permit plugins to be cancelled")
1611	}
1612}
1613
1614// TestCoSchedulingWithPermitPlugin tests invocation of permit plugins.
1615func TestCoSchedulingWithPermitPlugin(t *testing.T) {
1616	// Create a plugin registry for testing. Register only a permit plugin.
1617	permitPlugin := &PermitPlugin{name: permitPluginName}
1618	registry, prof := initRegistryAndConfig(t, permitPlugin)
1619
1620	// Create the API server and the scheduler with the test plugin set.
1621	// TODO Make the subtests not share scheduler instances.
1622	testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "permit-plugin", nil), 2,
1623		scheduler.WithProfiles(prof),
1624		scheduler.WithFrameworkOutOfTreeRegistry(registry))
1625	defer testutils.CleanupTest(t, testCtx)
1626
1627	tests := []struct {
1628		name       string
1629		waitReject bool
1630		waitAllow  bool
1631	}{
1632		{
1633			name:       "having wait reject true and wait allow false",
1634			waitReject: true,
1635			waitAllow:  false,
1636		},
1637		{
1638			name:       "having wait reject false and wait allow true",
1639			waitReject: false,
1640			waitAllow:  true,
1641		},
1642	}
1643
1644	for _, test := range tests {
1645		t.Run(test.name, func(t *testing.T) {
1646			permitPlugin.failPermit = false
1647			permitPlugin.rejectPermit = false
1648			permitPlugin.timeoutPermit = false
1649			permitPlugin.waitAndRejectPermit = test.waitReject
1650			permitPlugin.waitAndAllowPermit = test.waitAllow
1651
1652			// Create two pods. First pod to enter Permit() will wait and a second one will either
1653			// reject or allow first one.
1654			podA, err := createPausePod(testCtx.ClientSet,
1655				initPausePod(&pausePodConfig{Name: "pod-a", Namespace: testCtx.NS.Name}))
1656			if err != nil {
1657				t.Errorf("Error while creating the first pod: %v", err)
1658			}
1659			podB, err := createPausePod(testCtx.ClientSet,
1660				initPausePod(&pausePodConfig{Name: "pod-b", Namespace: testCtx.NS.Name}))
1661			if err != nil {
1662				t.Errorf("Error while creating the second pod: %v", err)
1663			}
1664
1665			if test.waitReject {
1666				if err = waitForPodUnschedulable(testCtx.ClientSet, podA); err != nil {
1667					t.Errorf("Didn't expect the first pod to be scheduled. error: %v", err)
1668				}
1669				if err = waitForPodUnschedulable(testCtx.ClientSet, podB); err != nil {
1670					t.Errorf("Didn't expect the second pod to be scheduled. error: %v", err)
1671				}
1672				if !((permitPlugin.waitingPod == podA.Name && permitPlugin.rejectingPod == podB.Name) ||
1673					(permitPlugin.waitingPod == podB.Name && permitPlugin.rejectingPod == podA.Name)) {
1674					t.Errorf("Expect one pod to wait and another pod to reject instead %s waited and %s rejected.",
1675						permitPlugin.waitingPod, permitPlugin.rejectingPod)
1676				}
1677			} else {
1678				if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, podA); err != nil {
1679					t.Errorf("Expected the first pod to be scheduled. error: %v", err)
1680				}
1681				if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, podB); err != nil {
1682					t.Errorf("Expected the second pod to be scheduled. error: %v", err)
1683				}
1684				if !((permitPlugin.waitingPod == podA.Name && permitPlugin.allowingPod == podB.Name) ||
1685					(permitPlugin.waitingPod == podB.Name && permitPlugin.allowingPod == podA.Name)) {
1686					t.Errorf("Expect one pod to wait and another pod to allow instead %s waited and %s allowed.",
1687						permitPlugin.waitingPod, permitPlugin.allowingPod)
1688				}
1689			}
1690
1691			if permitPlugin.numPermitCalled == 0 {
1692				t.Errorf("Expected the permit plugin to be called.")
1693			}
1694
1695			permitPlugin.reset()
1696			testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{podA, podB})
1697		})
1698	}
1699}
1700
1701// TestFilterPlugin tests invocation of filter plugins.
1702func TestFilterPlugin(t *testing.T) {
1703	// Create a plugin registry for testing. Register only a filter plugin.
1704	filterPlugin := &FilterPlugin{}
1705	registry := frameworkruntime.Registry{filterPluginName: newPlugin(filterPlugin)}
1706
1707	// Setup initial filter plugin for testing.
1708	cfg := configtesting.V1beta2ToInternalWithDefaults(t, v1beta2.KubeSchedulerConfiguration{
1709		Profiles: []v1beta2.KubeSchedulerProfile{{
1710			SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName),
1711			Plugins: &v1beta2.Plugins{
1712				Filter: v1beta2.PluginSet{
1713					Enabled: []v1beta2.Plugin{
1714						{Name: filterPluginName},
1715					},
1716				},
1717			},
1718		}},
1719	})
1720
1721	// Create the API server and the scheduler with the test plugin set.
1722	testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "filter-plugin", nil), 1,
1723		scheduler.WithProfiles(cfg.Profiles...),
1724		scheduler.WithFrameworkOutOfTreeRegistry(registry))
1725	defer testutils.CleanupTest(t, testCtx)
1726
1727	tests := []struct {
1728		name string
1729		fail bool
1730	}{
1731		{
1732			name: "fail filter plugin",
1733			fail: true,
1734		},
1735		{
1736			name: "do not fail filter plugin",
1737			fail: false,
1738		},
1739	}
1740
1741	for _, test := range tests {
1742		t.Run(test.name, func(t *testing.T) {
1743			filterPlugin.failFilter = test.fail
1744			// Create a best effort pod.
1745			pod, err := createPausePod(testCtx.ClientSet,
1746				initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name}))
1747			if err != nil {
1748				t.Errorf("Error while creating a test pod: %v", err)
1749			}
1750
1751			if test.fail {
1752				if err = wait.Poll(10*time.Millisecond, 30*time.Second, podUnschedulable(testCtx.ClientSet, pod.Namespace, pod.Name)); err != nil {
1753					t.Errorf("Didn't expect the pod to be scheduled.")
1754				}
1755				if filterPlugin.numFilterCalled < 1 {
1756					t.Errorf("Expected the filter plugin to be called at least 1 time, but got %v.", filterPlugin.numFilterCalled)
1757				}
1758			} else {
1759				if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil {
1760					t.Errorf("Expected the pod to be scheduled. error: %v", err)
1761				}
1762				if filterPlugin.numFilterCalled != 1 {
1763					t.Errorf("Expected the filter plugin to be called 1 time, but got %v.", filterPlugin.numFilterCalled)
1764				}
1765			}
1766
1767			filterPlugin.reset()
1768			testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod})
1769		})
1770	}
1771}
1772
1773// TestPreScorePlugin tests invocation of pre-score plugins.
1774func TestPreScorePlugin(t *testing.T) {
1775	// Create a plugin registry for testing. Register only a pre-score plugin.
1776	preScorePlugin := &PreScorePlugin{}
1777	registry := frameworkruntime.Registry{preScorePluginName: newPlugin(preScorePlugin)}
1778
1779	// Setup initial pre-score plugin for testing.
1780	cfg := configtesting.V1beta2ToInternalWithDefaults(t, v1beta2.KubeSchedulerConfiguration{
1781		Profiles: []v1beta2.KubeSchedulerProfile{{
1782			SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName),
1783			Plugins: &v1beta2.Plugins{
1784				PreScore: v1beta2.PluginSet{
1785					Enabled: []v1beta2.Plugin{
1786						{Name: preScorePluginName},
1787					},
1788				},
1789			},
1790		}},
1791	})
1792
1793	// Create the API server and the scheduler with the test plugin set.
1794	testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "pre-score-plugin", nil), 2,
1795		scheduler.WithProfiles(cfg.Profiles...),
1796		scheduler.WithFrameworkOutOfTreeRegistry(registry))
1797	defer testutils.CleanupTest(t, testCtx)
1798
1799	tests := []struct {
1800		name string
1801		fail bool
1802	}{
1803		{
1804			name: "fail preScore plugin",
1805			fail: true,
1806		},
1807		{
1808			name: "do not fail preScore plugin",
1809			fail: false,
1810		},
1811	}
1812
1813	for _, test := range tests {
1814		t.Run(test.name, func(t *testing.T) {
1815			preScorePlugin.failPreScore = test.fail
1816			// Create a best effort pod.
1817			pod, err := createPausePod(testCtx.ClientSet,
1818				initPausePod(&pausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name}))
1819			if err != nil {
1820				t.Errorf("Error while creating a test pod: %v", err)
1821			}
1822
1823			if test.fail {
1824				if err = waitForPodUnschedulable(testCtx.ClientSet, pod); err != nil {
1825					t.Errorf("Didn't expect the pod to be scheduled. error: %v", err)
1826				}
1827			} else {
1828				if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, pod); err != nil {
1829					t.Errorf("Expected the pod to be scheduled. error: %v", err)
1830				}
1831			}
1832
1833			if preScorePlugin.numPreScoreCalled == 0 {
1834				t.Errorf("Expected the pre-score plugin to be called.")
1835			}
1836
1837			preScorePlugin.reset()
1838			testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{pod})
1839		})
1840	}
1841}
1842
1843// TestPreemptWithPermitPlugin tests preempt with permit plugins.
1844func TestPreemptWithPermitPlugin(t *testing.T) {
1845	// Create a plugin registry for testing. Register only a permit plugin.
1846	permitPlugin := &PermitPlugin{}
1847	registry, prof := initRegistryAndConfig(t, permitPlugin)
1848
1849	// Create the API server and the scheduler with the test plugin set.
1850	testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "preempt-with-permit-plugin", nil), 0,
1851		scheduler.WithProfiles(prof),
1852		scheduler.WithFrameworkOutOfTreeRegistry(registry))
1853	defer testutils.CleanupTest(t, testCtx)
1854
1855	// Add one node.
1856	nodeRes := map[v1.ResourceName]string{
1857		v1.ResourcePods:   "32",
1858		v1.ResourceCPU:    "500m",
1859		v1.ResourceMemory: "500",
1860	}
1861	_, err := createAndWaitForNodesInCache(testCtx, "test-node", st.MakeNode().Capacity(nodeRes), 1)
1862	if err != nil {
1863		t.Fatal(err)
1864	}
1865
1866	permitPlugin.failPermit = false
1867	permitPlugin.rejectPermit = false
1868	permitPlugin.timeoutPermit = false
1869	permitPlugin.waitAndRejectPermit = false
1870	permitPlugin.waitAndAllowPermit = true
1871	permitPlugin.waitingPod = "waiting-pod"
1872
1873	lowPriority, highPriority := int32(100), int32(300)
1874	resourceRequest := v1.ResourceRequirements{Requests: v1.ResourceList{
1875		v1.ResourceCPU:    *resource.NewMilliQuantity(200, resource.DecimalSI),
1876		v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)},
1877	}
1878	preemptorResourceRequest := v1.ResourceRequirements{Requests: v1.ResourceList{
1879		v1.ResourceCPU:    *resource.NewMilliQuantity(400, resource.DecimalSI),
1880		v1.ResourceMemory: *resource.NewQuantity(400, resource.DecimalSI)},
1881	}
1882
1883	// First pod will go running.
1884	runningPod := initPausePod(&pausePodConfig{Name: "running-pod", Namespace: testCtx.NS.Name, Priority: &lowPriority, Resources: &resourceRequest})
1885	runningPod.Spec.TerminationGracePeriodSeconds = new(int64)
1886	runningPod, err = createPausePod(testCtx.ClientSet, runningPod)
1887	if err != nil {
1888		t.Errorf("Error while creating the waiting pod: %v", err)
1889	}
1890	// Wait until the pod scheduled, then create a preemptor pod to preempt it.
1891	wait.Poll(100*time.Millisecond, 30*time.Second, podScheduled(testCtx.ClientSet, runningPod.Name, runningPod.Namespace))
1892
1893	// Second pod will go waiting.
1894	waitingPod := initPausePod(&pausePodConfig{Name: "waiting-pod", Namespace: testCtx.NS.Name, Priority: &lowPriority, Resources: &resourceRequest})
1895	waitingPod.Spec.TerminationGracePeriodSeconds = new(int64)
1896	waitingPod, err = createPausePod(testCtx.ClientSet, waitingPod)
1897	if err != nil {
1898		t.Errorf("Error while creating the waiting pod: %v", err)
1899	}
1900	// Wait until the waiting-pod is actually waiting, then create a preemptor pod to preempt it.
1901	wait.Poll(10*time.Millisecond, 30*time.Second, func() (bool, error) {
1902		w := false
1903		permitPlugin.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { w = true })
1904		return w, nil
1905	})
1906
1907	// Create third pod which should preempt other pods.
1908	preemptorPod, err := createPausePod(testCtx.ClientSet,
1909		initPausePod(&pausePodConfig{Name: "preemptor-pod", Namespace: testCtx.NS.Name, Priority: &highPriority, Resources: &preemptorResourceRequest}))
1910	if err != nil {
1911		t.Errorf("Error while creating the preemptor pod: %v", err)
1912	}
1913
1914	// TODO(#96478): uncomment below once we find a way to trigger MoveAllToActiveOrBackoffQueue()
1915	// upon deletion event of unassigned waiting pods.
1916	// if err = testutils.WaitForPodToSchedule(testCtx.ClientSet, preemptorPod); err != nil {
1917	// 	t.Errorf("Expected the preemptor pod to be scheduled. error: %v", err)
1918	// }
1919
1920	if err := wait.Poll(200*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
1921		w := false
1922		permitPlugin.fh.IterateOverWaitingPods(func(wp framework.WaitingPod) { w = true })
1923		return !w, nil
1924	}); err != nil {
1925		t.Error("Expected the waiting pod to get preempted")
1926	}
1927	// Expect the waitingPod to be still present.
1928	if _, err := getPod(testCtx.ClientSet, waitingPod.Name, waitingPod.Namespace); err != nil {
1929		t.Error("Get waiting pod in waiting pod failed.")
1930	}
1931	// Expect the runningPod to be deleted physically.
1932	_, err = getPod(testCtx.ClientSet, runningPod.Name, runningPod.Namespace)
1933	if err != nil && !errors.IsNotFound(err) {
1934		t.Error("Get running pod failed.")
1935	}
1936	if err == nil {
1937		t.Error("Running pod still exist.")
1938	}
1939	if permitPlugin.numPermitCalled == 0 {
1940		t.Errorf("Expected the permit plugin to be called.")
1941	}
1942
1943	permitPlugin.reset()
1944	testutils.CleanupPods(testCtx.ClientSet, t, []*v1.Pod{waitingPod, runningPod, preemptorPod})
1945}
1946
1947const (
1948	jobPluginName = "job plugin"
1949)
1950
1951var _ framework.PreFilterPlugin = &JobPlugin{}
1952var _ framework.PostBindPlugin = &PostBindPlugin{}
1953
1954type JobPlugin struct {
1955	podLister     listersv1.PodLister
1956	podsActivated bool
1957}
1958
1959func (j *JobPlugin) Name() string {
1960	return jobPluginName
1961}
1962
1963func (j *JobPlugin) PreFilter(_ context.Context, _ *framework.CycleState, p *v1.Pod) *framework.Status {
1964	labelSelector := labels.SelectorFromSet(labels.Set{"driver": ""})
1965	driverPods, err := j.podLister.Pods(p.Namespace).List(labelSelector)
1966	if err != nil {
1967		return framework.AsStatus(err)
1968	}
1969	if len(driverPods) == 0 {
1970		return framework.NewStatus(framework.UnschedulableAndUnresolvable, "unable to find driver pod")
1971	}
1972	return nil
1973}
1974
1975func (j *JobPlugin) PreFilterExtensions() framework.PreFilterExtensions {
1976	return nil
1977}
1978
1979func (j *JobPlugin) PostBind(_ context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) {
1980	if _, ok := p.Labels["driver"]; !ok {
1981		return
1982	}
1983
1984	// If it's a driver pod, move other executor pods proactively to accelerating the scheduling.
1985	labelSelector := labels.SelectorFromSet(labels.Set{"executor": ""})
1986	podsToActivate, err := j.podLister.Pods(p.Namespace).List(labelSelector)
1987	if err == nil && len(podsToActivate) != 0 {
1988		c, err := state.Read(framework.PodsToActivateKey)
1989		if err == nil {
1990			if s, ok := c.(*framework.PodsToActivate); ok {
1991				s.Lock()
1992				for _, pod := range podsToActivate {
1993					namespacedName := fmt.Sprintf("%v/%v", pod.Namespace, pod.Name)
1994					s.Map[namespacedName] = pod
1995				}
1996				s.Unlock()
1997				j.podsActivated = true
1998			}
1999		}
2000	}
2001}
2002
2003// This test simulates a typical spark job workflow.
2004// - N executor pods are created, but kept pending due to missing the driver pod
2005// - when the driver pod gets created and scheduled, proactively move the executors to activeQ
2006//   and thus accelerate the entire job workflow.
2007func TestActivatePods(t *testing.T) {
2008	var jobPlugin *JobPlugin
2009	// Create a plugin registry for testing. Register a Job plugin.
2010	registry := frameworkruntime.Registry{jobPluginName: func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) {
2011		jobPlugin = &JobPlugin{podLister: fh.SharedInformerFactory().Core().V1().Pods().Lister()}
2012		return jobPlugin, nil
2013	}}
2014
2015	// Setup initial filter plugin for testing.
2016	cfg := configtesting.V1beta2ToInternalWithDefaults(t, v1beta2.KubeSchedulerConfiguration{
2017		Profiles: []v1beta2.KubeSchedulerProfile{{
2018			SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName),
2019			Plugins: &v1beta2.Plugins{
2020				PreFilter: v1beta2.PluginSet{
2021					Enabled: []v1beta2.Plugin{
2022						{Name: jobPluginName},
2023					},
2024				},
2025				PostBind: v1beta2.PluginSet{
2026					Enabled: []v1beta2.Plugin{
2027						{Name: jobPluginName},
2028					},
2029				},
2030			},
2031		}},
2032	})
2033
2034	// Create the API server and the scheduler with the test plugin set.
2035	testCtx := initTestSchedulerForFrameworkTest(t, testutils.InitTestAPIServer(t, "job-plugin", nil), 1,
2036		scheduler.WithProfiles(cfg.Profiles...),
2037		scheduler.WithFrameworkOutOfTreeRegistry(registry))
2038	defer testutils.CleanupTest(t, testCtx)
2039
2040	cs := testCtx.ClientSet
2041	ns := testCtx.NS.Name
2042	pause := imageutils.GetPauseImageName()
2043
2044	// Firstly create 2 executor pods.
2045	var pods []*v1.Pod
2046	for i := 1; i <= 2; i++ {
2047		name := fmt.Sprintf("executor-%v", i)
2048		executor := st.MakePod().Name(name).Namespace(ns).Label("executor", "").Container(pause).Obj()
2049		pods = append(pods, executor)
2050		if _, err := cs.CoreV1().Pods(executor.Namespace).Create(context.TODO(), executor, metav1.CreateOptions{}); err != nil {
2051			t.Fatalf("Failed to create pod %v: %v", executor.Name, err)
2052		}
2053	}
2054
2055	// Wait for the 2 executor pods to be unschedulable.
2056	for _, pod := range pods {
2057		if err := waitForPodUnschedulable(cs, pod); err != nil {
2058			t.Errorf("Failed to wait for Pod %v to be unschedulable: %v", pod.Name, err)
2059		}
2060	}
2061
2062	// Create a driver pod.
2063	driver := st.MakePod().Name("driver").Namespace(ns).Label("driver", "").Container(pause).Obj()
2064	pods = append(pods, driver)
2065	if _, err := cs.CoreV1().Pods(driver.Namespace).Create(context.TODO(), driver, metav1.CreateOptions{}); err != nil {
2066		t.Fatalf("Failed to create pod %v: %v", driver.Name, err)
2067	}
2068
2069	// Verify all pods to be scheduled.
2070	for _, pod := range pods {
2071		if err := waitForPodToScheduleWithTimeout(cs, pod, wait.ForeverTestTimeout); err != nil {
2072			t.Fatalf("Failed to wait for Pod %v to be schedulable: %v", pod.Name, err)
2073		}
2074	}
2075
2076	// Lastly verify the pods activation logic is really called.
2077	if jobPlugin.podsActivated == false {
2078		t.Errorf("JobPlugin's pods activation logic is not called")
2079	}
2080}
2081
2082func initTestSchedulerForFrameworkTest(t *testing.T, testCtx *testutils.TestContext, nodeCount int, opts ...scheduler.Option) *testutils.TestContext {
2083	testCtx = testutils.InitTestSchedulerWithOptions(t, testCtx, nil, opts...)
2084	testutils.SyncInformerFactory(testCtx)
2085	go testCtx.Scheduler.Run(testCtx.Ctx)
2086
2087	if nodeCount > 0 {
2088		if _, err := createAndWaitForNodesInCache(testCtx, "test-node", st.MakeNode(), nodeCount); err != nil {
2089			t.Fatal(err)
2090		}
2091	}
2092	return testCtx
2093}
2094
2095// initRegistryAndConfig returns registry and plugins config based on give plugins.
2096// TODO: refactor it to a more generic functions that accepts all kinds of Plugins as arguments
2097func initRegistryAndConfig(t *testing.T, pp ...*PermitPlugin) (frameworkruntime.Registry, schedulerconfig.KubeSchedulerProfile) {
2098	var registry frameworkruntime.Registry
2099	if len(pp) == 0 {
2100		return frameworkruntime.Registry{}, schedulerconfig.KubeSchedulerProfile{}
2101	}
2102
2103	versionedCfg := v1beta2.KubeSchedulerConfiguration{
2104		Profiles: []v1beta2.KubeSchedulerProfile{{
2105			SchedulerName: pointer.StringPtr(v1.DefaultSchedulerName),
2106			Plugins: &v1beta2.Plugins{
2107				Permit: v1beta2.PluginSet{},
2108			},
2109		}},
2110	}
2111	registry = frameworkruntime.Registry{}
2112	for _, p := range pp {
2113		registry.Register(p.Name(), newPermitPlugin(p))
2114		versionedCfg.Profiles[0].Plugins.Permit.Enabled = append(versionedCfg.Profiles[0].Plugins.Permit.Enabled, v1beta2.Plugin{Name: p.Name()})
2115	}
2116	cfg := configtesting.V1beta2ToInternalWithDefaults(t, versionedCfg)
2117	return registry, cfg.Profiles[0]
2118}
2119